参考:https://thief.one/2017/02/20/Python%E5%8D%8F%E7%A8%8B/
http://blog.gusibi.com/post/python-coroutine-1-yield/
1 概念
协程(Coroutine),又称微线程。
- 协程作用
在执行函数A时,可以随时终端,去执行函数B,然后中断继续执行函数A(可以自由切换)。不同于函数调用(没有调用语句)。 - 协程优势
1)执行效率极高,因为子程序切换(函数)不是线程切换,由程序自身控制,没有切换线程的开销。所以与多线程相比,线程的数量越多,协程性能的优势越明显。
2)不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在控制共享资源时也不需要加锁,因此执行效率高很多。
3)协程可以处理IO密集型程序的效率问题,但是处理CPU密集型不是它的长处,如要充分发挥CPU利用率可以结合多进程+协程。
2 Python2.x协程
python2.x协程应用:
- yield
- gevent
2.1 gevent
gevent是第三方库,通过greenlet实现协程,其基本思想:
当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
2.1.1 install
pip install gevent
2.1.2 gevent usage
- monkey可以使一些阻塞的模块变得不阻塞,机制:遇到IO操作则自动切换,手动切换可以用gevent.sleep(0)
- gevent.spawn 启动协程,参数为函数名称,参数名称
- gevent.joinall 停止协程
2.1.3 示例
一个爬虫的例子:
#! -*- coding:utf-8 -*-
import gevent
from gevent import monkey;monkey.patch_all()
import urllib2
def get_body(i):
print "start",i
urllib2.urlopen("http://cn.bing.com")
print "end",i
tasks=[gevent.spawn(get_body,i) for i in range(3)]
gevent.joinall(tasks)
运行结果:
start 0
start 1
start 2
end 2
end 0
end 1
说明:从结果上来看,执行get_body的顺序应该先是输出”start”,然后执行到urllib2时碰到IO堵塞,则会自动切换运行下一个程序(继续执行get_body输出start),直到urllib2返回结果,再执行end。也就是说,程序没有等待urllib2请求网站返回结果,而是直接先跳过了,等待执行完毕再回来获取返回值。值得一提的是,在此过程中,只有一个线程在执行,因此这与多线程的概念是不一样的。
换成多线程实现:
import threading
import urllib2
def get_body(i):
print "start",i
urllib2.urlopen("http://cn.bing.com")
print "end",i
for i in range(3):
t=threading.Thread(target=get_body,args=(i,))
t.start()
运行结果:
start 0
start 1
start 2
end 1
end 2
end 0
说明:从结果来看,多线程与协程的效果一样,都是达到了IO阻塞时切换的功能。不同的是,多线程切换的是线程(线程间切换),协程切换的是上下文(可以理解为执行的函数)。而切换线程的开销明显是要大于切换上下文的开销,因此当线程越多,协程的效率就越比多线程的高。(猜想多进程的切换开销应该是最大的)
2.2 yield
协程的底层架构是在pep342 中定义,并在python2.5 实现的。
python2.5中,yield关键字可以在表达式中使用,而且生成器API中增加了.send(value)方法。生成器可以调用.send(...)方法发送数据,发送的数据会成为生成器函数中yield表达式的值。
2.2.1 协程生成器的基本行为
首先说明一下,协程有四个状态,可以使用inspect.getgeneratorstate(…)函数确定:
- GEN_CREATED # 等待开始执行
- GEN_RUNNING # 解释器正在执行(只有在多线程应用中才能看到这个状态)
- GEN_SUSPENDED # 在yield表达式处暂停
- GEN_CLOSED # 执行结束
示例代码:
#! -*- coding: utf-8 -*-
import inspect
# 协程使用生成器函数定义:定义体中有yield关键字。
def simple_coroutine():
print('-> coroutine started')
# yield 在表达式中使用;如果协程只需要从客户那里接收数据,yield关键字右边不需要加表达式(yield默认返回None)
x = yield
print('-> coroutine received:', x)
my_coro = simple_coroutine()
my_coro # 和创建生成器的方式一样,调用函数得到生成器对象。
# 协程处于 GEN_CREATED (等待开始状态)
print(inspect.getgeneratorstate(my_coro))
my_coro.send(None)
# 首先要调用next()函数,因为生成器还没有启动,没有在yield语句处暂停,所以开始无法发送数据
# 发送 None 可以达到相同的效果 my_coro.send(None)
next(my_coro)
# 此时协程处于 GEN_SUSPENDED (在yield表达式处暂停)
print(inspect.getgeneratorstate(my_coro))
# 调用这个方法后,协程定义体中的yield表达式会计算出42;现在协程会恢复,一直运行到下一个yield表达式,或者终止。
my_coro.send(42)
print(inspect.getgeneratorstate(my_coro))
运行上述代码,输出结果如下:
GEN_CREATED
-> coroutine started
GEN_SUSPENDED
-> coroutine received: 42
# 这里,控制权流动到协程定义体的尾部,导致生成器像往常一样抛出StopIteration异常
Traceback (most recent call last):
File "/Users/gs/coroutine.py", line 18, in <module>
my_coro.send(42)
StopIteration
send方法的参数会成为暂停yield表达式的值,所以,仅当协程处于暂停状态是才能调用send方法。 如果协程还未激活(GEN_CREATED 状态)要调用next(my_coro) 激活协程,也可以调用my_coro.send(None)
再看一个两个值的协程:
def simple_coro2(a):
print('-> coroutine started: a =', a)
b = yield a
print('-> Received: b =', b)
c = yield a + b
print('-> Received: c =', c)
my_coro2 = simple_coro2(14)
print(inspect.getgeneratorstate(my_coro2))
# 这里inspect.getgeneratorstate(my_coro2) 得到结果为 GEN_CREATED (协程未启动)
next(my_coro2)
# 向前执行到第一个yield 处 打印 “-> coroutine started: a = 14”
# 并且产生值 14 (yield a 执行 等待为b赋值)
print(inspect.getgeneratorstate(my_coro2))
# 这里inspect.getgeneratorstate(my_coro2) 得到结果为 GEN_SUSPENDED (协程处于暂停状态)
my_coro2.send(28)
# 向前执行到第二个yield 处 打印 “-> Received: b = 28”
# 并且产生值 a + b = 42(yield a + b 执行 得到结果42 等待为c赋值)
print(inspect.getgeneratorstate(my_coro2))
# 这里inspect.getgeneratorstate(my_coro2) 得到结果为 GEN_SUSPENDED (协程处于暂停状态)
my_coro2.send(99)
# 把数字99发送给暂停协程,计算yield 表达式,得到99,然后把那个数赋值给c 打印 “-> Received: c = 99”
# 协程终止,抛出StopIteration
运行上述代码,输出结果如下:
GEN_CREATED
-> coroutine started: a = 14
GEN_SUSPENDED
-> Received: b = 28
-> Received: c = 99
Traceback (most recent call last):
File "/Users/gs/coroutine.py", line 37, in <module>
my_coro2.send(99)
StopIteration
simple_coro2 协程的执行过程分为3个阶段,如下图所示:
1)调用next(my_coro2),打印第一个消息,然后执行yield a,产出数字14.
2)调用my_coro2.send(28),把28赋值给b,打印第二个消息,然后执行 yield a + b 产生数字42
3)调用my_coro2.send(99),把99赋值给c,然后打印第三个消息,协程终止。
3 实战
以下是读视频流然后做视频识别的实战案例。
代码如下:
def __video_capture(self, target):
while self.__gate_opened:
origin_frame = self.__video_stream.read()
frame_rgb = cv2.cvtColor(origin_frame, cv2.COLOR_BGR2RGB)
target.send(frame_rgb)
else:
target.close()
@coroutine
def __parse_origin_frame(self, target):
try:
while True:
frame = (yield)
if frame is None:
continue
category_index = ComponentManager.get_category_index()
updated_frame, score, classes,boxes = parse_origin_video_frame(frame, self.__session, self.__detection_graph, category_index)
items = item_detect(score, classes, boxes, category_index)
if items:
valid_frame = Frame()
#print "items: ",
for item in items:
#print item.get_id(), item.get_name(), item.get_score(), item.get_x(), item.get_y(),
valid_frame.add_item(item)
#print " end"
self.__cur_frame_manager.add_frame(valid_frame)
target.send(updated_frame)
except GeneratorExit:
target.close()
@coroutine
def __handle_cv(self):
try:
while True:
frame = (yield)
if frame is None:
continue
output_rgb = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
cv2.imshow(self.__video_name, output_rgb)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
except GeneratorExit:
with self.__lock:
self.__cur_frame_manager.parse_frame()
self.__cur_frame_manager = None