1、concurrent.futures
concurrent.futures
模块提供了一个用于异步执行callables的高级接口。
这里面有三个重要的类。
-
concurrent.futures.Executor
一个抽象类,提供异步执行调用的方法。它不应该直接使用,而是通过其具体的子类。
它里面有几个重要的函数在子类ThreadPoolExecutor
、ProcessPoolExecutor
中得以实现:Executor.submit(fn,* args,** kwargs )
Executor.map(func,* iterables,timeout = None,chunksize = 1 )
func
:需要异步执行的函数*iterables
:可迭代对象,如列表等。每一次func执行,都会从iterables中取参数。timeout
:设置每次异步操作的超时时间
Executor.shutdown(wait = True )
-
ThreadPoolExecutor
ThreadPoolExecutor
是一个Executor
子类,它使用一个线程池来异步执行调用 -
ProcessPoolExecutor
ProcessPoolExecutor
类是Executor使用的过程池异步执行调用子类。class concurrent.futures.ProcessPoolExecutor(max_workers = None,
mp_context = None,initializer = None,initargs =())Executor使用最多max_workers进程池异步执行调用的子类。
如果max_workers是None或者没有给出,将默认为机器上的处理器数量。
2、应用场景
下面这段资料来自廖雪峰的python关于线程与进程的介绍。
如果你不幸拥有一个多核CPU,你肯定在想,多核应该可以同时执行多个线程。
如果写一个死循环的话,会出现什么情况呢?
打开Mac OS X的Activity Monitor,或者Windows的Task Manager,都可以监控某个进程的CPU使用率。
我们可以监控到一个死循环线程会100%占用一个CPU。
如果有两个死循环线程,在多核CPU中,可以监控到会占用200%的CPU,也就是占用两个CPU核心。
要想把N核CPU的核心全部跑满,就必须启动N个死循环线程。
试试用Python写个死循环:
import threading, multiprocessing
def loop():
x = 0
while True:
x = x ^ 1
for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=loop)
t.start()
启动与CPU核心数量相同的N个线程,在4核CPU上可以监控到CPU占用率仅有102%,也就是仅使用了一核。
但是用C、C++或Java来改写相同的死循环,直接可以把全部核心跑满,4核就跑到400%,8核就跑到800%,为什么Python不行呢?
因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。
结论就是:python不可能用多线程实现多任务并行,但是多进程就不存在这个问题。
举个栗子,下面是一天时间的解压归档日志:
[appl@SZVM-EXlOIT-1-243 test1]$ pwd
/opt/appl/test1
appl@SZVM-EXlOIT-1-243 test1]$ ll
总用量 684224
-rw-r----- 1 appl fspfappl 5243679 3月 13 00:20 access.log.2019-03-13-00.0
-rw-r----- 1 appl fspfappl 5248039 3月 13 00:53 access.log.2019-03-13-00.1
-rw-r----- 1 appl fspfappl 721806 3月 13 00:59 access.log.2019-03-13-00.2
-rw-r----- 1 appl fspfappl 5243914 3月 13 01:54 access.log.2019-03-13-01.0
-rw-r----- 1 appl fspfappl 387323 3月 13 01:59 access.log.2019-03-13-01.1
-rw-r----- 1 appl fspfappl 4510087 3月 13 02:59 access.log.2019-03-13-02.0
ProcessPoolExecutor
的map函数
可以在单机下利用多核资源去处理数据,这在数据量不是很大的情况下耗时和分布式计算相差无几,并且python的编程比使用spark job,hadoop mapreduce编程要简单的多。
参考代码:
import re
from pandas import DataFrame
import concurrent.futures
import glob
regrex = re.compile(r".*?(/ktb/[a-zA-Z\_\/]*?)\d*? +?.*")
def parse_file(file):
access_list = []
with open(file) as f:
for line in f:
result = regrex.match(line)
if result:
access_list.append(result.group(1))
return access_list
fileList = glob.glob('/opt/appl/test1/*')
access_lists = []
with concurrent.futures.ProcessPoolExecutor() as executor:
for access_list in executor.map(parse_file,fileList):
access_lists.extend(access_list)
df = DataFrame({'access':access_lists})