本文旨在尝试使用 Ray 将一个运行在单机环境,非常消耗 CPU 且运行时间较长的算法任务,改成分布式环境运行的程序,以达到同时降低单台机器的负载和提高任务整体运行的速度的作用
1.Setup environment
1.1 Miniconda
我们知道 python 有自带的包管理工具 pip, 为什么我们需要 conda,还有 miniconda 和 conda 的关系是什么?
http://blog.sina.com.cn/s/blog_8a122dcf0102x9vn.html
安装 miniconda 可以参考
https://developers.google.com/earth-engine/python_install-conda
Conda 国内镜像加速
conda config --add channels https://mirrors.ustc.edu.cn/anaconda/pkgs/free/
conda config --add channels https://mirrors.ustc.edu.cn/anaconda/pkgs/main/
conda config --set show_channel_urls yes
使用 conda 创建专用于 ray 的环境
conda create -n ray python=3.7
conda activate ray
# 如果发现 python 的版本不符合自己的要求
conda uninstall python
conda install python=3.7
# 将 ray 设置为 conda 登录默认环境
printf '\n# add path to conda\nexport PATH="$HOME/miniconda3/bin:$PATH"\n' >> ~/.bashrc
echo 'source activate ray' >> ~/.bashrc
1.1 Ray
pip install -U ray
1.2 Pandas
pip install -U pandas
1.3 Facebook Faiss
# CPU version only
conda install faiss-cpu -c pytorch
# GPU version
conda install faiss-gpu cudatoolkit=8.0 -c pytorch # For CUDA8
conda install faiss-gpu cudatoolkit=9.0 -c pytorch # For CUDA9
conda install faiss-gpu cudatoolkit=10.0 -c pytorch # For CUDA10
1.4 Ray cluster
# start head
ray start --head --redis-port=6379
# add worker
ray start --address='xxxxxxxx:6379' --redis-password='5241590000000000'
2.Rewrite python script
faiss_query_actor_pool.py
import ray
import load
from ray.util import ActorPool
import faiss_index
import sys
import pandas as pd
ray.init(include_webui=False)
@ray.remote(memory=1500 * 1024 * 1024)
class FaissQuery(object):
def __init__(self, goods_embed):
self.TOPN = 1000
self.index = faiss_index.FaissIndex(goods_embed)
def search(self, rows):
#slow operation
rs = self.index.search_by_vectors(toVectors(rows), self.TOPN)
return extractResult(rs)
if __name__ == "__main__":
goods_file = sys.argv[1]
query_file = sys.argv[2]
output_file = sys.argv[3]
BATCH_SIZE = int(sys.argv[4])
POOL_SIZE = int(sys.argv[5])
goods_embed = load.load_embedding(goods_file)
print('Load goods ' + str(len(goods_embed)))
query_embed = pd.read_csv(query_file, sep='\t', header=None, names=['query','embed']).to_dict("records")
print('Load queries ' + str(len(query_embed)))
print(f'BATCH_SIZE={BATCH_SIZE}, RAY_ACTOR_POOL_SIZE={POOL_SIZE}')
actorPool = ActorPool([FaissQuery.remote(goods_embed) for i in range(POOL_SIZE)])
index = 0
while index < len(query_embed):
rows = query_embed[index : index + BATCH_SIZE]
actorPool.submit(lambda a, v: a.search.remote(rows), rows)
index += BATCH_SIZE
with open(output_file, "a") as f:
while actorPool.has_next():
f.write(actorPool.get_next())
3.Testing
可以看到当 actor 的数量增加时,计算时间在减少
$ time python3 faiss_query_actor_pool.py goods.csv quries.csv result 1000 1
Load goods 10000
Load queries 10000
BATCH_SIZE=1000, RAY_ACTOR_POOL_SIZE=1
real 0m21.716s
user 0m4.979s
sys 0m1.908s
$ time python3 faiss_query_actor_pool.py goods.csv quries.csv result 1000 3
Load goods 10000
Load queries 10000
BATCH_SIZE=1000, RAY_ACTOR_POOL_SIZE=3
real 0m11.756s
user 0m4.168s
sys 0m1.539s
$ time python3 faiss_query_actor_pool.py goods.csv quries.csv result 1000 6
2020-06-18 17:01:27,406 INFO resource_spec.py:212 -- Starting Ray with 12.84 GiB memory available for workers and up to 6.44 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
Load goods 10000
Load queries 10000
BATCH_SIZE=1000, RAY_ACTOR_POOL_SIZE=6
real 0m8.698s
user 0m4.129s
sys 0m1.414s