asyncio模块:asyncio是Python用于解决异步IO编程的整套解决方案。
- 高并发编程三个要素:事件循环+ IO多路复用 + 回调函数(驱动生成器,即协程);
- 包括各种特定系统实现的模块化事件循环(select、poll、epoll);
- 传输和协议抽象;
- 对TCP、UDP、SSL、子进程、延时调用以及其他的具体支持;
- 模仿
futures
模块但适用于事件循环使用的Future类; - 基于
yield from
的协议和任务,可以顺序的方式编写并发代码; - 必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转义到线程池;
- 模仿
threading
模块中的同步原语、可以用在单线程的协程之间; - 关键词
async
定义协程,await
异步调用。
基于asyncio的框架:tornado(实现可直接部署的web服务器)、gevent、twisted(scrapy,django channels),使用这些框架必须有对应的异步驱动支持(如tornado中使用pymysql则不能异步)。
使用asyncio
- 单线程,所有的函数调用都在loop中执行(如执行耗时操作则会等待完成后才执行下一个);
- 使用
await asyncio.sleep(2)
与time.sleep(2)
的区别是前者会立即返回一个Future对象,下次循环的时候判断这个是否已经过2s,而不是阻塞等待。
import asyncio
import time
# 定义异步函数
async def get_html(url):
print("start get url")
await asyncio.sleep(2)
# time.sleep(5) # 协程是单线程执行,time.sleep是同步阻塞接口,不应在协程中实现
print("end get url")
start_time = time.time()
loop = asyncio.get_event_loop() # 创建事件循环(单线程:所有的函数调用都在loop中执行)
tasks = [
get_html("http://www.imooc.com") for i in range(10)
]
loop.run_until_complete(asyncio.wait(tasks)) # 类似join,等待协程执行完成才往下执行
print(time.time() - start_time)
获取协程返回值
import asyncio
import time
from functools import partial
async def get_html(url):
print("start get url")
await asyncio.sleep(2)
return "ywh"
def callback(url, future):
print(url)
print("send email to ywh")
start_time = time.time()
loop = asyncio.get_event_loop()
# get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
task = loop.create_task(get_html("http://www.imooc.com")) # 创建一个任务,返回Future对象
task.add_done_callback(
partial(callback, "http://www.imooc.com") # 把callback包装可接收参数的偏函数
) # 执行get_html,完成后调用回调函数callback,最后再返回get_html的结果
loop.run_until_complete(task)
print(task.result()) # 获取Future对象的结果
wait与gather:gather是更高层次的封装,可以将task分组管理;
import asyncio
import time
async def get_html(url):
print("start get url")
await asyncio.sleep(2)
print("end get url")
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
tasks = [get_html("http://www.imooc.com") for i in range(10)]
# loop.run_until_complete(asyncio.gather(*tasks))
# print(time.time() - start_time)
group1 = [get_html("http://projectsedu.com") for i in range(2)]
group2 = [get_html("http://www.imooc.com") for i in range(2)]
group1 = asyncio.gather(*group1)
group2 = asyncio.gather(*group2)
group2.cancel()
loop.run_until_complete(asyncio.gather(group1, group2))
print(time.time() - start_time)
协程的取消,嵌套
import asyncio
loop = asyncio.get_event_loop()
loop.run_forever() # 一直运行不会停止
loop.run_until_complete() # 运行指定协程后停止
取消future(task)
import asyncio
import time
async def get_html(sleep_times):
print("waiting")
await asyncio.sleep(sleep_times)
print("done after {}s".format(sleep_times))
tasks = [get_html(2), get_html(3), get_html(4)] # 模拟三个执行时长不同的任务
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e: # 人为制造取消信号:捕捉键盘ctrl + c异常
all_tasks = asyncio.Task.all_tasks() # 不需要传入loop:自动从events.get_event_loop中获取loop,并获取loop中的所有task
for task in all_tasks: # 获取所有tasks
print(task.cancel()) # 取消task,返回取消结果
loop.stop()
loop.run_forever() # loop调用stop后必须重新调用run_forever,否则会抛出异常
finally:
loop.close()
嵌套协程的调度过程(task -> print_sum -> compute):
import asyncio
# print_sum协程嵌套await_compute协程
async def compute(x, y):
print("Compute %s + %s ..." %(x, y))
await asyncio.sleep(1.0) # 3
return x + y # 4
async def print_sum(x, y):
result = await compute(x, y) # 2
print("%s + %s = %s" %(x, y, result)) # 5
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2)) # 1
loop.close()
- 创建loop和需要提交到loop的task(通过task驱动协程执行),执行;
- 协程中的await相当于yield from,在task和compute子协程之间建立一个通道,此时进入compute调度,print_sum暂停;
- 子协程compute中的await表示暂停,不经过pring_sum、直接返回给task再返回给loop,等待1s;
- 1s过后,task经通道询问compute,compute计算好结果值会抛出异常(StopIteration)并返回计算结果,compute协程标记完成;
- print_sum捕捉到compute的异常、提取结果值,最后会把异常抛出给task,print_sum协程标记完成。
call_at,call_soon,call_later,call_soon_threadsafe
import asyncio
def callback(sleep_times, loop):
print("success time {}".format(loop.time()))
def stoploop(loop):
loop.stop()
loop = asyncio.get_event_loop()
now = loop.time()
loop.call_at(now + 2, callback, 2, loop) # 当前时间的2s后执行
loop.call_at(now + 1, callback, 1, loop)
loop.call_at(now + 3, callback, 3, loop)
# loop.call_soon(stoploop, loop) # 退出循环
loop.call_soon(callback, 4, loop) # 立刻执行(在队列中等待到下一个循环即执行),另外还有call_later
# 处理共享变量的线程安全问题可以使用call_soon_threadsafe
loop.run_forever()
集成线程池
在协程中集成阻塞IO:对于阻塞的库和接口(如pymysql),协程中要使用多线程。
import asyncio
from concurrent.futures import ThreadPoolExecutor
import socket
import time
from urllib.parse import urlparse
def get_url(url):
url = urlparse(url)
host = url.netloc
path = url.path
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# client.setblocking(False)
client.connect((host, 80))
client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
data = b""
while True:
d = client.recv(1024)
if d:
data += d
else:
break
data = data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
client.close()
start_time = time.time()
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(3)
tasks = []
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
task = loop.run_in_executor(executor, get_url, url) # 指定线程池,将阻塞IO操作放入loop中,不影响系统运行
# 把线程中的future包装成协程的future
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print("last time:{}".format(time.time() - start_time))
模拟Http请求
requests是同步的http请求模块,在asyncio不能达到异步的效果,而asyncio本身没有提供http协议的接口,可以使用aiohttp:
import time
import asyncio
import socket
from urllib.parse import urlparse
async def get_url(url):
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"
# 建立socket连接比较费时,使用await
reader, writer = await asyncio.open_connection(host, 80)
writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
all_lines = []
async for raw_line in reader: # 异步化的for循环
data = raw_line.decode("utf8")
all_lines.append(data)
html = "\n".join(all_lines)
return html
async def main():
tasks = []
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
tasks.append(asyncio.ensure_future(get_url(url)))
for task in asyncio.as_completed(tasks):
result = await task
print(result)
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print('last time:{}'.format(time.time() - start_time))
future和task
见asyncio源码
同步与通信
import aiohttp
import asyncio
from asyncio import Lock, Queue # await,区别于多线程的Queue(阻塞)
cache = {}
queue = [] # 协程是单线程,因此使用list、dict就可以实现通信,而不会有线程安全问题
lock = Lock()
async def get_stuff(url):
# 如果没有同步机制,此处可能会被两个协程都对同一个url发起请求(耗时、被反爬)
async with lock: # 加锁,避免parse_stuff和use_stuff同时执行这部分代码
# 由__await__、__aenter__实现,等价于await lock.acquire()、lock.release()
if url in cache:
return cache[url]
stuff = await aiohttp.request('GET', url)
cache[url] = stuff
return stuff
async def parse_stuff():
stuff = await get_stuff()
async def use_stuff():
stuff = await get_stuff()
tasks = [parse_stuff(), use_stuff()] # 当协程函数中没有await,则会按加入tasks顺序执行
实例:基于Asyncio协程的高并发爬虫
import re
import asyncio
import aiohttp
import aiomysql
from pyquery import PyQuery
# https://www.lfd.uci.edu/
stop_flag = False
start_url = "http://www.jobbole.com/"
waitting_urls = []
seen_urls = set()
sem = asyncio.Semaphore(3)
async def fetch(url, session):
"""
发送http请求
:param url:
:return:
"""
async with sem: # 并发度控制
await asyncio.sleep(1) # 爬取速度控制
try:
async with session.get(url) as resp:
print('url statis: {0}'.format(resp.status))
if resp.status in [200, 201]:
data = await resp.text()
return data
except Exception as e:
print(e)
def extract_urls(html):
"""
从请求页面中获取下次要请求url
:param html:
:return:
"""
urls = []
pq = PyQuery(html)
for link in pq.items('a'):
url = link.attr('href')
if url and url.startswith('http') and url not in seen_urls:
urls.append(url)
waitting_urls.append(url)
return urls
async def article_handler(url, session, pool):
"""
获取文章详情并解析入库
:param url:
:param session:
:return:
"""
html = await fetch(url, session)
seen_urls.add(url)
extract_urls(html)
pq = PyQuery(html)
title = pq('title').text() # 省略其他字段
print(title)
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("sql")
insert_sql = """
INSERT INTO xxx
"""
print(cur.description)
await cur.execute(insert_sql)
async def init_urls(url, session):
"""
解析页面,
:param url:
:param session:
:return:
"""
html = await fetch(url, session)
seen_urls.add(url)
extract_urls(html)
async def consumer(pool, session):
# async with aiohttp.ClientSession() as session: # 发送http请求需要的session
while not stop_flag:
if len(waitting_urls) == 0:
await asyncio.sleep(0.5)
continue
url = waitting_urls.pop()
print('start get url: ' + url)
# 详情页协程,解析页面内容、入库
if re.match('http://.*?jobbole.com/\d+/', url):
if url not in seen_urls:
asyncio.ensure_future(article_handler(url, session, pool))
# 非详情页协程,进一步提取出详情页的url
else:
if url not in seen_urls:
asyncio.ensure_future(init_urls(url, session))
async def main(loop):
# 等待Mysql连接池建立
pool = await aiomysql.create_pool(
host='', port='', user='', password='', db='mysql', loop=loop, charset='utf8', autocommit=True
)
async with aiohttp.ClientSession() as session: # 发送http请求需要的session
html = await fetch(start_url, session)
seen_urls.add(start_url)
extract_urls(html)
# consumer协程从url获取,动态向asyncio提交article_handler和init_urls协程
asyncio.ensure_future(consumer(pool, session))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
asyncio.ensure_future(main(loop))
loop.run_forever()