python之concurrent.futures — 启动并行任务

注本文摘自https://docs.python.org/3/library/concurrent.futures.html

1.概论

concurrent.futures 模块提供了一个高水平的接口用于异步执行调用。
异步执行可以使用线程实现,使用ThreadPoolExecutor,或者独立的进程,使用ProcessPoolExecutor 实现。两者都实现了相同的接口, 都是由抽象 Executor 类定义的.

2. Executor对象

class concurrent.futures.Executor
一个提供执行异步调用方法的抽象类。它不应该直接使用,而是通过它的具体子类。

2.1 submit(fn, *args, **kwargs)

调度可调用的fn,作为fn(args kwargs)执行,并返回一个表示可调用的执行的Future对象。

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

2.2 map(func, *iterables, timeout=None, chunksize=1)

与map(func,*iterables)相对应的异步执行的版本,并且对func的几个调用可以并发地执行。调用Executor.map()指定的timeout时间后,如果_next_()调用后结果不可用,迭代器将会抛出concurrent.futures.TimeoutError异常。timeout可以是一个整形或者浮点型数值,如果timeout不指定或者为None,等待时间无限。如果调用引发异常,那么当从迭代器中检索值时将会抛出异常。当使用ProcessPoolExecutor时,该方法将iterables分成若干块,并将其作为单独的任务提交到池中。通过将块大小设置为一个正整数,可以指定这些块的(近似)大小。对于非常长的迭代器,使用一个较大的值来进行块大小可以显著地提高性能,而不是默认的1。对于ThreadPoolExecutor,chunksize参数没有影响。

2.3 shutdown(wait=True)

告诉执行程序,当当前正在执行的Futures结束执行时,它应该释放它正在使用的任何资源。在shutdown之后调用 Executor.submit() 和Executor.map()将会报 RuntimeError.

如果wait等于True,那么这个方法将不会返回,直到所有的待完成的Futures执行完,并且与执行程序关联的资源已经被释放。如果wait等于False,那么该方法将立即返回,与执行器相关联的资源将在所有等待的Futures执行时被释放。不管wait的值是什么,整个Python程序都不会退出,直到所有的待完成的Futures都被执行完成。

你可以避免显示的调用该方法通过使用with 语句。当然了,后台使用的wait参数是True.

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

3.ThreadPoolExecutor

ThreadPoolExecutor是一个Executor的子类,它使用线程池来异步执行调用。
当一个Future关联的调用等待其他Future的结果时,就可能出现死锁。例如:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)

class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')

Executor子类,使用max_workers规格的线程池来执行异步调用。
例子:

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

4.ProcessPoolExecutor

ProcessPoolExecutor 类是Executor 的一个子类,使用进程池执行异步调用。
ProcessPoolExecutor使用 multiprocessing 模块,该模块允许它避开全局解释器锁,但也意味着只能执行和返回可执行的对象。

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

5. Future Objects

Future类封装了可调用的异步执行.Future 实例通过 Executor.submit()方法创建。

5.1 cancel()

试图取消调用。如果调用当前正在执行,并且不能被取消,那么该方法将返回False,否则调用将被取消,方法将返回True。

5.2 cancelled()

如果成功取消调用,返回True。

5.3 running()

如果调用当前正在执行并且不能被取消,返回True。

5.4 done()

如果调用成功地取消或结束了,返回True。

5.5 result(timeout=None)

返回调用返回的值。如果调用还没有完成,那么这个方法将等待超时秒。如果调用在超时秒内没有完成,那么就会有一个concurrent.Futures.TimeoutError将报出。timeout可以是一个整形或者浮点型数值,如果timeout不指定或者为None,等待时间无限。如果futures在完成之前被取消了,那么 CancelledError 将会报出。

5.6 exception(timeout=None)

返回调用抛出的异常,如果调用还未完成,该方法会等待timeout指定的时长,如果该时长后调用还未完成,就会报出超时错误
concurrent.futures.TimeoutError。timeout可以是一个整形或者浮点型数值,如果timeout不指定或者为None,等待时间无限。如果futures在完成之前被取消了,那么 CancelledError 将会报出。
如果调用完成并且无异常报出,返回None.

5.7 add_done_callback(fn)

将可调用fn捆绑到future上,当Future被取消或者结束运行,fn作为future的唯一参数将会被调用。如果future已经运行完成或者取消,fn将会被立即调用。

6.Module Functions

6.1 concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待fs提供的 Future 实例(possibly created by different Executor instances) 运行结束。返回一个命名的2元集合,分表代表已完成的和未完成的futures.
timeout 可用于控制等待的最大时长. timeout 可以是一个整形或者浮点型数值,如果timeout不指定或者为None,等待时间无限。
return_when 表明什么时候函数应该返回。它的值必须是一下值之一:

FIRST_COMPLETED :函数在任何future结束或者取消的时候返回。
FIRST_EXCEPTION :函数在任何future因为异常结束的时候返回,如果没有future报错,效果等于ALL_COMPLETED.
ALL_COMPLETED :函数在所有future结束后才会返回。

6.2 concurrent.futures.as_completed(fs, timeout=None)

def as_completed(fs, timeout=None):
    """An iterator over the given futures that yields each as it completes.

    Args:
        fs: The sequence of Futures (possibly created by different Executors) to
            iterate over.
        timeout: The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.

    Returns:
        An iterator that yields the given Futures as they complete (finished or
        cancelled). If any given Futures are duplicated, they will be returned
        once.

    Raises:
        TimeoutError: If the entire result iterator could not be generated
            before the given timeout.
    """
    if timeout is not None:
        end_time = timeout + time.time()

    fs = set(fs)
    with _AcquireFutures(fs):
        finished = set(
                f for f in fs
                if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
        pending = fs - finished
        waiter = _create_and_install_waiters(fs, _AS_COMPLETED)

    try:
        yield from finished

        while pending:
            if timeout is None:
                wait_timeout = None
            else:
                wait_timeout = end_time - time.time()
                if wait_timeout < 0:
                    raise TimeoutError(
                            '%d (of %d) futures unfinished' % (
                            len(pending), len(fs)))

            waiter.event.wait(wait_timeout)

            with waiter.lock:
                finished = waiter.finished_futures
                waiter.finished_futures = []
                waiter.event.clear()

            for future in finished:
                yield future
                pending.remove(future)

    finally:
        for f in fs:
            with f._condition:
                f._waiters.remove(waiter)

配合 for 使用可以循环得到已经完成的 Future.

import concurrent.futures
# from concurrent.futures import ThreadPoolExecutor
def wait_on_future():
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.、
    for future in concurrent.futures.as_completed((f,)):
        print(future.result())

executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
def main_thread(rlt):
    with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
        # Start the load operations and mark each future with its URL
        future_to_url = {executor.submit(get_rlt_from_allfile, rlt[i]): rlt[i] for i in range(0,len(rlt))}
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (url, exc))
            else:
                pass
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容