基础
异步调用的原理,是在单个线程中通过切换任务(就像单线程的函数切换,负担很小,性能很好)来达到并发的效果。相较于线程这种比较重的并发方式,异步调用可以大大提升I/O密集型任务的执行效率,达到非常高的并发量。python中异步调用的最小单位是协程,随着异步编程在python3各个版本中不断的迭代,使用异步编程变得越来越简单,因此,我们有必要好好掌握python中异步编程相关的知识。
两个关键字
async
:用来声明一个协程函数(async def
),与普通函数不同的是,调用协程函数并不会返回其运行的结果,而是返回一个协程对象(coroutine)。协程对象需要注册到事件循环(event loop)中才能被调度执行到。
await
:用来异步等待一个协程的返回,只能在协程函数里使用。await
时意味着将函数控制权交还给event loop。举个例子,当我们在g()
函数内部遇到await f()
时,就会将暂时挂起g()
的执行直到 f()
返回,与此同时,将CPU的执行权让出给event loop中的其他函数继续执行。
awaitable
:就像for
关键字用于iterable
对象, await
关键字用于awaitable
对象。最常见的两个awaitable
对象就是原生的协程对象以及使用asyncio.create_task()
方法创建的asyncio.Task
对象。值得注意的是,你并不总是需要await
一个Task
如果你不需要取消或者等待协程运行的结果的话。
例子
以Fluent Python 2nd Edition
(神作,忍不住再次安利)这本书的示例代码为例:
import asyncio
import socket
from keyword import kwlist
MAX_KEYWORD_LEN = 4
async def probe(domain: str) -> tuple[str, bool]:
loop = asyncio.get_running_loop()
try:
await loop.getaddrinfo(domain, None)
except socket.gaierror:
return (domain, False)
return (domain, True)
async def main() -> None:
names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN)
domains = (f'{name}.dev'.lower() for name in names)
coros = [probe(domain) for domain in domains]
for coro in asyncio.as_completed(coros):
domain, found = await coro
mark = '+' if found else ' '
print(f'{mark} {domain}')
if __name__ == '__main__':
asyncio.run(main())
作者是这么描述协程的工作流程的,注意加粗部分:
Using the syntax await loop.getaddrinfo(…)
avoids blocking because await suspends the current coroutine object—for example, probe('if.dev')
. A new coroutine object is created, getaddrinfo('if.dev', None)
, it starts the low-level addrinfo query and yields control back to the event loop, which can drive other pending coroutine objects, such as probe('or.dev')
. When the event loop gets a response for the getaddrinfo('if.dev', None)
query, that specific coroutine object resumes and returns control back to the probe('if.dev')
—which was suspended at await—and can now handle a possible exception and return the result tuple.
这里注意一下英文中
Suspend
和Pending
的差异:Suspend:一个事情已经开始了, 不过现在要停了(可能是暂时地)。 Classes have been suspended for the holidays.
Pending:一个事情还没开始, 因为还在等其他东西。 This project is still pending for approval.
常用对象
Future:
Future
对象是用来模仿concurrent.futures
包中的Future
对象的,除了一小部分API有差异外,他们的API基本上兼容。Future
对象代表一个任务的结果,注意这里的结果可以是未执行的结果或者时一个执行异常。源代码中是这样描述这个对象的:
class Future(object):
"""
This class is *almost* compatible with concurrent.futures.Future.
Differences:
- result() and exception() do not take a timeout argument and
raise an exception when the future isn't done yet.
- Callbacks registered with add_done_callback() are always called
via the event loop's call_soon_threadsafe().
- This class is not compatible with the wait() and as_completed()
methods in the concurrent.futures package.
"""
Task:
一个和Future对象类似的协程对象,非线程安全,查看源代码可以看到Task
是Future
的子类,因此 Future
对象不一定是一个Task
对象, 但Task
对象一定是个Future
对象。
class Task(Future):
""" A coroutine wrapped in a Future. """
Task对象在创建时就会注册到事件循环中。
EventLoop:
管理和分配不同Task
的执行,Task
需要注册到EventLoo
以后才能被调度执行到。你可以把它看成是某个监控着协程空闲、可执行等运行状态,并且能根据某个协程等待的事件变为可执行时唤醒这些空闲的协程的While True
的循环。Loop
是可插拔(替换)的,也就是说,你可以自己实现一个事件循环来代替的默认的事件循环,比如Linux
系统上非常著名的uvloop
,使用下面代码即可替换EventLoop
实现:
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
uvloop是基于libuv库(nodejs)使用Cython编写的,拥有比自带的event loop更高的性能,遗憾的是你只能在*nix 和 python3.5+的环境中使用它。
常用方法
asyncio.run()
在python3.7中引入的新方法,会自动创建event loop并且执行run_until_complete,同时在执行结束时会自动关闭event loop,在引入该方法之前,你可能需要使用如下代码来执行一个简单的协程:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
如果你需要对未完成的任务执行cancel()
方法,那么还需要在另外写一些代码来处理它们。而asyncio.run()
方法将这些代码范式进行了封装,使得调用协程变得不需要再写这些模板式的代码了。
asyncio.gather(*aws, loop=None, return_exceptions=False)
这个方法将协程(准确的说是awaitable对象,因此也可以是future对象)集合统一放到一个future对象里面,并且将协程的结果统一在一个列表中返回。如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws
中可等待对象的顺序一致。
asyncio.ensure_future
和 asyncio.create_task
asyncio.ensure_future
虽然名字里带了future,但通常它返回的对象是一个Task
对象(除非传入的obj
对象本身就是一个Future对象),这是一个有点反直觉且经常容易混淆的点,看下面的例子:
import asyncio
async def foo():
print("before foo await")
await asyncio.sleep(1)
print("after foo await")
return "foo"
async def bar():
print("before bar await")
await asyncio.sleep(1)
print("after bar await")
return "bar"
async def popo():
print("before popo await")
await asyncio.sleep(1)
print("after popo await")
return "popo"
async def set_after(fut, delay, value):
# Sleep for *delay* seconds.
await asyncio.sleep(delay)
# Set *value* as a result of *fut* Future.
fut.set_result(value)
async def main():
print("running main")
task1 = asyncio.create_task(foo())
task2 = asyncio.create_task(bar())
fut1 = asyncio.ensure_future(popo())
loop = asyncio.get_running_loop()
fut2 = loop.create_future()
loop.create_task(
set_after(fut2, 1, '... world'))
print(isinstance(task1, asyncio.Future))
print(isinstance(fut1, asyncio.Task))
print(isinstance(fut2, asyncio.Task))
print(isinstance(fut2, asyncio.Future))
await task1
await task2
await fut1
await fut2
print("exiting main")
asyncio.run(main())
输出如下, 注意第三行和第四行的输出:
running main
True
True
False
True
before foo await
before bar await
before popo await
after foo await
after popo await
after bar await
exiting main
因此,python 3.7 及之后版本都推荐使用asyncio.create_task
方法,这个方法限制了传入的对象必须是一个协程对象。
asyncio.get_running_loop
和asyncio.get_event_loop
asyncio.get_running_loop
函数是在Python 3.7中添加,在协程内部使用以便获取运行着的事件循环的函数,当事件循环不存在时,这个函数可能会返回RuntimeError
。它的实现相较于asyncio.get_event_loop
(可能会按需开始一个新的事件循环)更加简单和快速.
其他常用类
Asyncio.Queue
对于并发编程,经常需要使用队列来将负载分配到多个任务上,比如经典的生产者-消费者模式,asyncio
包同样提了Queue
对象来满足这类需求,参考官方的代码示例:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())
其中的几个方法需要单独说明一下:
put_nowait(item)
: 将item放到队列中,非阻塞性操作,当队列满时,将抛出QueueFull
的异常;
put(item)
:将item放到队列中,阻塞性操作, 当队列满时,将一直等待直到有空位;
get_nowait()
: 从队列中获取一个item,非阻塞性操作,当队列为空时,将抛出QueueEmpty
的异常。
get()
: 从队列中获取一个item,阻塞性操作,当队列为空时,将一直等待直到有item可用;
task_done()
: 该方法通常由消费者处理,用来表示从队列获取的任务已经完成。对于每一个通过get()
从队列获取的任务,调用该方法会告知队列任务处理完成;
join()
: 阻塞直到队列中的所有item都已经被处理过。每个item添加到队列中时,未完成任务的计数就会增加;当这些任务调用task_done()
时,这个计数就会减少;当未完成任务计数为0时,join()
将不再阻塞。
Asyncio.Semaphore
异步编程处理IO密集型的任务时具有很好的性能,但有时我们也会希望限制一下并发量,这时候就可以使用信号量来达到这个目的。基本用法可以参考官方的代码示例:
sem = asyncio.Semaphore(10)
# ... later
async with sem:
# work with shared resource