上一篇:生产者消费者问题
在之前的文章中我们一般只演示了两个线程的情况,在实际中我们要管理多个线程的时候就需要用到线程池。使用线程池管理线程能够使主线程可以获得某一线程的状态以及返回值,当一个线程完成的时候主线程就能立知道。
这里我们使用的线程池类是ThreadPoolExecutor
,它在concurrent.futures
下。concurrent.futures
中还包括了ProcessPoolExecutor
进程池对象,这个包的设计让多线程和多进程的接口一致。
下面是一个例子:
from concurrent.futures import ThreadPoolExecutor
import time
def do_something(name, sec):
print('Start doing %s' % name)
time.sleep(sec)
print('%s completed' % name)
return name
executor = ThreadPoolExecutor(max_workers=2)
task = executor.submit(do_something, 'A', 2)
print(task.done())
print(task.result())
print(task.done())
运行结果:
Start doing A
False
A completed
A
True
首先需要实例化一个线程池对象,ThreadPoolExecutor
类包含一个参数max_workers
,表示最大同时运行的线程个数。线程池中可以田间任意多个线程,但是同时能运行的个数为max_workers
,其他线程需要等当前正在运行的max_workers
个线程运行完成才能运行。线程池对象的submit
方法可传入一个函数句柄及它的参数,参数依次排列。一旦调用submit
方法,线程就已经开始执行或即将执行,并返回一个Future
对象。可调用Future
对象的done
方法查看线程是否执行完成,该方法非阻塞。还可以调用result
方法获得线程的返回值,该方法阻塞直到线程结束得到返回值。
如果线程过多,可采用下面的写法:
from concurrent.futures import ThreadPoolExecutor, as_completed
import random
...
all_task = [executor.submit(do_something, 'task_%d' %i, random.uniform(2,6)) for i in range(10)]
for future in as_completed(all_task):
data = future.result()
print(data)
这里的as_completed
是一个生成器,它会生成已经完成的线程的future
对象。先执行完成的线程的future
对象会先被生成,直到所有线程结束,最后一个线程的future
对象被生成。从结果来看,由于每次的线程切换不同,执行结果也不同。
另外还可以用ThreadPoolExecutor
对象的map
方法查询线程是否执行完成:
for data in executor.map(do_something, ['task_%d' %i for i in range(10)], [random.uniform(2,6) for i in range(10)]):
print(data)
和之前的as_completed
方法不同,map
生成器是按照参数的顺序返回的,但是线程执行依然是无序的。而且map
返回的是线程的返回值,不是Future
对象。在实践中最常用的还是第一种方法。
concurrent.futures
还提供了wait
方法,用于阻塞主线程。其用法是:
from concurrent.futures import wait, ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
wait(fs=all_task, return_when=ALL_COMPLETED)
第一参数fs
是需要等待的线程列表,还有一个可选参数是return_when
,即停止阻塞的条件,默认是ALL_COMPLETED
,即所有线程完成。除此之外还包括:FIRST_COMPLETED
(第一个线程执行完成后)、FIRST_EXCEPTION
(在子线程中第一次出现抛出错误后)。