以下介绍几种多进程方法。(未完待更新)
multiprocessing.Pool() 多进程
import os
import time
import pandas as pd
from multiprocessing import Pool
# =============================================================================
# pool.apply_async(func_child, args=(i,), callback=func_callback)
# func_child: 需要作为子进程运行的函数
# args: 函数 func_child 所需参数
# callback:回调功能。指定回调函数,此例子中利用回调函数保存数据
# =============================================================================
# 子进程要执行的代码
def run_proc(idx):
if idx%5 == 0:
time.sleep(0.2) # 若 idx 是 5 的 倍数则休眠 0.2s
df = pd.DataFrame([{"idx": idx}])
print(idx, os.getpid())
return df
# 回调函数 callback_to_csv 的 参数 x 自动获取子进程 run_proc return 的消息
def callback_to_csv(x):
"""自动获取子进程消息结果存入 csv"""
if os.path.exists("res.csv"):
x.to_csv("res.csv", index=False, header=False, mode="a")
else:
x.to_csv("res.csv", index=False, header=True)
if __name__=='__main__':
pool = Pool() # 创建实例,进程数默认CPU核数,若调整可 Pool(4) 则设置为 4 进程
for idx in range(100): #len(data)
pool.apply_async(run_proc, (idx,), callback=callback_to_csv)
pool.close() # join() 前需要 close(), close() 之后无法添加新进场
pool.join() # 等待所有子进程执行完毕
print("结束")
注意:
若在编辑器里选中或全选代码片段运行,会发现进程被阻塞,原因是
multiprocessing
模块在交互模式下是不支持的。解决办法是代码保存为py
文件,例如a.py
。在IDE
编辑器中以整个文件来运行,通常是F5
;或者我们在cmd
中 直接python a.py
即可正常执行子程序。经测试在
Spyder
中以整个文件运行,结果正常,只是子进程run_proc
中的print
打印无效,if __name__=='__main__':
下的print
打印有效。目前用到的
logging
日志模块类TimedRotatingFileHandler
不支持多进程,导致日志无法正常打印,IO
读写错误和子进程功能失效等莫名其妙等原因,暂时还未找到解决方案,后续将优化日志模块。Pool()
的进程皆是相互独立的,因此打印及输出的结果有可能是杂乱无章,没有顺序的。如果业务上对输出的结果顺序很重视,可在输出结果上带有一个id
号,用于后期补偿在结果数据上以id
进行排序。
multiprocessing.Process() 多进程
......
os.fork() (限于Unix/Linux/Mac平台)
......