2.Python与Java
1.Python相关
1.Python PEP介绍
1.PEP是什么?
引用链接: https://www.cnblogs.com/abella/p/10056875.html
PEP的全称是Python Enhancement Proposals
,其中Enhancement是增强改进的意思,Proposals则可译为提案或建议书,所以合起来,比较常见的翻译是Python增强提案
或Python改进建议书
。
我个人倾向于前一个翻译,因为它更贴切。Python核心开发者主要通过邮件列表讨论问题、提议、计划等,PEP通常是汇总了多方信息,经过了部分核心开发者review和认可,最终形成的正式文档,起到了对外公示的作用,所以我认为翻译成“提案”更恰当。
PEP的官网是:https://www.python.org/dev/peps/,这也就是PEP 0 的地址。其它PEP的地址是将编号拼接在后面,例如:https://www.python.org/dev/peps/pep-0020/ 就是PEP 20 的链接,以此类推
2.PEP状态表示
I - Informational PEP
P - Process PEP
S - Standards Track PEP
信息类:这类PEP就是提供信息,有告知类信息,也有指导类信息等等。例如PEP 20(The Zen of Python,即著名的Python之禅)、PEP 404 (Python 2.8 Un-release Schedule,即宣告不会有Python2.8版本)。
流程类:这类PEP主要是Python本身之外的周边信息。例如PEP 1(PEP Purpose and Guidelines,即关于PEP的指南)、PEP 347(Migrating the Python CVS to Subversion,即关于迁移Python代码仓)。
标准类:这类PEP主要描述了Python的新功能和新实践(implementation),是数量最多的提案。例如我之前提到过的f-string方式,它出自PEP 498(Literal String Interpolation,字面字符串插值)。
每个PEP最初都是一个草案(Draft),随后会经历一个过程,因此也就出现了不同的状态
A – Accepted (Standards Track only) or Active proposal 已接受(仅限标准跟踪)或有效提案
D – Deferred proposal 延期提案
F – Final proposal 最终提案
P – Provisional proposal 暂定提案
R – Rejected proposal 被否决的提案
S – Superseded proposal 被取代的提案
W – Withdrawn proposal 撤回提案
PEP提案已经累积产生了478个,我们并不需要对每个PEP都熟知,没有必要。下面,我列举了一些PEP,推荐大家一读:
PEP 0 -- Index of Python Enhancement Proposals
PEP 7 -- Style Guide for C Code,C扩展
PEP 8 -- Style Guide for Python Code,编码规范(必读)
PEP 20 -- The Zen of Python,Python之禅
PEP 202 -- List Comprehensions,列表生成式
PEP 274 -- Dict Comprehensions,字典生成式
PEP 234 -- Iterators,迭代器
PEP 257 -- Docstring Conventions,文档注释规范
PEP 279 -- The enumerate() built-in function,enumerate枚举
PEP 282 -- A Logging System,日志模块
PEP 285 -- Adding a bool type,布尔值(建议阅读《Python对象的身份迷思:从全体公民到万物皆数》)
PEP 289 -- Generator Expressions,生成器表达式
PEP 318 -- Decorators for Functions and Methods,装饰器
PEP 342 -- Coroutines via Enhanced Generators,协程
PEP 343 -- The "with" Statement,with语句
PEP 380 -- Syntax for Delegating to a Subgenerator,yield from语法
PEP 405 -- Python Virtual Environments,虚拟环境
PEP 471 -- os.scandir() function,遍历目录
PEP 484 -- Type Hints,类型约束
PEP 492 -- Coroutines with async and await syntax,async/await语法
PEP 498 -- Literal String Interpolation Python,字符串插值
PEP 525 -- Asynchronous Generators,异步生成器
PEP 572 -- Assignment Expressions,表达式内赋值(最争议)
PEP 3105 -- Make print a function,print改为函数
PEP 3115 -- Metaclasses in Python 3000,元类
PEP 3120 -- Using UTF-8 as the default source encoding
PEP 3333 -- Python Web Server Gateway Interface v1.0.1,Web开发
PEP 8000 -- Python Language Governance Proposal Overview,GvR老爹推出决策层后,事关新决策方案
中文翻译:
PEP8 -- https://dwz.cn/W01HexFD
PEP257 -- https://dwz.cn/JLctlNLC
PEP328 -- https://dwz.cn/4vCQJpEP
PEP333 -- https://dwz.cn/TAXIZdzc
PEP484 -- https://dwz.cn/dSLZgg5B
PEP492 -- http://t.cn/EALeaL0
PEP541 -- https://dwz.cn/ce98vc27
PEP3107 -- http://suo.im/4xFESR
PEP3333 -- https://dwz.cn/si3xylgw
3.python之禅
优美胜于丑陋(Python以编写优美的代码为目标)
明了胜于晦涩(优美的代码应当是明了的,命名规范,风格相似)
简洁胜于复杂(优美的代码应当是简洁的,不要有复杂的内部实现)
复杂胜于凌乱(如果复杂不可避免,那代码间也不能有难懂的关系,要保持接口简洁)
扁平胜于嵌套(优美的代码应当是扁平的,不能有太多的嵌套)
间隔胜于紧凑(优美的代码有适当的间隔,不要奢望一行代码解决问题)
可读性很重要(优美的代码是可读的)
即便假借特例的实用性之名,也不可违背这些规则(这些规则至高无上)
不要包容所有错误,除非您确定需要这样做(精准地捕获异常,不写 except:pass 风格的代码)
当存在多种可能,不要尝试去猜测
而是尽量找一种,最好是唯一一种明显的解决方案(如果不确定,就用穷举法)
虽然这并不容易,因为您不是 Python 之父(这里的 Dutch 是指 Guido )
做也许好过不做,但不假思索就动手还不如不做(动手之前要细思量)
如果您无法向人描述您的方案,那肯定不是一个好方案;反之亦然(方案测评标准)
命名空间是一种绝妙的理念,我们应当多加利用(倡导与号召)
引用链接:https://www.jianshu.com/p/30fee49b8d55
2.Python基础
1.GIL 全局解释器锁
GIL(全局解释器锁,GIL 只有cpython有):在同一个时刻,只能有一个线程在一个cpu上执行字节码,没法像c和Java一样将多个线程映射到多个CPU上执行,但是GIL会根据执行的字节码行数(为了让各个线程能够平均利用CPU时间,python会计算当前已执行的微代码数量,达到一定阈值后就强制释放GIL)和时间片以及遇到IO操作的时候主动释放锁,让其他字节码执行。
作用:限制多线程同时执行,保证同一个时刻只有一个线程执行。
原因:线程并非独立,在一个进程中多个线程共享变量的,多个线程执行会导致数据被污染造成数据混乱,这就是线程的不安全性,为此引入了互斥锁。
互斥锁:即确保某段关键代码的数据只能又一个线程从头到尾完整执行,保证了这段代码数据的安全性,但是这样就会导致死锁。
死锁:多个子线程在等待对方解除占用状态,但是都不先解锁,互相等待,这就是死锁。
基于GIL的存在,在遇到大量的IO操作(文件读写,网络等待)代码时,使用多线程效率更高。
3.并发编程
1.多进程模块 multiprocess
参考链接: https://thief.one/2016/11/23/Python-multiprocessing/
1.基础用法
import os
import multiprocessing
# Main
print('Main:', os.getpid())
# worker function
def worker(sign, lock):
lock.acquire()
print(sign, os.getpid())
lock.release()
# Multi-process
record = []
lock = multiprocessing.Lock()
if __name__ == '__main__':
for i in range(5):
process = multiprocessing.Process(target=worker, args=('process', lock))
process.start()
record.append(process)
for process in record:
process.join()
# Main: 90827
# process 90828
# process 90829
# process 90830
# process 90831
# process 90832
2.数据通信
ipc:就是进程间的通信模式,常用的一半是socke,rpc,pipe和消息队列等。
multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。
3.使用Array共享数据
对于Array数组类,括号内的“i”表示它内部的元素全部是int类型,而不是指字符“i”,数组内的元素可以预先指定,也可以只指定数组的长度。Array类在实例化的时候必须指定数组的数据类型和数组的大小,类似temp = Array(‘i’, 5)。对于数据类型有下面的对应关系:
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double
代码:
from multiprocessing import Process
from multiprocessing import Array
def func(i,temp):
temp[0] += 100
print("进程%s " % i, ' 修改数组第一个元素后----->', temp[0])
if __name__ == '__main__':
temp = Array('i', [1, 2, 3, 4])
for i in range(5):
p = Process(target=func, args=(i, temp))
p.start()
# 进程0 修改数组第一个元素后-----> 101
# 进程1 修改数组第一个元素后-----> 201
# 进程2 修改数组第一个元素后-----> 301
# 进程3 修改数组第一个元素后-----> 401
# 进程4 修改数组第一个元素后-----> 501
4.使用Manager共享数据
通过Manager类也可以实现进程间数据的共享,主要用于线程池之间通信,Manager()返回的manager对象提供一个服务进程,使得其他进程可以通过代理的方式操作Python对象。manager对象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array等多种格式.��````````````````
from multiprocessing import Process
from multiprocessing import Manager
def func(i, dic):
dic["num"] = 100+i
print(dic.items())
if __name__ == '__main__':
dic = Manager().dict()
for i in range(5):
p = Process(target=func, args=(i, dic))
p.start()
p.join()
# [('num', 100)]
# [('num', 101)]
# [('num', 102)]
# [('num', 103)]
# [('num', 104)]
5.使用queues的Queue类共享数据
multiprocessing是一个包,它内部有一个queues模块,提供了一个Queue队列类,可以实现进程间的数据共享,如下例所示:
import multiprocessing
from multiprocessing import Process
from multiprocessing import queues
def func(i, q):
ret = q.get()
print("进程%s从队列里获取了一个%s,然后又向队列里放入了一个%s" % (i, ret, i))
q.put(i)
if __name__ == "__main__":
lis = queues.Queue(20, ctx=multiprocessing)
lis.put(0)
for i in range(10):
p = Process(target=func, args=(i, lis,))
p.start()
# 进程1从队列里获取了一个0,然后又向队列里放入了一个1
# 进程4从队列里获取了一个1,然后又向队列里放入了一个4
# 进程2从队列里获取了一个4,然后又向队列里放入了一个2
# 进程6从队列里获取了一个2,然后又向队列里放入了一个6
# 进程0从队列里获取了一个6,然后又向队列里放入了一个0
# 进程5从队列里获取了一个0,然后又向队列里放入了一个5
# 进程9从队列里获取了一个5,然后又向队列里放入了一个9
# 进程7从队列里获取了一个9,然后又向队列里放入了一个7
# 进程3从队列里获取了一个7,然后又向队列里放入了一个3
# 进程8从队列里获取了一个3,然后又向队列里放入了一个8
6.使用pipe实现进程间通信
pipe只能适用于两个进程间通信,queue则没这个限制,他有两个方法
from multiprocessing import Pipe,Process
import time
def produce(pipe):
pipe.send('666')
time.sleep(1)
def consumer(pipe):
print(pipe.recv())
# 有些类似socket的recv方法
if __name__ == '__main__':
send_pi,recv_pi = Pipe()
my_pro = Process(target=produce,args=(send_pi,))
my_con = Process(target=consumer,args=(recv_pi,))
my_pro.start()
my_con.start()
my_pro.join()
my_con.join()
7.进程锁
一般来说每个进程使用单独的空间,不必加进程锁的,但是如果你需要先实现进程数据共享,使用案例二中的代码,又害怕造成数据抢夺和脏数据的问题。就可以设置进程锁,与threading类似,在multiprocessing里也有同名的锁类RLock,Lock,Event,Condition和 Semaphore,连用法都是一样样的。
如果有多个进程要上锁,使用multiprocessing.Manager().BoundedSemaphore(1)
from multiprocessing import Process
from multiprocessing import Array
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
import time
def func(i,lis,lc):
lc.acquire()
lis[0] = lis[0] - 1
time.sleep(1)
print('say hi', lis[0])
lc.release()
if __name__ == "__main__":
array = Array('i', 1)
array[0] = 10
lock = RLock()
for i in range(10):
p = Process(target=func, args=(i, array, lock))
p.start()
say hi 9
say hi 8
say hi 7
say hi 6
say hi 5
8.进程池
from multiprocessing import Pool导入就行,非常容易使用的。进程池内部维护了一个进程序列,需要时就去进程池中拿取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
from multiprocessing import Pool
import time
def func(args):
time.sleep(1)
print("正在执行进程 ", args)
if __name__ == '__main__':
p = Pool(5) # 创建一个包含5个进程的进程池
for i in range(30):
# 有30个任务
p.apply_async(func=func, args=(i,))
# 异步执行,并发。这里不用target,要用func
p.close() # 等子进程执行完毕后关闭进程池
# time.sleep(2)
# p.terminate() # 立刻关闭进程池
p.join()
from multiprocessing.dummy import Pool as ThreadPool 是多线程进程池,绑定一个cpu核心。from multiprocessing import Pool多进程,运行于多个cpu核心。multiprocessing 是多进程模块, 而multiprocessing.dummy是以相同API实现的多线程模块。
没有绕过GIL情况下,多线程一定受GIL限制。
from multiprocessing.dummy import Pool as tp
def fun(i):
print i+i+i+i
list_i=[range(100)]
px = tp(processes=8)
# 开启8个线程池
px.map(fun,list_i)
px.close()
px.join()
9.Process介绍
构造方法:
- Process([group [, target [, name [, args [, kwargs]]]]])
- group: 线程组,目前还没有实现,库引用中提示必须是None;
- target: 要执行的方法;
- name: 进程名;
- args/kwargs: 要传入方法的参数。
实例方法:
- is_alive():返回进程是否在运行。
- join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的3. timeout(可选参数)。
- start():进程准备就绪,等待CPU调度。
- run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
- terminate():不管任务是否完成,立即停止工作进程。
属性:
- authkey
- daemon:和线程的setDeamon功能一样(将父进程设置为守护进程,当父进程结束时,子进程也结束)。
- exitcode(进程在运行时为None、如果为–N,表示被信号N结束)。
- name:进程名字。
- pid:进程号。
10.Pool介绍
Multiprocessing.Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来执行它。在共享资源时,只能使用Multiprocessing.Manager类,而不能使用Queue或者Array。Pool类用于需要执行的目标很多,而手动限制进程数量又太繁琐时,如果目标少且不用控制进程数量则可以用Process类。
构造方法:
- Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
- processes :使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量。
- initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
- maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
- context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。
实例方法:
- apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞。
- apply(func[, args[, kwds]])是阻塞的
- close() 关闭pool,使其不在接受新的任务。
- terminate() 关闭pool,结束工作进程,不在处理未完成的任务。
- join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
Pool使用方法
Pool+map函数
说明:此写法缺点在于只能通过map向函数传递一个参数。
from multiprocessing import Pool
def test(i):
print i
if __name__=="__main__":
lists=[1,2,3]
pool=Pool(processes=2) #定义最大的进程数
pool.map(test,lists) #p必须是一个可迭代变量。
pool.close()
pool.join()
异步进程池(非阻塞)
from multiprocessing import Pool
def test(i):
print i
if __name__=="__main__":
pool = Pool(processes=10)
for i in xrange(500):
'''
For循环中执行步骤:
(1)循环遍历,将500个子进程添加到进程池(相对父进程会阻塞)
(2)每次执行10个子进程,等一个子进程执行完后,立马启动新的子进程。(相对父进程不阻塞)
apply_async为异步进程池写法。
异步指的是启动子进程的过程,与父进程本身的执行(print)是异步的,而For循环中往进程池添加子进程的过程,与父进程本身的执行却是同步的。
'''
pool.apply_async(test, args=(i,)) #维持执行的进程总数为10,当一个进程执行完后启动一个新进程.
print “test”
pool.close()
pool.join()
执行顺序:For循环内执行了2个步骤,第一步:将500个对象放入进程池(阻塞)。第二步:同时执行10个子进程(非阻塞),有结束的就立即添加,维持10个子进程运行。(apply_async方法的会在执行完for循环的添加步骤后,直接执行后面的print语句,而apply方法会等所有进程池中的子进程运行完以后再执行后面的print语句)
注意:调用join之前,先调用close或者terminate方法,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束。
同步进程池(阻塞)
from multiprocessing import Pool
def test(p):
print p
time.sleep(3)
if __name__=="__main__":
pool = Pool(processes=10)
for i in xrange(500):
'''
实际测试发现,for循环内部执行步骤:
(1)遍历500个可迭代对象,往进程池放一个子进程
(2)执行这个子进程,等子进程执行完毕,再往进程池放一个子进程,再执行。(同时只执行一个子进程)
for循环执行完毕,再执行print函数。
'''
pool.apply(test, args=(i,)) #维持执行的进程总数为10,当一个进程执行完后启动一个新进程.
print “test”
pool.close()
pool.join()
说明:for循环内执行的步骤顺序,往进程池中添加一个子进程,执行子进程,等待执行完毕再添加一个子进程…..等500个子进程都执行完了,再执行print “test”。(从结果来看,并没有多进程并发)
11.子进程返回值
在实际使用多进程的时候,可能需要获取到子进程运行的返回值。如果只是用来存储,则可以将返回值保存到一个数据结构中;如果需要判断此返回值,从而决定是否继续执行所有子进程,则会相对比较复杂。另外在Multiprocessing中,可以利用Process与Pool创建子进程,这两种用法在获取子进程返回值上的写法上也不相同。这篇中,我们直接上代码,分析多进程中获取子进程返回值的不同用法,以及优缺点。
初级用法(Pool)
目的:存储子进程返回值
说明:如果只是单纯的存储子进程返回值,则可以使用Pool的apply_async异步进程池;当然也可以使用Process,用法与threading中的相同,这里只介绍前者。
实例:当进程池中所有子进程执行完毕后,输出每个子进程的返回值。
from multiprocessing import Pool
def test(p):
return p
if __name__=="__main__":
pool = Pool(processes=10)
result=[]
for i in xrange(50000):
'''
for循环执行流程:
(1)添加子进程到pool,并将这个对象(子进程)添加到result这个列表中。(此时子进程并没有运行)
(2)执行子进程(同时执行10个)
'''
result.append(pool.apply_async(test, args=(i,)))#维持执行的进程总数为10,当一个进程执行完后添加新进程.
pool.join()
'''
遍历result列表,取出子进程对象,访问get()方法,获取返回值。(此时所有子进程已执行完毕)
'''
for i in result:
print i.get()
错误写法:
for i in xrange(50000):
t=pool.apply_async(test, args=(i,)))
print t.get()
说明:这样会造成阻塞,因为get()方法只能等子进程运行完毕后才能调用成功,否则会一直阻塞等待。如果写在for循环内容,相当于变成了同步,执行效率将会非常低。
高级用法(Pool)
目的:父进程实时获取子进程返回值,以此为标记结束所有进程。
实例(一)
执行子进程的过程中,不断获取返回值并校验,如果返回值为True则结果所有进程。
from multiprocessing import Pool
import Queue
import time
def test(p):
time.sleep(0.001)
if p==10000:
return True
else:
return False
if __name__=="__main__":
pool = Pool(processes=10)
q=Queue.Queue()
for i in xrange(50000):
'''
将子进程对象存入队列中。
'''
q.put(pool.apply_async(test, args=(i,)))#维持执行的进程总数为10,当一个进程执行完后添加新进程.
'''
因为这里使用的为pool.apply_async异步方法,因此子进程执行的过程中,父进程会执行while,获取返回值并校验。
'''
while 1:
if q.get().get():
pool.terminate() #结束进程池中的所有子进程。
break
pool.join()
说明:总共要执行50000个子进程(并发数量为10),当其中一个子进程返回True时,结束进程池。因为使用了apply_async为异步进程,因此在执行完for循环的添加子进程操作后(只是添加并没有执行完所有的子进程),可以直接执行while代码,实时判断子进程返回值是否有True,有的话结束所有进程。
优点:不必等到所有子进程结束再结束程序,只要得到想要的结果就可以提前结束,节省资源。
不足:当需要执行的子进程非常大时,不适用,因为for循环在添加子进程时,要花费很长的时间,虽然是异步,但是也需要等待for循环添加子进程操作结束才能执行while代码,因此会比较慢。
实例(二)
多线程+多进程,添加执行子进程的过程中,不断获取返回值并校验,如果返回值为True则结果所有进程。
from multiprocessing import Pool
import Queue
import threading
import time
def test(p):
time.sleep(0.001)
if p==10000:
return True
else:
return False
if __name__=="__main__":
result=Queue.Queue() #队列
pool = Pool()
def pool_th():
for i in xrange(50000000): ##这里需要创建执行的子进程非常多
try:
result.put(pool.apply_async(test, args=(i,)))
except:
break
def result_th():
while 1:
a=result.get().get() #获取子进程返回值
if a:
pool.terminate() #结束所有子进程
break
'''
利用多线程,同时运行Pool函数创建执行子进程,以及运行获取子进程返回值函数。
'''
t1=threading.Thread(target=pool_th)
t2=threading.Thread(target=result_th)
t1.start()
t2.start()
t1.join()
t2.join()
pool.join()
执行流程:利用多线程,创建一个执行pool_th函数线程,一个执行result_th函数线程,pool_th函数用来添加进程池,开启进程执行功能函数并将子进程对象存入队列,而result_th()函数用来不停地从队列中取子进程对象,调用get()方法获取返回值。等发现其中存在子进程的返回值为True时,结束所有进程,最后结束线程。
优点:弥补了实例(一)的不足,即使for循环的子进程数量很多,也能提高性能,因为for循环与判断子进程返回值同时进行。
2.多线程模块
线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务,多线程就是在一个进程中的多个线程,如果使用多线程默认开启一个主线程,按照程序需求自动开启多个线程(也可以自己定义线程数)
1.多线程知识点
- Python 在设计之初就考虑到要在解释器的主循环中,同时只有一个线程在执行,即在任意时刻,只有一个线程在解释器中运行。对Python 虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。
- 多线程共享主进程的资源,所以可能还会改变其中的变量,这个时候就要加上线程锁,每次执行完一个线程在执行下一个线程。
- 因为每次只能有一个线程运行,多线程怎么实现的呢?Python解释器中一个线程做完了任务然后做IO(文件读写)操作的时候,这个线程就退出,然后下一个线程开始运行,循环之。
- 当你读完上面三点你就知道多线程如何运行起来,并且知道多线程常用在那些需要等待然后执行的应用程序上(比如爬虫读取到数据,然后保存的时候下一个线程开始启动)也就是说多线程适用于IO密集型的任务量(文件存储,网络通信)。
- 注意一点,定义多线程,然后传递参数的时候,如果是有一个参数就是用args=(i,)一定要加上逗号,如果有两个或者以上的参数就不用这样
2.案例一 多线程核心用法
import sys
import threading
import time
reload(sys)
sys.setdefaultencoding('utf-8')
def loop():
#定义一个要循环的函数,当然后面肯定会定义好几个函数
print 'thread %s is running...' % threading.current_thread().name
#threading.current_thread().name就是当前线程的名字 在声明线程的时候可以自定义子线程的名字
n = 0
while n < 10:
n = n + 1
print '%s >>> %s' % (threading.current_thread().name, n)
#输出当前线程名字 和循环的参数n
print 'thread %s ended.' % threading.current_thread().name
print 'thread %s is running...' % threading.current_thread().name
#下面的一部分就是threading的核心用法
#包括target name args 之类的 一般我只用targer=你定义的函数名
t = threading.Thread(target=loop, name='线程名:')
# 在这里就申明了这个线程的名字
t.start()
#开始
t.join()
#关于join的相关信息我会在后面的代码详说
print 'thread %s ended.' % threading.current_thread().name
运行结果:
thread MainThread is running...
thread 线程名: is running...
线程名: >>> 1
线程名: >>> 2
线程名: >>> 3
线程名: >>> 4
线程名: >>> 5
线程名: >>> 6
线程名: >>> 7
线程名: >>> 8
线程名: >>> 9
线程名: >>> 10
thread 线程名: ended.
thread MainThread ended.
3.案例二 线程锁
前面有说到过,多线程是共享内存的,所以其中的变量如果发生了改变的话就会改变后边的变量,导致异常,这个时候可以加上线程锁。线程锁的概念就是主要这个线程运行完后再运行下一个线程。
import sys
import threading
import time
reload(sys)
sys.setdefaultencoding('utf-8')
def loop():
l.acquire()
# 这里相当于把线程加了锁,目前只允许这一个线程运行
print 'thread %s is running...' % threading.current_thread().name
#threading.current_thread().name就是当前线程的名字 在声明线程的时候可以自定义子线程的名字
n = 0
while n < 10:
n = n + 1
print '%s >>> %s' % (threading.current_thread().name, n)
#输出当前线程名字 和循环的参数n
print 'thread %s ended.' % threading.current_thread().name
l.release()
# 这里是把线程锁解开,可以再运行写一个线程
print 'thread %s is running...' % threading.current_thread().name
#下面的一部分就是threading的核心用法
#包括target name args 之类的 一般我只用targer=你定义的函数名
t = threading.Thread(target=loop, name='线程名:')
l = threading.Lock()
# 这里申明一个线程锁
t.start()
#开始
t.join()
#关于join的相关信息我会在后面的代码详说
print 'thread %s ended.' % threading.current_thread().name
使用线程锁后,程序按照一个一个有序执行。其中lock还有Rlock的方法,RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。否则会出现死循环,程序不知道解哪一把锁。注意:如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的锁
4.案例三 join()方法的使用
在多线程中,每个线程自顾执行自己的任务,当最后一个线程运行完毕后再退出,所以这个时候如果你要打印信息的话,会看到打印出来的信息错乱无章,有的时候希望主线程能够等子线程执行完毕后在继续执行,就是用join()方法。
import sys
import threading
import time
reload(sys)
sys.setdefaultencoding('utf-8')
t00 = time.time()
# 获取当前时间戳
def cs1():
time0 = time.time()
for x in range(9):
print x + time.time()-time0
# 计算用了多少时间
print threading.current_thread().name
# 打印这个线程名字
def cs2():
for x1 in range(6,9):
print x1
print threading.current_thread().name
threads=[]
# 定义一个空的列表
t1 = threading.Thread(target=cs1)
t2 = threading.Thread(target=cs2)
threads.append(t1)
threads.append(t2)
# 把这两个线程的任务加载到这个列表中
for x in threads:
x.start()
# 然后执行,这个案例很常用,就是有多个函数要多线程执行的时候用到
# 如果一个程序有多个函数,但是你只想其中的某一个或者某两个函数多线程,用法一样加入空的列表即可
x.join()
#线程堵塞 先运行第一个在运行第二个
#x.join()
#注意你的join放在这里是没有意义的,和不加join一样。线程不堵塞 但是会出现不匀称的表现 并且会修改不同线程中的变量
print 'use time.{}'.format(time.time()-t00)
关于setDaemon()的概念就是:主线程A中,创建了子线程B,并且在主线程A中调用了B.setDaemon(),这个的意思是,把主线程A设置为守护线程,这时候,要是主线程A执行结束了,就不管子线程B是否完成,一并和主线程A退出.这就是setDaemon方法的含义,这基本和join是相反的。此外,还有个要特别注意的:必须在start() 方法调用之前设置,如果不设置为守护线程,程序会被无限挂起。
5.案例四 线程锁之信号Semaphore
类名:BoundedSemaphore。这种锁允许一定数量的线程同时更改数据,它不是互斥锁。比如地铁安检,排队人很多,工作人员只允许一定数量的人进入安检区,其它的人继续排队。
import time
import threading
def run(n, se):
se.acquire()
print("run the thread: %s" % n)
time.sleep(1)
se.release()
# 设置允许5个线程同时运行
semaphore = threading.BoundedSemaphore(5)
for i in range(20):
t = threading.Thread(target=run, args=(i,semaphore))
t.start()
运行后,可以看到5个一批的线程被放行。
6.案例五 线程锁之事件Event
事件线程锁的运行机制:
全局定义了一个Flag,如果Flag的值为False,那么当程序执行wait()方法时就会阻塞,如果Flag值为True,线程不再阻塞。这种锁,类似交通红绿灯(默认是红灯),它属于在红灯的时候一次性阻挡所有线程,在绿灯的时候,一次性放行所有排队中的线程。
事件主要提供了四个方法set()、wait()、clear()和is_set()。
调用clear()方法会将事件的Flag设置为False。
调用set()方法会将Flag设置为True。
调用wait()方法将等待“红绿灯”信号。
is_set():判断当前是否"绿灯放行"状态
下面是一个模拟红绿灯,然后汽车通行的例子:
#利用Event类模拟红绿灯
import threading
import time
event = threading.Event()
# 定义一个事件的对象
def lighter():
green_time = 5
# 绿灯时间
red_time = 5
# 红灯时间
event.set()
# 初始设为绿灯
while True:
print("\33[32;0m 绿灯亮...\033[0m")
time.sleep(green_time)
event.clear()
print("\33[31;0m 红灯亮...\033[0m")
time.sleep(red_time)
event.set()
def run(name):
while True:
if event.is_set():
# 判断当前是否"放行"状态
print("一辆[%s] 呼啸开过..." % name)
time.sleep(1)
else:
print("一辆[%s]开来,看到红灯,无奈的停下了..." % name)
event.wait()
print("[%s] 看到绿灯亮了,瞬间飞起....." % name)
if __name__ == '__main__':
light = threading.Thread(target=lighter,)
light.start()
for name in ['奔驰', '宝马', '奥迪']:
car = threading.Thread(target=run, args=(name,))
car.start()
运行结果:
绿灯亮...
一辆[奔驰] 呼啸开过...
一辆[宝马] 呼啸开过...
一辆[奥迪] 呼啸开过...
一辆[奥迪] 呼啸开过...
......
红灯亮...
一辆[宝马]开来,看到红灯,无奈的停下了...
一辆[奥迪]开来,看到红灯,无奈的停下了...
一辆[奔驰]开来,看到红灯,无奈的停下了...
绿灯亮...
[奥迪] 看到绿灯亮了,瞬间飞起.....
一辆[奥迪] 呼啸开过...
[奔驰] 看到绿灯亮了,瞬间飞起.....
一辆[奔驰] 呼啸开过...
[宝马] 看到绿灯亮了,瞬间飞起.....
一辆[宝马] 呼啸开过...
一辆[奥迪] 呼啸开过...
......
7.案例六 线程锁之条件Condition
Condition称作条件锁,依然是通过acquire()/release()加锁解锁。
wait([timeout])方法将使线程进入Condition的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。
notify()方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用acquire()尝试获得锁定(进入锁定池),其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
notifyAll()方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
实际案例
import threading
import time
num = 0
con = threading.Condition()
class Foo(threading.Thread):
def __init__(self, name, action):
super(Foo, self).__init__()
self.name = name
self.action = action
def run(self):
global num
con.acquire()
print("%s开始执行..." % self.name)
while True:
if self.action == "add":
num += 1
elif self.action == 'reduce':
num -= 1
else:
exit(1)
print("num当前为:", num)
time.sleep(1)
if num == 5 or num == 0:
print("暂停执行%s!" % self.name)
con.notify()
con.wait()
print("%s开始执行..." % self.name)
con.release()
if __name__ == '__main__':
a = Foo("线程A", 'add')
b = Foo("线程B", 'reduce')
a.start()
b.start()
如果不强制停止,程序会一直执行下去,并循环下面的结果:
线程A开始执行...
num当前为: 1
num当前为: 2
num当前为: 3
num当前为: 4
num当前为: 5
暂停执行线程A!
线程B开始执行...
num当前为: 4
num当前为: 3
num当前为: 2
num当前为: 1
num当前为: 0
暂停执行线程B!
线程A开始执行...
num当前为: 1
num当前为: 2
num当前为: 3
num当前为: 4
num当前为: 5
暂停执行线程A!
线程B开始执行...
8.案例 七定时器
定时器Timer类是threading模块中的一个小工具,用于指定n秒后执行某操作。一个简单但很实用的东西。
from threading import Timer
def hello():
print("hello, world")
t = Timer(1, hello)
# 表示1秒后执行hello函数
t.start()
9.案例八 通过with语句使用线程锁
类似于上下文管理器,所有的线程锁都有一个加锁和释放锁的动作,非常类似文件的打开和关闭。在加锁后,如果线程执行过程中出现异常或者错误,没有正常的释放锁,那么其他的线程会造到致命性的影响。通过with上下文管理器,可以确保锁被正常释放。其格式如下:
with some_lock:
# 执行任务...
这相当于:
some_lock.acquire()
try:
# 执行任务..
finally:
some_lock.release()
10.threading 的常用属性
current_thread() 返回当前线程
active_count() 返回当前活跃的线程数,1个主线程+n个子线程
get_ident() 返回当前线程
enumerater() 返回当前活动 Thread 对象列表
main_thread() 返回主 Thread 对象
settrace(func) 为所有线程设置一个 trace 函数
setprofile(func) 为所有线程设置一个 profile 函数
stack_size([size]) 返回新创建线程栈大小;或为后续创建的线程设定栈大小为 size
TIMEOUT_MAX Lock.acquire(), RLock.acquire(), Condition.wait() 允许的最大超时时间
11.线程池 threadingpool
在使用多线程处理任务时也不是线程越多越好。因为在切换线程的时候,需要切换上下文环境,线程很多的时候,依然会造成CPU的大量开销。为解决这个问题,线程池的概念被提出来了。
预先创建好一个数量较为优化的线程组,在需要的时候立刻能够使用,就形成了线程池。在Python2中,没有内置的较好的线程池模块,需要自己实现或使用第三方模块。
需要注意的是,线程池的整体构造需要自己精心设计,比如某个函数定义存在多少个线程,某个函数定义什么时候运行这个线程,某个函数定义去获取线程获取任务,某个线程设置线程守护(线程锁之类的),等等…
Python3 线程池:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor # 线程池,进程池
import threading, time
def test(arg):
print(arg, threading.current_thread().name)
if __name__ == "__main__":
thread_pool = ThreadPoolExecutor(5) # 定义5个线程执行此任务
# process_pool = ProcessPoolExecutor(5) # 定义5个进程
for i in range(5):
thread_pool.submit(test, i)
# process_pool.submit(test,i)
下面是一个简单的线程池:
import queue
import time
import threading
class MyThreadPool:
def __init__(self, maxsize=5):
self.maxsize = maxsize
self._pool = queue.Queue(maxsize) # 使用queue队列,创建一个线程池
for _ in range(maxsize):
self._pool.put(threading.Thread)
def get_thread(self):
return self._pool.get()
def add_thread(self):
self._pool.put(threading.Thread)
def run(i, pool):
print('执行任务', i)
time.sleep(1)
pool.add_thread() # 执行完毕后,再向线程池中添加一个线程类
if __name__ == '__main__':
pool = MyThreadPool(5) # 设定线程池中最多只能有5个线程类
for i in range(20):
t = pool.get_thread() # 每个t都是一个线程类
obj = t(target=run, args=(i, pool)) # 这里的obj才是正真的线程对象
obj.start()
print("活动的子线程数: ", threading.active_count()-1)
分析一下上面的代码:
- 实例化一个MyThreadPool的对象,在其内部建立了一个最多包含5个元素的阻塞队列,并一次性将5个Thread类型添加进去。
- 循环100次,每次从pool中获取一个thread类,利用该类,传递参数,实例化线程对象。
- 在run()方法中,每当任务完成后,又为pool添加一个thread类,保持队列中始终有5个thread类。
- 一定要分清楚,代码里各个变量表示的内容。t表示的是一个线程类,也就是threading.Thread,而obj才是正真的线程对象。
上面的例子是把线程类当做元素添加到队列内,从而实现的线程池。这种方法比较糙,每个线程使用后就被抛弃,并且一开始就将线程开到满,因此性能较差。下面是一个相对好一点的例子,在这个例子中,队列里存放的不再是线程类,而是任务,线程池也不是一开始就直接开辟所有线程,而是根据需要,逐步建立,直至池满。
# -*- coding:utf-8 -*-
"""
一个基于thread和queue的线程池,以任务为队列元素,动态创建线程,重复利用线程,
通过close和terminate方法关闭线程池。
"""
import queue
import threading
import contextlib
import time
# 创建空对象,用于停止线程
StopEvent = object()
def callback(status, result):
"""
根据需要进行的回调函数,默认不执行。
:param status: action函数的执行状态
:param result: action函数的返回值
:return:
"""
pass
def action(thread_name, arg):
"""
真实的任务定义在这个函数里
:param thread_name: 执行该方法的线程名
:param arg: 该函数需要的参数
:return:
"""
# 模拟该函数执行了0.1秒
time.sleep(0.1)
print("第%s个任务调用了线程 %s,并打印了这条信息!" % (arg+1, thread_name))
class ThreadPool:
def __init__(self, max_num, max_task_num=None):
"""
初始化线程池
:param max_num: 线程池最大线程数量
:param max_task_num: 任务队列长度
"""
# 如果提供了最大任务数的参数,则将队列的最大元素个数设置为这个值。
if max_task_num:
self.q = queue.Queue(max_task_num)
# 默认队列可接受无限多个的任务
else:
self.q = queue.Queue()
# 设置线程池最多可实例化的线程数
self.max_num = max_num
# 任务取消标识
self.cancel = False
# 任务中断标识
self.terminal = False
# 已实例化的线程列表
self.generate_list = []
# 处于空闲状态的线程列表
self.free_list = []
def put(self, func, args, callback=None):
"""
往任务队列里放入一个任务
:param func: 任务函数
:param args: 任务函数所需参数
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数
1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""
# 先判断标识,看看任务是否取消了
if self.cancel:
return
# 如果没有空闲的线程,并且已创建的线程的数量小于预定义的最大线程数,则创建新线程。
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
# 构造任务参数元组,分别是调用的函数,该函数的参数,回调函数。
w = (func, args, callback,)
# 将任务放入队列
self.q.put(w)
def generate_thread(self):
"""
创建一个线程
"""
# 每个线程都执行call方法
t = threading.Thread(target=self.call)
t.start()
def call(self):
"""
循环去获取任务函数并执行任务函数。在正常情况下,每个线程都保存生存状态, 直到获取线程终止的flag。
"""
# 获取当前线程的名字
current_thread = threading.currentThread().getName()
# 将当前线程的名字加入已实例化的线程列表中
self.generate_list.append(current_thread)
# 从任务队列中获取一个任务
event = self.q.get()
# 让获取的任务不是终止线程的标识对象时
while event != StopEvent:
# 解析任务中封装的三个参数
func, arguments, callback = event
# 抓取异常,防止线程因为异常退出
try:
# 正常执行任务函数
result = func(current_thread, *arguments)
success = True
except Exception as e:
# 当任务执行过程中弹出异常
result = None
success = False
# 如果有指定的回调函数
if callback is not None:
# 执行回调函数,并抓取异常
try:
callback(success, result)
except Exception as e:
pass
# 当某个线程正常执行完一个任务时,先执行worker_state方法
with self.worker_state(self.free_list, current_thread):
# 如果强制关闭线程的flag开启,则传入一个StopEvent元素
if self.terminal:
event = StopEvent
# 否则获取一个正常的任务,并回调worker_state方法的yield语句
else:
# 从这里开始又是一个正常的任务循环
event = self.q.get()
else:
# 一旦发现任务是个终止线程的标识元素,将线程从已创建线程列表中删除
self.generate_list.remove(current_thread)
def close(self):
"""
执行完所有的任务后,让所有线程都停止的方法
"""
# 设置flag
self.cancel = True
# 计算已创建线程列表中线程的个数,
# 然后往任务队列里推送相同数量的终止线程的标识元素
full_size = len(self.generate_list)
while full_size:
self.q.put(StopEvent)
full_size -= 1
def terminate(self):
"""
在任务执行过程中,终止线程,提前退出。
"""
self.terminal = True
# 强制性的停止线程
while self.generate_list:
self.q.put(StopEvent)
# 该装饰器用于上下文管理
@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
用于记录空闲的线程,或从空闲列表中取出线程处理任务
"""
# 将当前线程,添加到空闲线程列表中
state_list.append(worker_thread)
# 捕获异常
try:
# 在此等待
yield
finally:
# 将线程从空闲列表中移除
state_list.remove(worker_thread)
# 调用方式
if __name__ == '__main__':
# 创建一个最多包含5个线程的线程池
pool = ThreadPool(5)
# 创建100个任务,让线程池进行处理
for i in range(100):
pool.put(action, (i,), callback)
# 等待一定时间,让线程执行任务
time.sleep(3)
print("-" * 50)
print("\033[32;0m任务停止之前线程池中有%s个线程,空闲的线程有%s个!\033[0m"
% (len(pool.generate_list), len(pool.free_list)))
# 正常关闭线程池
pool.close()
print("任务执行完毕,正常退出!")
# 强制关闭线程池
# pool.terminate()
# print("强制停止任务!")
3.协程模块
协程的机制使得我们可以用同步的方式写出异步运行的代码。
总所周知,Python因为有GIL(全局解释锁)这玩意,不可能有真正的多线程的存在,因此很多情况下都会用multiprocessing实现并发,而且在Python中应用多线程还要注意关键地方的同步,不太方便,用协程代替多线程和多进程是一个很好的选择,因为它吸引人的特性:主动调用/退出,状态保存,避免cpu上下文切换等
1.什么是协程?
协程,又称作Coroutine。从字面上来理解,即协同运行的例程,它是比是线程(thread)更细量级的用户态线程,特点是允许用户的主动调用和主动退出,挂起当前的例程然后返回值或去执行其他任务,接着返回原来停下的点继续执行。等下,这是否有点奇怪?我们都知道一般函数都是线性执行的,不可能说执行到一半返回,等会儿又跑到原来的地方继续执行。但一些熟悉python(or其他动态语言)的童鞋都知道这可以做到,答案是用yield语句。其实这里我们要感谢操作系统(OS)为我们做的工作,因为它具有getcontext和swapcontext这些特性,通过系统调用,我们可以把上下文和状态保存起来,切换到其他的上下文,这些特性为coroutine的实现提供了底层的基础。操作系统的Interrupts和Traps机制则为这种实现提供了可能性,因此它看起来可能是下面这样的:
2.理解生成器(generator)
学过生成器和迭代器的同学应该都知道python有yield这个关键字,yield能把一个函数变成一个generator,与return不同,yield在函数中返回值时会保存函数的状态,使下一次调用函数时会从上一次的状态继续执行,即从yield的下一条语句开始执行,这样做有许多好处,比如我们想要生成一个数列,若该数列的存储空间太大,而我们仅仅需要访问前面几个元素,那么yield就派上用场了,它实现了这种一边循环一边计算的机制,节省了存储空间,提高了运行效率。
def func():
def fib(max):
n, a, b = 0, 0, 1
while n < max:
yield b
a, b = b, a + b
n = n + 1
print([x for x in fib(6)][-1])
Coroutine与Generator
有些人会把生成器(generator)和协程(coroutine)的概念混淆,我以前也会这样,不过其实发现,两者的区别还是很大的。
直接上最重要的区别:
- generator总是生成值,一般是迭代的序列
- coroutine关注的是消耗值,是数据(data)的消费者
- coroutine不会与迭代操作关联,而generator会
- coroutine强调协同控制程序流,generator强调保存状态和产生数据
相似的是,它们都是不用return来实现重复调用的函数/对象,都用到了yield(中断/恢复)的方式来实现。
asyncio
asyncio是python 3.4中新增的模块,它提供了一种机制,使得你可以用协程(coroutines)、IO复用(multiplexing I/O)在单线程环境中编写并发模型。
根据官方说明,asyncio模块主要包括了:
- 具有特定系统实现的事件循环(event loop);
- 数据通讯和协议抽象(类似Twisted中的部分);
- TCP,UDP,SSL,子进程管道,延迟调用和其他;
- Future类;
- yield from的支持;
- 同步的支持;
- 提供向线程池转移作业的接口;
下面来看下asyncio的一个例子:
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
当事件循环开始运行时,它会在Task中寻找coroutine来执行调度,因为事件循环注册了print_sum(),因此print_sum()被调用,执行result = await compute(x, y)这条语句(等同于result = yield from compute(x, y)),因为compute()自身就是一个coroutine,因此print_sum()这个协程就会暂时被挂起,compute()被加入到事件循环中,程序流执行compute()中的print语句,打印”Compute %s + %s …”,然后执行了await asyncio.sleep(1.0),因为asyncio.sleep()也是一个coroutine,接着compute()就会被挂起,等待计时器读秒,在这1秒的过程中,事件循环会在队列中查询可以被调度的coroutine,而因为此前print_sum()与compute()都被挂起了,因此事件循环会停下来等待协程的调度,当计时器读秒结束后,程序流便会返回到compute()中执行return语句,结果会返回到print_sum()中的result中,最后打印result,事件队列中没有可以调度的任务了,此时loop.close()把事件队列关闭,程序结
接下来是异步生成器,来看一个例子:
假如我要到一家超市去购买土豆,而超市货架上的土豆数量是有限的:
class Potato:
@classmethod
def make(cls, num, *args, **kws):
potatos = []
for i in range(num):
potatos.append(cls.__new__(cls, *args, **kws))
return potatos
all_potatos = Potato.make(5)
现在我想要买50个土豆,每次从货架上拿走一个土豆放到篮子:
def take_potatos(num):
count = 0
while True:
if len(all_potatos) == 0:
sleep(.1)
else:
potato = all_potatos.pop()
yield potato
count += 1
if count == num:
break
def buy_potatos():
bucket = []
for p in take_potatos(50):
bucket.append(p)
对应到代码中,就是迭代一个生成器的模型,显然,当货架上的土豆不够的时候,这时只能够死等,而且在上面例子中等多长时间都不会有结果(因为一切都是同步的),也许可以用多进程和多线程解决,而在现实生活中,更应该像是这样的:
async def take_potatos(num):
count = 0
while True:
if len(all_potatos) == 0:
await ask_for_potato()
potato = all_potatos.pop()
yield potato
count += 1
if count == num:
break
当货架上的土豆没有了之后,我可以询问超市请求需要更多的土豆,这时候需要等待一段时间直到生产者完成生产的过程:
async def ask_for_potato():
await asyncio.sleep(random.random())
all_potatos.extend(Potato.make(random.randint(1, 10)))
当生产者完成和返回之后,这是便能从await挂起的地方继续往下跑,完成消费的过程。而这整一个过程,就是一个异步生成器迭代的流程:
async def buy_potatos():
bucket = []
async for p in take_potatos(50):
bucket.append(p)
print(f'Got potato {id(p)}...')
async for语法表示我们要后面迭代的是一个异步生成器。
def main():
import asyncio
loop = asyncio.get_event_loop()
res = loop.run_until_complete(buy_potatos())
loop.close()
用asyncio运行这段代码,结果是这样的:
Got potato 4338641384...
Got potato 4338641160...
Got potato 4338614736...
Got potato 4338614680...
Got potato 4338614568...
Got potato 4344861864...
Got potato 4344843456...
Got potato 4344843400...
Got potato 4338641384...
Got potato 4338641160...
...
既然是异步的,在请求之后不一定要死等,而是可以做其他事情。比如除了土豆,我还想买番茄,这时只需要在事件循环中再添加一个过程:
def main():
import asyncio
loop = asyncio.get_event_loop()
res = loop.run_until_complete(asyncio.wait([buy_potatos(), buy_tomatos()]))
loop.close()
再来运行这段代码:
Got potato 4423119312...
Got tomato 4423119368...
Got potato 4429291024...
Got potato 4421640768...
Got tomato 4429331704...
Got tomato 4429331760...
Got tomato 4423119368...
Got potato 4429331760...
Got potato 4429331704...
Got potato 4429346688...
Got potato 4429346072...
Got tomato 4429347360...
...
看下AsyncGenerator的定义,它需要实现aiter和anext两个核心方法,以及asend,athrow,aclose方法。
class AsyncGenerator(AsyncIterator):
__slots__ = ()
async def __anext__(self):
...
@abstractmethod
async def asend(self, value):
...
@abstractmethod
async def athrow(self, typ, val=None, tb=None):
...
async def aclose(self):
...
@classmethod
def __subclasshook__(cls, C):
if cls is AsyncGenerator:
return _check_methods(C, '__aiter__', '__anext__',
'asend', 'athrow', 'aclose')
return NotImplemented
异步生成器是在3.6之后才有的特性,同样的还有异步推导表达式,因此在上面的例子中,也可以写成这样:
bucket = [p async for p in take_potatos(50)]
类似的,还有await表达式:
result = [await fun() for fun in funcs if await condition()]
除了函数之外,类实例的普通方法也能用async语法修饰:
class ThreeTwoOne:
async def begin(self):
print(3)
await asyncio.sleep(1)
print(2)
await asyncio.sleep(1)
print(1)
await asyncio.sleep(1)
return
async def game():
t = ThreeTwoOne()
await t.begin()
print('start')
实例方法的调用同样是返回一个coroutine:
function = ThreeTwoOne.begin
method = function.__get__(ThreeTwoOne, ThreeTwoOne())
import inspect
assert inspect.isfunction(function)
assert inspect.ismethod(method)
assert inspect.iscoroutine(method())
同理还有类方法:
class ThreeTwoOne:
@classmethod
async def begin(cls):
print(3)
await asyncio.sleep(1)
print(2)
await asyncio.sleep(1)
print(1)
await asyncio.sleep(1)
return
async def game():
await ThreeTwoOne.begin()
print('start')
根据PEP 492中,async也可以应用到上下文管理器中,aenter和aexit需要返回一个Awaitable:
class GameContext:
async def __aenter__(self):
print('game loading...')
await asyncio.sleep(1)
async def __aexit__(self, exc_type, exc, tb):
print('game exit...')
await asyncio.sleep(1)
async def game():
async with GameContext():
print('game start...')
await asyncio.sleep(2)
在3.7版本,contextlib中会新增一个asynccontextmanager装饰器来包装一个实现异步协议的上下文管理器:
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_connection():
conn = await acquire_db_connection()
try:
yield
finally:
await release_db_connection(conn)
async修饰符也能用在call方法上:
class GameContext:
async def __aenter__(self):
self._started = time()
print('game loading...')
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc, tb):
print('game exit...')
await asyncio.sleep(1)
async def __call__(self, *args, **kws):
if args[0] == 'time':
return time() - self._started
async def game():
async with GameContext() as ctx:
print('game start...')
await asyncio.sleep(2)
print('game time: ', await ctx('time'))
await和yield from
Python3.3的yield from语法可以把生成器的操作委托给另一个生成器,生成器的调用方可以直接与子生成器进行通信:
def sub_gen():
yield 1
yield 2
yield 3
def gen():
return (yield from sub_gen())
def main():
for val in gen():
print(val)
# 1
# 2
# 3
利用这一特性,使用yield from能够编写出类似协程效果的函数调用,在3.5之前,asyncio正是使用@asyncio.coroutine和yield from语法来创建协程:
# https://docs.python.org/3.4/library/asyncio-task.html
import asyncio
@asyncio.coroutine
def compute(x, y):
print("Compute %s + %s ..." % (x, y))
yield from asyncio.sleep(1.0)
return x + y
@asyncio.coroutine
def print_sum(x, y):
result = yield from compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
然而,用yield from容易在表示协程和生成器中混淆,没有良好的语义性,所以在Python 3.5推出了更新的async/await表达式来作为协程的语法。
因此类似以下的调用是等价的:
async with lock:
...
with (yield from lock):
...
######################
def main():
return (yield from coro())
def main():
return (await coro())
那么,怎么把生成器包装为一个协程对象呢?这时候可以用到types包中的coroutine装饰器(如果使用asyncio做驱动的话,那么也可以使用asyncio的coroutine装饰器),@types.coroutine装饰器会将一个生成器函数包装为协程对象:
import asyncio
import types
@types.coroutine
def compute(x, y):
print("Compute %s + %s ..." % (x, y))
yield from asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
尽管两个函数分别使用了新旧语法,但他们都是协程对象,也分别称作native coroutine以及generator-based coroutine,因此不用担心语法问题。
下面观察一个asyncio中Future的例子:
import asyncio
future = asyncio.Future()
async def coro1():
await asyncio.sleep(1)
future.set_result('data')
async def coro2():
print(await future)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([
coro1(),
coro2()
]))
loop.close()
两个协程在在事件循环中,协程coro1在执行第一句后挂起自身切到asyncio.sleep,而协程coro2一直等待future的结果,让出事件循环,计时器结束后coro1执行了第二句设置了future的值,被挂起的coro2恢复执行,打印出future的结果'data'。
future可以被await证明了future对象是一个Awaitable,进入Future类的源码可以看到有一段代码显示了future实现了await协议:
class Future:
...
def __iter__(self):
if not self.done():
self._asyncio_future_blocking = True
yield self # This tells Task to wait for completion.
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.
if compat.PY35:
__await__ = __iter__ # make compatible with 'await' expression
当执行await future这行代码时,future中的这段代码就会被执行,首先future检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的Task(任务)等待future完成。
当future执行set_result方法时,会触发以下的代码,设置结果,标记future已经完成:
def set_result(self, result):
...
if self._state != _PENDING:
raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._result = result
self._state = _FINISHED
self._schedule_callbacks()
最后future会调度自身的回调函数,触发Task._step()告知Task驱动future从之前挂起的点恢复执行,不难看出,future会执行下面的代码:
class Future:
...
def __iter__(self):
...
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.
最终返回结果给调用方。
前面讲了那么多关于asyncio的例子,那么除了asyncio,就没有其他协程库了吗?asyncio作为python的标准库,自然受到很多青睐,但它有时候还是显得太重量了,尤其是提供了许多复杂的轮子和协议,不便于使用。
你可以理解为,asyncio是使用async/await语法开发的协程库,而不是有asyncio才能用async/await,除了asyncio之外,curio和trio是更加轻量级的替代物,而且也更容易使用。
asynccontextmanager使用
从Python 3.7开始,有两种方法可以编写异步上下文管理器。一种就是前面提到的魔法函数的实现,另外一种就是contextlib的另外一个模块asynccontextmanager。通过装饰器的方式实现一个异步上下文管理器
import asyncio
from contextlib import asynccontextmanager
from concurrent.futures.thread import ThreadPoolExecutor
class AsyncFile(object):
def __init__(self, file, loop=None, executor=None):
if not loop:
loop = asyncio.get_running_loop() # 获取当前运行事件循环
if not executor:
executor = ThreadPoolExecutor(10) # 线程池数量10
self.file = file
self.loop = loop
self.executor = executor
self.pending = []
self.result = []
def write(self, string):
"""
实现异步写操作
:param string: 要写的内容
:return:
"""
self.pending.append(
self.loop.run_in_executor(
self.executor, self.file.write, string,
)
)
def read(self, i):
"""
实现异步读操作
:param i:
:return:
"""
self.pending.append(
self.loop.run_in_executor(self.executor, self.file.read, i,)
)
def readlines(self):
self.pending.append(
self.loop.run_in_executor(self.executor, self.file.readlines, )
)
@asynccontextmanager
async def async_open(path, mode="w"):
with open(path, mode=mode) as f:
loop = asyncio.get_running_loop()
file = AsyncFile(f, loop=loop)
try:
yield file
finally:
file.result = await asyncio.gather(*file.pending, loop=loop)
上面的代码通过使用asyncio中run_in_executor运行一个线程,来使得阻塞操作变为非阻塞操作,达到异步非阻塞的目的。
AsyncFile类提供了一些方法,这些方法将用于将write、read和readlines的调用添加到pending列表中。这些任务通过finally块中的事件循环在ThreadPoolExecutor进行调度。
yield 前半段用来表示aenter()
yield 后半段用来表示aexit()
使用finally以后可以保证链接资源等使用完之后能够关闭。
运行异步上下文管理器
如果调用前面示例中的异步上下文管理器,则需要使用关键字async with来进行调用。另外带有async with的语句只能在异步函数中使用。
from asynciodemo.asyncwith import async_open
import asyncio
import tempfile
import os
async def main():
tempdir = tempfile.gettempdir()
path = os.path.join(tempdir, "run.txt")
print(f"临时文件目录:{path}")
async with async_open(path, mode='w') as f:
f.write("公众号: ")
f.write("Python")
f.write("学习")
f.write("开发")
if __name__ == '__main__':
asyncio.run(main())
2.Java相关
1.Java新特性
2.精通并发编程、注解、反射、集合、web编程
3.熟悉网络编程NIO Netty
4.熟悉JVM调优
5. 开源框架,原理源码
熟练使用spring/springmvc/springboot/mybatis等开源框架, 熟悉这些框架的原理、源码