python 数据库连接池

因为数据库的连接和关闭比较消耗资源,所以通常创建一个连接池,需要就获取,用完则放回连接池

主要有两个模块PooledDBPersistentDB

1、PooledDBPersistentDB的使用场景

1-1 PersistentDB

1、此模块好处是能保证线程安全为每一个线程建立一个连接,所以线程之间的链接是不共享的。
每个线程每次获取都是同一个链接,当线程完成或杀死的时候,链接就关闭了。
2、有稳定个数的线程用PersistentDB
3、使用多线程时没有返回所有的数据【太水不知道为什么】

1-2 PooledDB

1、当有一个线程想获取一个链接时
①如果要求非共享链接,直接拿出一个空闲链接返回
②如果要求是共享链接,【参数maxshared】限制共享链接的总个数,如果到达maxshared的上限,就等待/报错【参数blocking –true:则等待 –False(默认):报错】
2、经常开关线程用PooledDB
3、使用多线程时返回所有的数据

**2、数据库封装 **

因为我执行希望能使其为一个事务,所以这样写

mysqlutility.py

from loguru import logger
import pymysql

from dbutils.persistent_db import PersistentDB
from dbutils.pooled_db import PooledDB

TEXT_DB = {"host": "127.0.0.1", "port": 3306, "user": "leslie", "password": "cheung",
           "database": "test", "cursorclass": pymysql.cursors.DictCursor}


class MysqlUtility:
    __pool = None

    def __init__(self, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
                 maxusage=100, setsession=None, reset=True, data_info=None):
        """

        :param mincached:连接池中空闲连接的初始数量
        :param maxcached:连接池中空闲连接的最大数量
        :param maxshared:共享连接的最大数量
        :param maxconnections:创建连接池的最大数量
        :param blocking:超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理
        :param maxusage:单个连接的最大重复使用次数
        :param setsession:optional list of SQL commands that may serve to prepare
            the session, e.g. ["set datestyle to ...", "set time zone ..."]
        :param reset:how connections should be reset when returned to the pool
            (False or None to rollback transcations started with begin(),
            True to always issue a rollback for safety's sake)
        """
        if not self.__pool:
            self.data_info = data_info if data_info else TEXT_DB
            # self.__class__.__pool = PooledDB(creator=pymysql,  **self.data_info)
            self.__class__.__pool = PersistentDB(creator=pymysql, **self.data_info)

    def get_conn(self):
        conn = self.__pool.connection()  # 从连接池获取一个链接
        cursor = conn.cursor()
        return conn, cursor

    @staticmethod
    def select(sql, cursor):
        try:
            cursor.execute(sql)
            # data = cursor.fetchall()
            data = cursor.fetchone()
            return data
        except Exception as e:
            logger.exception(f"执行select语句异常:{e}")

    def execute(self, sql, cursor):
        return self._execute_db(sql=sql, cursor=cursor, execute_type=1)

    def executemany(self, sql, cursor):
        return self._execute_db(sql=sql, cursor=cursor, execute_type=2)

    @staticmethod
    def _execute_db(sql, cursor, execute_type, data=None):
        func = None
        try:
            if execute_type == 1:
                func = "execute"
                rows = cursor.execute(sql)
            elif execute_type == 2:
                func = "executemany"
                rows = cursor.executemany(sql, data)
            else:
                raise ValueError(f"没有类型为{execute_type}的执行方法")
            return rows
        except Exception as e:
            logger.exception(f"执行{func}语句异常:{e}")

    @staticmethod
    def commit(conn):
        conn.commit()

    @staticmethod
    def close(conn, cursor):
        cursor.close()
        conn.close()

    @staticmethod
    def rollback(conn):
        conn.rollback()


mysql_utility = MysqlUtility()

调用

import threading

from mysqlutility import mysql_utility


def handle(i):
    sql = "SELECT * FROM base_info"
    conn, cursor = mysql_utility.get_conn()
    data = mysql_utility.select(sql=sql, cursor=cursor)
    print(f"num:{i} 数据:{data}")

def transaction(s_id):
    conn, cursor = mysql_utility.get_conn()
    try:
        ## 如果语句中某个字段是字符串,那么根据它查找时要带引号
        del_sql = f"delete from base_info where id = '{s_id}'"
        save_key = '(name, age)'
        insert_val = [('张国荣', 55), ('林正英', 69)]
        inser_sql = f"insert into base_info {save_key} values {insert_val}"
        mysql_utility.execute(sql=del_sql, cursor=cursor)
        mysql_utility.executemany(sql=inser_sql, cursor=cursor, data=insert_val)
    except Exception as e:
        logger.error(f"执行事务异常:{e}")
        mysql_utility.rollback(conn=conn)
    finally:
        mysql_utility.commit(conn=conn)
        mysql_utility.close(conn=conn, cursor=cursor)
        

if __name__ == '__main__':
    # tasks = []
    # for i in range(8):
    #     t = threading.Thread(target=handle, args=(i,))
    #     tasks.append(t)
    # for t in tasks:
    #     t.start()
    # for t in tasks:
    #     t.join()

    s_id = "1212"
    transaction(s_id=s_id)

来源:ai__hashimoto

物联沃分享整理
物联沃-IOTWORD物联网 » python 数据库连接池

发表评论