https://zhuanlan.zhihu.com/p/27238630
https://www.jianshu.com/p/686b6cecc5ea
https://zhuanlan.zhihu.com/p/98084488
一、tf中的Queue,QueueRunner与Coordinate
https://zhuanlan.zhihu.com/p/31361295
https://www.jianshu.com/p/d063804fb272
其实概念只有三个:
- Queue是TF队列和缓存机制的实现
- QueueRunner是TF中对操作Queue的线程的封装
- Coordinator是TF中用来协调线程运行的工具
一句话概括流程就是:
- Queue->(构建图阶段)创建队列;
- QueueRunner->(构建图阶段)创建线程进行入队操作;
- tf.train.start_queue_runners()->(执行图阶段)填充队列;
- tf.train.Coordinator() 在线程出错时关闭之。
1.1 Queue
根据实现的方式不同,分成具体的几种类型,例如:
- tf.FIFOQueue 按入列顺序出列的队列
- tf.RandomShuffleQueue 随机顺序出列的队列
队列机制的TensorFlow中实现多线程数据输入的基础与核心。
import tensorflow as tf
#创建一个先进先出队列,指定队列最多可以保存两个元素,并指定类型为整数
q = tf.FIFOQueue(2, "int32")
#使用enqueue_many函数来初始化队列中的元素。和变量初始化类似,在使用的队列之前
#需要明确的调用这个初始化过程
init =q.enqueue_many(([0,10],))
#使用Dequeue函数将队列中个的第一个元素出队列。这个元素的值被储存在变量x中
x = q.dequeue()
#将得到的值加1
y = x+1
#将加1后的值在重新加入队列
q_inc = q.enqueue([y])
with tf.Session() as sess:
#运行初始化队列的操作
init.run()
for _ in range(5):
#运行q_inc将执行数据出队列,出队的元素+1、重新加入队列的整个过程
v, - = sess.run([x, q_inc])
#打印出队列元素的取值
print(v)
1.2 QueueRunner
Tensorflow的计算主要在使用CPU/GPU和内存,而数据读取涉及磁盘操作,速度远低于前者操作。因此通常会使用多个线程读取数据,然后使用一个线程消费数据。QueueRunner就是来管理这些读写队列的线程的。
QueueRunner需要与Queue一起使用(这名字已经注定了它和Queue脱不开干系),但并不一定必须使用Coordinator。
import tensorflow as tf
#声明一个先进先出的队列(FIFQUEUE),队列最多100个元素,类型为实数
queue = tf.FIFOQueue(100, "float")
#定义队列的入队操作
enqueue_op = queue.enqueue([tf.random_normal([1]) ])
#使用tf.train.QueueRunner来创建多个线程运行队列的入队操作
#tf.train.QueueRunner的第一个参数给出了被操作的队列。[enqueue_op]*5
#表示需要5个线程,每个线程中运行的是enqueue_op操作
qr = tf.train.QueueRunner(queue, [enqueue_op]*5)
#将定义的QueueRunner加入TensorFlow计算图上指定的集合
#tf.train.add_queue_runner函数没有指定的集合
#则加入默认集合tf.GraphKeys.QUEUE_RUNNERS。
#下面的函数就是刚刚定义的qr加入默认的tf.GraphKey.QUEUE_RUNNERS集合
tf.train.add_queue_runner(qr)
#定义出队操作
out_tensor = queue.dequeue()
with tf.Session() as sess:
#使用tf.train.Coordinator来协同启动的线程
coord = tf.train.Coordinator()
#使用tf.train.QueueRunner时,需要明确调用tf.train.start_queue_runners来启动所有线程。
#否则因为没有线程运行入队操作,当调用出队操作时程序会一直的等待入队操作被运行。
#tf.train.start_queue_runners函数会默认启动
#tf.GraphKeys.QUEUE_RUNNERS集合中所有的QueueRunner。
#因为这个函数只是支持启动指定集合的QueueRunner,
#所以一般来说tf.train.add_queue_runner函数和tf.train.start_queue_runners函数会指定同一个集合
thread= tf.train.start_queue_runners(sess = sess, coord = coord)
#获取队列的取值
for _ in range(3):print(sess.run(out_tensor)[0])
# 使用tf.train.Coordinator来停止所有的线程
coord.request_stop()
coord.join(threads)
"以上将启动5个线程来执行队列入队的操作,其中每一个线程都是将随机数写入队列。于是在每次运行出队操作时,可以得到一个随机数"
使用Queue与QueueRunner有三种方式:
-
上面使用了自定义创建
tf.train.QueueRunner
配合tf.train.start_queue_runners
启动。- 自定义的QueueRunner一定要手动调用
tf.train.add_queue_runner
将其加入到tf.GraphKeys.QUEUE_RUNNERS
中,这样之后tf.train.start_queue_runners
才能去启动collection中的QueueRunner。
- 自定义的QueueRunner一定要手动调用
-
或者,可以自定义创建
tf.train.QueueRunner
,session中手动调用QueueRunner.create_threads()
方法创建线程,运行入队的op。create_threads():Create threads to run the enqueue ops for the given session.
-
使用tf封装好的
tf.train.string_input_producer()
等方法,该方法自动创建相应的Queue,并会自动调用QueueRunner
进行封装,并添加进collection。之后调用tf.train.start_queue_runners
启动即可。Returns:
A queue with the output strings. AQueueRunner
for the Queue
is added to the currentGraph
'sQUEUE_RUNNER
collection.
二、输入文件队列
tensorflow使用文件名队列+内存队列双队列的形式读入文件,可以很好地管理epoch。
https://zhuanlan.zhihu.com/p/27238630
对于文件名队列,我们使用tf.train.string_input_producer函数。这个函数需要传入一个文件名list,系统会自动将它转为一个文件名队列。
def string_input_producer(string_tensor,
num_epochs=None,
shuffle=True,
seed=None,
capacity=32,
shared_name=None,
name=None,
cancel_op=None):
"""Output strings (e.g. filenames) to a queue for an input pipeline.
Note: if `num_epochs` is not `None`, this function creates local counter
`epochs`. Use `local_variables_initializer()` to initialize local variables.
Returns:
A queue with the output strings. A `QueueRunner` for the Queue
is added to the current `Graph`'s `QUEUE_RUNNER` collection.
其中tf.train.string_input_producer
重要的参数:
- num_epochs:代表了原始数据集的输入在文件名(当然不一定得是文件名)队列中的重复次数,若为None则默认无限循环。若设置5则表示数据集只会重复输入5次,若继续想取出数据则报错
OutOfRange
error。若不为None,session中必须先调用local_variables_initializer()
初始化局部变量,即这个epoch计数器,局部变量代表此变量不会被Saver持久化,只是本次运行临时的变量。 - shuffle:shuffle是指在一个epoch内文件的顺序是否被打乱。
- capacity:Queue的容量,默认32。
在tensorflow中,内存队列不需要我们自己建立,我们只需要使用reader对象从文件名队列中读取数据就可以了,具体实现可以参考下面的实战代码。
2.1 输入文件队列代码实战
假设所有的输入数据都已经整理成了TFRecord格式。
虽然一个TFRecord文件中可以储存多个训练样例,但是当训练数据数量较大时,可以将数据分成多个TFRecord文件来提高处理效率
tf.train.match_filenames_once
函数获取一个正则表达式的所有文件,本质这个函数就是返回一个符合条件的local variable本地变量。因此同样需要local_variables_initializer()
初始化局部变量。
tf.train.string_input_producer
函数会使用初始化时提供的文件列表创建一个输入队列
- 设置shuffle参数,支持随机打乱文件列表出队的顺序,shuffle为True,文件在加入队列之前会被打乱顺序,所以出队的顺序也是随机的。(随机打乱文件顺序以及即入输入队列的过程会跑在单独的线程)
- 输入队列会将队列中的文件均匀地分给不同的线程,不会出现处理重复的现象
- 当一个输入队列中的所有文件都被处理完后,它会将初始化时提供的文件列表中的文件全部重新加入队列
- 设置num_epochs参数来限制记载初始文件列表的最大轮数。所有都使用后,继续读取新文件则会,ERORR:OutOfRange的错误
生成TFRecords代码:
import tensorflow as tf
#创建TFRecord文件的帮助函数
def _int64_feature(value):
return tf.train.Feature(int64_list = tf.train.Int64List(value = [value]))
#模拟海量数据情况下将写入不同的文件。num_shards定义写入多少文件
#instances_per_shard定义每个文件中有多少个数据
num_shards = 2
instances_per_shard = 2
for i in range(num_shards):
#以0000n-of-0000m的后缀区分,其中m表示多少个文件,n表示编号
filename = ('/path/to/data.tfrecords-%.5d-of-%.5d' % (i, num_shards))
writer = tf.python_io.TFRecordWriter(filename)
#将数据分装成Example结构并写入TFRecord文件
for j in range (instances_per_shard):
writer = =tf.python_io.TFRecordWriter(filename)
#将数据封装成Example结构并写入TFRecord文件
for j in range(instance_per_shard):
#Example结构仅包含当前样例属于第几个文件以及是第几个样本
example = tf.train.Example(features = tf.train.Feactures(feature={
'i ': _int64_feature(i),
'j ': _int64_feature(j)
}))
writer.write(example.SerializeToString())
writer.close()
#指定目录下生成两个文件00000和00001
使用文件名队列,来读取内容:
import tensorflow as tf
#使用tf.train.match_input_producer函数创建输入队列,输入队列中的文件列表为
#tf.train.match_filename_once函数获取的文件列表。
#这里的shuffle设置为False来避免打乱文件顺序,但在实际中一般设置为TRUE
filename_queue = tf.train.string_input_producer(files, shuffle = False)
#
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
serialized_example,
features ={
' i ':tf.FixedLenFeature([] , tf.int64),
' j ':tf.FixedLenFeature([] , tf.int64),
}
)
with tf.Session() as sess:
#虽然在本段程序中并没有声明任何向量,但是使用tf.train.match_filenames_once函数时
#需要初始化一些变量。这里string_input_producer没有指定epoch_num因此不需要局部变量。
tf.local_variable_initializer().run()
print(sess.run(files))
#声明tf.train.Coordinator类协同不同程序,并启动线程
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
#多次执行获取数据的操作
for i in range(6):
print(sess.run([features['i'], features['j']]))
coord.request_stop()
coord.join(threads)
#在不打乱文件列表下,会依次读出样例数据中的每一个样例