- 本篇博文就是手写数字识别的一个升级版本,天天手写数字都厌烦了,索性在网上找了个有趣的例程唐诗生成.
- 本博文是学完RNN的一个小练习,读懂全部程序你会对LSTM有更深的理解.
参考:
......
先看结果:
'''
求得能名及八鹦,把盘犹舒热龙鳞。
赞娟从荀绕苔寝,唯有泉声细洞房。
求耕遐老数三秦,岂汝百骢滤自能。
关朝酒别词堪愁,愁向长云寄白头。
注农满映云满嫌,此翩逼时朝方清。
'''
网络流程:
- 收集唐诗数据-->>点击下载
- 分析数据-->>生成词向量
- 生成get_batch函数
- 批量对诗进行训练
- 确保精度 / 保存模型
- 生成唐诗(隐含诗)
特殊说明:
生成词向量会略去很少的生僻字
数据缺失进行填充
批量训练对长短不齐的诗进行补全,以长的为标准
生成诗的过程使用了随机性
random
,如果不适用随机性那么每次生成的诗都一样.源程序对概率大的诗进行了保留大概率,下图简单说明以下:
对于生成五言和七言诗有缺陷,得一直达到要求才可以返回
if len(sentence) == 2 + 2 * type:
sentence += u'\n'
poem += sentence
flag = False
代码:
#这里就不放代码了,那么多代码,我不信你在微信上面看完!!!还是点击后面的阅读全文进行观察吧~~
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import collections
import numpy as np
import tensorflow as tf
import os
#os.environ["CUDA_VISIBLE_DEVICES"] = "0"#设置GPU为gtx1060
'''
author: log16
Data: 2017/5/4
'''
# -------------------------------数据预处理---------------------------#
poetry_file = os.getcwd()+'/poetry.txt'
# 诗集
poetrys = []
with open(poetry_file, "r") as f:
for line in f:
try:
#line = line.decode('UTF-8')
line = line.strip(u'\n')
title, content = line.strip(u' ').split(u':')
content = content.replace(u' ', u'')
if u'_' in content or u'(' in content or u'(' in content or u'《' in content or u'[' in content:
continue
if len(content) < 5 or len(content) > 79:
continue
content = u'[' + content + u']'
poetrys.append(content)
except Exception as e:
pass
# 按诗的字数排序
poetrys = sorted(poetrys, key=lambda line: len(line))
print('唐诗总数: ', len(poetrys))
# 统计每个字出现次数
all_words = []
for poetry in poetrys:
all_words += [word for word in poetry]
counter = collections.Counter(all_words)
count_pairs = sorted(counter.items(), key=lambda x: -x[1])
words, _ = zip(*count_pairs)
# 取前多少个常用字
words = words[:len(words)] + (' ',)
# 每个字映射为一个数字ID
word_num_map = dict(zip(words, range(len(words))))
#word_num_map = sorted(word_num_map.items(),key=lambda x:x[1],reverse=True)
# 把诗转换为向量形式
to_num = lambda word: word_num_map.get(word, len(words))
poetrys_vector = [list(map(to_num, poetry)) for poetry in poetrys]
# [[314, 3199, 367, 1556, 26, 179, 680, 0, 3199, 41, 506, 40, 151, 4, 98, 1],
# [339, 3, 133, 31, 302, 653, 512, 0, 37, 148, 294, 25, 54, 833, 3, 1, 965, 1315, 377, 1700, 562, 21, 37, 0, 2, 1253, 21, 36, 264, 877, 809, 1]
# ....]
# 每次取64首诗进行训练
batch_size = 64
n_chunk = len(poetrys_vector) // batch_size
#利用序列对原始数据进行随机化的抽取batch和enpoch
class DataSet(object):
def __init__(self, data_size):
self._data_size = data_size
self._epochs_completed = 0#epoch次数 = n*batch//data_size
self._index_in_epoch = 0#batch数量 = n*batch
self._data_index = np.arange(data_size)#下标索引
def next_batch(self, batch_size):
start = self._index_in_epoch
if start + batch_size > self._data_size:
np.random.shuffle(self._data_index)#打乱数据索引
self._epochs_completed = self._epochs_completed + 1
self._index_in_epoch = batch_size
full_batch_features, full_batch_labels = self.data_batch(0, batch_size)
return full_batch_features, full_batch_labels
else:
self._index_in_epoch += batch_size
end = self._index_in_epoch
full_batch_features, full_batch_labels = self.data_batch(start, end)
if self._index_in_epoch == self._data_size:
self._index_in_epoch = 0
self._epochs_completed = self._epochs_completed + 1
np.random.shuffle(self._data_index)
return full_batch_features, full_batch_labels
def data_batch(self, start, end):
batches = []
for i in range(start, end):
batches.append(poetrys_vector[self._data_index[i]])
length = max(map(len, batches))#求取batch的最大一个的长度
#------以最长的一个数据为基础,其它的用" "空格去补全
xdata = np.full((end - start, length), word_num_map[' '], np.int32)
for row in range(end - start):
xdata[row, :len(batches[row])] = batches[row]
ydata = np.copy(xdata)
ydata[:, :-1] = xdata[:, 1:]#标签对应着xdata的下一个序列
return xdata, ydata
# ---------------------------------------RNN--------------------------------------#
input_data = tf.placeholder(tf.int32, [batch_size, None])
output_targets = tf.placeholder(tf.int32, [batch_size, None])
# 定义RNN
def neural_network(model='lstm', rnn_size=128, num_layers=2):
if model == 'rnn':
cell_fun = tf.nn.rnn_cell.BasicRNNCell
elif model == 'gru':
cell_fun = tf.nn.rnn_cell.GRUCell
elif model == 'lstm':
cell_fun = tf.nn.rnn_cell.BasicLSTMCell
cell = cell_fun(rnn_size, state_is_tuple=True)#创建一个LSTM单元
cell = tf.nn.rnn_cell.MultiRNNCell([cell] * num_layers, state_is_tuple=True)#LSTM层数
initial_state = cell.zero_state(batch_size, tf.float32)
with tf.variable_scope('rnnlm'):
softmax_w = tf.get_variable("softmax_w", [rnn_size, len(words)])
softmax_b = tf.get_variable("softmax_b", [len(words)])
with tf.device("/gpu:0"):
#这里会在文章单独解释
embedding = tf.get_variable("embedding", [len(words), rnn_size])
inputs = tf.nn.embedding_lookup(embedding, input_data)
outputs, last_state = tf.nn.dynamic_rnn(cell, inputs, initial_state=initial_state, scope='rnnlm')
output = tf.reshape(outputs, [-1, rnn_size])
logits = tf.matmul(output, softmax_w) + softmax_b
probs = tf.nn.softmax(logits)
return logits, last_state, probs, cell, initial_state
def load_model(sess, saver, ckpt_path):
latest_ckpt = tf.train.latest_checkpoint(ckpt_path)#得到最后一次保存的模型
if latest_ckpt:
print('resume from', latest_ckpt)
saver.restore(sess, latest_ckpt)
return int(latest_ckpt[latest_ckpt.rindex('-') + 1:])
else:
print('building model from scratch')
sess.run(tf.global_variables_initializer())
return -1
# 训练
def train_neural_network():
logits, last_state, _, _, _ = neural_network()
targets = tf.reshape(output_targets, [-1])
loss = tf.contrib.legacy_seq2seq.sequence_loss_by_example([logits], [targets], [tf.ones_like(targets, dtype=tf.float32)],
len(words))
cost = tf.reduce_mean(loss)
learning_rate = tf.Variable(0.0, trainable=False)
tvars = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(cost, tvars), 5)#防止梯度爆炸,在其中设置的一个参考
# optimizer = tf.train.GradientDescentOptimizer(learning_rate)
optimizer = tf.train.AdamOptimizer(learning_rate)
#因为这里不是直接优化损失函数,优化的是梯度值,所以得更新梯度
train_op = optimizer.apply_gradients(zip(grads, tvars))#对梯度进行更新
trainds = DataSet(len(poetrys_vector))
x, y = trainds.next_batch(batch_size)
#GPU设置为按需增长,且最大占用90%
config = tf.ConfigProto(allow_soft_placement=True)
gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.9)
config.gpu_options.allow_growth = True
with tf.Session(config=config) as sess:
with tf.device('/gpu:0'):#使用os模块设置GPU
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver(tf.all_variables())
last_epoch = load_model(sess, saver, 'model/')
for epoch in range(last_epoch + 1, 100):
sess.run(tf.assign(learning_rate, 0.002 * (0.97 ** epoch)))#不断更新学习率
# sess.run(tf.assign(learning_rate, 0.01))
all_loss = 0.0
for batche in range(n_chunk):
x, y = trainds.next_batch(batch_size)
train_loss, _, _ = sess.run([cost, last_state, train_op],
feed_dict={input_data: x, output_targets: y})
all_loss = all_loss + train_loss
if batche % 50 == 1:
# print(epoch, batche, 0.01,train_loss)
print(epoch, batche, 0.002 * (0.97 ** epoch), train_loss)
saver.save(sess, 'model/poetry.module', global_step=epoch)
print(epoch, ' Loss: ', all_loss * 1.0 / n_chunk)
train_neural_network()
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import collections
import numpy as np
import tensorflow as tf
'''
This one will produce a poetry
author: log16
Date: 2017/5/4
'''
# -------------------------------数据预处理---------------------------#
poetry_file = 'poetry.txt'
# 诗集
poetrys = []
with open(poetry_file, "r") as f:
for line in f:
try:
#line = line.decode('UTF-8')
line = line.strip(u'\n')
title, content = line.strip(u' ').split(u':')
content = content.replace(u' ', u'')
if u'_' in content or u'(' in content or u'(' in content or u'《' in content or u'[' in content:
continue
if len(content) < 5 or len(content) > 79:
continue
content = u'[' + content + u']'
poetrys.append(content)
except Exception as e:
pass
# 按诗的字数排序
poetrys = sorted(poetrys, key=lambda line: len(line))
print('唐诗总数: ', len(poetrys))
# 统计每个字出现次数
all_words = []
for poetry in poetrys:
all_words += [word for word in poetry]
counter = collections.Counter(all_words)
count_pairs = sorted(counter.items(), key=lambda x: -x[1])
words, _ = zip(*count_pairs)
# 取前多少个常用字
words = words[:len(words)] + (' ',)
# 每个字映射为一个数字ID
word_num_map = dict(zip(words, range(len(words))))
# 把诗转换为向量形式,参考TensorFlow练习1
to_num = lambda word: word_num_map.get(word, len(words))
poetrys_vector = [list(map(to_num, poetry)) for poetry in poetrys]
# [[314, 3199, 367, 1556, 26, 179, 680, 0, 3199, 41, 506, 40, 151, 4, 98, 1],
# [339, 3, 133, 31, 302, 653, 512, 0, 37, 148, 294, 25, 54, 833, 3, 1, 965, 1315, 377, 1700, 562, 21, 37, 0, 2, 1253, 21, 36, 264, 877, 809, 1]
# ....]
# 每次取64首诗进行训练
batch_size = 1
n_chunk = len(poetrys_vector) // batch_size
class DataSet(object):
def __init__(self, data_size):
self._data_size = data_size
self._epochs_completed = 0
self._index_in_epoch = 0
self._data_index = np.arange(data_size)
def next_batch(self, batch_size):
start = self._index_in_epoch
if start + batch_size > self._data_size:
np.random.shuffle(self._data_index)
self._epochs_completed = self._epochs_completed + 1
self._index_in_epoch = batch_size
full_batch_features, full_batch_labels = self.data_batch(0, batch_size)
return full_batch_features, full_batch_labels
else:
self._index_in_epoch += batch_size
end = self._index_in_epoch
full_batch_features, full_batch_labels = self.data_batch(start, end)
if self._index_in_epoch == self._data_size:
self._index_in_epoch = 0
self._epochs_completed = self._epochs_completed + 1
np.random.shuffle(self._data_index)
return full_batch_features, full_batch_labels
def data_batch(self, start, end):
batches = []
for i in range(start, end):
batches.append(poetrys_vector[self._data_index[i]])
length = max(map(len, batches))
xdata = np.full((end - start, length), word_num_map[' '], np.int32)
for row in range(end - start):
xdata[row, :len(batches[row])] = batches[row]
ydata = np.copy(xdata)
ydata[:, :-1] = xdata[:, 1:]
return xdata, ydata
# ---------------------------------------RNN--------------------------------------#
input_data = tf.placeholder(tf.int32, [batch_size, None])
output_targets = tf.placeholder(tf.int32, [batch_size, None])
# 定义RNN
def neural_network(model='lstm', rnn_size=128, num_layers=2):
if model == 'rnn':
cell_fun = tf.nn.rnn_cell.BasicRNNCell
elif model == 'gru':
cell_fun = tf.nn.rnn_cell.GRUCell
elif model == 'lstm':
cell_fun = tf.nn.rnn_cell.BasicLSTMCell
cell = cell_fun(rnn_size, state_is_tuple=True)
cell = tf.nn.rnn_cell.MultiRNNCell([cell] * num_layers, state_is_tuple=True)
initial_state = cell.zero_state(batch_size, tf.float32)
with tf.variable_scope('rnnlm'):
softmax_w = tf.get_variable("softmax_w", [rnn_size, len(words)])
softmax_b = tf.get_variable("softmax_b", [len(words)])
with tf.device("/cpu:0"):
embedding = tf.get_variable("embedding", [len(words), rnn_size])
inputs = tf.nn.embedding_lookup(embedding, input_data)
outputs, last_state = tf.nn.dynamic_rnn(cell, inputs, initial_state=initial_state, scope='rnnlm')
output = tf.reshape(outputs, [-1, rnn_size])
logits = tf.matmul(output, softmax_w) + softmax_b
probs = tf.nn.softmax(logits)
return logits, last_state, probs, cell, initial_state
# -------------------------------生成古诗---------------------------------#
# 使用训练完成的模型
def gen_poetry():
def to_word(weights):
t = np.cumsum(weights)
test = sorted(t)
s = np.sum(weights)
sample = int(np.searchsorted(t, np.random.rand(1) * s))
return words[sample]
_, last_state, probs, cell, initial_state = neural_network()
Session_config = tf.ConfigProto(allow_soft_placement=True)
Session_config.gpu_options.allow_growth = True
with tf.Session(config=Session_config) as sess:
with tf.device('/gpu:0'):
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver(tf.all_variables())
saver.restore(sess, 'model/poetry.module-99')
state_ = sess.run(cell.zero_state(1, tf.float32))
x = np.array([list(map(word_num_map.get, '['))])
[probs_, state_] = sess.run([probs, last_state], feed_dict={input_data: x, initial_state: state_})
#word = to_word(probs_)
word = words[np.argmax(probs_)]
poem = ''
while word != ']':
poem += word
x = np.zeros((1, 1))
x[0, 0] = word_num_map[word]
[probs_, state_] = sess.run([probs, last_state], feed_dict={input_data: x, initial_state: state_})
word = to_word(probs_)
#word = words[np.argmax(probs_)]
return poem
print(gen_poetry())
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import collections
import numpy as np
import tensorflow as tf
'''
This one will produce a poetry with heads.
author: log16
Data: 2017/5/4
'''
# -------------------------------数据预处理---------------------------#
poetry_file = 'poetry.txt'
# 诗集
poetrys = []
with open(poetry_file, "r") as f:
for line in f:
try:
#line = line.decode('UTF-8')
line = line.strip(u'\n')
title, content = line.strip(u' ').split(u':')
content = content.replace(u' ', u'')
if u'_' in content or u'(' in content or u'(' in content or u'《' in content or u'[' in content:
continue
if len(content) < 5 or len(content) > 79:
continue
content = u'[' + content + u']'
poetrys.append(content)
except Exception as e:
pass
# 按诗的字数排序
poetrys = sorted(poetrys, key=lambda line: len(line))
print('唐诗总数: ', len(poetrys))
# 统计每个字出现次数
all_words = []
for poetry in poetrys:
all_words += [word for word in poetry]
counter = collections.Counter(all_words)
count_pairs = sorted(counter.items(), key=lambda x: -x[1])
words, _ = zip(*count_pairs)
# 取前多少个常用字
words = words[:len(words)] + (' ',)
# 每个字映射为一个数字ID
word_num_map = dict(zip(words, range(len(words))))
# 把诗转换为向量形式,参考TensorFlow练习1
to_num = lambda word: word_num_map.get(word, len(words))
poetrys_vector = [list(map(to_num, poetry)) for poetry in poetrys]
# [[314, 3199, 367, 1556, 26, 179, 680, 0, 3199, 41, 506, 40, 151, 4, 98, 1],
# [339, 3, 133, 31, 302, 653, 512, 0, 37, 148, 294, 25, 54, 833, 3, 1, 965, 1315, 377, 1700, 562, 21, 37, 0, 2, 1253, 21, 36, 264, 877, 809, 1]
# ....]
# 每次取64首诗进行训练
batch_size = 1
n_chunk = len(poetrys_vector) // batch_size
class DataSet(object):
def __init__(self, data_size):
self._data_size = data_size
self._epochs_completed = 0
self._index_in_epoch = 0
self._data_index = np.arange(data_size)
def next_batch(self, batch_size):
start = self._index_in_epoch
if start + batch_size > self._data_size:
np.random.shuffle(self._data_index)
self._epochs_completed = self._epochs_completed + 1
self._index_in_epoch = batch_size
full_batch_features, full_batch_labels = self.data_batch(0, batch_size)
return full_batch_features, full_batch_labels
else:
self._index_in_epoch += batch_size
end = self._index_in_epoch
full_batch_features, full_batch_labels = self.data_batch(start, end)
if self._index_in_epoch == self._data_size:
self._index_in_epoch = 0
self._epochs_completed = self._epochs_completed + 1
np.random.shuffle(self._data_index)
return full_batch_features, full_batch_labels
def data_batch(self, start, end):
batches = []
for i in range(start, end):
batches.append(poetrys_vector[self._data_index[i]])
length = max(map(len, batches))
xdata = np.full((end - start, length), word_num_map[' '], np.int32)
for row in range(end - start):
xdata[row, :len(batches[row])] = batches[row]
ydata = np.copy(xdata)
ydata[:, :-1] = xdata[:, 1:]
return xdata, ydata
# ---------------------------------------RNN--------------------------------------#
input_data = tf.placeholder(tf.int32, [batch_size, None])
output_targets = tf.placeholder(tf.int32, [batch_size, None])
# 定义RNN
def neural_network(model='lstm', rnn_size=128, num_layers=2):
if model == 'rnn':
cell_fun = tf.nn.rnn_cell.BasicRNNCell
elif model == 'gru':
cell_fun = tf.nn.rnn_cell.GRUCell
elif model == 'lstm':
cell_fun = tf.nn.rnn_cell.BasicLSTMCell
cell = cell_fun(rnn_size, state_is_tuple=True)
cell = tf.nn.rnn_cell.MultiRNNCell([cell] * num_layers, state_is_tuple=True)
initial_state = cell.zero_state(batch_size, tf.float32)
with tf.variable_scope('rnnlm'):
softmax_w = tf.get_variable("softmax_w", [rnn_size, len(words)])
softmax_b = tf.get_variable("softmax_b", [len(words)])
with tf.device("/cpu:0"):
embedding = tf.get_variable("embedding", [len(words), rnn_size])
inputs = tf.nn.embedding_lookup(embedding, input_data)
outputs, last_state = tf.nn.dynamic_rnn(cell, inputs, initial_state=initial_state, scope='rnnlm')
output = tf.reshape(outputs, [-1, rnn_size])
logits = tf.matmul(output, softmax_w) + softmax_b
probs = tf.nn.softmax(logits)
return logits, last_state, probs, cell, initial_state
# -------------------------------生成古诗---------------------------------#
# 使用训练完成的模型
def gen_head_poetry(heads, type):
if type != 5 and type != 7:
print
'The second para has to be 5 or 7!'
return
def to_word(weights):
t = np.cumsum(weights)
s = np.sum(weights)
sample = int(np.searchsorted(t, np.random.rand(1) * s))
return words[sample]
_, last_state, probs, cell, initial_state = neural_network()
Session_config = tf.ConfigProto(allow_soft_placement=True)
Session_config.gpu_options.allow_growth = True
with tf.Session(config=Session_config) as sess:
with tf.device('/gpu:0'):
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver(tf.global_variables())
saver.restore(sess, 'model/poetry.module-99')
poem = ''
for head in heads:
flag = True
while flag:
state_ = sess.run(cell.zero_state(1, tf.float32))
x = np.array([list(map(word_num_map.get, u'['))])
[probs_, state_] = sess.run([probs, last_state], feed_dict={input_data: x, initial_state: state_})
sentence = head
x = np.zeros((1, 1))
x[0, 0] = word_num_map[sentence]
[probs_, state_] = sess.run([probs, last_state], feed_dict={input_data: x, initial_state: state_})
word = to_word(probs_)
sentence += word
while word != u'。':
x = np.zeros((1, 1))
x[0, 0] = word_num_map[word]
[probs_, state_] = sess.run([probs, last_state],
feed_dict={input_data: x, initial_state: state_})
word = to_word(probs_)
sentence += word
if len(sentence) == 2 + 2 * type:
sentence += u'\n'
poem += sentence
flag = False
return poem
print(gen_head_poetry(u'求赞求关注', 7))