当有多个非相关任务需要处理时,并行能大大提高处理速度。这里简要介绍python的multiprocessing
模块。
各种方法是否是阻塞的可参考 python语法——使用Pool实现多进程并行。
map() 会使进程阻塞,即通过 map() 开启的多进程都结束之后,这个函数才会有返回结果,否则主进程会一直等待,不会往下进行 。
map_async() 为非阻塞,即通过 map_async() 开启多进程之后,立刻会返回结果,主进程会继续往下执行。注意:如果后面调用了 join() 函数,则不管之前用的是 map 还是 map_async,主进程都会等待,直到进程池中所有进程执行完毕,才会继续往下执行。
简单多进程编写
当我们任务数量确定而且比较少的时候,可以手动为每个任务指定一个进程来运行。
import multiprocessing as mp
def f(a):
print(a)
if __name__ == '__main__':
# 这里有三个任务,手动指定3个进程
p1 = mp.Process(target=f, args=(1,))
p2 = mp.Process(target=f, args=(2,))
p3 = mp.Process(target=f, args=(3,))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
# 输出
1
2
3
使用进程池来处理多任务
当我们任务比较多而且不确定数量(又或者想使得代码更简洁)的时候可以使用进程池Pool
来编写多进程。
import multiprocessing as mp
def f(a):
return a
if __name__ == '__main__':
pool = mp.Pool()
res = pool.map(f, (1, 2, 3, 4))
print(res)
# 输出
[1, 2, 3, 4]
可以看到,进程池不仅可以方便的处理多进程,还将各个进程的的处理结果储存了在一个列表中。Pool
默认使用计算机所有cpu核来进行运算,也可使用Pool(process=4)
来指定并行的进程数。
另一个可以储存结果的函数是apply_async()
,但是这个函数只支持传入一个参数,也即运行一个任务,运行多个任务需要多次指定。此外,他返回的结果是一个类似生成器的东西,需要通过get
函数取出来。
import multiprocessing as mp
def f(a):
return a
if __name__ == '__main__':
pool = mp.Pool()
res = [pool.apply_async(f, (i,)) for i in range(4)]
print([r.get() for r in res])
# 输出
[0, 1, 2, 3]
多参数函数的多进程
上面处理任务的函数都只有一个参数,在实际情况这种情况是很少的,一般我们的函数都需要传入多个参数。python2中未能支持传多个参数,python3.3后则有starmap
来支持传入多参数。
import multiprocessing as mp
def f(a, b):
return (a, b)
if __name__ == '__main__':
pool = mp.Pool()
res = pool.starmap(f, ((1, 2), ('a', 'b')))
print(res)
# 输出
[(1, 2), ('a', 'b')]
另一个传多个参数的方法是通过python中的魔术方法*args
:
import multiprocessing as mp
def f(*l):
a, b = l[0] # 为什么这里是l[0]而不是l?因为这里传入的l是一个嵌套元组((a,b),)。
return (a, b)
if __name__ == '__main__':
pool = mp.Pool()
res = pool.map(f, ((1, 2), ('a', 'b')))
print(res)
# 输出
[(1, 2), ('a', 'b')]
亦或使用**kwargs
:
import multiprocessing as mp
def f(*l):
d = l[0] # 这里l为({'a': 1, 'b': 2},)
return (d['a'], d['b'])
if __name__ == '__main__':
pool = mp.Pool()
res = pool.map(f, ({'a': 1, 'b': 2}, {'a': 3, 'b': 4}))
print(res)
# 输出
[(1, 2), (3, 4)]
注:需要注意的是map
和starmap
都是主进程阻塞的,无论是否调用join()
函数。
共享内存和进程锁
一般情况下,各个进程中的数据变量是无法发生交流的,但我们可以通过使用Value
数据存储在一个共享的内存表中。
import multiprocessing as mp
import time
def job(v, num):
for _ in range(5):
time.sleep(0.1) # 暂停0.1秒,让输出效果更明显
v.value += num # v.value获取共享变量值
print(v.value)
def multicore():
v = mp.Value('i', 0) # 定义共享变量
p1 = mp.Process(target=job, args=(v, 1))
p2 = mp.Process(target=job, args=(v, 3)) # 设定不同的number看如何抢夺内存
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicore()
# 输出
1
5
9
13
17
4
8
12
16
20
在上面的代码中,我们定义了一个共享变量v,两个进程都可以对它进行操作。 在job()中我们想让v每隔0.1秒输出一次累加num的结果,但是在两个进程p1和p2 中设定了不同的累加值。很明显可以看到他们发生了冲突。
为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。
import multiprocessing as mp
def job(v, num, l):
l.acquire() # 锁住
for _ in range(5):
time.sleep(0.1)
v.value += num # 获取共享内存
print(v.value)
l.release() # 释放
def multicore():
l = mp.Lock() # 定义一个进程锁
v = mp.Value('i', 0) # 定义共享内存
p1 = mp.Process(target=job, args=(v, 1, l)) # 需要将lock传入
p2 = mp.Process(target=job, args=(v, 3, l))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicore()
# 输出
3
6
9
12
15
16
17
18
19
20
需要主义的是上面可能仍然会发生冲突——p1先执行还是p2先执行的问题。为了解决这个问题我们可以在start
,join
中决定他们的顺序。
import multiprocessing as mp
def job(v, num, l):
l.acquire() # 锁住
for _ in range(5):
time.sleep(0.1)
v.value += num # 获取共享内存
print(v.value)
l.release() # 释放
def multicore():
l = mp.Lock() # 定义一个进程锁
v = mp.Value('i', 0) # 定义共享内存
p1 = mp.Process(target=job, args=(v, 1, l)) # 需要将lock传入
p2 = mp.Process(target=job, args=(v, 3, l))
p1.start()
p1.join()
p2.start()
p2.join()
if __name__ == '__main__':
multicore()
# 输出
1
2
3
4
5
8
11
14
17
20
对于start和join方法:
- start方法表示进程开始运行;
- join表示主进程阻塞,等待子进程完成;
关于守护进程
1、守护进程会在主进程代码执行结束后就终止,不论子进程是否执行完;
2、守护进程内无法再开启子进程,否则抛出异常。若想子进程可继续开启子进程,则需将主进程设置为非守护进程,即deamon=False
。若重写继承Process类的话则可参考 Python Multiprocessing 将进程设置为非守护进程;
如果我们有两个任务需要
并发
执行,那么开一个主进程和一个子进程分别去执行就ok了;
如果子进程的任务在主进程任务结束后就没有存在的必要了,那么该子进程应该在开启前就被设置成守护进程;
主进程代码运行结束,守护进程随即终止;
参考
Python多进程编程
多进程和守护进程 (推荐)