在上一篇中我们介绍了 mpi4py 中非重复非阻塞同步通信模式,下面我们将介绍可重复的非阻塞通信,我们先从可重复的非阻塞标准通信开始。
在实际的计算环境中,经常会在一个内部循环中重复使用相同的数据结构来传递不同的数据,此时将数据结构与发送/接收动作绑定可提高程序的效率,这可以通过可重复的非阻塞通信实现。重复的通信需借助通信请求对象 MPI.Prequest(MPI.Request 的子类对象)进行控制其发起、完成等。这样处理可减少 MPI 环境通信控制器与进程之间进行交互的开销。
可重复的非阻塞通信也有4个模式,即标准、缓冲、就绪和同步模式,这4个通信模式是与阻塞通信,非重复非阻塞通信的4个模式一一对应的。可重复的非阻塞通信的4个模式用到的(MPI.Comm 类)发送方法分别为 Send_init,Bsend_init,Rsend_init 和 Ssend_init,相应的接收方法为 Recv_init,用来管理重复启动通信的操作方法是 MPI.Prequest.Start,MPI.Prequest.Startall。注意:可重复的非阻塞通信只有以大写字母开头的方法,而没有提供与之对应的小写字母开头的方法,因此如果要发送和接收通用的 Python 对象,需要手动进行 pickle 系列化和恢复操作。
我们首先介绍可重复的非阻塞标准通信,其通信方法(MPI.Comm 类的方法)接口如下:
Send_init(self, buf, int dest, int tag=0)
Recv_init(self, buf, int source=ANY_SOURCE, int tag=ANY_TAG)
这些方法调用中的参数是与阻塞标准通信模式的方法调用参数一样的。
需要注意的是,上面虽然给出的是可重复非阻塞的发送和可重复非阻塞的接收方法,但实际上可重复非阻塞发送可与任何接收动作(阻塞接收,非重复非阻塞接收)匹配,反之,可重复非阻塞接收也可与任何发送动作(阻塞发送,非重复非阻塞发送)匹配。
非阻塞的发送和接收都会返回一个 MPI.Prequest 对象,它是 MPI.Request 子类对象,除了可以使用其所继承的 Test*/test*,Wait*/wait*,Cancel 等来等待、测试或是取消通信外,我们还会用到其另外两个方法来启动重复的通信操作,这两个方法是:
Start(self)
Startall(type cls, requests)
下面给出可重复非阻塞标准通信的使用例程:
# Send_init_Recv_init.py
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
count = 10
send_buf = np.arange(count, dtype='i')
recv_buf = np.empty(count, dtype='i')
if rank == 0:
send_req = comm.Send_init(send_buf, dest=1, tag=11)
send_req.Start()
send_req.Wait()
print 'process %d sends %s' % (rank, send_buf)
elif rank == 1:
recv_req = comm.Recv_init(recv_buf, source=0, tag=11)
recv_req.Start()
recv_req.Wait()
print 'process %d receives %s' % (rank, recv_buf)
运行结果如下:
$ mpiexec -n 2 python Send_init_Recv_init.py
process 0 sends [0 1 2 3 4 5 6 7 8 9]
process 1 receives [0 1 2 3 4 5 6 7 8 9]
上面这个例程使用了可重复非阻塞的接收方法 Recv_init 来匹配可重复非阻塞的发送方法 Send_init,这不是必须的,下面给出一个阻塞的接收方法 Recv 来匹配可重复非阻塞的发送方法 Send_init 的例程。
# Send_init_Recv.py
import numpy as np
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
count = 10
send_buf = np.arange(count, dtype='i') + 10 * rank
recv_buf = np.empty((size - 1, count), dtype='i')
if rank != 0:
send_req = comm.Send_init(send_buf, dest=0, tag=rank)
send_req.Start()
send_req.Wait()
print 'process %d sends %s' % (rank, send_buf)
else:
for i in range(size - 1):
comm.Recv(recv_buf[i], source=i+1, tag=i+1)
print 'process %d receives %s' % (rank, recv_buf[i])
运行结果如下:
$ mpiexec -n 3 python Send_init_Recv.py
process 1 sends [10 11 12 13 14 15 16 17 18 19]
process 0 receives [10 11 12 13 14 15 16 17 18 19]
process 2 sends [20 21 22 23 24 25 26 27 28 29]
process 0 receives [20 21 22 23 24 25 26 27 28 29]
上面我们介绍了 mpi4py 中可重复非阻塞标准通信模式,在下一篇中我们将介绍可重复非阻塞缓冲通信。