TensorFlow10:TensorFlow计算加速

在前面的章节中介绍了使用TensorFlow实现各种深度学习的算法。然而要将深度学习应用到实际问题中,一个非常大的问题在于训练深度学习模型需要的计算量太大。比如要将前面介绍的Inception-v3模型在单机上训练到78%的正确率需要将近半年的时间,这样的训练速度是完全无法应用到实际生产中的。为了加速训练过程,本章将介绍如何通过TensorFlow利用GPU或/和分布式计算进行模型训练。
1.首先,我们将介绍如何在TensorFlow中使用单个GPU进行计算加速,也将介绍TensorFlow会话(tf.Session)时的一些常用参数。通过这些参数可以使调试更加方便而且程序的可扩展性更好。然而,在很多情况下,单个GPU的加速效率无法满足训练大型深度学习模型的计算量需求,这时将需要利用更多的计算资源。
2.为了同时利用多个GPU或者多台机器,我们将介绍深度学习模型的并行方式。在这一节中也将给出具体的TensorFlow样例程序来使用多GPU训练模型,并比较并行化效率提升的比率。
3.最后我们将介绍分布式TensorFlow,以及如何通过分布式TensorFlow训练深度学习模型。在这一节中将给出具体的TensorFlow样例程序来实现不同的分布式深度学习训练模式。虽然TensorFlow可以支持分布式深度学习模型训练,但是它并不提供集群创建、管理等功能。
4.为了更方便地使用分布式TensorFlow,我们将介绍才云科技基于Kubernetes容器云平台搭建的分布式TensorFlow系统。

1.TensorFlow使用GPU

TensorFlow程序可以通过tf.device函数来指定运行每一个操作的设备,这个设备可以是本地的CPU或者GPU,也可以是某一台远程的服务器。但在本节中只关心本地的设备。TensorFlow会给每一个可用的设备一个名称,tf.device函数可以通过设备的名称来指定执行运算的设备。比如CPU在TensorFlow中的名称为/cpu:0.在默认情况下,即使机器有多个CPU,TensorFlow也不会区分它们,所有的CPU都使用/cpu:0作为名称。而一台机器上不同GPU的名称是不同的,第n个GPU在TensorFlow中的名称为/gpu:n.比如第一个GPU的名称为/gpu:0,第二个GPU名称为/gpu:1,以此类推。
TensorFlow提供了一个快捷的方式来查看运行每一个运算的设备。在生成会话时,可以通过设置log_device_placement参数来打印运行每一个运算的设备。下面的程序展示了如何使用log_device_placement这个参数:

import tensorflow as tf
a = tf.constant([1.0, 2.0, 3.0], shape=[3], name='a')
b = tf.constant([1.0, 2.0, 3.0], shape=[3], name='b')
c = a + b
# 通过log_device_placement参数来输出运行每一个运算的设备
sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))
print(sess.run(c))

运行代码,输出:
[2. 4. 6.]
在以上代码中,TensorFlow程序生成会话时加入了参数log_device_placement=True,所以程序会将每一个操作的设备输出到屏幕。于是除了可以看到最后的计算结果之外,还可以看到类似“add:/job:localhost/replica:0/task:0/cpu:0”这样的输出。这些输出显示了执行每一个运算的设备。比如加法操作add是通过CPU来运行的,因为它的设备名称中包含了/cpu:0
在配置好GPU环境的TensorFlow中,如果操作没有明确地指定运行设备,那么TensorFlow会优先选择GPU。比如将以上代码在亚马逊的实例上运行时,会得到以下运行结果:


在亚马逊实例运行的结果

从上面的输出可以看到在配置好GPU环境的TensorFlow中,TensorFlow会优先将运算放置在GPU上。不过,尽管亚马逊实例有4个GPU,在默认环境下,TensorFlow只会将运算优先放到/gpu:0上。于是可以看见在上面的程序中,所有的运算都被放在了/gpu:0上。如果需要将某些运算放到不同的GPU或者CPU上,就需要通过tf.device来手工指定。以下程序给出了一个通过tf.device来手工指定运行设备的样例:

# 通过tf.device将运算指定到特定的设备上
with tf.device('/cpu:0'):
    a = tf.constant([1.0, 2.0, 3.0], shape=[3], name='a')
    b = tf.constant([1.0, 2.0, 3.0], shape=[3], name='b')
with tf.device('/gpu:1'):
    c = a + b
sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))
print(sess.run(c))

在AWS g2.8xlarge实例上运行上述代码可以得到以下结果:




在以上代码中可以看到生成常量a和b的操作被加载到了CPU上,而加法操作被放到了第二个GPU"/gpu:1"上。在TensorFlow中,不是所有的操作都可以被放在GPU上,如果强行将无法放在GPU上的操作指定到GPU上,那么程序将会报错。以下代码给出了一个报错的样例:



不同版本的TensorFlow对GPU的支持不一样,如果程序中全部使用强制指定设备的方式会降低程序的可移植性。在TensorFlow的kernel中定义了哪些操作可以跑在GPU上。比如可以在variable_ops.cc程序中找到以下定义:

在这段定义中可以看到GPU只在部分数据类型上支持tf.Variable操作。如果在TensorFlow代码库中搜索调用这段代码的宏TF_CALL_GPU_NUMBER_TYPES,可以发现在GPU上,tf.Variable操作只支持实数型(float16,float32和double)的参数。而在报错的样例代码中给定的参数是整数型的,所以不支持在GPU上运行。为避免这个问题,TensorFlow在生成会话时可以指定allow_soft_placement参数。当allow_soft_placement参数设置为True时,如果运算无法由GPU执行,那么TensorFlow会自动将它放到CPU上执行。以下代码给出了一个使用allow_soft_placement参数的样例:


虽然GPU可以加速TensorFlow的计算,但一般来说不会把所有的操作全部放在GPU上。一个比较好的实践是将密集型的运算放在GPU上,而把其他操作放到CPU上。GPU是机器中相对独立的资源,将计算放入或者转出GPU都需要额外的时间。而且GPU需要将计算时用到的数据从内存复制到GPU设备上,这也需要额外的时间。TensorFlow可以自动完成这些操作而不需要用户特别处理,但为了提高程序运行的速度,用户也需要尽量将相关的运算放在同一个设备上。

2.深度学习训练并行模式

TensorFlow可以很容易地利用单个GPU加速深度学习模型的训练过程,但要利用更多的GPU或者机器,需要了解如何并行化地训练深度学习模型。常用的并行化深度学习模型训练方式有两种,同步模式和异步模式。本节中将介绍这两种模式的工作方式及其优劣。
为了帮助我们理解这两种训练模式,本节首先简单回顾一下如何训练深度学习模型。下图展示了深度学习模型的训练流程图:


深度学习模型训练流程图

深度学习模型的训练是一个迭代的过程。在每一轮迭代中,前向传播算法会根据当前参数的取值计算出在一小部分训练数据上的预测值,然后反向传播算法再根据损失函数计算参数的梯度并更新参数。在并行化地训练深度学习模型时,不同设备(GPU或CPU)可以在不同训练数据上运行这个迭代的过程,而不同并行模式的区别在于不同的参数更新方式。
下图展示了异步模式的训练流程图:


异步模式深度学习模型训练流程图

从上图可以看到,每一轮迭代时,不同设备会读取参数最新的取值,但因为不同设备读取参数取值的时间不一样,所以得到的值也有可能不一样。根据当前参数的取值和随机获取的一小部分训练数据,不同设备各自运行反向传播的过程并独立地更新参数。可以简单地认为异步模式就是单机模式复制了多份,每一份使用不同的训练数据进行训练。在异步模式下,不同设备之间是完全独立的。
然而使用异步模式训练的深度学习模型可能无法达到较优的训练效果。下图中给出了一个具体的样例来说明异步模式的问题:
异步模式训练深度学习模型存在的问题示意图

如上图,其中黑色曲线展示了模型的损失函数,黑色小球表示了在t0时刻参数所对应的损失函数的大小。假设两个设备d0和d1在时间t0同时读取了参数的取值,那么设备d0和d1计算出来的梯度都会将小球向左移动。假设在时间t1设备d0已经完成了反向传播的计算并更新了参数,修改后的参数处于图中晓慧球的位置。然而这时的设备d1并不知道参数已经被更新了,所以在时间t2时,设备d1会继续将小球向左移动,使得小球的位置达到如图中白球的地方。从图中可以看到,当参数被调整到小白球的位置,将无法达到最优点。
为了避免更新不同步的问题,可以使用同步模式。在同步模式下,所有的设备同时读取参数的取值,并且当反方向传播算法完成之后同步更新参数的取值。单个设备不会单独对参数进行更新,而会等待所有设备都完成反向传播之后再统一更新参数。
下图展示了同步模式的训练过程:


同步模式深度学习模型训练流程图

从上图可以看到,每一轮迭代时,不同设备首先统一读取当前参数的取值,并随机获取一小部分数据。然后在不同设备上运行反向传播过程得到在各自训练数据上参数的梯度。注意虽然所有设备使用的参数是一致的,但是因为训练数据不同,所以得到参数的梯度就可能不一样。当所有设备完成反向传播的计算之后,需要计算出不同设备上参数梯度的平均值,最后再根据平均值对参数进行更新。
同步模式解决了异步模式中存在的参数更新问题,然而同步模式的效率却低于异步模式。在同步模式下,每一轮迭代都需要设备统一开始、统一结束。如果设备的运行速度不一致,那么每一轮训练都需要等待最慢的设备结束才能开始更新参数,于是很多时间将被花在等待上。虽然理论上异步模式存在缺陷,但是因为训练深度学习模型时使用的随机梯度下降本身就是梯度下降的一个近似解法,而且即使是梯度下降也无法保证达到全局最优值,所以在实际应用中,在相同时间内,使用异步模式训练的模型不一定比同步模式差。所以这两种训练模式在实践中都有非常广泛的应用。

3.多GPU并行

在第二节中介绍了常用的分布式深度学习模型训练模式。这一节将给出具体的TensorFlow代码,在一台机器的多个GPU上并行训练深度学习模型。因为一般来说一台机器上的多个GPU性能相似,所以在这种设置下会更多地采用同步模式训练深度学习模型。下面将给出具体的代码,在多GPU上训练深度学习模型解决MNIST问题。本节的样例代码将沿用之前解决MNIST问题这一章的代码框架,并且使用mnist_inference.py程序来完成神经网络的前向传播过程。以下代码给出了新的神经网络训练程序mnist_multi_gpu_train.py:

# coding=utf-8
from datetime import datetime
import os
import time

import tensorflow as tf
import mnist_inference

# 定义训练神经网络时需要用到的参数。
BATCH_SIZE = 100 
LEARNING_RATE_BASE = 0.001
LEARNING_RATE_DECAY = 0.99
REGULARAZTION_RATE = 0.0001
TRAINING_STEPS = 1000
MOVING_AVERAGE_DECAY = 0.99 
N_GPU = 4

# 定义日志和模型输出的路径。
MODEL_SAVE_PATH = "logs_and_models/"
MODEL_NAME = "model.ckpt"
DATA_PATH = "output.tfrecords" 

# 定义输入队列得到训练数据,具体细节可以参考第七章。
def get_input():
    filename_queue = tf.train.string_input_producer([DATA_PATH]) 
    reader = tf.TFRecordReader()
    _, serialized_example = reader.read(filename_queue)

    # 定义数据解析格式。
    features = tf.parse_single_example(
        serialized_example,
        features={
            'image_raw': tf.FixedLenFeature([], tf.string),
            'pixels': tf.FixedLenFeature([], tf.int64),
            'label': tf.FixedLenFeature([], tf.int64),
        })

    # 解析图片和标签信息。
    decoded_image = tf.decode_raw(features['image_raw'], tf.uint8)
    reshaped_image = tf.reshape(decoded_image, [784])
    retyped_image = tf.cast(reshaped_image, tf.float32)
    label = tf.cast(features['label'], tf.int32)
    
    # 定义输入队列并返回。
    min_after_dequeue = 10000
    capacity = min_after_dequeue + 3 * BATCH_SIZE
    return tf.train.shuffle_batch(
        [retyped_image, label], 
        batch_size=BATCH_SIZE, 
        capacity=capacity, 
        min_after_dequeue=min_after_dequeue)

# 定义损失函数。
def get_loss(x, y_, regularizer, scope, reuse_variables=None):
    with tf.variable_scope(tf.get_variable_scope(), reuse=reuse_variables):
        y = mnist_inference.inference(x, regularizer)
    cross_entropy = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(y, y_))
    regularization_loss = tf.add_n(tf.get_collection('losses', scope))
    loss = cross_entropy + regularization_loss
    return loss

# 计算每一个变量梯度的平均值。
def average_gradients(tower_grads):
    average_grads = []

    # 枚举所有的变量和变量在不同GPU上计算得出的梯度。
    for grad_and_vars in zip(*tower_grads):
        # 计算所有GPU上的梯度平均值。
        grads = []
        for g, _ in grad_and_vars:
            expanded_g = tf.expand_dims(g, 0)
            grads.append(expanded_g)
        grad = tf.concat(grads, 0)
        grad = tf.reduce_mean(grad, 0)

        v = grad_and_vars[0][1]
        grad_and_var = (grad, v)
        # 将变量和它的平均梯度对应起来。
        average_grads.append(grad_and_var)
    # 返回所有变量的平均梯度,这个将被用于变量的更新。
    return average_grads

# 主训练过程。
def main(argv=None): 
    # 将简单的运算放在CPU上,只有神经网络的训练过程放在GPU上。
    with tf.Graph().as_default(), tf.device('/cpu:0'):
        
        # 定义基本的训练过程
        x, y_ = get_input()
        regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE)
        
        global_step = tf.get_variable('global_step', [], initializer=tf.constant_initializer(0), trainable=False)
        learning_rate = tf.train.exponential_decay(
            LEARNING_RATE_BASE, global_step, 60000 / BATCH_SIZE, LEARNING_RATE_DECAY)       
        
        opt = tf.train.GradientDescentOptimizer(learning_rate)
        
        tower_grads = []
        reuse_variables = False
        # 将神经网络的优化过程跑在不同的GPU上。
        for i in range(N_GPU):
            # 将优化过程指定在一个GPU上。
            with tf.device('/gpu:%d' % i):
                with tf.name_scope('GPU_%d' % i) as scope:
                    cur_loss = get_loss(x, y_, regularizer, scope, reuse_variables)
                    reuse_variables = True
                    grads = opt.compute_gradients(cur_loss)
                    tower_grads.append(grads)
        
        # 计算变量的平均梯度。
        grads = average_gradients(tower_grads)
        for grad, var in grads:
            if grad is not None:
                tf.histogram_summary('gradients_on_average/%s' % var.op.name, grad)

        # 使用平均梯度更新参数。
        apply_gradient_op = opt.apply_gradients(grads, global_step=global_step)
        for var in tf.trainable_variables():
            tf.histogram_summary(var.op.name, var)

        # 计算变量的滑动平均值。
        variable_averages = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step)
        variables_to_average = (tf.trainable_variables() +tf.moving_average_variables())
        variables_averages_op = variable_averages.apply(variables_to_average)
        # 每一轮迭代需要更新变量的取值并更新变量的滑动平均值。
        train_op = tf.group(apply_gradient_op, variables_averages_op)

        saver = tf.train.Saver(tf.all_variables())
        summary_op = tf.merge_all_summaries()        
        init = tf.initialize_all_variables()
        with tf.Session(config=tf.ConfigProto(
                allow_soft_placement=True, log_device_placement=True)) as sess:
            # 初始化所有变量并启动队列。
            init.run()
            coord = tf.train.Coordinator()
            threads = tf.train.start_queue_runners(sess=sess, coord=coord)
            summary_writer = tf.train.SummaryWriter(MODEL_SAVE_PATH, sess.graph)

            for step in range(TRAINING_STEPS):
                # 执行神经网络训练操作,并记录训练操作的运行时间。
                start_time = time.time()
                _, loss_value = sess.run([train_op, cur_loss])
                duration = time.time() - start_time
                
                # 每隔一段时间数据当前的训练进度,并统计训练速度。
                if step != 0 and step % 10 == 0:
                    # 计算使用过的训练数据个数。
                    num_examples_per_step = BATCH_SIZE * N_GPU
                    examples_per_sec = num_examples_per_step / duration
                    sec_per_batch = duration / N_GPU
    
                    # 输出训练信息。
                    format_str = ('%s: step %d, loss = %.2f (%.1f examples/sec; %.3f sec/batch)')
                    print (format_str % (datetime.now(), step, loss_value, examples_per_sec, sec_per_batch))
                    
                    # 通过TensorBoard可视化训练过程。
                    summary = sess.run(summary_op)
                    summary_writer.add_summary(summary, step)
    
                # 每隔一段时间保存当前的模型。
                if step % 1000 == 0 or (step + 1) == TRAINING_STEPS:
                    checkpoint_path = os.path.join(MODEL_SAVE_PATH, MODEL_NAME)
                    saver.save(sess, checkpoint_path, global_step=step)
        
            coord.request_stop()
            coord.join(threads)
        
if __name__ == '__main__':
    tf.app.run()

在AWS的g2.8xlarge实例上运行以上程序可以得到类似下面的结果:


新的神经网络训练程序运行结果

在AWS的g2.8xlarge实例上运行以上代码可以同时使用4个GPU训练神经网络。下图显示了运行样例代码时不同GPU的使用情况:


在AWS的g2.8xlarge实例上运行MNST样例程序时GPU的使用情况

因为在前面定义的神经网络规模比较小,所以上图中显示的GPU使用率不高。如果训练大型的神经网络模型,TensorFlow将会占满所有用到的GPU。

通过调整参数N_GPU,可以实验同步模式下随着GPU个数的增加训练速度的加速比率。下图展示了在给出的MNIST样例代码上,训练速度随着GPU数量增加的变化趋势:


训练速度随着GPU数量增加的变化趋势

从上图中可以看出,当使用两个GPU时,模型的训练速度是使用一个GPU的1.92倍。也就是说当GPU数量较小时,训练速度基本可以随着GPU的数量线性增长。
下图展示了当GPU数量增多时,训练速度随着GPU数量增加的加速比:


训练速度随着GPU增加的变化趋势

从上图可以看出,当GPU数量增加时,虽然加速比不再是线性,但TensorFlow仍然可以通过增加GPU数量有效地加速深度学习模型的训练过程。

4.分布式TensorFlow

通过多GPU并行的方式可以达到很好地加速效果。然而一台机器上能够安装的GPU有限,要进一步提升深度学习模型的训练速度,就需要将TensorFlow分布式运行在多台机器上。本节将介绍如何编写以及运行分布式TensorFlow程序。
1.首先在第一小节中将介绍分布式TensorFlow的工作原理,并给出最简单的分布式TensorFlow样例程序。在这一小节中也将介绍不同的TensorFlow分布式方式。
2.第二小节中将给出两个完整的TensorFlow样例程序来同步或者异步地训练深度学习模型。
3.最后将总结目前生态分布式TensorFlow中的不足,并介绍才云科技(Caicloud.io)提供的分布式TensorFlow解决方案(TensorFlow as a Service, TaaS).

4.1分布式TensorFlow原理

在第二节中介绍了分布式TensorFlow训练深度学习模型的理论。本小节将具体介绍如何使用TensorFlow在分布式集群中训练深度学习模型。以下代码展示了如何创建一个最简单的TensorFlow集群:

import tensorflow as tf
c = tf.constant("Hello, distributed Tensorflow!")
# 创建一个本地TensorFlow集群
server = tf.train.Server.create_local_server()
# 在集群上创建一个会话
sess = tf.Session(server.target)
print(sess.run(c))

运行代码,结果如下:
b'Hello, distributed Tensorflow!'
在以上代码中,首先通过tf.train.Server.create_local_server函数在本地建立了一个只有一台机器的TensorFlow集群。然后在该集群上生成了一个会话,并通过生成的会话将运算运行在创建的TensorFlow集群上。虽然这只是一个单机集群,但它大致反映了TensorFlow集群的工作流程。TensorFlow集群通过一系列的任务(tasks)来执行TensorFlow计算图中的运算。一般来说,不同任务跑在不同机器上。最主要的例外是使用GPU时,不同任务可以使用同一台机器上的不同GPU。TensorFlow集群中的任务也会被聚合成工作(jobs),每个工作可以包含一个或者多个任务。比如在训练深度学习模型时,一台运行反向传播的机器是一个任务,而所有运行反向传播机器的集合是一种工作。
上面的样例代码是只有一个任务的集群。当一个TensorFlow集群有多个任务时,需要使用tf.train.ClusterSpec来指定运行每一个任务的机器。比如以下代码展示了在本地运行有两个任务的TensorFlow集群。第一个任务的代码如下:

import tensorflow as tf
# 创建两个集群
c = tf.constant("Hello from server1!")
# 生成一个有两个任务的集群,一个任务跑在本地2222端口,另外一个跑在本地2223端口
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
# 通过上面生成的集群配置成server
# job_name和task_index:指定当前所启动的任务。
server = tf.train.Server(cluster, job_name="local", task_index=0)
# server.target:生成会话来使用TensorFlow集群中的资源。
# log_device_placement:可以看到执行每一个操作的任务
sess = tf.Session(server.target, config=tf.ConfigProto(log_device_placement=True)) 
print(sess.run(c))
server.join()

# 第二个任务的代码
import tensorflow as tf
c = tf.constant("Hello from server2!")
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=1)
sess = tf.Session(server.target, config=tf.ConfigProto(log_device_placement=True)) 
print(sess.run(c))
server.join()

启动第一个任务后,可以得到类似下面的输出:


从第一个任务的输出中可以看到,当只启动第一个任务时,程序会停下来等待第二个任务启动,而且持续输出:fail to connect to...
当第二个任务启动后,可以看到从第一个任务中会输出Hello from server1!的结果。第二个任务的输出如下:


值得注意的是第二个任务中定义的计算也被放在了设备/job:/local/replica:0/task/cpu:0上。也就是说这个计算将由第一个任务来执行。从上面这个样例可以看到,通过tf.train.Server.target生成的会话可以统一管理整个TensorFlow集群中的资源。
和使用多GPU类似,TensorFlow支持通过tf.device来指定操作运行在哪个任务上。比如将第二个任务中定义计算的语句改为以下代码,就可以看到这个计算将被调度到/job:local/replica:0/task:1/cpu:0上面。

with tf.device("/job:local/task:1")
        c = tf.constant("Hello from server2!")

在上面的样例中只定义了一个工作“local”。但一般在训练深度学习模型时,会定义两个工作。一个工作专门负责存储、获取以及更新变量的取值,这个工作所包含的任务统称为参数服务器(parameter server,ps)。另外一个工作负责运行反向传播算法来获取参数梯度,这个工作所包含的任务统称为服务器(worker)。下面给出了一个比较常见的用于训练深度学习模型的TensorFlow集群配置方法:


TensorFlow集群配置方法

使用分布式TensorFlow训练深度学习模型一般有两种方式。一种方式叫做计算图内分布式(in-graph replication)。使用这种分布式训练方式时,所有的任务都会使用一个TensorFlow计算图中的变量(也就是深度学习模型中的参数),而只是将计算部分发布到不同的计算服务器上。上一节中给出的代码也实现了参数的同步更新。然而因为计算图内分布式需要有一个中心节点来生成这个计算图并分配计算任务,所以当数据量太大时,这个中心节点容易造成性能瓶颈。
另外一种分布式TensorFlow训练深度学习模型的方式叫计算图之间分布式(between-graph replication)。使用这种分布式方式时,在每一个计算服务器上都会创建一个独立的TensorFlow计算图,但不同计算图中的相同参数需要以一种固定的方式放到同一个参数服务器上。TensorFlow提供了tf.train.replica_device_setter函数来帮助完成这一个过程,在下一小节将给出具体的样例。因为每个计算服务器的TensorFlow计算图是独立的,所以这种方式的并行度要更高。但在计算图之间分布式下进行参数的同步更新比较困难。为了解决这个问题,TensorFlow提供了tf.train.SyncReplicasOptimizer函数来帮助实现参数的同步更新。这让计算图之间分布式方式被更加广泛地使用。下一小节将给出使用计算图之间分布式的样例程序来实现异步模式和同步模式的并行化深度学习模型训练过程。

4.2分布式TensorFlow模型训练

本小节将给出两个样例程序分别实现使用计算图之间分布式(Between-graph replication)完成分布式深度学习模型训练的异步更新和同步更新。第一部分将给出使用计算图之间分布式实现异步更新的TensorFlow程序。这一部分也会给出具体的命令行将该程序分布式的运行在一个参数服务器和两个计算服务器上,并通过TensorBoard可视化在第一个计算服务器上的TensorFlow计算图。第二部分将给出计算图之间分布式实现同步参数更新的TensorFlow程序。同步参数更新的代码大部分和异步更新相似,所以在这一部分中将重点介绍它们之间的不同之处。

异步模式样例程序

下面的样例代码将扔采用MNIST数字识别问题的那一章给出的模式,并复用mnist_inference.py程序中定义的前向传播算法,以下代码实现了异步模式的分布式神经网络训练过程:

# coding=utf-8
import time
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

import mnist_inference

# 配置神经网络的参数。
BATCH_SIZE = 100
LEARNING_RATE_BASE = 0.01
LEARNING_RATE_DECAY = 0.99
REGULARAZTION_RATE = 0.0001
TRAINING_STEPS = 1000
MOVING_AVERAGE_DECAY = 0.99

# 模型保存的路径和文件名。
MODEL_SAVE_PATH = "logs/log_async"
DATA_PATH = "../../datasets/MNIST_data"

FLAGS = tf.app.flags.FLAGS

# 指定当前程序是参数服务器还是计算服务器。
tf.app.flags.DEFINE_string('job_name', 'worker', ' "ps" or "worker" ')
# 指定集群中的参数服务器地址。
tf.app.flags.DEFINE_string(
    'ps_hosts', ' tf-ps0:2222,tf-ps1:1111',
    'Comma-separated list of hostname:port for the parameter server jobs. e.g. "tf-ps0:2222,tf-ps1:1111" ')
# 指定集群中的计算服务器地址。
tf.app.flags.DEFINE_string(
    'worker_hosts', ' tf-worker0:2222,tf-worker1:1111',
    'Comma-separated list of hostname:port for the worker jobs. e.g. "tf-worker0:2222,tf-worker1:1111" ')
# 指定当前程序的任务ID。
tf.app.flags.DEFINE_integer('task_id', 0, 'Task ID of the worker/replica running the training.')

# 定义TensorFlow的计算图,并返回每一轮迭代时需要运行的操作。
def build_model(x, y_, is_chief):
    regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE)
    # 通过和5.5节给出的mnist_inference.py代码计算神经网络前向传播的结果。
    y = mnist_inference.inference(x, regularizer)
    global_step = tf.Variable(0, trainable=False)

    # 计算损失函数并定义反向传播过程。
    cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_, 1))
    cross_entropy_mean = tf.reduce_mean(cross_entropy)
    loss = cross_entropy_mean + tf.add_n(tf.get_collection('losses'))
    learning_rate = tf.train.exponential_decay(
        LEARNING_RATE_BASE, global_step, 60000 / BATCH_SIZE, LEARNING_RATE_DECAY)
    train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss, global_step=global_step)

    # 定义每一轮迭代需要运行的操作。
    if is_chief:
        # 计算变量的滑动平均值。   
        variable_averages = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step)
        variables_averages_op = variable_averages.apply(tf.trainable_variables())
        with tf.control_dependencies([variables_averages_op, train_op]):
            train_op = tf.no_op()
    return global_step, loss, train_op


def main(argv=None):
    # 解析flags并通过tf.train.ClusterSpec配置TensorFlow集群。
    ps_hosts = FLAGS.ps_hosts.split(',')
    worker_hosts = FLAGS.worker_hosts.split(',')
    cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
    # 通过tf.train.ClusterSpec以及当前任务创建tf.train.Server。
    server = tf.train.Server(cluster, job_name = FLAGS.job_name, task_index=FLAGS.task_id)

    # 参数服务器只需要管理TensorFlow中的变量,不需要执行训练的过程。server.join()会
    # 一致停在这条语句上。
    if FLAGS.job_name == 'ps':
        with tf.device("/cpu:0"):
            server.join()

    # 定义计算服务器需要运行的操作。
    is_chief = (FLAGS.task_id == 0)
    mnist = input_data.read_data_sets(DATA_PATH, one_hot=True)

    device_setter = tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_id, cluster=cluster)
    with tf.device(device_setter):

        # 定义输入并得到每一轮迭代需要运行的操作。
        x = tf.placeholder(tf.float32, [None, mnist_inference.INPUT_NODE], name='x-input')
        y_ = tf.placeholder(tf.float32, [None, mnist_inference.OUTPUT_NODE], name='y-input')
        global_step, loss, train_op = build_model(x, y_, is_chief)

        # 定义用于保存模型的saver。
        saver = tf.train.Saver()
        # 定义日志输出操作。
        summary_op = tf.summary.merge_all()
        # 定义变量初始化操作。
        init_op = tf.global_variables_initializer()
        # 通过tf.train.Supervisor管理训练深度学习模型时的通用功能。
        sv = tf.train.Supervisor(
            is_chief=is_chief,
            logdir=MODEL_SAVE_PATH,
            init_op=init_op,
            summary_op=summary_op,
            saver=saver,
            global_step=global_step,
            save_model_secs=60,
            save_summaries_secs=60)

        sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
        # 通过tf.train.Supervisor生成会话。
        sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)

        step = 0
        start_time = time.time()

        # 执行迭代过程。
        while not sv.should_stop():
            xs, ys = mnist.train.next_batch(BATCH_SIZE)
            _, loss_value, global_step_value = sess.run([train_op, loss, global_step], feed_dict={x: xs, y_: ys})
            if global_step_value >= TRAINING_STEPS: break

           # 每隔一段时间输出训练信息。
            if step > 0 and step % 100 == 0:
                duration = time.time() - start_time
                sec_per_batch = duration / global_step_value
                format_str = "After %d training steps (%d global steps), loss on training batch is %g.  (%.3f sec/batch)"
                print format_str % (step, global_step_value, loss_value, sec_per_batch)
            step += 1
    sv.stop()

if __name__ == "__main__":
    tf.app.run()

假设上面代码的文件名为dist_tf_mnist_async.py,那么要启动一个拥有一个参数服务器、两个计算服务器的集群,需要先在运行参数服务器的机器上启动以下命令:


运行参数服务器的机器上启动的命令

然后在运行第一个计算服务器的机器上启动以下命令:


运行第一个计算服务器的机器上启动的命令

最后在运行第二个计算服务器的机器上启动以下命令:
运行第二个计算服务器的机器上启动的命令

在启动第一个计算服务器之后,这个计算服务器就会尝试连接其他的服务器(包括计算服务器和参数服务器)。如果其他服务器还没有启动,则被启动的计算服务器会报连接出错的问题。下面展示了一个出错信息:


连接出错信息

不过这不会影响TensorFlow集群的启动。当TensorFlow集群中所有服务器都被启动之后,每一个计算服务器将不再报错。在TensorFlow集群完全启动之后,训练过程将被执行。
下图展示了第一个计算服务器的TensorFlow计算图:
通过TensorBoard可视化的分布式TensorFlow计算图

从上图可以看出,神经网络中定义的参数被放在了参数服务器上(图中灰色节点),而反向传播的计算过程则放在了当前的计算服务器上(图中的深灰色节点)
在计算服务器训练神经网络的过程中,第一个计算服务器会输出类似下面的信息:

第二个计算服务器会输出类似下面的信息:


从输出的信息中可以看到,在第二个计算服务器启动之前,第一个服务器已经运行了很多轮迭代了。在异步模式下,即使有计算服务器没有正常工作,参数更新的过程仍可继续,而且全局的迭代轮数时所有计算服务器迭代轮数的和。

同步模式样例程序

和异步模式类似,下面给出的代码同样是基于MNIST数字识别问题那一章给出的框架。该代码实现了同步模式的分布式神经网络训练过程:

# coding=utf-8
import time
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

import mnist_inference

# 配置神经网络的参数。
BATCH_SIZE = 100 
LEARNING_RATE_BASE = 0.8
LEARNING_RATE_DECAY = 0.99
REGULARAZTION_RATE = 0.0001
TRAINING_STEPS = 10000
MOVING_AVERAGE_DECAY = 0.99 
MODEL_SAVE_PATH = "logs/log_sync"
DATA_PATH = "../../datasets/MNIST_data"


# 和异步模式类似的设置flags。
FLAGS = tf.app.flags.FLAGS

tf.app.flags.DEFINE_string('job_name', 'worker', ' "ps" or "worker" ')
tf.app.flags.DEFINE_string(
    'ps_hosts', ' tf-ps0:2222,tf-ps1:1111',
    'Comma-separated list of hostname:port for the parameter server jobs. e.g. "tf-ps0:2222,tf-ps1:1111" ')
tf.app.flags.DEFINE_string(
    'worker_hosts', ' tf-worker0:2222,tf-worker1:1111',
'Comma-separated list of hostname:port for the worker jobs. e.g. "tf-worker0:2222,tf-worker1:1111" ')
tf.app.flags.DEFINE_integer('task_id', 0, 'Task ID of the worker/replica running the training.')

# 和异步模式类似的定义TensorFlow的计算图。唯一的区别在于使用
# tf.train.SyncReplicasOptimizer函数处理同步更新。
def build_model(x, y_, n_workers, is_chief):
    regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE)
    y = mnist_inference.inference(x, regularizer)
    global_step = tf.Variable(0, trainable=False)

    variable_averages = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step)
    variables_averages_op = variable_averages.apply(tf.trainable_variables())

    cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_, 1))
    cross_entropy_mean = tf.reduce_mean(cross_entropy)
    loss = cross_entropy_mean + tf.add_n(tf.get_collection('losses'))
    learning_rate = tf.train.exponential_decay(
        LEARNING_RATE_BASE, global_step, 60000 / BATCH_SIZE, LEARNING_RATE_DECAY)
   
    # 通过tf.train.SyncReplicasOptimizer函数实现同步更新。
    opt = tf.train.SyncReplicasOptimizer(
        tf.train.GradientDescentOptimizer(learning_rate),
        replicas_to_aggregate=n_workers,
        total_num_replicas=n_workers)

    train_op = opt.minimize(loss, global_step=global_step)     
    if is_chief:
        variable_averages = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step)
        variables_averages_op = variable_averages.apply(tf.trainable_variables())
        with tf.control_dependencies([variables_averages_op, train_op]):
            train_op = tf.no_op()

    return global_step, loss, train_op, opt

def main(argv=None): 
    # 和异步模式类似的创建TensorFlow集群。
    ps_hosts = FLAGS.ps_hosts.split(',')
    worker_hosts = FLAGS.worker_hosts.split(',')
    print ('PS hosts are: %s' % ps_hosts)
    print ('Worker hosts are: %s' % worker_hosts)
    n_workers = len(worker_hosts)

    cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
    server = tf.train.Server(
        cluster, job_name = FLAGS.job_name, task_index=FLAGS.task_id)

    if FLAGS.job_name == 'ps':
        with tf.device("/cpu:0"):
            server.join()

    is_chief = (FLAGS.task_id == 0)
    mnist = input_data.read_data_sets(DATA_PATH, one_hot=True)
   
    with tf.device(tf.train.replica_device_setter(
            worker_device="/job:worker/task:%d" % FLAGS.task_id, cluster=cluster)):
        x = tf.placeholder(tf.float32, [None, mnist_inference.INPUT_NODE], name='x-input')
        y_ = tf.placeholder(tf.float32, [None, mnist_inference.OUTPUT_NODE], name='y-input')
        global_step, loss, train_op, opt = build_model(x, y_, n_workers, is_chief)
        # 和异步模式类似的声明一些辅助函数。
        saver = tf.train.Saver()
        summary_op = tf.summary.merge_all()
        init_op = tf.global_variables_initializer()

        # 在同步模式下,主计算服务器需要协调不同计算服务器计算得到的参数梯度并最终更新参数。
        # 这需要主计算服务器完成一些额外的初始化工作。
        if is_chief:
            # 获取协调不同计算服务器的队列。在更新参数之前,主计算服务器需要先启动这些队列。
            chief_queue_runner = opt.get_chief_queue_runner()
            # 初始化同步更新队列的操作。
            init_tokens_op = opt.get_init_tokens_op(0)
     
        # 和异步模式类似的声明tf.train.Supervisor。
        sv = tf.train.Supervisor(is_chief=is_chief,
                                logdir=MODEL_SAVE_PATH,
                                init_op=init_op,
                                summary_op=summary_op,
                                saver = saver,
                                global_step=global_step,
                                save_model_secs=60,
                                save_summaries_secs=60)
        sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=False)
        sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)        

        # 在主计算服务器上启动协调同步更新的队列并执行初始化操作。
        if is_chief:
            sv.start_queue_runners(sess, [chief_queue_runner])
            sess.run(init_tokens_op)
     
        # 和异步模式类似的运行迭代的训练过程。
        step = 0
        start_time = time.time()
        while not sv.should_stop():
            xs, ys = mnist.train.next_batch(BATCH_SIZE)
            _, loss_value, global_step_value = sess.run([train_op, loss, global_step], feed_dict={x: xs, y_: ys})
            if global_step_value >= TRAINING_STEPS: break

            if step > 0 and step % 100 == 0:
                duration = time.time() - start_time
                sec_per_batch = duration / (global_step_value * n_workers)
                format_str = "After %d training steps (%d global steps), loss on training batch is %g.  (%.3f sec/batch)"
                print format_str % (step, global_step_value, loss_value, sec_per_batch)
            step += 1
    sv.stop()
       
if __name__ == "__main__":
    tf.app.run()

和异步模式类似,在不同机器上运行以上代码就可以启动TensorFlow集群。但和异步模式不同的是,当第一台计算服务器初始化完毕之后,它并不能直接更新参数。这是因为在程序中要求每一次参数更新都需要来自两个计算服务器的梯度。在第一个计算服务器上,可以看到与下面类似的输出:


第一台计算服务器的输出

第二个计算服务器的输出如下:


第二胎计算服务器的输出

在第一个计算服务器的第一行输出中可以看到,前100轮迭代平均速度为0.176 (sec/batch),要远远慢于最后的平均速度0.042(sec/batch)。这是因为在第一轮迭代开始之前,第一个计算服务器需要等待第二个计算服务器执行初始化的过程,于是导致前100轮迭代的平均速度是最慢的。这也反映了同步更新的一个问题。当一个计算服务器被卡住时,其他所有的计算服务器都需要等待这个最慢的计算服务器。
为了解决这个问题,可以调整tf.train.SyncReplicasOptimizer函数中的replicas_to_aggreate参数。当replicas_to_aggreate小于计算服务器总数时,每一轮迭代就不需要手机所有的梯度,从而避免被最慢的计算服务器卡住。TensorFlow也支持通过调整同步队列初始化操作tf.train.SyncReplicasOptimizer.get_init_tokens_op中的参数来控制对不同计算服务器之间的同步要求。当提供给初始化函数get_init_tokens_op的参数大于0时,TensorFlow支持多次使用由同一个计算服务器得到的梯度,于是也可以缓解计算服务器性能瓶颈的问题。
4.3使用Caicloud运行分布式TensorFlow

从上一小节中给出的样例程序可以看出,每次运行分布式TensorFlow都需要登录不同的机器来启动集群。这使用起来非常不方便。当需要使用100台机器运行分布式TensorFlow时,需要手动登录到每一台机器并启动TensorFlow服务,这个过程十分繁琐。而且,当某个服务器上的程序死掉之后,TensorFlow并不能自动重启,这给监控工作带来了巨大的难题。如果类比TensorFlow与Hadoop,可以发现TensorFlow只实现了相当于Hadoop中MapReduce的计算框架,而没有提供类似Yarn的集群管理工具以及HDFS的存储系统。为了降低分布式TensorFlow的使用门槛,才云科技基于Kubernetes容器云平台提供了一个分布式TensorFlow平台TensorFlow as Service(Taas)。本节中将大致介绍如何使用Caicloud提供的Taas平台运行分布式TensorFlow。
从上一小节中给出的代码可以看出,编写分布式TensorFlow程序需要制定很多与模型训练无关的代码来完成TensorFlow集群的设置工作。为了降低分布式TensorFlow的学习成本。Caicloud的TensorFlow as a Service(Taas)平台首先对TensorFlow集群进行了更高层的封装,屏蔽了其中与模型训练无关的底层细节。其次,Taas平台结合了谷歌开源的容器云平台管理工具Kubernetes来实现对分布式TensorFlow任务的管理和监控,并支持通过UI设置分布式TensorFlow任务的节点个数、是否使用GPU等信息。
Caicloud的Taas平台提供了一个抽象基类CaicloudDistTensorflowBase,该类封装了分布式TensorFlow集群的配置与启动、模型参数共享与更新逻辑处理、计算节点之间的协同交互以及训练得到的模型和日志的保存等与模型训练过程无关的操作。用户只需要继承该基类,并实现与模型训练相关的函数即可。其代码结构如下:



用户继承的类需要选择性地实现CaicloudDistTensorflowBase基类中的4个函数,它们分别是build_model/get_init_fn/train和after_train。build_model给出了定义计算图的接口。在这个函数中,用户需要处理输入数据、定义深度学习模型以及定义训练模型的过程。get_init_fn函数中可以定义在会话(tf.Session)生成之后需要额外完成的初始化工作,当没有特殊的初始化操作时,用户可以不用定义这个函数。这个函数可以完成模型预加载的过程。train函数中定义的是每一轮训练中需要运行的操作。TaaS会自动完成迭代的过程,但用户需要定义每一轮迭代中需要执行的操作。最后在训练结束之后,用户可以通过定义after_train来评测以及保存最后得到的模型。下面将通过TaaS平台实现分布式TensorFlow训练过程来解决MNIST问题。
代码就不写,直接粘贴了:







将以上代码提交到Caicloud的TaaS平台之后,可以看到类似下图所示的监控页面。在监控页面中,TaaS提供了对资源利用率、训练进度、训练模式、程序日志以及TensorBoard等多种信息的综合展示,用户可以更好地了解训练的进度和状态。因为篇幅所限,对TaaS感兴趣的读者可以在Caicloud的官方网站caicloud.io上找到更多信息和教程。


Caicloud提供的TaaS分布式TensorFlow任务详情页面
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容