Python:多进程同步共享全局变量(锁,计数器,原子布尔)

摘要:Python,多进程

多进程变量同步的场景和方法

场景:在使用Python多进程并行时需要在进程间共享变量,这些共享的变量可以更好地控制和把握任务执行的情况,比如查看任务进度,提前停止任务等
方法:在多线程中变量共享在主线程中定义变量,在每个子线程中使用global关键字拿到变量,再配合threading.RLock()在对变量操作时拿到和释放锁(acquirerelease)即可,但是在多进程中,变量是放在不同子进程的数据区中,每个进程都是独立的地址空间,所以用一般的方法是不能共享变量的,multiprocessing模块提供了ArrayManagerValue类来定义共享变量,能够实现进程间共享数字,字符串,列表,字典,实例对象的变量共享


共享整数变量

对于整数的多进程共享是常用的场景,比如使用多进程并行任务,需要记录执行日志记录任务进度,实例代码如下

import multiprocessing
from multiprocessing import Pool, Lock, Value

from utils.logger_utils import logging_

LOGGER = logging_("predict_main", os.path.join(ROOT_PATH, "./logs/details.log"))
lock = Lock()
Counter = Value('i', 0)
ENT_LIST = list(set([line.strip().replace("(", "(").replace(")", ")") for line in open(os.path.join(BASIC_PATH, "data/ent_name_predict.txt"), "r", encoding="utf8").readlines()]))
TOTAL = len(ENT_LIST)

def get_one_res(data):
    global TOTAL, lock, Counter
    res = {}
    try:
        ent_name = data
        res = get_feature(ent_name, PREDICT_DATE)
        res["updatedate"] = PREDICT_DATE
        res["uid"] = get_md5(formatted_ent(ent_name))
    except Exception as e:
        LOGGER.error(data + ":错误:" + e.args[0])
    finally:
        with lock:
            Counter.value += 1
        LOGGER.info("执行完成:(%d / %d) 进程号: %d --------------- %s", Counter.value, TOTAL, os.getpid(), data)
    return res


if __name__ == '__main__':
    pool = Pool(int(get_string("process_num")))
    res = pool.map(get_one_res, ENT_LIST)
    LOGGER.info("全部执行完成,关闭进程池")
    pool.close()
    pool.join()

运行查看执行日志

2021-11-18 15:19:15 [predict_main] INFO [42] 执行完成:(1 / 1400) 进程号: 15 --------------- 深圳顺亚投资有限公司
2021-11-18 15:19:16 [predict_main] INFO [42] 执行完成:(2 / 1400) 进程号: 25 --------------- 芜湖新扬投资合伙企业(有限合伙)
2021-11-18 15:19:18 [predict_main] INFO [42] 执行完成:(3 / 1400) 进程号: 24 --------------- 保定隆瑞房地产开发有限公司
2021-11-18 15:19:19 [predict_main] INFO [42] 执行完成:(4 / 1400) 进程号: 11 --------------- 云南俊发凯丰房地产开发有限公司

在全局定义锁和计数器,Value('i', 0)代表定义的共享变量是int类型初始值是0,如果要定义double变量则使用Value('d', 0),相当于java里面的原子变量,在执行函数中调用with上下文在实行完任务后调用Counter.value += 1实现计数+1,最后在进程池中调用执行方法,每个并行的任务在执行完毕会调用锁进行计数器+1,同一时刻只有一个子进程拿到锁实现进程同步,如果不采用锁的方式,在日志中计数器会乱序,但是最终总的值相等


共享布尔变量

这种情况在全局中记录一个布尔变量,每次执行任务前拿到变量判断是否与预期一致,如果执行报错修改变量状态,多用于子进程中任务报错提前结束全部任务全部退出,代码如下

from multiprocessing import Pool, Lock, Manager
from ctypes import c_bool
import os

lock = Lock()
ERROR = Manager().Value(c_bool, False)


def run(fn):
    global tests_count, lock, ERROR
    if not ERROR.value:
        try:
            print('执行任务. PID: %d ' % (os.getpid()))
            1 / 0
        except Exception as e:
            with lock:
                ERROR.value = True
    else:
        print("子进程报错,任务结束")


if __name__ == "__main__":
    pool = Pool(10)
    # 80个任务,会运行run()80次,每次传入xrange数组一个元素
    pool.map(run, list(range(80)))
    pool.close()
    pool.join()

查看执行输出

执行任务. PID: 27374 
子进程报错,任务结束
子进程报错,任务结束
子进程报错,任务结束
...
Process finished with exit code 0

初始化一个共享变量为布尔类型为False,每个进程在执行前先拿到共享变量判断是否为False,是则执行任务,否则直接跳过执行。初始化布尔变量使用Manager类实例化后调用Value方法,c_bool是Ctypes下的数据类型,相关类型如下

另一种是在主进程中判断共享变量,调用map_async使得主进程不被子进程阻塞,主进程判断全局变量如果不符合预期直接退出,调用terminate终止线程池

from multiprocessing import Pool, Lock, Manager, Value
from ctypes import c_bool
import os
import time

lock = Lock()
ERROR = Manager().Value(c_bool, False)
COUNTER = Value('i', 0)


def run(fn):
    global tests_count, lock, ERROR
    try:
        time.sleep(2)
        1 / 0
    except:
        with lock:
            ERROR.value = True
    finally:
        with lock:
            COUNTER.value += 1
            print('执行任务(%d / %d). PID: %d ' % (COUNTER.value, 80, os.getpid()))


if __name__ == "__main__":
    pool = Pool(10)
    pool.map_async(run, list(range(80)))
    pool.close()
    print("主进程判断...")
    while COUNTER.value != len(list(range(80))):
        time.sleep(1)
        if ERROR.value:
            print("子进程报错,主进程提前退出")
            pool.terminate()
            break
    pool.join()

输出如下,每隔1秒中检查全局变量ERROR,如果变为True主进程终止进程池

主进程判断...
执行任务(1 / 80). PID: 4168 
执行任务(2 / 80). PID: 4169 
执行任务(3 / 80). PID: 4177 
执行任务(4 / 80). PID: 4171 
执行任务(5 / 80). PID: 4173 
执行任务(6 / 80). PID: 4182 
执行任务(7 / 80). PID: 4183 
执行任务(8 / 80). PID: 4174 
执行任务(9 / 80). PID: 4179 
执行任务(10 / 80). PID: 4181 
子进程报错,主进程提前退出

Process finished with exit code 0

一个实用的例子是多进程找一个列表中符合要求第一个值,如果找到则退出多进程

from multiprocessing import Pool, Lock, Manager, Value
from ctypes import c_bool
import os
import time

lock = Lock()
FOUND = Manager().Value(c_bool, False)
COUNTER = Value('i', 0)


def run(fn):
    global tests_count, lock, ERROR
    try:
        time.sleep(2)
        res = fn + 1
        if res == 10:
            print("结果是:{}".format(fn))
            with lock:
                FOUND.value = True
            return fn
    except Exception as e:
        print(e)
    finally:
        with lock:
            COUNTER.value += 1
            print('执行任务(%d / %d). PID: %d ' % (COUNTER.value, 80, os.getpid()))


if __name__ == "__main__":
    t1 = time.time()
    pool = Pool(10)
    pool.map_async(run, list(range(80)))
    pool.close()
    print("主进程判断...")
    while COUNTER.value != len(list(range(80))):
        time.sleep(1)
        if FOUND.value:
            print("已找到结果")
            pool.terminate()
            break
    pool.join()
    t2 = time.time()
    print(t2 - t1)

共享字典和数组变量

使用Manager近创建,Manager().dict()Manager().list(),测试代码如下

from multiprocessing.pool import Pool
from multiprocessing import Manager, Lock
import time
import datetime

LOCK = Lock()
DICT = Manager().dict()
LIST = Manager().list()


def job(ent):
    with LOCK:
        if len(LIST) < 5:
            time.sleep(1)
            LIST.append(ent)
        else:
            if len(LIST) and ent <= len(LIST) - 1:
                LIST.pop(ent)
        print("dt:{}".format(datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")), "ent:{}".format(ent),
              "LIST:{}".format(LIST))


def job2(ent):
    if len(LIST) < 5:
        time.sleep(1)
        LIST.append(ent)
    else:
        if len(LIST) and ent <= len(LIST) - 1:
            LIST.pop(ent)
    print("dt:{}".format(datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")), "ent:{}".format(ent),
          "LIST:{}".format(LIST))


if __name__ == '__main__':
    pool = Pool(10)
    pool.map(job2, list(range(10)) * 2)
    pool.close()
    pool.join()

执行job输出如下,输出结果和单进程单线程的结果一致,按照顺序每个进程在锁外排队

dt:2021-11-20 22:01:28 ent:0 LIST:[0]
dt:2021-11-20 22:01:29 ent:1 LIST:[0, 1]
dt:2021-11-20 22:01:30 ent:2 LIST:[0, 1, 2]
dt:2021-11-20 22:01:31 ent:3 LIST:[0, 1, 2, 3]
dt:2021-11-20 22:01:32 ent:4 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:5 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:6 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:7 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:8 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:9 LIST:[0, 1, 2, 3, 4]
dt:2021-11-20 22:01:32 ent:0 LIST:[1, 2, 3, 4]
dt:2021-11-20 22:01:33 ent:1 LIST:[1, 2, 3, 4, 1]
dt:2021-11-20 22:01:33 ent:2 LIST:[1, 2, 4, 1]
dt:2021-11-20 22:01:34 ent:3 LIST:[1, 2, 4, 1, 3]
dt:2021-11-20 22:01:34 ent:4 LIST:[1, 2, 4, 1]
dt:2021-11-20 22:01:35 ent:5 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:6 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:7 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:8 LIST:[1, 2, 4, 1, 5]
dt:2021-11-20 22:01:35 ent:9 LIST:[1, 2, 4, 1, 5]

Process finished with exit code 0

如果执行job2则不控制共享变量的同步则完全失控,报错pop index out of range,原因是在对变量操作时有其他进程也在操作得到意想不到的结果

dt:2021-11-20 22:03:02 ent:0 LIST:[0, 1, 2]
dt:2021-11-20 22:03:02 ent:1 LIST:[0, 1, 2]
dt:2021-11-20 22:03:02 ent:2 LIST:[0, 1, 2]
dt:2021-11-20 22:03:02 ent:3 LIST:[0, 1, 2, 3, 5]
dt:2021-11-20 22:03:02 ent:5 LIST:[0, 1, 2, 3, 5, 4, 6]
dt:2021-11-20 22:03:02 ent:4 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8]
dt:2021-11-20 22:03:02 ent:6 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8, 7]
dt:2021-11-20 22:03:02 ent:9 LIST:[0, 1, 2, 3, 5, 4, 6, 9, 8, 7]
...
    raise self._value
IndexError: pop index out of range

共享自定义对象

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容