python多进程(multiprocessing)归纳

序言

为了让程序更快更高更强,每一代的程序员真的都费尽心思,python作为一种动态语言/脚本语言,一直被某些崇尚C、java的程序员们诟病的一般是其运行速度,嫌弃它很慢。一方面相较于静态语言,其每行都要即时编译又要即时赋予数据类型的方式的确会限制它的速度,但另一方面从写程序的角度而言,由于其简洁的语法和便于理解的写法也极大的提高了写的速度,所以现在也就变成了一个快速部署上手的“胶水语言”。

但即使是python爱好者,一旦遇到大的数据量的时候,就不禁开始嫌弃它的速度,不断地寻找方法去加快再加快,成为了永恒的问题。

除了优化代码逻辑以外,但这个其实也是很多人最不想做的,毕竟需要了解的东西很多也很底层,例如可变与不可变容器哈希散列序列化等等,都是些可知不可知的东西,所以都很多人都处于半懂不懂的状态。。。回到正题,这里,我们来总结和给一些例子,罗列一下几个常用的python的多线程的模块以及其用法和需要注意的细节。

提到的模块

  1. threading
  2. multiprocessing
  3. concurrent.futures
  4. queue
  5. subprocess
  6. os

线程?进程?协程?

开篇我还是在老生常谈一下,线程与进程的概念,这真的是一个被讲烂了的一个概念,而且还疯狂的被抄来吵去,整个国内网上都是些晦涩难懂的东西。这里我简单解释一下。

线程(thread),进程(process)


进程与线程关系图

首先,从父子关系来说。进程最大,线程从属于进程,进程之间无法share memory,因为它们各有各的allocated chunk of memory (heap),但是同属于某个进程的线程间可以share memory,因为它们共享single allocated heap。由于线程之间切换时,需要CPU分块的关/开,所以开销较大,当处理IO密集时,就会很慢,所以后来就出现了协程

协程(coroutine):协程的作用,是在执行函数A时,可以随时中断,去执行函数B,然后中断继续执行函数A(可以自由切换)。但这一过程并不是一次调用(call),由程序自身控制切换,不需要让CPU进行切片。这一整个过程看似像多线程,然而协程只有一个线程执行。
协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序

这里由于讲的是多线程,我们就不细讲协程的问题。。。之后可能会写(想了解的人可以参考这一篇python中异步(async)IO

平行(Parallel)?并行(Concurrent)?

单核电脑可以做 并行/并发计算,但不能做平行计算。
所以并行(Concurrent)其实是一种电脑的手段,通过一些快速切换达到一种多个任务利用overlap的时间进行操作为啥可以overlap,因为有些任务需要等待,此时就是别的任务计算的好时机,但平行(Parallel)是真正意义上的同时计算。
所以说平行(Parallel)必须发生在多核的计算机上。

汇总示意图

如果把上述的概念放一起。。就会变成这样

Async IO in Python: A Complete Walkthrough

来自Async IO in Python: A Complete Walkthrough

模块间的对比

  • threading,多线程的模块,会因为python底层的全局解释器锁GIL而低效(甚至多线程比单线程还慢)

需要注意的是,GIL只会影响CPU密集型的任务
需要注意的是,GIL只会影响CPU密集型的任务
需要注意的是,GIL只会影响CPU密集型的任务
重要的事情说三遍,不要什么速度慢的锅都丢给GIL。

  • multiprocessing,多进程的模块,每个进程有独立的GIL,绕过了threading低效的原因,但也同样带来了线程间数据通讯和同步的困难,进程间无法share memory(复习上面进程与线程的差异。)。
  • os,本质还是调用系统,让系统去执行一个进程。但很难管理和调取结果,跟使用shell脚本类似
  • subprocess,同样与os是一样的,但是由于属于os的一种wrapper,所以借口更全面丰富一点。相较于使用os.system,我会更喜欢使用subprocess.check_callsubprocess.check_putput

不同的任务用不同的思想

俗话说得好,因地制宜、因材施教。不同类型的任务会有不同的解决方法,这里我对每种任务可能只提出一种解决方法吧吧吧吧。。。(说不定就嫌弃然后写了几种)
怎么分任务呢?
从需求上去分别是,CPU密集型还是I/O密集型
从代码/实际实现上去看,分别是一样参数分批执行,还是每个动态的参数
从结果来看,需不需要获取结果

下面会再提到以上几点。

PS:对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。(下面用time.sleep达到IO的效果)

开始写代码了!

写多线程的代码之前,大家总会有点疑问。我拿什么来测试,ok,所以一开始,我们先定义些mock data/function来便于后面的测试和评估。

mock data/function

# 来自 https://code.tutsplus.com/articles/introduction-to-parallel-and-concurrent-programming-in-python--cms-28612
import os
import time
import threading
import multiprocessing
def IO_task():
    """ Do nothing, wait for a timer to expire """
    print("PID: %s, Process Name: %s, Thread Name: %s" % (
        os.getpid(),
        multiprocessing.current_process().name,
        threading.current_thread().name)
    )
    time.sleep(1)
def CPU_task():
    """ Do some computations """
    print("PID: %s, Process Name: %s, Thread Name: %s" % (
        os.getpid(),
        multiprocessing.current_process().name,
        threading.current_thread().name)
    )
    x = 0
    while x < 10000000:
        x += 1

以上设计两个函数,一个主要是进行time.sleep的操作,模拟等待。另一个主要是进行一些真的需要计算并且很久的任务。其实最主要的就是print函数中,获取进程,线程各自id的操作。这样才能透明清晰的知道当前在哪,在干啥

如何写多线程(basic)

写之前,一定要再次重复一些话,要理解这是在干嘛。否则只会confuse再后面的操作。
那,是在做什么呢? 一言蔽之:分发
不论是线程?进程?协程?,程序员能做的都是将一个任务(worker函数) 给一个中央调度(scheduler)去进行调度,我们不做底层的事情,所以这里做的事情最主要的就是分发。

## Run tasks serially
start_time = time.time()
for _ in range(NUM_WORKERS):
    only_sleep()
end_time = time.time()
 
print("Serial time=", end_time - start_time)
 
# Run tasks using threads
start_time = time.time()
threads = [threading.Thread(target=only_sleep) for _ in range(NUM_WORKERS)]
[thread.start() for thread in threads]
[thread.join() for thread in threads]
end_time = time.time()
 
print("Threads time=", end_time - start_time)
 
# Run tasks using processes
start_time = time.time()
processes = [multiprocessing.Process(target=only_sleep()) for _ in range(NUM_WORKERS)]
[process.start() for process in processes]
[process.join() for process in processes]
end_time = time.time()
 
print("Parallel time=", end_time - start_time)

PID: 95726, Process Name: MainProcess, Thread Name: MainThread
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
PID: 95726, Process Name: MainProcess, Thread Name: MainThread
Serial time= 4.018089056015015
PID: 95726, Process Name: MainProcess, Thread Name: Thread-1
PID: 95726, Process Name: MainProcess, Thread Name: Thread-2
PID: 95726, Process Name: MainProcess, Thread Name: Thread-3
PID: 95726, Process Name: MainProcess, Thread Name: Thread-4
Threads time= 1.0047411918640137
PID: 95728, Process Name: Process-1, Thread Name: MainThread
PID: 95729, Process Name: Process-2, Thread Name: MainThread
PID: 95730, Process Name: Process-3, Thread Name: MainThread
PID: 95731, Process Name: Process-4, Thread Name: MainThread
Parallel time= 1.014023780822754

值得注意的一点是,也是因为我写这篇文章时用的是win10,所以也就注意到了python中multiprocessing在不同的系统上的策略的差异。像是以上代码,预期中,multiprocessing下的IOtasks应该耗时接近1秒左右,在并行,但事实上,并不是,猜测是win下难以启动process,从而导致根本执行不起来。

从以上的代码来看,其实写一个多线程真的很简单。

  • 写一个函数
  • 初始化一个process或者一个thread
  • 启动
    就可以了。。其实就可以大大的节省时间以及利用好自己电脑的多线程,但真正的问题还在后面~~

如何写好多线程(advanced)

如何写好多线程这一部分,我们从几个问题出发,加展示code

获取并行计算后的结果?
from multiprocessing import Pool
import multiprocessing as mp
import time
def IO_tasks(value):
    print("Process Name: %s, calculating %s" % (
        mp.current_process().name,value))
    time.sleep(1)
    value = value **2
    return value
# 这里稍微修改一下之前的IO tasks
# 用map直接获取一个list的返回结果
start_time = time.time()
with Pool(processes=4) as pool:
# 初始化一个线程池
    result = pool.map(IO_tasks, range(100))
    # 使用一个线程池的实例,利用其进行分发任务和对应的参数
end_time = time.time()
print("Parallel time=", end_time - start_time)
# 用imap获取一个iterator,然后再手动write out,这样不耗内存
with Pool(processes=4) as pool:
# 初始化一个线程池
    for i in pool.imap(IO_tasks, range(10)):
        print(i)
    # 使用一个线程池的实例,利用其进行分发任务和对应的参数
end_time = time.time()
print("Parallel time=", end_time - start_time)

以上代码可以获取到结果,甚至也可以非常方便的使用map来将不同的参数给到同一个任务(worker)上。
但有个问题,返回的结果是乱序(unorder)的
如果有人仔细的阅读multiprocessing的文档(例如我),就会发现其中Pool的实例下其实有很多种类似于map的方法,其中就有map map_async imap_unordered imap,熟练python的同学可能一下子就看出其中的区别,例如i肯定就是iterator的意思,async就是同步的意思。

pool的几种分发方式比较

map会将传给它的iterable转化成list并进行分块(chunks),然后将这些chunks分发给每个process。
所以它高内存消耗,但会稍微快一点
imap默认情况下不会将其转化并分块,只会1个1个的传给process。
所以对于大的iterable的任务,会比`map`慢一点,而且它的顺序也是正确的!!!
imap_unorder类似于imap
但它的顺序会是乱序,对IO密集型来说与imap在速度上无显著差异
map_async,有点类似于map,但有个显著的差异,它需要等待所有任务结束才会返回结果,要不是完整的结果,要不没有结果。。。。
好处是它的顺序跟你input的顺序是一致的
apply其实是apply_async().get()的实现,执行get的话会等到该任务返回结果。
所以很慢。。。甚至类似于线性叠加的任务
apply_async是单独的执行一个任务,但会返回一个AsyncResult,这个result可以获取结果(阻塞),也可以暂时不获取(不阻塞)。
略无用。。。

如何通讯?

其实说呢,最好不要通讯,通讯的开销真的很大,至少对于multiprocessing来说是这样的,都是不同的进程,原来理论上就是不可以通讯的,当然由于启动一个进程完全由python控制,可以在初始化导入一定的对象,这样可以做到类似于通讯的功能。

但如果强行要通讯呢。。。
multiprocessing提供了两种可以在进程间交换的object。example

  • Queue 这个属于queue.Queue的copy,通过put做到类似于list.append的inplace添加的操作,然后用get在别的线程中获取
  • Pipe()这就跟它的名字很像了,有个管子,可以从父连接,传递到子连接

除此之外,还有一种类似于线程间的同步的Lock的操作。example. 这里就不讲怎么用Lock了

当然,上面几种都不属于shared memory的东西,仅仅是长得像。multiprocessing中提供一些数据类型,可以达到shared memory的操作。example

为什么用Queue?什么时候用?
Queue有个好处就是,它会等待,直到有东西出现。例如,你准备了一堆任务A,它会输出到QueueB,然后之前你已经初始化了一堆嗷嗷待哺的进程C,他们需要再QueueB中get东西并计算,这时,直到A完成并写入到了QueueB,进程C们才会开始跑,之前则一直在等待。具体例子可以看以下,是一个很好的producer-consumer的模型。(来自multiprocessing的doc)

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()
潜在的问题?

multiprocessing由于是多进程,每个进程其实都有一个自己独立的放数据的区块,所以如果你传进去的参数很大,这也就意味着它会拷贝多次这个很大的数据!!!! 死亡操作警告
然后我们会自然而然的想到python中用来节省内存的 序列对象 generator,不知道的童鞋自己面壁吧。但今天我在使用generator时,遇到了个神奇的情况,代码每个地方都在使用生成器,并且处理完以后也没有收集起来。但是我电脑的内存却随着迭代次数的增加而疯狂上升直到死机。。。。
这时,我在stackoverflow上看到了一个问题Memory usage steadily growing for multiprocessing.Pool.imap_unordered(感谢谷歌)。在这个问题的解答中,我们找到了原因。

mutlprocessing的imap会初始化一个thread去获取传递的generator,并且还会储存下来???然后导致这个thread的内存占用随着时间增长逐渐爆炸。。。解决方法也同样在该问题中。使用threading下的 Semaphore class进行操作,使得该线程及时释放掉资源。从而解决了问题。

最后的最后,讲一下死亡操作。DeadLock

DeadLock这个东西吧,听名字就很牛批,反正就是自己把自己搞死了的既视感。简单来说,就是两个进程都在等对方释放资源。(天呀,好感人。。并不

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

multiprocessing中如果使用queue真的很容易遇到deadlock的情况,因为如果使用了p.join,那么这个进程就不会结束,直到这个queue的东西被消耗了。
解决的方法也很简单。。简单的调换最后两行或者干脆直接去掉.join

写的好累。。。我要摸鱼了。。。不想写了。。。
告辞
有问题可以评论我,应该会回的,毕竟可能明天还要写个协程的

reference

  1. https://en.wikipedia.org/wiki/Scripting_language
  2. Concurrent Execution
  3. 平行(parallel)计算与并行(concurrent)计算
  4. python进程、线程、协程
  5. concurrency和parallelism的区别
  6. python GIL 全局锁
  7. python3 cookbook chapter 9 并发编程
  8. wiki 协程
  9. python 协程 (nMask's Blog)
  10. imap/imap_unordered和 map/map_async的差异
  11. python中异步(async)IO
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容

  • 一. 操作系统概念 操作系统位于底层硬件与应用软件之间的一层.工作方式: 向下管理硬件,向上提供接口.操作系统进行...
    月亮是我踢弯得阅读 5,952评论 3 28
  • 多进程 要让python程序实现多进程,我们先了解操作系统的相关知识。 Unix、Linux操作系统提供了一个fo...
    蓓蓓的万能男友阅读 591评论 0 1
  • 必备的理论基础 1.操作系统作用: 隐藏丑陋复杂的硬件接口,提供良好的抽象接口。 管理调度进程,并将多个进程对硬件...
    drfung阅读 3,525评论 0 5
  • 写在前面的话 代码中的# > 表示的是输出结果 输入 使用input()函数 用法 注意input函数输出的均是字...
    FlyingLittlePG阅读 2,730评论 0 8
  • 线程 操作系统线程理论 线程概念的引入背景 进程 之前我们已经了解了操作系统中进程的概念,程序并不能单独运行,只有...
    go以恒阅读 1,632评论 0 6