如何在TensorFlow中高效使用数据集
处理并使用数据集是深度学习任务非常重要的组成部分。在本文中,作者 Francesco Zuppichini 将教你使用 TensorFlow 的内建管道向模型传递数据的方法,从此远离「feed-dict」。本文内容已更新至最新的 TensorFlow 1.5 版本。
经常使用神经网络框架的人都会知道,feed-dict 是向 TensorFlow 传递信息最慢的方式,应该尽量避免使用。向模型提供数据的正确方式是使用输入管道,这样才能保证 GPU 在工作时永远无需等待新的数据。
幸运的是,TensorFlow 拥有一个名为 Dataset 的内建 API,它可以让我们的工作更加简单。在本教程中,我们将介绍搭建内建管道,让数据高效传递给模型的方法。
本文将解释 Dataset 的基本原理,包含大多数常用案例。
概述
使用 Dataset 需要遵循三个步骤:
载入数据:为数据创建一个数据集实例。
创建一个迭代器:通过使用创建的数据集构建一个迭代器来对数据集进行迭代。
使用数据:通过使用创建的迭代器,我们可以找到可传输给模型的数据集元素。
载入数据
我们首先需要一些可以放入数据集的数据。
从 Numpy 导入
这是一种常见情况:我们拥有一个 numpy 数组,想把它传递给 TensorFlow。
# create a random vector of shape (100,2)
x = np.random.sample((100,2))
# make a dataset from a numpy array
dataset = tf.data.Dataset.from_tensor_slices(x)
我们当然也可以传递多个 numpy 数组,一个典型的例子是:当我们已有被分配多个特征和标签的数据时……
features, labels = (np.random.sample((100,2)), np.random.sample((100,1)))
dataset = tf.data.Dataset.from_tensor_slices((features,labels))
从张量导入
当然,我们也可以从张量中初始化自己的数据集。
# using a tensor
dataset = tf.data.Dataset.from_tensor_slices(tf.random_uniform([100, 2]))
从占位符导入
当我们希望动态地修改 Dataset 中的数据时,这就会很有用,稍后会有详述。
x = tf.placeholder(tf.float32, shape=[None,2])
dataset = tf.data.Dataset.from_tensor_slices(x)
从生成器导入
我们还可以从生成器中初始化 Dataset,这种方式在拥有不同长度的元素的数组时有意义(例如一个序列)。
sequence = np.array([[1],[2,3],[3,4]])
def generator():
for el in sequence:
yield el
dataset = tf.data.Dataset().from_generator(generator,
output_types=tf.float32,
output_shapes=[tf.float32])
在这种情况下,你还需要告诉 Dataset 数据的类型和形状以创建正确的张量。
创建迭代器
我们已经学会创建数据集了,但如何从中获取数据呢?我们必须使用迭代器(Iterator),它会帮助我们遍历数据集中的内容并找到真值。有四种类型的迭代器。
One Shot 迭代器
这是最简单的迭代器,使用第一个示例:
x = np.random.sample((100,2))
# make a dataset from a numpy array
dataset = tf.data.Dataset.from_tensor_slices(x)
# create the iterator
iter = dataset.make_one_shot_iterator()
随后你需要调用 get_next() 来获取包含这些数据的张量
...
# create the iterator
iter = dataset.make_one_shot_iterator()
el = iter.get_next()
我们可以运行 el 来查看它们的值。
with tf.Session() as sess:
print(sess.run(el)) # output: [ 0.42116176 0.40666069]
可初始化的迭代器
如果我们想要创建一个动态的数据集,在其中可以实时更改数据源,我们可以用占位符创建一个数据集。随后我们可以使用通常的 feed-dict 机制来初始化占位符。这一过程可用「可初始化迭代器(initializable iterator)」来完成。使用上一节中的第三个例子:
# using a placeholder
x = tf.placeholder(tf.float32, shape=[None,2])
dataset = tf.data.Dataset.from_tensor_slices(x)
data = np.random.sample((100,2))
iter = dataset.make_initializable_iterator() # create the iterator
el = iter.get_next()
with tf.Session() as sess:
# feed the placeholder with data
sess.run(iter.initializer, feed_dict={ x: data })
print(sess.run(el)) # output [ 0.52374458 0.71968478]
这次我们调用 make_initializable_iterator。然后,我们在 sess 中运行 initializer 操作,以传递数据,这种情况下数据是随机的 numpy 数组。
假设现在我们有了训练数据集和测试数据集,那么常见的代码如下:
train_data = (np.random.sample((100,2)), np.random.sample((100,1)))
test_data = (np.array([[1,2]]), np.array([[0]]))
然后,我们训练该模型,并在测试数据集上对其进行测试,测试可以通过训练后再次初始化迭代器来完成。
# initializable iterator to switch between dataset
EPOCHS = 10
x, y = tf.placeholder(tf.float32, shape=[None,2]), tf.placeholder(tf.float32, shape=[None,1])
dataset = tf.data.Dataset.from_tensor_slices((x, y))
train_data = (np.random.sample((100,2)), np.random.sample((100,1)))
test_data = (np.array([[1,2]]), np.array([[0]]))
iter = dataset.make_initializable_iterator()
features, labels = iter.get_next()
with tf.Session() as sess:
# initialise iterator with train data
sess.run(iter.initializer, feed_dict={ x: train_data[0], y: train_data[1]})
for _ in range(EPOCHS):
sess.run([features, labels])
# switch to test data
sess.run(iter.initializer, feed_dict={ x: test_data[0], y: test_data[1]})
print(sess.run([features, labels]))
可重新初始化的迭代器
这个概念和之前的类似,即在数据之间动态地转换。但并不是将新数据馈送到相同的数据集,而是在数据集之间转换。如前,我们需要一个训练集和一个测试集。
# making fake data using numpy
train_data = (np.random.sample((100,2)), np.random.sample((100,1)))
test_data = (np.random.sample((10,2)), np.random.sample((10,1)))
我们可以创建两个数据集:
# create two datasets, one for training and one for test
train_dataset = tf.data.Dataset.from_tensor_slices(train_data)
test_dataset = tf.data.Dataset.from_tensor_slices(test_data)
接下来是要展示的技巧,即创建一个通用的迭代器:
# create a iterator of the correct shape and type
iter = tf.data.Iterator.from_structure(train_dataset.output_types,
train_dataset.output_shapes)
以及两个初始化运算:
# create the initialisation operations
train_init_op = iter.make_initializer(train_dataset)
test_init_op = iter.make_initializer(test_dataset)
和之前一样,我们得到了下一个元素:
features, labels = iter.get_next()
现在,我们可以直接使用会话运行这两个初始化运算。总结起来我们得到:
# Reinitializable iterator to switch between Datasets
EPOCHS = 10
# making fake data using numpy
train_data = (np.random.sample((100,2)), np.random.sample((100,1)))
test_data = (np.random.sample((10,2)), np.random.sample((10,1)))
# create two datasets, one for training and one for test
train_dataset = tf.data.Dataset.from_tensor_slices(train_data)
test_dataset = tf.data.Dataset.from_tensor_slices(test_data)
# create a iterator of the correct shape and type
iter = tf.data.Iterator.from_structure(train_dataset.output_types,
train_dataset.output_shapes)
features, labels = iter.get_next()
# create the initialisation operations
train_init_op = iter.make_initializer(train_dataset)
test_init_op = iter.make_initializer(test_dataset)
with tf.Session() as sess:
sess.run(train_init_op) # switch to train dataset
for _ in range(EPOCHS):
sess.run([features, labels])
sess.run(test_init_op) # switch to val dataset
print(sess.run([features, labels]))
可馈送的迭代器
老实说,我并不认为这个有什么用。基本上,它是用迭代器之间的转换取代了数据集之间的转换,从而得到如一个来自 make_one_shot_iterator() 的迭代器,以及一个来自 make_initializable_iterator() 的迭代器。
使用数据
在前述例子中,我们利用会话输出 Dataset 中下一个元素的值。
...
next_el = iter.get_next()
...
print(sess.run(next_el)) # will output the current element
为了将数据传递给模型,我们只需要传递从 get_next() 生成的张量。在下面的代码中,我们有一个包含了两个 numpy 数组的 Dataset,这里用了和第一节一样的例子。注意,我们需要将.random.sample 封装到另一个 numpy 数组,以增加一个维度,从而将数据进行分批。
# using two numpy arrays
features, labels = (np.array([np.random.sample((100,2))]),
np.array([np.random.sample((100,1))]))
dataset = tf.data.Dataset.from_tensor_slices((features,labels)).repeat().batch(BATCH_SIZE)
然后,和往常一样,我们创建一个迭代器:
iter = dataset.make_one_shot_iterator()
x, y = iter.get_next()
创建一个模型,即一个简单的神经网络:
# make a simple model
net = tf.layers.dense(x, 8) # pass the first value from iter.get_next() as input
net = tf.layers.dense(net, 8)
prediction = tf.layers.dense(net, 1)
loss = tf.losses.mean_squared_error(prediction, y) # pass the second value from iter.get_net() as label
train_op = tf.train.AdamOptimizer().minimize(loss)
我们直接使用来自 iter.get_next() 的张量作为第一层的输入和损失函数的标签。总结起来我们得到:
# make a simple model
EPOCHS = 10
BATCH_SIZE = 16
# using two numpy arrays
features, labels = (np.array([np.random.sample((100,2))]),
np.array([np.random.sample((100,1))]))
dataset = tf.data.Dataset.from_tensor_slices((features,labels)).repeat().batch(BATCH_SIZE)
iter = dataset.make_one_shot_iterator()
x, y = iter.get_next()
# make a simple model
net = tf.layers.dense(x, 8, activation=tf.tanh) # pass the first value from iter.get_next() as input
net = tf.layers.dense(net, 8, activation=tf.tanh)
prediction = tf.layers.dense(net, 1, activation=tf.tanh)
loss = tf.losses.mean_squared_error(prediction, y) # pass the second value from iter.get_net() as label
train_op = tf.train.AdamOptimizer().minimize(loss)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for i in range(EPOCHS):
_, loss_value = sess.run([train_op, loss])
print("Iter: {}, Loss: {:.4f}".format(i, loss_value))
输出:
Iter: 0, Loss: 0.1328
Iter: 1, Loss: 0.1312
Iter: 2, Loss: 0.1296
Iter: 3, Loss: 0.1281
Iter: 4, Loss: 0.1267
Iter: 5, Loss: 0.1254
Iter: 6, Loss: 0.1242
Iter: 7, Loss: 0.1231
Iter: 8, Loss: 0.1220
Iter: 9, Loss: 0.1210
一些有用的技巧
数据分批
通常数据分批是一件令人痛苦的事情,但通过 Dataset API,我们可以利用 batch(BATCH_SIZE) 方法自动地将数据集按设定的批量大小进行分批。默认批量大小为 1。在下面的示例代码中,我们使用的批量大小为 4。
# BATCHING
BATCH_SIZE = 4
x = np.random.sample((100,2))
# make a dataset from a numpy array
dataset = tf.data.Dataset.from_tensor_slices(x).batch(BATCH_SIZE)
iter = dataset.make_one_shot_iterator()
el = iter.get_next()
with tf.Session() as sess:
print(sess.run(el))
输出:
[[ 0.65686128 0.99373963]
[ 0.69690451 0.32446826]
[ 0.57148422 0.68688242]
[ 0.20335116 0.82473219]]
repeat
使用.repeat(),我们可以指定数据集被迭代的次数。如果不传输任何参数,循环将永久进行。通常来说,永久运行循环和在标准循环中直接控制 epoch 的数量可以得到不错的结果。
shuffle
我们可以利用 shuffle() 进行数据集 shuffle,默认是在每一个 epoch 中将数据集 shuffle 一次。记住:数据集 shuffle 是避免过拟合的重要方法。
我们还可以设置参数 buffer_size,下一个元素将从该固定大小的缓存中均匀地选取。例如:
# BATCHING
BATCH_SIZE = 4
x = np.array([[1],[2],[3],[4]])
# make a dataset from a numpy array
dataset = tf.data.Dataset.from_tensor_slices(x)
dataset = dataset.shuffle(buffer_size=100)
dataset = dataset.batch(BATCH_SIZE)
iter = dataset.make_one_shot_iterator()
el = iter.get_next()
with tf.Session() as sess:
print(sess.run(el))
第一次运行的输出:
[[4]
[2]
[3]
[1]]
第二次运行的输出:
[[3]
[1]
[2]
[4]]
这样,数据集 shuffle 就完成了。你还可以设置 seed 参数。
MAP
你可以使用 map 方法对数据集中的所有成员应用定制化函数。下列示例中,我们把每个元素乘 2:
# MAP
x = np.array([[1],[2],[3],[4]])
# make a dataset from a numpy array
dataset = tf.data.Dataset.from_tensor_slices(x)
dataset = dataset.map(lambda x: x*2)
iter = dataset.make_one_shot_iterator()
el = iter.get_next()
with tf.Session() as sess:
# this will run forever
for _ in range(len(x)):
print(sess.run(el))
输出:
[2]
[4]
[6]
[8]
其他资源
TensorFlow 数据集教程:https://www.tensorflow.org/programmers_guide/datasets
数据集文档:https://www.tensorflow.org/api_docs/python/tf/data/Dataset
结论
该数据集 API 使我们快速、稳健地创建优化输入流程来训练、评估和测试我们的模型。本文中,我们了解了很多可以常见操作。
原文链接:https://towardsdatascience.com/how-to-use-dataset-in-tensorflow-c758ef9e4428