理解Python进程Process

Demo代码和引用知识点都参考自<a href="http://mp.weixin.qq.com/s/Km-HAY9Jbg5hbuTl5Xj8dw">《理解Python并发编程一篇就够了 - 进程篇》--董伟明</a>或作者个人公众号Python之美, 《Python Cookbook》和<a href="http://www.liaoxuefeng.com/">廖雪峰Python3教程</a>。

基本使用

运用多进程时,将方法放在main()中,否则会出现异常警告。

Process() 基本使用:与Thread()类似。

Pool() 基本使用:

其中map方法用起来和内置的map函数一样,却有多进程的支持。

from multiprocessing import Pool
pool = Pool(2)
pool.map(fib, [35] * 2)

multiprocessing.dummy 模块:

multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module.

对于以上部分知识点,没有实际运用过,只是单纯了解并编写Demo进行了练习,理解没有很透彻。

# -*- coding: utf-8 -*-
from multiprocessing import Process, Pool
from multiprocessing.dummy import Pool as DummyPool
import time
import datetime

def log_time(methond_name):
    def decorator(f):
        def wrapper(*args, **kwargs):
            start_time = time.time()
            res = f(*args, **kwargs)
            end_time = time.time()
            print('%s cost %ss' % (methond_name, (end_time - start_time)))
            return res
        return wrapper
    return decorator

def fib(n):
    if n <=2 :
        return 1
    return fib(n-1) + fib(n-2)

@log_time('single_process')
def single_process():
    fib(33)
    fib(33)

@log_time('multi_process')
def multi_process():
    jobs = []
    for _ in range(2):
        p = Process(target=fib, args=(33, ))
        p.start()
        jobs.append(p)
    for j in jobs:
        j.join()


@log_time('pool_process')
def pool_process():
    pool = Pool(2)
    pool.map(fib, [33]*2)


@log_time('dummy_pool')
def dummy_pool():
    pool = DummyPool(2)
    pool.map(fib, [33]*2)


if __name__ == '__main__':
    single_process()
    multi_process()
    pool_process()
    dummy_pool()

基于Pipe的parmap

理解稍有困难。


队列

实现生产消费者模型,一个队列存放任务,一个队列存放结果。
multiprocessing模块下也有Queue,但不提供task_done()join()方法。故利用Queue存放结果,JoinableQueue() 来存放任务。

仿照的Demo,一个消费者进程和一个生产者进程:

# -*- coding: utf-8 -*-
from multiprocessing import Process, Queue, JoinableQueue
import time
import random

def double(n):
    return n * 2 

def producer(name, task_q):
    while 1:
        n = random.random()
        if n > 0.8:  # 大于0.8时跳出
            task_q.put(None)
            print('%s break.' % name)
            break
        print('%s produce %s.' % (name, n))
        task_q.put((double, n))
    

def consumer(name, task_q, result_q):
    while 1:
        task = task_q.get()
        if task is None:
            print('%s break.' % name)
            break
        func, arg = task
        res = func(arg)
        time.sleep(0.5)  # 阻塞
        task_q.task_done()
        result_q.put(res)
        print('%s consume %s, result %s' % (name, arg, res))

def run():
    task_q = JoinableQueue()
    result_q = Queue()
    processes = []
    p1 = Process(name='p1', target=producer, args=('p1', task_q))
    c1 = Process(name='c1', target=consumer, args=('c1', task_q, result_q))
    p1.start()
    c1.start()
    processes.append(p1)
    processes.append(c1)

    # join()阻塞主进程
    for p in processes:
        p.join()

    # 子进程结束后,输出result中的值
    while 1:
        if result_q.empty():
            break
        result = result_q.get()
        print('result is: %s' % result)

if __name__ == '__main__':
    run()

如果存在多个consumer()进程,只会有一个consumer()进程能取出None并break,其他的则会在task_q.get()一直挂起,尝试在consumer()方法中添加超时退出。

import queue

def consumer(name, task_q, result_q):
    while 1:
        try:
            task = task_q.get(1)  # 1s
        except queue.Empty:
            print('%s time out, break.' % name)
        if task is None:
            print('%s break.' % name)
            break
        func, arg = task
        res = func(arg)
        time.sleep(0.5)  # 阻塞
        task_q.task_done()
        result_q.put(res)
        print('%s consume %s, result %s' % (name, arg, res))

共享内存

利用sharedctypes中的Array, Value来共享内存。
下例为仿照。

# -*- coding: utf-8 -*-

from pprint import pprint

# 共享内存
from multiprocessing import sharedctypes, Process, Lock
from ctypes import Structure, c_bool, c_double

pprint(sharedctypes.typecode_to_type)

lock = Lock()


class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]  # _fields_


def modify(n, b, s, arr, A):
    n.value **= 2
    b.value = True
    s.value = s.value.upper()
    arr[0] = 10
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':

    n = sharedctypes.Value('i', 7)
    b = sharedctypes.Value(c_bool, False, lock=False)
    s = sharedctypes.Array('c', b'hello world', lock=lock)  # bytes
    arr = sharedctypes.Array('i', range(5), lock=True)
    A = sharedctypes.Array(Point, [(1.875, -6.25), (-5.75, 2.0)], lock=lock)
    p = Process(target=modify, args=(n, b, s, arr, A))
    p.start()
    p.join()
    print(n.value)
    print(b.value)
    print(s.value)
    print(arr[:])
    print([(a.x, a.y) for a in A])

实际项目中利用Value来监测子进程的任务状态, 并通过memcached来存储更新删除。

# -*- coding: utf-8 -*-

from multiprocessing import Process, Value
import time
import datetime
import random


FINISHED = 3
FAILED = 4
INPROCESS = 2
WAITING = 1

def execute_method(status, process):
    time.sleep(1)
    status.value = INPROCESS  # test
    time.sleep(1)
    status.value = FINISHED  # test
    time.sleep(0.5)

def run(execute_code):
    status = Value('i', WAITING )
    process = Value('f', 0.0)
    # mem_cache.set('%s_status' % execute_code, status.value, 0)
    # mem_cache.set('%s_process' % execute_code, process .value, 0)
    p = Process(target=execute_method, args=(status, process))
    p.start()
    start_time = datetime.datetime.now()
    while True:
        print(status.value)
        now_time = datetime.datetime.now()
        if (now_time - start_time).seconds > 30:  # 超过30sbreak
            # mem_cache.delete('%s_status' % execute_code)
            # mem_cache.delete('%s_process' % execute_code)
            print('execute failed')
            p.terminate()
            break
        if status.value == 3:
            # mem_cache.delete('%s_status' % execute_code)
            # mem_cache.delete('%s_process' % execute_code)
            print('end execute')
            break
        else:
            # mem_cache.set('%s_status' % execute_code, status.value, 0)
            # mem_cache.set('%s_process' % execute_code, process .value, 0)
            print('waiting or executing')
        time.sleep(0.5)
    p.join()

服务进程

下例为仿照博客中的服务进程的例子,简单的展示了Manager的常见的共享方式。

一个multiprocessing.Manager对象会控制一个服务器进程,其他进程可以通过代理的方式来访问这个服务器进程。 常见的共享方式有以下几种:

  1. Namespace。创建一个可分享的命名空间。
  2. Value/Array。和上面共享ctypes对象的方式一样。
    dict/list。创建一个可分享的
  3. dict/list,支持对应数据结构的方法。
  4. Condition/Event/Lock/Queue/Semaphore。创建一个可分享的对应同步原语的对象。
# -*- coding: utf-8 -*-
from multiprocessing import Manager, Process

def modify(ns, lproxy, dproxy):
    ns.name = 'new_name'
    lproxy.append('new_value')
    dproxy['new'] = 'new_value'

def run():
    # 数据准备
    manager = Manager()
    ns = manager.Namespace()
    ns.name = 'origin_name'
    lproxy = manager.list()
    lproxy.append('origin_value')
    dproxy = manager.dict()
    dproxy['origin'] = 'origin_value'
    
    # 子进程
    p = Process(target=modify, args=(ns, lproxy, dproxy))
    p.start()
    print(p.pid)
    p.join()

    print('ns.name: %s' % ns.name)
    print('lproxy: %s' % lproxy)
    print('dproxy: %s' % dproxy)

if __name__ == '__main__':
    run()

上例主要是展示了Manager中的共享对象类型和代理,查看源码知是通过register()方法。

multiprocessing/managers.py:

#
# Definition of SyncManager
#

class SyncManager(BaseManager):
    '''
    Subclass of `BaseManager` which supports a number of shared object types.

    The types registered are those intended for the synchronization
    of threads, plus `dict`, `list` and `Namespace`.

    The `multiprocessing.Manager()` function creates started instances of
    this class.
    '''

SyncManager.register('Queue', queue.Queue)
SyncManager.register('JoinableQueue', queue.Queue)
SyncManager.register('Event', threading.Event, EventProxy)
SyncManager.register('Lock', threading.Lock, AcquirerProxy)
SyncManager.register('RLock', threading.RLock, AcquirerProxy)
SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy)
SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
                     AcquirerProxy)
SyncManager.register('Condition', threading.Condition, ConditionProxy)
SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
SyncManager.register('Pool', pool.Pool, PoolProxy)
SyncManager.register('list', list, ListProxy)
SyncManager.register('dict', dict, DictProxy)
SyncManager.register('Value', Value, ValueProxy)
SyncManager.register('Array', Array, ArrayProxy)
SyncManager.register('Namespace', Namespace, NamespaceProxy)

# types returned by methods of PoolProxy
SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False)
SyncManager.register('AsyncResult', create_method=False)

除了在子进程中,还可利用Manager()来在不同进程间通信,如下面的分布式进程简单实现。


分布进程

和上例的主要区别是,非子进程间进行通信。

manager_server.py:

# -*- coding: utf-8 -*-

from multiprocessing.managers import BaseManager

host = '127.0.0.1'
port = 8080
authkey = b'python'

shared_list = []

class ServerManager(BaseManager):
    pass

ServerManager.register('get_list', callable=lambda: shared_list)
server_manager = ServerManager(address=(host, port), authkey=authkey)
server = server_manager.get_server()
server.serve_forever()

manager_client.py

# -*- coding: utf-8 -*-

from multiprocessing.managers import BaseManager

host = '127.0.0.1'
port = 8080
authkey = b'python'

class ClientManager(BaseManager):
    pass

ClientManager.register('get_list')
client_manager = ClientManager(address=(host, port), authkey=authkey)
client_manager.connect()

l = client_manager.get_list()
print(l)

l.append('new_value')
print(l)

运行多次后,shared_list中会不断添加new_value

仿照廖雪峰教程上的分布式进程加以适当修改。

manager_server.py:

# -*- coding: utf-8 -*-

from multiprocessing.managers import BaseManager
from multiprocessing import Condition, Value
import queue

host = '127.0.0.1'
port = 8080
authkey = b'python'


task_q = queue.Queue(10)
result_q = queue.Queue(20)
cond = Condition()
done = Value('i', 0)

def double(n):
    return n * 2

class ServerManager(BaseManager):
    pass

ServerManager.register('get_task_queue', callable=lambda: task_q)
ServerManager.register('get_result_queue', callable=lambda: result_q)
ServerManager.register('get_cond', callable=lambda: cond)
ServerManager.register('get_done', callable=lambda: done)
ServerManager.register('get_double', callable=double)

server_manager = ServerManager(address=(host, port), authkey=authkey)
server = server_manager.get_server()

print('start server')
server.serve_forever()


manager_producer.py:

# -*- coding: utf-8 -*-

from multiprocessing.managers import BaseManager
import random
import time

host = '127.0.0.1'
port = 8080
authkey = b'python'

class ProducerManager(BaseManager):
    pass

ProducerManager.register('get_task_queue')
ProducerManager.register('get_cond')
ProducerManager.register('get_done')
producer_manager = ProducerManager(address=(host, port), authkey=authkey)

producer_manager.connect()
task_q  = producer_manager.get_task_queue()
cond = producer_manager.get_cond()
# done = producer_manager.get_done()
count = 20  # 最多有20个任务

while count > 0:
    if cond.acquire():
        if not task_q.full():
            n = random.randint(0, 10)
            task_q.put(n)
            print("Producer:deliver one, now tasks:%s" % task_q.qsize())
            cond.notify()
            count -= 1
            time.sleep(0.5)
        else:
            print("Producer:already full, stop deliver, now tasks:%s" % task_q.qsize())
            cond.wait() 
        cond.release()
# done.value = 1
print('Producer break')

manager_consumer.py:

# -*- coding: utf-8 -*-

from multiprocessing.managers import BaseManager

host = '127.0.0.1'
port = 8080
authkey = b'python'

class ConsumerManager(BaseManager):
    pass

ConsumerManager.register('get_task_queue')
ConsumerManager.register('get_result_queue')
ConsumerManager.register('get_cond')
# ConsumerManager.register('get_done')
ConsumerManager.register('get_double')

consumer_manager = ConsumerManager(address=(host, port), authkey=authkey)
consumer_manager.connect()

task_q = consumer_manager.get_task_queue()
result_q = consumer_manager.get_result_queue()
cond = consumer_manager.get_cond()
# done = consumer_manager.get_done()

while 1:
    if result_q.full():
        print('result queue is full')
        break
    if cond.acquire():
        if not task_q.empty():
            arg = task_q.get()
            res = consumer_manager.get_double(arg)
            print("Consumer:consume one, now tasks:%s" % task_q.qsize())
            result_q.put(res)
            cond.notify()
        else:
            print("Consumer:only 0, stop consume, products")
            cond.wait()
        cond.release()

while 1:
    if result_q.empty():
        break
    result = result_q.get()
    print('result is: %s' % result)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容

  • @(python)[笔记] 目录 一、什么是进程 1.1 进程的概念 进程的概念起源于操作系统,是操作系统最核心的...
    CaiGuangyin阅读 1,247评论 0 9
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,580评论 18 139
  • 今天我要在我的粉圈得瑟我妈腌的咸鸭蛋。 这位退休后打理了楼顶菜园,现阶段各种蔬菜,几只鸡鸭,生机勃勃。...
    fionacy阅读 150评论 0 1
  • 刚网上闲逛,发现了一则2009年百度问答里挺火热的问题,即“如何将QQ下载到桌面上”。 这不禁让我想起了自己第一次...
    化浊阅读 361评论 0 0
  • 精确率、召回率、F1 精确率 = TP / (TP + FP),表示返回的正例中真正例所占的比例;召回率 = TP...
    贰拾贰画生阅读 7,327评论 0 7