这两天仔细看了下分布式pytorch的文档…起因是跑分布式pytorch的时候,在training的最后一个epoch挂掉了
首先在最后一个epoch/batch挂掉的原因,肯定不会是模型的问题,因为前面几个epoch很稳定。仔细排查了下,发现是有的worker还在训练,有的worker却结束退出了,所以还在训练的worker一直等不到其他worker的回应,所以出错了。
非常简单粗暴地,用DP+all_reduce,手动地把梯度聚合的方式从同步改成了异步,就好了。
但是仔细想了想,又觉得不太对,这不应该是会出错的地方。分布式的pytorch也算是比较成熟的方案了,分布式的环境也是一个成熟的应用了,要么是pytorch+分布式出错了,要么就是我的用法不对……
分布式机器学习的训练方式
- 模型并行
- 数据并行
顾名思义,模型并行,是模型拆分成多个模块,并行的去计算各自的梯度。用的是同一份数据,模型模型各自算各自的。数据并行则是把数据拆成多份,不同的机器上用同一份模型去计算不同的数据,每次计算之后,机器们就同步一下各自的梯度。
Distribution In Pytorch
在pytorch中,关于分布式计算,主要有三个大的组件
- Distributed Data Parallel(DDP)
- RPC-Based Distributed Training(RPC)
- Collective Communication(c10d)
一般常用的分布式训练都是选择数据并行,如果模型太大放到gpu里面装不下,就只能考虑模型并行了。所以在pytorch中,常用的分布式训练,都是使用DDP这一套就够了。RPC和c10d就逐渐底层,主要是为了满足一些定制化的需要,比如pytorch里面默认的分布式架构是ring型的,但是你想要用parameter server的训练框架,那就可以用后面两种组件搭配DDP去完成模型的分布式训练。可以看出来DDP确实是非常精妙的,可以参考pytorch这边发表的关于DDP的论文(PyTorch Distributed: Experiences on Accelerating Data Parallel Training)。我只用了最普适的DDP方法,所以接下来对DDP的使用方法做一些归纳。
DDP的使用方法
最开始,pytorch数据并行这块,用的模块是DataParallel(DP),这个方法有几个缺点:
- just 1 processing and multi-thread。进程线程和cpu的关系我有点忘了,但是反正就是效率会低一些(doge,and 回头补充)并且python原生存在的问题是python有CIL锁,所以很难真正意义上实现多processing。
- 并且在分布式的环境中,DP包裹的模型,需要手动地去调用all_reduce来更新模型参数。(顺便pytorch中有几种梯度聚合的方式,详情可以参见文档)
- 且由于DP的1 processing模式,在多GPU的时候,会存在各个GPU负载不均衡的问题。
出于这几点原因,pytorch做了相应的优化,于是有了DDP。DDP和DP有很大的不同:
- DDP是multi processing的,解决了python中CIL锁争抢的问题。并且做了许多优化处理,来解决之前DP中存在的负载不均衡的问题。
- DDP不需要手动去update 参数梯度了,DDP内部做了相应的广播处理(是同步更新参数的)
- DDP可以搭配RPC或者c10d的一些方法去完成更定制化的分布式训练
DDP的使用也比较简单:(详情参考:https://pytorch.org/docs/master/generated/torch.nn.parallel.DistributedDataParallel.html)
1. torch.distributed.init_process_group(backend='nccl', world_size=N, init_method='...')
#做init操作,初始化分布式的环境,告知一共有多少的机器并行训练,用的啥后端框架做消息传递
2. model = DistributedDataParallel(model, device_ids=[i], output_device=i)
#把模型裹上DDP
一些相关不相关的distrbutedxxx
一个是DistributedSampler。可以说这个是直接解决我最初的bug…导致部分worker提前training结束的原因就是每个机器上的batch数量不相同,我用的分布式环境是sagemaker,它能够支持一种方式就是把输入根据机器数量,切成近似均匀的N等份输入,这样每个机器就不需要下载全量的数据做训练了,只要拿到1/N总数的数据,可以大大减少机器负载和download时间。但是这直接导致的就是job failed…
于是认命还是用回DistributedSampler(DS)(可以说没有特殊情况的输入,都应该用DistributedSampler)它可以根据init时指定的world_size,对dataset做shuffle和split,保证每个机器上拿到的数据是不重复的,并且是均匀的,每个机器上的batch数量一定是一样的。
顺便提一嘴,如果在sagemaker已经等分过数据的情况下,再调用DS,就会让每个机器拿到的数据是 1/(N*N)的大小了…
还有一个是DistributedOptimizer(DO)。这个数据并行的时候实际应该用不到,DO在初始化的时候,是要指定reff格式的输入参数。直白一点就是你可以指定模型的某些个参数,然后变成reff格式,指定给DO,这样优化器就会只更新分布式环境里的这几个参数了。所以一般DO是在模型并行的时候需要用的,因为数据并行的时候,每个环境下是独立拥有优化器的。
最后鸣谢pytorch document,写得真好,不愧大佬:https://pytorch.org/tutorials/beginner/dist_overview.html