从hello world入手,熟悉一下ray开发
示例一:求平方(remote funtions)
Ray可以非常容易的将一个funtion变成分布式,只需要在funtion添加一个@ray.remote
注解:
import ray
@ray.remote
def square(x):
return x * x
if __name__ == '__main__':
for i in range(4):
future = square.remote(i)
print(ray.get(future))
# prints [0, 1, 4, 9]
调用分布式方法时,只需要调用方法名.remote()
即可,如future = square.remote(i)
(这个调用是异步的)。使用ray.get()
即可获取调用结果,如ray.get(future)
(这个调用是同步的)
示例二:计算圆周率π(remote actors)
本示例将展示如何将class定义为remote actors。根据莱布尼兹公式,π可以这样计算:
根据这个公式,我们可以连续计算10万个值来得到一个π的近似值。那么我们完全将这10万个值平均分成10份并行计算,再将10个结果合计就是我们要的近似值。这样我的效率就增加10倍。代码实现如下:
import ray
@ray.remote
class Worker(object):
def __init__(self):
# 计算结果
self.r = 0.0
def calculatePiFor(self, start, elements):
# 计算从start开始,连续elements个的和
for i in range(start, start + elements):
self.r += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1)
return self.r
def result(self):
# 返回计算结果
return self.r
if __name__ == '__main__':
# 启动Ray,如果想使用已存在的Ray集群,请使用
# ray.init("ray://<head_node_host>:10001")
ray.init()
futures = []
# 将任务切分为10个子任务并行计算
for i in range(10):
# 初始化子任务(Actor)
worker = Worker.remote()
# 每个子任务计算10000个值,这是异步的,not block
future = worker.calculatePiFor.remote(i * 10000, 10000)
futures.append(future)
# 等待所有子任务结果
results = ray.get(futures)
# 计算最终结果
print(sum(results))
总结
示例一展示了如何编写无状态的并行计算代码,示例二则展示了如何利用Ray进行有状态并行计算。它们展示了最常见的2种并行计算场景。