本篇文章主要讲解mysqlclient操作MySQL关系型数据库,安装mysqlclient的命令行:pip install mysqlclient
;
然后创建一个名为XKD_Python_Course的数据库和一张名为students的数据库表,我们先在命令行工具里面查看一下表名是否存在,登录mysql数据库的命令行:mysql -uroot -p
,然后show databases;
,发现没有XKD_Python_Course这个数据库,那我们就创建一个:create database XKD_Python_Course;
,创建完再:show databases;
一下,可以看到XKD_Python_Course数据库已经存在了
- 然后我们可以使用命令行:
use XKD_Python_Course;
进入这个库,然后开始创建数据库表;
CREATE TABLE `students` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL,
`age` int(11) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
-
然后可以通过命令行:
show create table students;
,查看一下创建表的创建;
-
还可以通过命令行:
desc students;
,查看表的结构;
此时我们已经创建好了数据库和数据库表,然后我们需要授权用户,设置密码,授权成功之后一定要记得刷新权限;
grant all on XKD_Python_Course.* to 'zengzeng'@'%' identified by '123456';
# 刷新权限
flush privileges;
-
授权后我们可以退出,然后登录我们刚刚授权的用户:
mysql -uzengzeng -p123456
,就可以成功登录,登录后可以show databases;
查看一下刚刚创建的数据库
然后我们可以看到里面有两个数据库,如果我们想操作XKD_Python_Cours,那我们就需要使用:
use XKD_Python_Course
操作数据库
刚刚我们安装的mysqlclient中有一个MySQLdb类,我们可以用这个MySQLdb类去操作mysql数据库
-
插入操作,在插入之前我们可以先查看一下数据表里是否已经存在这条数据:
select * from students;
,然后可以放心的进行插入了
import MySQLdb
connect = None # 连接对象
cursor = None # 游标对象
try:
# 连接对象
connect = MySQLdb.connect(host='localhost', # 主机地址
user='zengzeng', # 账号
password='123456', # 密码
database='XKD_Python_Course', # 数据库名
use_unicode=True,
charset='utf8') # 指定字符集
# 游标对象
cursor = connect.cursor() # 通过连接对象调用cursor()
except Exception as e:
print(e)
connect.close()
try:
if cursor:
result = cursor.execute("insert into students (name, age) values ('Angle', 18)") # 插入操作
print('result = {}'.format(result))
connect.commit() # 提交
except Exception as e:
print(e)
connect.rollback() # 回滚
finally:
if cursor:
cursor.close()
if connect:
connect.close()
执行代码,返回的结果是result = 1,表示操作了一行数据,这时我们查看数据库表select * from students;
,是不是发现表中多了一条数据啊;
- 批量插入数据,就是在插入数据的地方增加一个for循环,我们来试一下一次插入10条数据;
import MySQLdb
connect = None # 连接对象
cursor = None # 游标对象
try:
# 连接对象
connect = MySQLdb.connect(host='localhost', # 主机地址
user='zengzeng', # 账号
password='123456', # 密码
database='XKD_Python_Course', # 数据库名
use_unicode=True,
charset='utf8') # 指定字符集
# 游标对象
cursor = connect.cursor() # 通过连接对象调用cursor()
except Exception as e:
print(e)
connect.close()
try:
if cursor:
for i in range(10):
result = cursor.execute("insert into students (name, age) values ('Angle', {})".format(i)) # 插入操作
connect.commit() # 提交
except Exception as e:
print(e)
connect.rollback() # 回滚
finally:
if cursor:
cursor.close()
if connect:
connect.close()
- 查询数据,查询结果的id会随着游标往下走
import MySQLdb
from pprint import pprint # 换行
connect = None # 连接对象
cursor = None # 游标对象
try:
# 连接对象
connect = MySQLdb.connect(host='localhost', # 主机地址
user='zengzeng', # 账号
password='123456', # 密码
database='XKD_Python_Course', # 数据库名
use_unicode=True,
charset='utf8') # 指定字符集
# 游标对象
cursor = connect.cursor() # 通过连接对象调用cursor()
except Exception as e:
print(e)
connect.close()
try:
if cursor:
cursor.execute('select * from students') # 不会返回任何对象,想要拿到数据可以通过cursor.方法名
one_result = cursor.fetchone() # 查询结果集的下一条数据
many_result = cursor.fetchmany(5) # 查询结果集的下五条数据
all_result = cursor.fetchall() # 查询结果集的剩余所有数据
# 换行打印
pprint(one_result)
print('*' * 100)
pprint(many_result)
print('*' * 100)
pprint(all_result)
connect.commit() # 提交
except Exception as e:
print(e)
connect.rollback() # 回滚
finally:
if cursor:
cursor.close()
if connect:
connect.close()
cursor
- cursor是游标对象,用于执行查询和获取结果;
cursor游标支持的方法
execute(op[,args])
:执行一个数据库的查询和命令;fetchmany(size)
:获取结果集的下几行务;fetchone()
:获取结果集的下一行;fetchall()
:获取结果集中剩下的所有行;rowcount()
:最近一次execute返回数据的行数或影响的行数;close()
:关闭游标对象;
查询参数化
在使用子查询的时候,我们就可以使用查询参数
# 位置参数
cursor.execute('select * from students where id = %s', args=(10, ))
# 关键字参数
cursor.execute('select * from students where id = %(id)s', args={'id': 10})
使用上下文管理
会自动关闭游标
import MySQLdb
connect = MySQLdb.connect(host='localhost',
user='zengzeng',
password='123456',
database='XKD_Python_Course',
use_unicode=True,
charset='utf8')
with connect as cursor:
# 会自动关闭cursor对象
cursor.execute("insert into students (name, age) values ('zengzeng', 22)")
# 此时连接还没有关闭
cursor.execute("insert into students (name, age) values ('Mark', 23)")
connect.close()
Queue模块
Queue模块实现了多生产者多消费者队列,尤其适合多线程编程,Queue类中实现了所有需要的锁原语,Queue模块实现了三种类型队列:
第一种,FIFO(先进先出)队列,第一加入队列的任务,被第一个取出;
第二种,LIFO(后进先出)队列,最后加入队列的任务,被第一个取出(操作类似与栈,总是从栈顶取出,这个队列还不清楚内部的实现);
第三种,PriorityQueue(优先级)队列,保持队列数据有序,最小值被先取出;
queue模块中的Queue与multiprocessing模块的Queue的区别
- queue模块中的Queue:是普通的队列模式,先进先出模式,get方法会阻塞请求,直到有数据get出来为止,适用于多线程的场景
from threading import Thread, Event
from queue import Queue
import time
def write(q: Queue, e: Event):
for value in range(100):
print('put {} to queue'.format(value))
q.put(value)
time.sleep(0.5)
else:
e.set()
def read(q: Queue, e: Event):
while True :
if not q.empty() or not e.is_set():
value = q.get()
print('get {} from queue'.format(value))
time.sleep(1)
else:
break
if __name__ == '__main__':
q = Queue()
e = Event()
tw = Thread(target=write, args=(q,e))
tr = Thread(target=read, args=(q,e))
tw.start()
tr.start()
tw.join()
tr.join()
print('finished ')
- multiprocessing模块的Queue:是多进程并发的Queue队列,用于解决多进程间的通信问题。
from multiprocessing import Process,Queue, Event
import time
def write(q: Queue, e: Event):
for value in range(100):
print('put {} to queue'.format(value))
q.put(value)
time.sleep(0.5)
else:
e.set()
def read(q: Queue, e: Event):
while True :
if not q.empty() or not e.is_set():
value = q.get()
print('get {} from queue'.format(value))
time.sleep(1)
else:
break
if __name__ == '__main__':
q = Queue()
e = Event()
pw = Process(target=write, args=(q,e))
pr = Process(target=read, args=(q,e))
pw.start()
pr.start()
pw.join()
pr.join()
print('finished ')
Queue队列对象的方法
qsize()
:返回队列的大致大小;empty()
:判断队列是否为空,如果队列为空,返回True,反之False;full()
:判断是否满了;put()
:将项目放入队列;put_nowait
:相当于put(item, False);get()
:从队列中删除并返回一个项目;get_nowait()
:提供了两种方法来支持跟踪守护进程消费者线程是否已完全处理入队任务;task_done()
:表示以前排队的任务已完成;join()
:阻止直到队列中的所有项目都已获取并处理完毕;
使用Queue构建连接池
from queue import Queue
import MySQLdb
class ConnectPool:
def __init__(self, size=5, *args, **kwargs):
if not isinstance(size, int) or size < 1:
size = 10
self.__pool = Queue(size)
for i in range(size):
self.__pool.put(MySQLdb.connect(*args, **kwargs))
@property
def connect(self):
return self.__pool.get()
@connect.setter
def connect(self, conn):
self.__pool.put(conn)
if __name__ == '__main__':
# 构建连接池
pool = ConnectPool(host='localhost',
user='zengzeng',
password='123456',
database='XKD_Python_Course',
use_unicode=True,
charset='utf8')
# 获取一个连接
connect = pool.connect
#
with connect as cursor:
with cursor:
sql = 'select * from students'
cursor.execute(sql)
print(cursor.fetchall())
线程连接池实现
from queue import Queue
import MySQLdb
import threading
class ConnectPool:
def __init__(self, size=5, *args, **kwargs):
if not isinstance(size, int) or size < 1:
size = 10
self.__pool = Queue(size)
for i in range(size):
self.__pool.put(MySQLdb.connect(*args, **kwargs))
# 创建一个local对象
self.local = threading.local()
@property
def connect(self):
return self.__pool.get()
@connect.setter
def connect(self, conn):
self.__pool.put(conn)
def __enter__(self):
if getattr(self.local, 'conn', None) is None:
self.local.conn = self.connect
return self.local.conn.cursor()
def __exit__(self, *exc_info):
if exc_info:
self.local.conn.rollback()
else:
self.local.conn.commit()
# 将连接对象归还到连接池
self.connect = self.local.conn
# 线程级别的连接对象引用计算器减一
self.local.conn = None
if __name__ == '__main__':
pool = ConnectPool(host='localhost',
user='zengzeng',
password='123456',
database='XKD_Python_Course',
use_unicode=True,
charset='utf8')
def foo(pool):
with pool as cursor:
with cursor: # 对cursor做上下文管理
sql = 'select * from students'
cursor.execute(sql)
print(cursor.fetchall())
for i in range(5):
t = threading.Thread(target=foo, args=(pool, ))
t.start()