Python 借助 asyncio 实现并发编程

asyncio 基础

创建协程

使用 async 关键字创建 coroutine

async def coroutine_add_one(number: int) -> int:
    return number + 1


def add_one(number: int) -> int:
    return number + 1


function_result = add_one(1)
coroutine_result = coroutine_add_one(1)

print(
    f'Function result is {function_result} and the type is {type(function_result)}')
# => Function result is 2 and the type is <class 'int'>

print(
    f'Coroutine result is {coroutine_result} and the type is {type(coroutine_result)}')
# => Coroutine result is <coroutine object coroutine_add_one at 0x7f9f495f20a0> and the type is <class 'coroutine'>
# => sys:1: RuntimeWarning: coroutine 'coroutine_add_one' was never awaited

创建 coroutine 和创建普通的函数一样直接,唯一的区别在于使用 async def 而不是 def
当我们直接调用协程 coroutine_add_one 时,传入的参数并没有被加 1 然后返回计算结果,我们只是得到了一个 coroutine object
即我们只是创建了一个能够在之后的某个时间运行的 coroutine 对象,为了运行它,我们总是需要显式地将其放入 event loop 中。最简单的方式就是使用 asyncio.run 函数。

运行 coroutine

import asyncio


async def coroutine_add_one(number: int) -> int:
    return number + 1


result = asyncio.run(coroutine_add_one(1))
print(result)
# => 2

asyncio.run 是 asyncio 应用程序的入口。

使用 await 关键字暂停执行
asyncio 的真正用处,在于能够在一个长时间运行的操作过程中,暂停执行,从而令 event loop 有机会处理其他任务。“暂停”的动作通过 await 关键字触发。await 后面通常紧跟着一个对 coroutine (更严谨地说,一个 awaitable 对象)的调用。

import asyncio


async def add_one(number: int) -> int:
    return number + 1


async def main() -> None:
    one_plus_one = await add_one(1)
    two_plus_one = await add_one(2)
    print(one_plus_one)
    # => 2
    print(two_plus_one)
    # => 3


asyncio.run(main())

首先 await 对协程 add_one(1) 的调用,此时父协程(即 main())被暂停,add_one(1) 执行并获取结果(2),main() 协程恢复执行,将结果赋值给 one_plus_one;同样地,对协程 add_one(2)await 也会导致 main() 被暂停和恢复。

await

sleep

前面的例子只是为了介绍协程的基本语法,并没有涉及任何 long-running 操作,因而也没有享受到 asyncio 在并发方面的作用。我们可以借助 asyncio.sleep 函数模拟 web API 请求或者数据库查询等长时间运行的操作,asyncio.sleep 能够令某个协程“睡眠”指定的时间(秒)。
asyncio.sleep 本身就是一个协程,因而当我们在某个协程中 await asyncio.sleep 时,其他部分代码就得到了执行的机会。

sleep 实现 delay 函数

# util.py
import asyncio


async def delay(delay_seconds: int) -> int:
    print(f'sleeping for {delay_seconds} second(s)')
    await asyncio.sleep(delay_seconds)
    print(f'finished sleeping for {delay_seconds} second(s)')
    return delay_seconds

运行两个协程

import asyncio
from util import delay


async def add_one(number: int) -> int:
    return number + 1


async def hello_world_message() -> str:
    await delay(1)
    return 'Hello Wrold!'


async def main() -> None:
    message = await hello_world_message()
    one_plus_one = await add_one(1)
    print(one_plus_one)
    print(message)
    # => sleeping for 1 second(s)
    # => finished sleeping for 1 second(s)
    # => 2
    # => Hello Wrold!


asyncio.run(main())

运行上面的代码,先是等待 1 秒钟,之后才是两个函数调用的结果被打印出来。我们本来希望看到的是,两个协程并发地执行,add_one(1) 的结果直接被输出,并不需要等待 hello_world_message() 中的 sleep 结束。
实际上 await 会暂停其所在的协程(这里是 main),并且不会执行当前协程中的任何其他代码,直到 await 表达式获得一个结果。hello_world_message 需要 1 秒钟才能返回结果,因而 main 协程也会被暂停 1 秒钟。排在它后面的 add_one(1) 在暂停结束后执行并返回结果。

execution flow

上面的代码和同步、顺序执行的代码没有表现出任何区别。为了实现并发,我们需要引入一个新的概念 task

tasks

Task 是对协程的一种包装,能够将一个协程调度至 event loop 并争取尽快执行。这种调度是以一种非阻塞的方式发生的,即 task 被创建后会立即返回,不必等待其运行结束,从而我们能够有机会执行其他代码。

并发地执行多个 task

import asyncio
from util import delay


async def hello_every_second():
    for i in range(2):
        await asyncio.sleep(1)
        print("I'm running other code while I'm waiting!")


async def main():
    first_delay = asyncio.create_task(delay(3))
    second_delay = asyncio.create_task(delay(3))
    await hello_every_second()
    await first_delay
    await second_delay


asyncio.run(main())
# => sleeping for 3 second(s)
# => sleeping for 3 second(s)
# => I'm running other code while I'm waiting!
# => I'm running other code while I'm waiting!
# => finished sleeping for 3 second(s)
# => finished sleeping for 3 second(s)

上述代码创建了 2 个 task,每个都需要 3 秒钟才能执行完毕。两次对 create_task 的调用都会立即返回。由于 task 调度的原则是尽快执行,当后面的 await 代码刷新了一次 event loop 之后,前面创建的 2 个 task 会立即被执行(非阻塞)。
两个 delay task 在 sleep 过程中,应用是闲置的,我们得以有机会运行其他代码。协程 hello_every_second 每隔 1 秒输出一条消息。整个应用总的运行时间大约是 3 秒,即大约等于耗时最长的异步任务的时间,而不是像顺序执行的程序那样,等于多个任务运行时间的总和。

execution flow

协程和任务的陷阱

将一些长时间运行的任务并发的执行,能够带来很大程度上的性能提升。因而我们会倾向于在应用的任何地方使用协程和 task。事实上,仅仅将函数用 async 修饰,将其封装进 task,并不总是带来性能上的提升。甚至有些情况下还会降低程序的效率。
最主要的情形有两种,一个是在不借助多进程的情况下,尝试在 task 或协程中运行 CPU-bound 代码;另一种是在不借助多线程的情况下调用阻塞式 I/O-bound API

CPU 密集型任务

有时候我们需要一些函数执行 CPU 密集型的任务,比如对一个很大的字典执行循环或者数学计算。为了提升效率,我们会想着将它们放置在单独的 task 中运行。然而现实是,asyncio 使用单线程并发模型,我们依然会受到单个线程和 GIL 的限制

计算协程运行时间

# util.py
import asyncio
import functools
import time
from typing import Callable, Any


def async_timed():
    def wrapper(func: Callable) -> Callable:
        @functools.wraps(func)
        async def wrapped(*args, **kwargs) -> Any:
            print(f'Starting {func} with {args} {kwargs}')
            start = time.time()
            try:
                return await func(*args, **kwargs)
            finally:
                end = time.time()
                total = end - start
                print(f'finished {func} in {total:.4f} second(s)')
        return wrapped
    return wrapper


@async_timed()
async def delay(delay_seconds: int) -> int:
    print(f'sleeping for {delay_seconds} second(s)')
    await asyncio.sleep(delay_seconds)
    print(f'finished sleeping for {delay_seconds} second(s)')
    return delay_seconds

运行 CPU-bound 代码

import asyncio
from util import delay, async_timed


@async_timed()
async def cpu_bound_work() -> int:
    counter = 0
    for i in range(100000000):
        counter = counter + 1
    return counter


@async_timed()
async def main():
    task_one = asyncio.create_task(cpu_bound_work())
    task_two = asyncio.create_task(cpu_bound_work())
    delay_task = asyncio.create_task(delay(4))
    await task_one
    await task_two
    await delay_task


asyncio.run(main())
# => Starting <function main at 0x7f2d6b85bc70> with () {}
# => Starting <function cpu_bound_work at 0x7f2d6c2bba30> with () {}
# => finished <function cpu_bound_work at 0x7f2d6c2bba30> in 2.7423 second(s)
# => Starting <function cpu_bound_work at 0x7f2d6c2bba30> with () {}
# => finished <function cpu_bound_work at 0x7f2d6c2bba30> in 2.7430 second(s)
# => Starting <function delay at 0x7f2d6b85a0e0> with (4,) {}
# => sleeping for 4 second(s)
# => finished sleeping for 4 second(s)
# => finished <function delay at 0x7f2d6b85a0e0> in 4.0048 second(s)
# => finished <function main at 0x7f2d6b85bc70> in 9.4903 second(s)

上述代码创建了 3 个 task,但实际执行时依然是顺序的而非并发的,耗费的时间并没有变少。两个 CPU-bound task 是依次执行的,甚至 delay_task 也并没有与其他两个任务呈现并发性。原因在于我们先创建了两个 CPU-bound 任务,这两个任务会阻塞 event loop,阻止其调度执行任何其他任务。
因此,总的运行时间等于两个 CPU-bound 任务执行完毕的时间加上 delay 任务运行的 4 秒。即 asyncio 并没有为 CPU-bound 的任务带来任何性能上的提升。
假如我们需要在执行 CPU-bound 任务的同时仍使用 async 语法,就必须借助多进程,告诉 asyncio 在 process pool 中执行任务。

阻塞式 API

我们也会倾向于使用现有的库执行 I/O-bound 操作,再将其封装进协程。然而,这会引起与 CPU-bound 操作同样的问题。因为这些 API 会阻塞 main 线程。
当我们在协程内部调用一个阻塞的 API,我们会阻塞 event loop 线程本身,线程被阻塞请求占据,导致 event loop 无法调度任何其他协程和任务。阻塞式 API 请求包括 requests 库和 time.sleep 等。通常来说,任何执行 I/O 操作且不是协程的函数,以及执行 CPU 密集型任务的函数,都可以认为是阻塞的

协程内部调用阻塞式 API

import asyncio
import requests
from util import async_timed


@async_timed()
async def get_example_status() -> int:
    return requests.get('http://www.example.com').status_code


@async_timed()
async def main():
    task_1 = asyncio.create_task(get_example_status())
    task_2 = asyncio.create_task(get_example_status())
    task_3 = asyncio.create_task(get_example_status())
    await task_1
    await task_2
    await task_3


asyncio.run(main())
# => Starting <function main at 0x7f4335080790> with () {}
# => Starting <function get_example_status at 0x7f4335186170> with () {}
# => finished <function get_example_status at 0x7f4335186170> in 0.5144 second(s)
# => Starting <function get_example_status at 0x7f4335186170> with () {}
# => finished <function get_example_status at 0x7f4335186170> in 0.5163 second(s)
# => Starting <function get_example_status at 0x7f4335186170> with () {}
# => finished <function get_example_status at 0x7f4335186170> in 0.5177 second(s)
# => finished <function main at 0x7f4335080790> in 1.5488 second(s)

main 协程运行的总时间基本上等于所有 task 运行的时间之和。即我们并没有获取到任何并发上的收益。原因在于 requests 库是阻塞的,任何调用都会阻塞当前线程,而 asyncio 只有一个线程,在阻塞调用结束之前,线程中的 event loop 没有机会以异步的形式运行任何任务。
当你使用的库并没有返回协程,你并没有在自己的协程中使用 await 关键字,很大可能你就是在进行阻塞的函数调用。当前我们使用的大多数 API 都是阻塞的,并不支持与 asyncio 开箱即用。
要想体验到 asyncio 带来的异步和并发特性,就必须使用原生支持协程和非阻塞 socket 的库,比如 aiohttp。或者你坚持使用 requests 库,同时又需要 async 语法,就必须显式地告诉 asyncio 使用多线程的方式,通过 thread pool executor 执行阻塞调用。

借助支持协程的库 aiohttp 实现并发

import asyncio
from aiohttp import ClientSession
from util import async_timed


@async_timed()
async def get_example_status() -> int:
    session = ClientSession()
    resp = await session.get('http://example.com')
    await session.close()
    return resp.status


@async_timed()
async def main():
    task_1 = asyncio.create_task(get_example_status())
    task_2 = asyncio.create_task(get_example_status())
    task_3 = asyncio.create_task(get_example_status())
    await task_1
    await task_2
    await task_3


asyncio.run(main())
# => Starting <function main at 0x7fd9f90b6a70> with () {}
# => Starting <function get_example_status at 0x7fd9f90b63b0> with () {}
# => Starting <function get_example_status at 0x7fd9f90b63b0> with () {}
# => Starting <function get_example_status at 0x7fd9f90b63b0> with () {}
# => finished <function get_example_status at 0x7fd9f90b63b0> in 0.5191 second(s)
# => finished <function get_example_status at 0x7fd9f90b63b0> in 0.5191 second(s)
# => finished <function get_example_status at 0x7fd9f90b63b0> in 0.5191 second(s)
# => finished <function main at 0x7fd9f90b6a70> in 0.5196 second(s)

可以看到所有 task 执行的总时间,基本上只比一个 task 运行的时间多一点点。此时的程序是并发执行的。

取消任务

取消任务

每个 task 对象都有一个 cancel 方法可以帮助我们随时终止该任务。当我们 await 取消的任务时,会报出 CancelledError 异常。
比如我们调度执行某个任务,又不希望该任务运行的时间超过 5 秒:

import asyncio
from asyncio import CancelledError
from util import delay


async def main():
    long_task = asyncio.create_task(delay(10))

    seconds_elapsed = 0

    while not long_task.done():
        print('Task not finished, checking again in a second.')
        await asyncio.sleep(1)
        seconds_elapsed = seconds_elapsed + 1
        if seconds_elapsed == 5:
            long_task.cancel()

    try:
        await long_task
    except CancelledError:
        print('Our task was cancelled')


asyncio.run(main())
# => Task not finished, checking again in a second.
# => Starting <function delay at 0x7fdb383ae0e0> with (10,) {}
# => sleeping for 10 second(s)
# => Task not finished, checking again in a second.
# => Task not finished, checking again in a second.
# => Task not finished, checking again in a second.
# => Task not finished, checking again in a second.
# => Task not finished, checking again in a second.
# => finished <function delay at 0x7fdb383ae0e0> in 5.0079 second(s)
# => Our task was cancelled

需要注意的是,CancelledError 只会在 await 语句处抛出,调用 cancel 方法并不会神奇地强行关闭正在运行的任务,只有你刚好遇到 await 时任务才会被终止,不然就等待下一个 await

使用 wait_for 设置超时时间
每隔一段时间手动进行检查,以确定是否取消某个任务,并不算一种简单的处理方式。asyncio 提供了一个 wait_for 函数,它接收一个协程或者任务,以及超时的秒数作为参数,返回一个协程对象。
若任务运行超时,一个 TimeoutException 就会被抛出,任务自动被终止。

import asyncio
from util import delay


async def main():
    delay_task = asyncio.create_task(delay(2))
    try:
        result = await asyncio.wait_for(delay_task, timeout=1)
        print(result)
    except asyncio.exceptions.TimeoutError:
        print('Got a timeout')
        print(f'Was the task cancelled? {delay_task.cancelled()}')

asyncio.run(main())
# => Starting <function delay at 0x7f71e18160e0> with (2,) {}
# => sleeping for 2 second(s)
# => finished <function delay at 0x7f71e18160e0> in 1.0016 second(s)
# => Got a timeout
# => Was the task cancelled? True

asyncio.shield
在另外一些情况下,我们有可能并不希望直接取消某个超时的任务,而是当任务运行时间过长时,提醒用户这个情况,但是并不执行任何 cancel 操作。
shield 可以帮助我们实现这样的功能。

from util import delay


async def main():
    task = asyncio.create_task(delay(10))

    try:
        result = await asyncio.wait_for(asyncio.shield(task), 5)
        print(result)
    except asyncio.exceptions.TimeoutError:
        print("Task took longer than five seconds, it will finish soon!")
        result = await task
        print(result)


asyncio.run(main())
# => Starting <function delay at 0x7ff344d120e0> with (10,) {}
# => sleeping for 10 second(s)
# => Task took longer than five seconds, it will finish soon!
# => finished sleeping for 10 second(s)
# => finished <function delay at 0x7ff344d120e0> in 10.0063 second(s)
# => 10

参考资料

Python Concurrency with asyncio

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容