Greenlet,轻量级协程,都是运行在主程序的进程中,但是可按合作方式调度。
每次只能有一个Greenlet运行。
和多进程库是不一样的,和有自旋锁的threading也不一样,和被操作系统调度的posix线程也是不一样的。
同步和异步
并发的核心意义是把大任务分成子任务,然后可以被同时或者异步调用,而不是顺序的同步执行。在子任务之间的切换就是上下文切换。
在gevent中,上下文切换是通过yield来进行的,也就是调用gevent.sleep(0)
import gevent
def foo():
print('Running in foo')
gevent.sleep(0)
print('Explicit context switch to foo again')
def bar():
print('Explicit context to bar')
gevent.sleep(0)
print('Implicit context switch back to bar')
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar
当然,gevent的真实威力是在处理网络、IO函数时可以被协作式调度。gevent来确保你的网络库会隐式地yield。
import time
import gevent
from gevent import select
start = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start)
def gr1():
# Busy waits for a second, but we don't want to stick around...
print('Started Polling: %s' % tic())
select.select([], [], [], 2)
print('Ended Polling: %s' % tic())
def gr2():
# Busy waits for a second, but we don't want to stick around...
print('Started Polling: %s' % tic())
select.select([], [], [], 2)
print('Ended Polling: %s' % tic())
def gr3():
print("Hey lets do some stuff while the greenlets poll, %s" % tic())
gevent.sleep(1)
gevent.joinall([
gevent.spawn(gr1),
gevent.spawn(gr2),
gevent.spawn(gr3),
])
Started Polling: at 0.0 seconds
Started Polling: at 0.0 seconds
Hey lets do some stuff while the greenlets poll, at 0.0 seconds
Ended Polling: at 2.0 seconds
Ended Polling: at 2.0 seconds
另外一个关于调度的例子:
import gevent
import random
def task(pid):
"""
Some non-deterministic task
"""
gevent.sleep(random.randint(0,2)*0.001)
print('Task %s done' % pid)
def synchronous():
for i in range(1,10):
task(i)
def asynchronous():
threads = [gevent.spawn(task, i) for i in xrange(10)]
gevent.joinall(threads)
print('Synchronous:')
synchronous()
print('Asynchronous:')
asynchronous()
Synchronous:
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
Asynchronous:
Task 1 done
Task 5 done
Task 6 done
Task 2 done
Task 4 done
Task 7 done
Task 8 done
Task 9 done
Task 0 done
Task 3 done
异步爬取数据的例子
import gevent.monkey
gevent.monkey.patch_socket()
import gevent
import urllib2
import simplejson as json
def fetch(pid):
response = urllib2.urlopen('http://json-time.appspot.com/time.json')
result = response.read()
json_result = json.loads(result)
datetime = json_result['datetime']
print('Process %s: %s' % (pid, datetime))
return json_result['datetime']
def synchronous():
for i in range(1,10):
fetch(i)
def asynchronous():
threads = []
for i in range(1,10):
threads.append(gevent.spawn(fetch, i))
gevent.joinall(threads)
print('Synchronous:')
synchronous()
print('Asynchronous:')
asynchronous()
Determinism
spawning
state
greenlet可能在遇到异常、无法使用资源的情况下出错并退出
started 用于指示Greenlet是否启动了
ready 用于指示Greenlet是否被停止
successful() 用于指示Greenlet是否已经停止且没有抛出异常
value Greenlet返回值
exception 抛出的异常
退出
如果main收到了SIGQUIT,会造成zombie process,应该在主线程内捕获并显示调用shutdown
import gevent
import signal
def run_forever():
gevent.sleep(1000)
if __name__ == '__main__':
gevent.signal(signal.SIGQUIT, gevent.kill)
thread = gevent.spawn(run_forever)
thread.join()
timeout
猴子补丁
猴子补丁只是把标准库中的阻塞io函数替换。
数据结构
Event
用于协程之间交流的
wait set clear
AsyncResult
get set clear
queue
Groups Pools
Locks Semaphores
Thread Locals
subprocess
Actor
import gevent
from gevent.queue import Queue
class Actor(gevent.Greenlet):
def __init__(self):
self.inbox = Queue()
Greenlet.__init__(self)
def receive(self, message):
"""
Define in your subclass.
"""
raise NotImplemented()
def _run(self):
self.running = True
while self.running:
message = self.inbox.get()
self.receive(message)
ZeroMQ
像并发模式一样工作,他的send recv是非阻塞的。
Simple Servers
from gevent.server import StreamServer
def handle(socket, address):
socket.send("Hello from a telnet!\n")
for i in range(5):
socket.send(str(i) + '\n')
socket.close()
server = StreamServer(('127.0.0.1', 5000), handle)
server.serve_forever()