1、应用环境
数据库连接池,主要用于多线程,为了防止多线程同时对数据库进行操作出现混乱。
2、使用
2.1安装
pip install DBUtils
2.2、导入
from dbutils.pooled_db import PooledDB
2.3、创建数据库连接池对象
self.mysql_pool_list = PooledDB(creator=pymysql,#数据库类型
maxcached=200,#最大空闲数
blocking=True,#默认False,即达到最大连接数时,再取新连接将会报错,True,达到最大连接数时,新连接阻塞,等待连接数减少再连接
ping=4,
host=self.MYSQL_HOST, port=self.MYSQL_PORT, user=self.MYSQL_USER,
password=self.MYSQL_PASSWORD,
db=self.MYSQL_DB,
charset='utf8'
)
参数说明:
1.creator:数据库驱动模块,如常见的pymysql,pymssql,cx_Oracle模块。无默认值
2.mincached:初始化连接池时创建的连接数。默认为0,即初始化时不创建连接。(建议默认0,假如非0的话,在某些数据库不可用时,整个项目会启动不了)
3.maxcached:池中空闲连接的最大数量。默认为0,即无最大数量限制。(建议默认)
4.maxshared:池中共享连接的最大数量。默认为0,即每个连接都是专用的,不可共享(不常用,建议默认)
5.maxconnections:被允许的最大连接数。默认为0,无最大数量限制。(视情况而定)
6.blocking:连接数达到最大时,新连接是否可阻塞。默认False,即达到最大连接数时,再取新连接将会报错。(建议True,达到最大连接数时,新连接阻塞,等待连接数减少再连接)
7.maxusage:连接的最大使用次数。默认0,即无使用次数限制。(建议默认)
8.setsession:可选的SQL命令列表,可用于准备会话。(例如设置时区)
9.reset:当连接返回到池中时,重置连接的方式。默认True,总是执行回滚。
10.ping:确定何时使用ping()检查连接。默认1,即当连接被取走,做一次ping操作。0是从不ping,1是默认,2是当该连接创建游标时ping,4是执行sql语句时ping,7是总是ping
2.4获取数据库链接
从连接池中获取一条数据库链接
conn = self.mysql_pool_list.connection()
完整代码
from dbutils.pooled_db import PooledDB
import configparser
import pymysql
import time
class Operation_mysql():
def __init__(self):
self.cofig()
self.mysql_pool()
# 数据接连接池
def mysql_pool(self):
while True:
try:
self.mysql_pool_list = PooledDB(creator=pymysql,#数据库类型
maxcached=200,#最大空闲数
blocking=True,#默认False,即达到最大连接数时,再取新连接将会报错,True,达到最大连接数时,新连接阻塞,等待连接数减少再连接
ping=4,
host=self.MYSQL_HOST, port=self.MYSQL_PORT, user=self.MYSQL_USER,
password=self.MYSQL_PASSWORD,
db=self.MYSQL_DB,
charset='utf8'
)
except BaseException as e:
print(f'数据库链接错误{e}')
self.mysql_pool_list = None
if self.mysql_pool_list:
print('数据库链接成功')
break
time.sleep(5)
#获取一条数据库链接
def get_conn(self):
conn = self.mysql_pool_list.connection()
cur = conn.cursor()
return conn,cur
#关闭数据库链接
def close_conn(self,conn,cur):
cur.close()
conn.close()
#查询数据库
def select_infor(self,insert):
conn,cur = self.get_conn()
try:
cur.execute(insert)
return cur.fetchall()
except BaseException as e:
print('数据库查询错误')
finally:
self.close_conn(conn,cur)
#更新数据库
def update_infor(self,insert):
conn, cur = self.get_conn()
try:
cur.execute(insert)
conn.commit()
return True
except BaseException as e:
print(f'数据库更新错误{e}')
finally:
self.close_conn(conn, cur)
#读取配置文件
def cofig(self):
# 读取信息
self.config = configparser.RawConfigParser()
try:
self.config.read('setting.ini')
self.MYSQL_HOST = self.config.get('USER', 'MYSQL_HOST').strip('\' ')
self.MYSQL_PORT = int(self.config.get('USER', 'MYSQL_PORT').strip('\' '))
self.MYSQL_USER = self.config.get('USER', 'MYSQL_USER').strip('\' ')
self.MYSQL_PASSWORD = self.config.get('USER', 'MYSQL_PASSWORD').strip('\' ')
self.MYSQL_DB = self.config.get('USER', 'MYSQL_DB').strip('\' ')
except BaseException as e:
print(f'读取配置文件错误:{e}')