在上一篇中我们介绍了 caput 中的 memh5 模块,下面我们将介绍一个建立在 mpi4py 基础之上的有用工具 mpipool,允许我们非常方便地将一个函数并行地应用到一系列数据上。
下载与安装 mpipool
前往 https://github.com/adrn/mpipool 下载 mpipool。可以直接下载 .zip 格式的压缩包,并解压,或者使用 git 将该软件 clone 到本地:
$ git clone https://github.com/adrn/mpipool.git
进入软件顶层目录,使用以下命令进行安装:
$ python setup.py install [--user]
使用 mpipool
mpipool 软件包提供了 MPIPool 类,其定义及主要方法接口如下:
class MPIPool(object)
MPIPool 类。
__init__(self, comm=None, debug=False, loadbalance=False)
MPIPool 类初始化函数。参数 comm
如果非 None,则应该是一个有效的 mpi4py 通信子对象,否则使用 MPI.COMM_WORLD,debug
如果为 True,会输出很多执行过程信息,loadbalance
如果为 True,当 task 的数目比 worker 进程的数目多时,会首先给每个 worker 进程分配一个 task,然后只要某个 worker 进程完成了其 task,则会给其分配一个新的 task,直到所有的 task 都被分配完为止;如果为 False,则会将所有的 task 按照一种循环的方式分配给各个 worker 进程,此时所有 task 完成的时间会由计算最慢的那一个 worker 进程决定。
is_master(self)
master 进程(rank 为 0)会返回 True,worker 进程(所有其它进程)会返回 False。
wait(self)
worker 进程调用该方法后会在一个循环中等待 master 进程发送过来的指令并完成相应的动作。具体来说,worker 进程如果收到 master 进程发送过来的一个函数,则会用该函数替换其自身将要作用到 task 上的函数,如果收到一个 task,则会将函数作用到该 task上得到计算结果,如果收到一个结束消息,则会跳出循环结束该函数的调用。master 进程调用该方法则会出错。
map(self, function, tasks, callback=None)
一般由 master 进程调用,将一个函数 function
并行地应用到一系列数据 tasks
上,结果会按顺序组成一个列表返回,callback
如果非 None 则应该是一个接受一个参数的函数,在返回结果列表之前会在结果列表的每个元素上调用该函数。该方法在内部会首先将 function
发送给每个 worker,然后将 tasks
中的元素 task 按照一定的方式(取决于初始化时 loadbalance
的值) 分配给各个 worker 进程,每个 worker 进程收到一个 task 后就会将 function
作用到 task 上并将其返回值发送给 master 进程,master 进程收集到所有结果后组成一个列表返回。
close(self)
master 进程给所有 worker 进程发送一个结束消息使其退出 wait 中的循环。
另外 mpipool 软件包中定义了 MPIPoolException 异常,MPIPool 类的 map 方法在计算错误时会抛出该异常。
例程
下面给出简单的使用例程。
# mpipool_demo.py
import sys
import numpy as np
from mpipool import MPIPool
# define the function that will be applied to tasks
def worker(task):
x, y = task
return x**2 + 2*y
# create the pool
pool = MPIPool()
# only run map() on the master process, all other processes wait for their work
if not pool.is_master():
pool.wait()
# worker processes exit after they have done their work
sys.exit(0)
# the following code is executed by the master process only
# create some random input data
x = np.random.uniform(size=10)
y = np.random.uniform(size=10)
tasks = list(zip(x, y))
# crate a callback function
def cb(x):
print x
# map the function worker to tasks
# and execute them parallel by processes other than the master
results = pool.map(worker, tasks, callback=cb)
# close the pool
pool.close()
print 'results:', results
运行结果如下:
$ mpiexec -n 4 python mpipool_demo.py
1.57878266266
1.3217051898
1.01841188873
0.105959488365
0.468227329147
1.43413524955
2.36051906039
2.75875975664
0.343832407757
2.16220071718
results: [1.5787826626560353, 1.3217051897978418, 1.0184118887282427, 0.10595948836453307, 0.4682273291469893, 1.4341352495458153, 2.3605190603946116, 2.758759756642663, 0.34383240775747825, 2.1622007171816713]
mpipool 类定义了 __enter__() 和 __exit__() 方法,所以它也是一个上下文管理器,可以使用类似下面的 with 语句,在退出该 with 语句时会自动调用 MPIPool 类的 close() 方法。在上一个例程中,我们让worker 进程完成工作后就退出,当然也可以保留这些进程以执行后面的语句,如下面的例程所示:
# mpipool_demo1.py
import numpy as np
from mpipool import MPIPool
# define the function that will be applied to tasks
def worker(task):
x,y = task
return 5*x + y**2
with MPIPool() as pool:
# only run map() on the master process, all other processes wait for their work
if not pool.is_master():
pool.wait()
else:
# the following code is executed by the master process only
# create some random input data
x = np.random.uniform(size=10)
y = np.random.uniform(size=10)
tasks = list(zip(x, y))
# crate a callback function
def cb(x):
print x
# map the function worker to tasks
# and execute them parallel by processes other than the master
results = pool.map(worker, tasks, callback=cb)
print 'Done!'
运行结果如下:
$ mpiexec -n 4 python mpipool_demo3.py
1.44172730818
1.17103049432
1.11841437626
3.94003914985
4.40300079589
4.07541181493
4.48560962925
5.16898092718
2.26724097853
3.01897745495
Done!
Done!
Done!
Done!
更通用的 pool
以上介绍的 mpipool 软件包只包含 MPIPool,在另一个软件包 schwimmbad 中提供了若干个 pool 工具,包括我们已经介绍了的 MPIPool(实现有些不同,但用法几乎一致),此外还包括 SerialPool,MultiPool 和 JoblibPool。SerialPool 仅仅只是普通的 Python map 函数的一个类包装,MultiPool 是用 Python 的 multiprocessing 模块实现的一个并行 pool 工具,允许利用单台机器的多个处理器完成并行的 map 运算,JoblibPool 则是利用 joblib.Parallel 模块实现的一个并行 pool 工具,而 MPIPool 则是利用 MPI 实现的一个可以在多节点集群或者超级计算机上运行的分布式并行 pool。这些 pool 工具都提供了统一的使用接口,因此可以很容易地由一个转换到另一个。这些 pool 工具的使用详见其文档,在此不作更多的介绍,感兴趣或者有需要的读者可以前往 https://github.com/adrn/schwimmbad 下载并安装使用。
以上我们介绍了一个非常实用的工具 mpipool,在下一篇中我们将简要地介绍并行分布式线性代数运算工具 ScaLAPACK,然后我们会介绍在 python 中使用 ScaLAPACK 的工具 scalapy。