在前面的博客中介绍了线程的用法,每次使用都要创建线程,启动线程,有没有什么办法简单操作呢。
python3.2引入的concurrent.future模块中有ThreadPoolExecutor和ProcessPoolExecutor两个类,这两个类内部维护着线程/进程池,以及要执行的任务队列,使得操作变得非常简单,不需要关心任何实现细节
来看一个简单的例子
#!/usr/bin/env python3.6
from concurrent.futures import ThreadPoolExecutor
import requests
import os
DEST_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "download")
BASE_URL = "http://flupy.org/data/flags"
CC_LIST = ("CN", "US", "JP", "EG")
if not os.path.exists(DEST_DIR):
os.mkdir(DEST_DIR)
def get_img(cc):
url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
response = requests.get(url)
return response.content
def save_img(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, 'wb') as f:
f.write(img)
def download_one(cc):
img = get_img(cc)
save_img(img, cc.lower() + ".gif")
return cc
def download_many(cc_list):
works = len(cc_list)
with ThreadPoolExecutor(works) as exector: # 使用with来管理ThreadPoolExecutor
# map方法和内置的map方法类似,不过exector的map方法会并发调用,返回一个由返回的值构成的生成器
response = exector.map(download_one, cc_list)
return len(list(response))
if __name__ == "__main__":
download_many(CC_LIST)
Future
concurrent.futures和asyncio中的Future类的作用相同,****都表示可能己经完成或尚未完成的延迟计算****
Future封装待完成的操作,可以放入队列,完成的状态可以查询,得到结果后可以获取结果
使用exector.submit()
方法提交执行的函数并获取一个Future,而不是直接创建,传入的参数是一个可调用的对象;获取的Future对象有一个done()
方法,判断该Future是否己完成, add_one_callback()
设置回调函数, result()
来获取Future的结果。as_completed()
传一个Future列表,在Future都完成之后返回一个迭代器
使用submit()方法试试看
def download_many(cc_list):
with ThreadPoolExecutor(max_workers=5) as exector:
future_list = []
for cc in cc_list:
# 使用submit提交执行的函数到线程池中,并返回futer对象(非阻塞)
future = exector.submit(download_one, cc)
future_list.append(future)
print(cc, future)
result = []
# as_completed方法传入一个Future迭代器,然后在Future对象运行结束之后yield Future
for future in futures.as_completed(future_list):
# 通过result()方法获取结果
res = future.result()
print(res, future)
result.append(res)
return len(result)
>>>
CN <Future at 0x7f80d32f5400 state=running>
US <Future at 0x7f80d330c320 state=running>
JP <Future at 0x7f80d330c8d0 state=running>
EG <Future at 0x7f80d330ce10 state=running>
JP <Future at 0x7f80d330c8d0 state=finished returned str>
CN <Future at 0x7f80d32f5400 state=finished returned str>
EG <Future at 0x7f80d330ce10 state=finished returned str>
US <Future at 0x7f80d330c320 state=finished returned str>
ProcessPoolExecutor的使用方法是一样的,唯一需要注意的区别是传入的max_workers这个参数对于ProcessPoolExecutor是可选的,在不使用的情况下默认值是os.cpu_count()的返回值(cpu的数量)
exector.submit()和futures.as_completed()这个组合比exector.map()更灵活,submit()可以处理不同的调用函数和参数,而map只能处理同一个可调用对象。
wait()阻塞主线程,直到所有task都完成。