python异步编程小抄

基础

异步调用的原理,是在单个线程中通过切换任务(就像单线程的函数切换,负担很小,性能很好)来达到并发的效果。相较于线程这种比较重的并发方式,异步调用可以大大提升I/O密集型任务的执行效率,达到非常高的并发量。python中异步调用的最小单位是协程,随着异步编程在python3各个版本中不断的迭代,使用异步编程变得越来越简单,因此,我们有必要好好掌握python中异步编程相关的知识。

两个关键字

async:用来声明一个协程函数(async def),与普通函数不同的是,调用协程函数并不会返回其运行的结果,而是返回一个协程对象(coroutine)。协程对象需要注册到事件循环(event loop)中才能被调度执行到。

await:用来异步等待一个协程的返回,只能在协程函数里使用await时意味着将函数控制权交还给event loop。举个例子,当我们在g()函数内部遇到await f()时,就会将暂时挂起g()的执行直到 f()返回,与此同时,将CPU的执行权让出给event loop中的其他函数继续执行。

awaitable:就像for关键字用于iterable对象, await关键字用于awaitable对象。最常见的两个awaitable对象就是原生的协程对象以及使用asyncio.create_task()方法创建的asyncio.Task对象。值得注意的是,你并不总是需要await一个Task如果你不需要取消或者等待协程运行的结果的话。

例子

Fluent Python 2nd Edition(神作,忍不住再次安利)这本书的示例代码为例:

import asyncio
import socket
from keyword import kwlist

MAX_KEYWORD_LEN = 4 


async def probe(domain: str) -> tuple[str, bool]: 
    loop = asyncio.get_running_loop() 
    try:
        await loop.getaddrinfo(domain, None) 
    except socket.gaierror:
        return (domain, False)
    return (domain, True)

async def main() -> None: 
     names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN) 
     domains = (f'{name}.dev'.lower() for name in names) 
     coros = [probe(domain) for domain in domains] 
     for coro in asyncio.as_completed(coros): 
     domain, found = await coro 
     mark = '+' if found else ' '
     print(f'{mark} {domain}')
    
    
if __name__ == '__main__':
    asyncio.run(main()) 

作者是这么描述协程的工作流程的,注意加粗部分:

Using the syntax await loop.getaddrinfo(…) avoids blocking because await suspends the current coroutine object—for example, probe('if.dev'). A new coroutine object is created, getaddrinfo('if.dev', None), it starts the low-level addrinfo query and yields control back to the event loop, which can drive other pending coroutine objects, such as probe('or.dev'). When the event loop gets a response for the getaddrinfo('if.dev', None) query, that specific coroutine object resumes and returns control back to the probe('if.dev')—which was suspended at await—and can now handle a possible exception and return the result tuple.

这里注意一下英文中SuspendPending的差异:

Suspend:一个事情已经开始了, 不过现在要停了(可能是暂时地)。 Classes have been suspended for the holidays.

Pending:一个事情还没开始, 因为还在等其他东西。 This project is still pending for approval.

常用对象

Future:

Future对象是用来模仿concurrent.futures包中的Future对象的,除了一小部分API有差异外,他们的API基本上兼容。Future对象代表一个任务的结果,注意这里的结果可以是未执行的结果或者时一个执行异常。源代码中是这样描述这个对象的:

class Future(object):
    """
    This class is *almost* compatible with concurrent.futures.Future.
    
        Differences:
    
        - result() and exception() do not take a timeout argument and
          raise an exception when the future isn't done yet.
    
        - Callbacks registered with add_done_callback() are always called
          via the event loop's call_soon_threadsafe().
    
        - This class is not compatible with the wait() and as_completed()
          methods in the concurrent.futures package.
    """

Task:

一个和Future对象类似的协程对象,非线程安全,查看源代码可以看到TaskFuture的子类,因此 Future对象不一定是一个Task对象, 但Task对象一定是个Future对象。

class Task(Future):
    """ A coroutine wrapped in a Future. """

Task对象在创建时就会注册到事件循环中。

EventLoop:

管理和分配不同Task的执行,Task需要注册到EventLoo以后才能被调度执行到。你可以把它看成是某个监控着协程空闲、可执行等运行状态,并且能根据某个协程等待的事件变为可执行时唤醒这些空闲的协程的While True的循环Loop是可插拔(替换)的,也就是说,你可以自己实现一个事件循环来代替的默认的事件循环,比如Linux系统上非常著名的uvloop,使用下面代码即可替换EventLoop实现:

import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

uvloop是基于libuv库(nodejs)使用Cython编写的,拥有比自带的event loop更高的性能,遗憾的是你只能在*nixpython3.5+的环境中使用它。

常用方法

asyncio.run()

在python3.7中引入的新方法,会自动创建event loop并且执行run_until_complete,同时在执行结束时会自动关闭event loop,在引入该方法之前,你可能需要使用如下代码来执行一个简单的协程:

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

如果你需要对未完成的任务执行cancel()方法,那么还需要在另外写一些代码来处理它们。而asyncio.run()方法将这些代码范式进行了封装,使得调用协程变得不需要再写这些模板式的代码了。

asyncio.gather(*aws, loop=None, return_exceptions=False)

这个方法将协程(准确的说是awaitable对象,因此也可以是future对象)集合统一放到一个future对象里面,并且将协程的结果统一在一个列表中返回。如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。

asyncio.ensure_futureasyncio.create_task

asyncio.ensure_future虽然名字里带了future,但通常它返回的对象是一个Task对象(除非传入的obj对象本身就是一个Future对象),这是一个有点反直觉且经常容易混淆的点,看下面的例子:

import asyncio


async def foo():
    print("before foo await")
    await asyncio.sleep(1)
    print("after foo await")
    return "foo"


async def bar():
    print("before bar await")
    await asyncio.sleep(1)
    print("after bar await")
    return "bar"


async def popo():
    print("before popo await")
    await asyncio.sleep(1)
    print("after popo await")
    return "popo"


async def set_after(fut, delay, value):
    # Sleep for *delay* seconds.
    await asyncio.sleep(delay)
    # Set *value* as a result of *fut* Future.
    fut.set_result(value)


async def main():
    print("running main")
    task1 = asyncio.create_task(foo())
    task2 = asyncio.create_task(bar())
    fut1 = asyncio.ensure_future(popo())
    loop = asyncio.get_running_loop()
    fut2 = loop.create_future()
    loop.create_task(
        set_after(fut2, 1, '... world'))
    print(isinstance(task1, asyncio.Future))
    print(isinstance(fut1, asyncio.Task))
    print(isinstance(fut2, asyncio.Task))
    print(isinstance(fut2, asyncio.Future))
    await task1
    await task2
    await fut1
    await fut2
    print("exiting main")


asyncio.run(main())

输出如下, 注意第三行和第四行的输出:

running main
True
True
False
True
before foo await
before bar await
before popo await
after foo await
after popo await
after bar await
exiting main

因此,python 3.7 及之后版本都推荐使用asyncio.create_task方法,这个方法限制了传入的对象必须是一个协程对象。

asyncio.get_running_loopasyncio.get_event_loop

asyncio.get_running_loop函数是在Python 3.7中添加,在协程内部使用以便获取运行着的事件循环的函数,当事件循环不存在时,这个函数可能会返回RuntimeError。它的实现相较于asyncio.get_event_loop (可能会按需开始一个新的事件循环)更加简单和快速.

其他常用类

Asyncio.Queue

对于并发编程,经常需要使用队列来将负载分配到多个任务上,比如经典的生产者-消费者模式,asyncio包同样提了Queue对象来满足这类需求,参考官方的代码示例:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())

其中的几个方法需要单独说明一下:

put_nowait(item): 将item放到队列中,非阻塞性操作,当队列满时,将抛出QueueFull的异常;

put(item):将item放到队列中,阻塞性操作, 当队列满时,将一直等待直到有空位;

get_nowait(): 从队列中获取一个item,非阻塞性操作,当队列为空时,将抛出QueueEmpty的异常。

get(): 从队列中获取一个item,阻塞性操作,当队列为空时,将一直等待直到有item可用;

task_done(): 该方法通常由消费者处理,用来表示从队列获取的任务已经完成。对于每一个通过get()从队列获取的任务,调用该方法会告知队列任务处理完成;

join(): 阻塞直到队列中的所有item都已经被处理过。每个item添加到队列中时,未完成任务的计数就会增加;当这些任务调用task_done()时,这个计数就会减少;当未完成任务计数为0时,join()将不再阻塞。

Asyncio.Semaphore

异步编程处理IO密集型的任务时具有很好的性能,但有时我们也会希望限制一下并发量,这时候就可以使用信号量来达到这个目的。基本用法可以参考官方的代码示例:

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容