并行:真的多任务 cpu大于当前执行的任务
并发:假的多任务 cpu小于当前执行的任务
多线程
import threading
def demo():
# 子线程
print("hello girls")
if __name__ == "__main__":
for i in range(5):
t = threading.Thread(target=demo)
t.start()
查看线程数量
threading.enumerate()
至少包含一个主线程
继承Thread类创建线程
必须重写run()
方法
import threading
import time
class A(threading.Thread):
def __init__(self,name):
super().__init__(name=name)
def run(self):
for i in range(5):
print(i)
if __name__ == "__main__":
t = A('test_name')
t.start()
多线程共享全局变量(线程间通信)
import threading
import time
num = 100
# 多线程共享全局变量的!!!
def demo1():
global num
num += 1
print("demo1---%d" % num)
def demo2():
print("demo2---%d" % num)
def main():
t1 = threading.Thread(target=demo1)
t2 = threading.Thread(target=demo2)
t1.start()
# time.sleep(1)
t2.start()
# time.sleep(1)
print("main---%d" % num)
多线程参数-args , kwargs
threading.Thread(target=test, args=(num,))
threading.Thread(target=demo, kwargs={'name': "join"})
Python字节码
查看Python转字节码的顺序
import dis
# 1 load a
# 2 load 1
# 3 执行add
# 4 赋值给a
def add_num(a):
a+=1
print(dis.dis(add_num))
互斥锁
当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制
创建锁
mutex = threading.Lock()
锁定
mutex.acquire()
解锁
mutex.release()
import threading
num = 0
mutex = threading.Lock()
def demo(nums):
global num
# 加锁
mutex.acquire()
for i in range(nums):
num += 1
# 解锁
mutex.release()
t = threading.Thread(target=demo1, args=(100,))
t.start()
可使用threading.RLock()
创建可重入的锁,多次加锁。(解锁次数和加锁次数相对应)
死锁
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。
- 避免死锁
- 程序设计时要尽量避免(银行家算法)
- 添加超时时间等
线程同步
Lock是比较低级的同步原语,当被锁定以后不属于特定的线程。一个所有两种状态:locked和unlocked。如果锁处于unlocked状态,acquire()
方法将其修改为locked并立即返回;如果锁已处于locked状态,则阻塞当前线程并等待其他线程释放锁,然后将其修改为locked并立即返回。release()
方法用来将锁的状态由locked修改为unlocked并立即返回,如果锁已经处于unlocked状态,调用该方法将抛出异常。
可重入锁RLock对象也是一种常用的线程同步原语,可以被同一个线程acquire()
多次。当处于locked状态时,某线程拥有该锁;当处于unlocked状态时,该锁不属于任何线程。
RLock对象的acquire() / release()
调用对可以嵌套,仅当最后一个或者最外层release()
执行结束后,锁才被设置为unlocked。
-
acquire()
获得锁。该方法等待锁被解锁,将其设置为locked并返回True。 -
release()
释放锁。当锁被锁定时,将其重置为解锁并返回。如果锁未锁定,则会引发RuntimeError。 -
locked()
如果锁被锁定,返回True。
import threading
import time
# 自定义线程类
class MyThread1(threading.Thread):
def __init__(self,cond):
super().__init__(name='MyThread')
self.cond = cond
# 重写线程代码
def run(self):
global x
# 获得锁
self.cond.acquire()
# 等待
self.cond.wait()
for i in range(3):
x = x + i
time.sleep(2)
print(x)
# 通知
self.cond.notify()
class MyThread2(threading.Thread):
def __init__(self,cond):
super().__init__(name='MyThread')
self.cond = cond
# 重写线程代码
def run(self):
global x
# 获得锁
self.cond.acquire()
# 通知
self.cond.notify()
for i in range(3):
x = x + i
time.sleep(2)
print(x)
# 等待
self.cond.wait()
print(x)
# 创建锁
cond = threading.Condition()
x = 0
# 先等待再通知,顺序不能反
t1 = MyThread1(cond)
t1.start()
t2 = MyThread2(cond)
t2.start()
多任务版udp聊天
- 创建套接字
- 绑定本地信息
- 获取对方IP和端口
- 发送、接收数据
- 创建两个线程,去执行功能
import socket
import threading
def recv_msg(udp_socket):
"""发送数据"""
while True:
recv_data = udp_socket.recvfrom(1024)
print(recv_data)
def send_msg(udp_socket, dest_ip, dest_port):
"""接收数据"""
while True:
send_data = input("输入要发送的数据:")
udp_socket.sendto(send_data.encode('gbk'), (dest_ip, dest_port))
def main():
"""完成udp聊天器"""
# 创建套接字
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 绑定
udp_socket.bind(("", 7890))
# 获取对方的IP和端口
dest_ip = input('请输入对方的IP:')
dest_port = int(input('请输入对方的port:'))
t_recv = threading.Thread(target=recv_msg, args=(udp_socket,))
t_send = threading.Thread(target=send_msg, args=(udp_socket, dest_ip, dest_port))
t_recv.start()
t_send.start()
if __name__ == '__main__':
main()
多进程
- 进程和程序
进程:正在执行的程序
程序:没有执行的代码,是一个静态的
import os
import time
# pid
pid = os.fork()
print("123")
# pid == 0 子进程
if pid == 0:
print("子进程:{}, 父进程:{}".format(os.getpid(), os.getppid()))
else:
print("父进程:{}".format(os.getpid()))
time.sleep(2)
'''
123
父进程:3456
123
子进程:3457,父进程:3456
'''
使用进程实现多任务
multiprocessing
模块就是跨平台的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情。
import multiprocessing
def demo():
while True:
print("--1--")
time.sleep(1)
def demo1():
while True:
print("--2--")
time.sleep(1)
p1 = multiprocessing.Process(target=demo)
p2 = multiprocessing.Process(target=demo1)
p1.start()
p2.start()
线程和进程之间的对比
- 进程:能够完成多任务,一台电脑上可以同时运行多个QQ
- 线程:能够完成多任务,一个QQ中的多个聊天窗口
- 根本区别:进程是操作系统资源分配的基本单位,而线程是任务调度和执行的基本单位
进程间通信-Queue
Queue-队列 先进先出
from multiprocessing import Queue
# 创建队列 最多可以存放3条数据
q = Queue(3)
# 存数据
q.put(1)
q.put("name")
q.put([11, 22])
# 获取大小
print(q.qsize())
# 判断队列是否为满
print(q.full())
# 判断队列是否为空
print(q.empty())
队列间简单通信
模拟下载数据,与数据处理
import multiprocessing
def download(q):
"""下载数据"""
lis = [11, 22, 33]
for item in lis:
q.put(item)
print("下载完成,并且保存到队列中...")
def analysis(q):
"""数据处理"""
analysis_data = list()
while True:
data = q.get()
print(data)
analysis_data.append(data)
if q.empty():
break
print(analysis_data)
# 创建一个队列 跨进程通信的队列
q = multiprocessing.Queue(2)
t1 = multiprocessing.Process(target=download, args=(q, ))
t2 = multiprocessing.Process(target=analysis, args=(q, ))
t1.start()
t2.start()
多进程共享全局变量
共享全局变量不适用于多进程编程
from queue import Queue
普通队列无法多进程通信
进程池
当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态生成多个进程,但是如果是上百甚至上千个目标,手动的去创建的进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法
初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求,但是如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务
from multiprocessing import Pool
import os, time, random
def worker(msg):
t_start = time.time()
print('%s开始执行,进程号为%d' % (msg, os.getpid()))
time.sleep(random.random() * 2)
t_stop = time.time()
# 0.2f
print(msg, "执行完成,耗时%0.2f" % (t_stop - t_start))
def demo():
pass
if __name__ == '__main__':
po = Pool(4) # 定义一个进程池 3个进程
for i in range(0, 10):
po.apply_async(worker, (i,))
print("--start--")
# 关闭进程池 不在接收新的请求
po.close()
# po.apply_async(demo)
# 等待子进程执行完成
po.join()
print("--end--")
进程池间的进程通信
进程池通信需要用到multiprocessing.Manager().Queue()
实现
import multiprocessing
def demo1(q):
# 进程池里面的进程 如果出现异常 不会主动抛出
try:
q.put('a')
except Exception as e:
print(e)
def demo2(q):
try:
data = q.get()
print(data)
except Exception as e:
print(e)
if __name__ == '__main__':
# q = Queue() 不能完成进程之间的通信
# q = multiprocessing.Queue() # 进程间通信
q = multiprocessing.Manager().Queue() # 进程池中的进程通信
po = multiprocessing.Pool(2)
po.apply_async(demo1, args=(q,))
po.apply_async(demo2, args=(q,))
po.close()
po.join()
多任务文件夹复制
- 获取用户要复制的文件夹名字
- 创建一个新的文件夹
- 获取文件夹的所有待拷贝的文件名字
- 创建进程池
- 添加拷贝任务
import multiprocessing
import os
def copy_file(q, file_name, new_folder_name, old_folder_name):
"""完成文件拷贝"""
# print("拷贝的文件名称为:%s" % file_name)
with open(old_folder_name + "/" + file_name, "rb") as f:
content = f.read()
# 保存到新的文件夹中
new_file = open(new_folder_name + "/" + file_name, "wb")
new_file.write(content)
new_file.close()
q.put(file_name)
def main():
# 获取用户要复制的文件夹名字 test
old_folder_name = input("请输入要复制的文件夹名字:")
# 创建一个新的文件夹 test[复件]
new_folder_name = old_folder_name + "复件"
if not os.path.exists(new_folder_name):
os.mkdir(new_folder_name)
# 获取文件夹的所有待拷贝的文件名字
file_names = os.listdir(old_folder_name)
# print(file_names)
# 创建进程池
po = multiprocessing.Pool(5)
# 创建队列
q = multiprocessing.Manager().Queue()
# 添加拷贝任务
for file_name in file_names:
po.apply_async(copy_file, args=(q, file_name, new_folder_name, old_folder_name))
po.close()
# 文件的总数
file_count = len(file_names)
coly_file_num = 0
while True:
file_name = q.get()
coly_file_num += 1
# 拷贝的进度
print("拷贝的进度%f%%" % (coly_file_num*100/file_count), end='')
if coly_file_num >= file_count:
break
if __name__ == '__main__':
main()
协程
同步、异步
- 同步:是指代码调用IO操作时,必须等待IO操作完成才返回的调用方式
- 异步:是指代码调用IO操作时,不必等IO操作完成就返回的调用方式
阻塞、非阻塞
- 阻塞:从调用者的角度出发,如果在调用的时候,被卡住,不能再继续向下运行,需要等待,就说是阻塞
- 非阻塞:从调用者的角度出发, 如果在调用的时候,没有被卡住,能够继续向下运行,无需等待,就说是非阻塞
生成器-send方法
-
send()
方法有一个参数,该参数指定的是上一次被挂起的yield语句的返回值 - 生成器启动时候
send()
参数必须为None
def a():
print('aaa')
p = yield '123'
print(p)
print('bbb')
p1 = yield '789'
r = a()
print(r.send(None))
print(r.send('456'))
>> aaa
>> 123
>> 456
>> bbb
>> 789
- 接收生成器
return
的值需要用异常抛出
def a():
print('aaa')
yield '789'
return "end"
r = a()
print(r.send(None))
try:
print(next(r))
except Exception as e:
print(e)
>> aaa
>> 789
>> end
-
close()
方法关闭生成器后,next
和send
直接抛出异常
使用yield
完成多任务
import time
def task1():
while True:
print("--1--")
time.sleep(0.1)
yield
def task2():
while True:
print("--2--")
time.sleep(0.1)
yield
def main():
t1 = task1()
t2 = task2()
while True:
next(t1)
next(t2)
if __name__ == "__main__":
main()
yield from介绍
python3.3新加了yield from语法
【子生成器】:yield from后的generator_1()生成器函数是子生成器
【委托生成器】:generator_2()是程序中的委托生成器,它负责委托子生成器完成具体任务。
【调用方】:main()是程序中的调用方,负责调用委托生成器。
lis = [1, 2, 3]
def generator_1(lis):
yield lis
def generator_2(lis):
yield from lis
for i in generator_1(lis):
print(i) # [1, 2, 3]
for i in generator_2(lis):
print(i) # 1, 2, 3
yield from
还可以做委托生成器,获取return
返回的值
def generator_1():
total = 0
while True:
x = yield
print('加', x)
if not x:
break
total += x
return total
def generator_2(): # 委托生成器
while True:
total = yield from generator_1() # 子生成器
print('加和总数是:', total)
def main(): # 调用方
# g1 = generator_1()
# g1.send(None)
# g1.send(2)
# g1.send(3)
# g1.send(None)
g2 = generator_2()
g2.send(None)
g2.send(2)
g2.send(3)
g2.send(None)
if __name__ == '__main__':
main()
协程
协程,又称微线程
协程是python个中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元(理解为需要的资源)
- Python中的协程大概经历了如下三个阶段:
- 最初的生成器变形yield/send
- yield from
- 在最近的Python3.5版本中引入async/await关键字
使用greenlet
完成多任务
greenlet
必须手动切换
from greenlet import greenlet
import time
# 协程利用程序的IO 来切换任务
def demo1():
while True:
print("demo1")
gr2.switch()
time.sleep(0.5)
def demo2():
while True:
print("demo2")
gr1.switch()
time.sleep(0.5)
gr1 = greenlet(demo1)
# print(greenlet.__doc__)
gr2 = greenlet(demo2)
gr1.switch()
使用gevent
完成多任务
使用gevent
必须使用gevent.sleep()
睡眠后才会切换
gevent
启动会在time.sleep()
之后执行,gevent.sleep()
才会打断
monkey.patch_all()
会将程序中用到的耗时操作 换为gevent
中实现的模块
import gevent
import time
from gevent import monkey
# 将程序中用到的耗时操作 换为gevent中实现的模块
monkey.patch_all()
def f1(n):
for i in range(n):
print(gevent.getcurrent(), i)
time.sleep(0.5)
# gevent.sleep(0.5)
def f2(n):
for i in range(n):
print(gevent.getcurrent(), i)
time.sleep(0.5)
# gevent.sleep(0.5)
def f3(n):
for i in range(n):
print(gevent.getcurrent(), i)
time.sleep(0.5)
# gevent.sleep(0.5)
print("--1--")
g1 = gevent.spawn(f1, 5)
print("--2--")
time.sleep(1)
# gevent.sleep(0.5)
g2 = gevent.spawn(f2, 5)
print("--3--")
g3 = gevent.spawn(f3, 5)
print("--4--")
g1.join()
g2.join()
g3.join()
下载器
import gevent
from gevent import monkey
monkey.patch_all()
# 必须放到patch_all()之下,否则会报警告
import requests # urllib 进行封装 爬虫 80-90
# 并行 并发
# 协程 并发
def download(url):
print("get: %s" % url)
res = requests.get(url)
data = res.text
print(len(data), url)
# g1 = gevent.spawn(download, 'https://www.baidu.com/')
# g2 = gevent.spawn(download, 'https://www.python.org/')
# g3 = gevent.spawn(download, 'https://www.baidu.com/')
# g1.join()
# g2.join()
# g3.join()
gevent.joinall([
gevent.spawn(download, 'https://www.baidu.com/'),
gevent.spawn(download, 'https://www.python.org/'),
gevent.spawn(download, 'https://www.baidu.com/')
])
总结
- 进程是资源分配的单位
- 线程是操作系统调度的单位
- 进程切换需要的资源很最大,效率很低
- 线程切换需要的资源一般,效率一般(当然了在不考虑GIL的情况下)
- 协程切换任务资源很小,效率高
- 多进程、多线程根据cpu核数不一样可能是并行的,但是协程是在一个线程中 所以是并发