利用Tensorflow的队列多线程读取数据

在tensorflow中,有三种方式输入数据

1. 利用feed_dict送入numpy数组

2. 利用队列从文件中直接读取数据

3. 预加载数据

其中第一种方式很常用,在tensorflow的MNIST训练源码中可以看到,通过feed_dict={},可以将任意数据送入tensor中。

第二种方式相比于第一种,速度更快,可以利用多线程的优势把数据送入队列,再以batch的方式出队,并且在这个过程中可以很方便地对图像进行随机裁剪、翻转、改变对比度等预处理,同时可以选择是否对数据随机打乱,可以说是非常方便。该部分的源码在tensorflow官方的CIFAR-10训练源码中可以看到,但是对于刚学习tensorflow的人来说,比较难以理解,本篇博客就当成我调试完成后写的一篇总结,以防自己再忘记具体细节。

-------------------------

## 读取CIFAR-10数据集##

按照第一种方式的话,CIFAR-10的读取只需要写一段非常简单的代码即可将测试集与训练集中的图像分别读取:

```python

path = 'E:\Dataset\cifar-10\cifar-10-batches-py'

# extract train examples

num_train_examples = 50000

x_train = np.empty((num_train_examples, 32, 32, 3), dtype='uint8')

y_train = np.empty((num_train_examples), dtype='uint8')

for i in range(1, 6):

fpath = os.path.join(path, 'data_batch_' + str(i))

(x_train[(i - 1) * 10000: i * 10000, :, :, :], y_train[(i - 1) * 10000: i * 10000]) = load_and_decode(fpath)

# extract test examples

fpath = os.path.join(path, 'test_batch')

x_test, y_test = load_and_decode(fpath)

return x_train, y_train, x_test, np.array(y_test)

```

其中load_and_decode函数只需要按照CIFAR-10官网给出的方式decode就行,最终返回的x_train是一个[50000, 32, 32, 3]的ndarray,但对于ndarray来说,进行预处理就要麻烦很多,为了取mini-SGD的batch,还自己写了一个类,通过调用train_set.next_batch()函数来取,总而言之就是什么都要自己动手,效率确实不高

但对于第二种方式,读取起来就要麻烦很多,但使用起来,又快又方便

首先,把CIFAR-10的测试集文件读取出来,生成文件名列表

```python

path = 'E:\Dataset\cifar-10\cifar-10-batches-py'

filenames = [os.path.join(path, 'data_batch_%d' % i) for i in range(1, 6)]

```

有了列表以后,利用tf.train.string_input_producer函数生成一个读取队列

```python

filename_queue = tf.train.string_input_producer(filenames)

```

接下来,我们调用read_cifar10函数,得到一幅一幅的图像,该函数的代码如下:

```python

def read_cifar10(filename_queue):

label_bytes = 1

IMAGE_SIZE = 32

CHANNELS = 3

image_bytes = IMAGE_SIZE*IMAGE_SIZE*3

record_bytes = label_bytes+image_bytes

# define a reader

reader = tf.FixedLengthRecordReader(record_bytes)

key, value = reader.read(filename_queue)

record_bytes = tf.decode_raw(value, tf.uint8)

label = tf.strided_slice(record_bytes, [0], [label_bytes])

depth_major = tf.reshape(tf.strided_slice(record_bytes, [label_bytes],

[label_bytes + image_bytes]),

[CHANNELS, IMAGE_SIZE, IMAGE_SIZE])

image = tf.transpose(depth_major, [1, 2, 0])

return image, label

```

第9行,定义一个reader,来读取固定长度的数据,这个固定长度是由CIFAR-10数据集图片的存储格式决定的,1byte的标签加上32 *32 *3长度的图像,3代表RGB三通道,由于图片的是按[channel, height, width]的格式存储的,为了变为常用的[height, width, channel]维度,需要在17行reshape一次图像,最终我们提取出了一副完整的图像与对应的标签

## 对图像进行预处理

我们取出的image与label均为tensor格式,因此预处理将变得非常简单

```python

if not distortion:

IMAGE_SIZE = 32

else:

IMAGE_SIZE = 24

# 随机裁剪为24*24大小

distorted_image = tf.random_crop(tf.cast(image, tf.float32), [IMAGE_SIZE, IMAGE_SIZE, 3])

# 随机水平翻转

distorted_image = tf.image.random_flip_left_right(distorted_image)

# 随机调整亮度

distorted_image = tf.image.random_brightness(distorted_image, max_delta=63)

# 随机调整对比度

distorted_image = tf.image.random_contrast(distorted_image, lower=0.2, upper=1.8)

# 对图像进行白化操作,即像素值转为零均值单位方差

float_image = tf.image.per_image_standardization(distorted_image)

```

distortion是定义的一个输入布尔型变量,默认为True,表示是否对图像进行处理

## 填充队列与随机打乱

调用tf.train.shuffle_batch或tf.train.batch函数,以tf.train.shuffle_batch为例,函数的定义如下:

```python

def shuffle_batch(tensors, batch_size, capacity, min_after_dequeue,

num_threads=1, seed=None, enqueue_many=False, shapes=None,

allow_smaller_final_batch=False, shared_name=None, name=None):

```

tensors表示输入的张量(tensor),batch_size表示要输出的batch的大小,capacity表示队列的容量,即大小,min_after_dequeue表示出队操作后队列中的最小元素数量,这个值是要小于队列的capacity的,通过调整min_after_dequeue与capacity两个变量,可以改变数据被随机打乱的程度,num_threads表示使用的线程数,只要取大于1的数,队列的效率就会高很多。

通常情况下,我们只需要输入以上几个变量即可,在CIFAR-10_input.py中,谷歌给出的代码是这样写的:

```python

if shuffle:

images, label_batch = tf.train.shuffle_batch([image, label], batch_size,                                  min_queue_examples+3*batch_size,

min_queue_examples, num_preprocess_threads)

else:

images, label_batch = tf.train.batch([image, label], batch_size,

num_preprocess_threads,

min_queue_examples + 3 * batch_size)

```

min_queue_examples由以下方式得到:

```python

min_fraction_of_examples_in_queue = 0.4

min_queue_examples = int(NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN

*min_fraction_of_examples_in_queue)

```

当然,这些值均可以自己随意设置,

最终得到的images,labels(label_batch),即为shape=[128, 32, 32, 3]的tensor,其中128为默认batch_size。

## 激活队列与处理异常

得到了images和labels两个tensor后,我们便可以把这两个tensor送入graph中进行运算了

```python

# input tensor

img_batch, label_batch = cifar10_input.tesnsor_shuffle_input(batch_size)

# build graph that computes the logits predictions from the inference model

logits, predicts = train.inference(img_batch, keep_prob)

# calculate loss

loss = train.loss(logits, label_batch)

```

定义sess=tf.Session()后,运行sess.run(),然而你会发现并没有输出,程序直接挂起了,仿佛死掉了一样

原因是这样的,虽然我们在数据流图中加入了队列,但只有调用tf.train.start_queue_runners()函数后,数据才会动起来,被负责输入管道的线程填入队列,否则队列将会挂起。

OK,我们调用函数,让队列运行起来

```python

with tf.Session(config=run_config) as sess:

sess.run(init_op) # intialization

queue_runner = tf.train.start_queue_runners(sess)

for i in range(10):

b1, b2 = sess.run([img_batch, label_batch])

print(b1.shape)

```

在这里为了测试,我们取10次输出,看看输出的batch1的维度是否正确

![console_output](E:\blog\1\微信截图_20171219164138.png)

10个batch的维度均为正确的,但是tensorflow却报了错,错误的文字内容如下:

> 2017-12-19 16:40:56.429687: W C:\tf_jenkins\home\workspace\rel-win\M\windows-gpu\PY\36\tensorflow\core\kernels\queue_base.cc:295]  _ 0 _ input_producer: **Skipping cancelled enqueue attempt with queue not closed**

简单地看一下,大致意思是说我们的队列里还有数据,但是程序结束了,抛出了异常,因此,我们还需要定义一个Coordinator,也就是协调器来处理异常

Coordinator有3个主要方法:

1. tf.train.Coordinator.should_stop() 如果线程应该停止,返回True

2. tf.train.Coordinator.request_stop() 请求停止线程

3. tf.train.Coordinator.join() 等待直到指定线程停止

首先,定义协调器

```python

coord = tf.train.Coordinator()

```

将协调器应用于QueueRunner

```python

queue_runner = tf.train.start_queue_runners(sess, coord=coord)

```

结束数据的训练或测试后,关闭线程

```python

coord.request_stop()

coord.join(queue_runner)

```

最终的sess代码段如下:

```python

coord = tf.train.Coordinator()

with tf.Session(config=run_config) as sess:

sess.run(init_op)

queue_runner = tf.train.start_queue_runners(sess, coord=coord)

for i in range(10):

b1, b2 = sess.run([img_batch, label_batch])

print(b1.shape)

coord.request_stop()

coord.join(queue_runner)

```

得到的输出结果为:

![console_output](E:\blog\1\微信截图_20171219165409.png)

完美解决,利用img_batch与label_batch,把tensor送入graph中,就可以享受tensorflow带来的训练乐趣了

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,013评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,205评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,370评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,168评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,153评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,954评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,271评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,916评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,382评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,877评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,989评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,624评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,209评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,199评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,418评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,401评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,700评论 2 345

推荐阅读更多精彩内容