跑回归代码
代码地址:https://github.com/mari-linhares/tensorflow-maml/blob/master/maml.ipynb
我是在我本地自己照着敲了一遍这个代码,并且实践了论文中回归部分的对比实验。
接下来,记录一下我对代码的理解
1.需要一个生成正弦曲线数据点的类
class SinusoidGenerator():
'''
振幅 [0.1, 5.0]
相位 [0, Π]
'''
def __init__(self, K=10, amplitude=None, phase=None):
'''
K: 正弦曲线的数据点的个数
amplitude: 振幅,[0.1, 5,0]任一数值
phase: 相位, [0, Π] 任一数值
'''
self.K = K
self.amplitude = amplitude if amplitude else np.random.uniform(0.1, 5.0)
self.phase = phase if phase else np.random.uniform(0, np.pi)
self.sampled_points = None
self.x = self._sample_x()
def _sample_x(self):
# 取的数据点在[-5, 5]之间
return np.random.uniform(-5, 5, self.K)
def f(self, x):
'''
求正弦曲线的函数值
'''
return self.amplitude * np.sin(x - self.phase)
def batch(self, x=None, force_new=False):
'''
生成一个正弦曲线对应的K个数据点
x: x值.
force_new: instead of using 'x'.
'''
if x is None:
if force_new:
x = self._sample_x()
else:
x = self.x
y = self.f(x)
return x[:, None], y[:, None]
def equally_spaced_samples(self, K=None):
'''
生成一个正弦曲线上等间隔的K个数据点
'''
if K is None:
K = self.K
return self.batch(x=np.linspace(-5, 5, K))
def plot(data, *args, **kwargs):
x, y = data
return plt.plot(x, y, *args, **kwargs)
# 可视化三条正弦曲线,每条正弦曲线上有100个点
for _ in range(3):
plt.title('Sinusoid examples')
plot(SinusoidGenerator(K=100).equally_spaced_samples())
plt.show()
2.需要一个制造训练集和测试集的方法
def generate_dataset(K, train_size=20000, test_size=10):
def _generate_dataset(size):
return [SinusoidGenerator(K=K) for _ in range(size)]
return _generate_dataset(train_size), _generate_dataset(test_size)
# 训练集有20000个,即有20000个正弦曲线,每个正弦曲线给出10个点;测试集有10个,每个也有10个点
train_ds, test_ds = generate_dataset(K=10)
3.回归模型(上一节提到的“基础模型”)
class SineModel(keras.Model):
'''
回归模型是一个神经网络模型,有两个隐藏层,大小为40,具有ReLU非线性
'''
def __init__(self):
super().__init__()
self.hidden1 = keras.layers.Dense(40, input_shape=(1,))
self.hidden2 = keras.layers.Dense(40)
self.out = keras.layers.Dense(1)
def forward(self, x):
x = keras.activations.relu(self.hidden1(x))
x = keras.activations.relu(self.hidden2(x))
x = self.out(x)
return x
4.一些做计算、格式化数据的函数
def np_to_tensor(list_of_numpy_objs):
return (tf.convert_to_tensor(obj) for obj in list_of_numpy_objs)
def copy_model(model, x):
'''
x:输入示例,这用于运行前向传递,以便将图的权重作为变量添加。
'''
copied_model = SineModel()
copied_model.forward(tf.convert_to_tensor(x))
# 获取训练之后的权重值
copied_model.set_weights(model.get_weights())
return copied_model
def loss_function(pred_y, y):
# 计算均方误差(同maml论文),keras_backend.mean(x):具有x元素均值的张量
return keras_backend.mean(keras.losses.mean_squared_error(y, pred_y))
def compute_loss(model, x, y, loss_fn=loss_function):
# 计算经过两个隐藏层后的值
logits = model.forward(x)
# y是真实值,mse是10个点的均方误差和
mse = loss_fn(y, logits)
return mse, logits
def apply_gradients(optimizer, gradients, variables):
# 更新model的权重值,Update the weights of the model.
optimizer.apply_gradients(zip(gradients, variables))
5.重头戏:maml模型代码
贴出原论文中的伪代码,对应看实现
# 学习率为0.01,元学习模型优化器Adam(同maml论文)
def train_maml(model, epochs, dataset, lr_inner=0.01, batch_size=1, log_steps=1000):
optimizer = keras.optimizers.Adam()
# Step 2: instead of checking for convergence, we train for a number of epochs
for _ in range(epochs):
total_loss = 0
losses = []
start = time.time()
# Step 3 and 4
# random.sample()感觉可以做到一个打乱的效果
for i, t in enumerate(random.sample(dataset, len(dataset))):
x, y = np_to_tensor(t.batch())
model.forward(x) # run forward pass to initialize weights
with tf.GradientTape() as test_tape:
# test_tape.watch(model.trainable_variables)
# Step 5
with tf.GradientTape() as train_tape:
train_loss, _ = compute_loss(model, x, y)
# Step 6
gradients = train_tape.gradient(train_loss, model.trainable_variables)
k = 0
model_copy = copy_model(model, x)
# model有两层,model.layers 是包含模型网络层的展平列表
for j in range(len(model_copy.layers)):
# layer.weights包含kernel和bias,subtract减;multiply乘
model_copy.layers[j].kernel = tf.subtract(model.layers[j].kernel,
tf.multiply(lr_inner, gradients[k]))
model_copy.layers[j].bias = tf.subtract(model.layers[j].bias,
tf.multiply(lr_inner, gradients[k+1]))
k += 2
# Step 8,根据训练得到的模型计算损失
test_loss, logits = compute_loss(model_copy, x, y)
# Step 8,更新元学习模型参数
gradients = test_tape.gradient(test_loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
# Logs
total_loss += test_loss
loss = total_loss / (i+1.0)
losses.append(loss)
if i % log_steps == 0 and i > 0:
print('Step {}: loss = {}, Time to run {} steps = {}'.format(i, loss, log_steps, time.time() - start))
start = time.time()
plt.plot(losses)
plt.show()
6.根据原论文中训练的参数进行训练,K=5,K=10
开始训练
# K=10
maml = SineModel()
# 训练集有20000个,即有20000个正弦曲线,每个正弦曲线给出10个点
train_maml(maml, 1, train_ds)
# K=5
maml_five = SineModel()
train_ds_five, test_ds_five = generate_dataset(K=5)
train_maml(maml_five, 1, train_ds_five)
测试代码
# (x,y)10个点,(x_test,y_test)100个点
def eval_sine_test(model, optimizer, x, y, x_test, y_test, num_steps=(0, 1, 10)):
'''
评估模型如何拟合“拟合”步骤的曲线训练
Args:
x,y用来微调模型参数
x_test,y_test: 模拟拟合过程
num_steps: 参数更新次数
'''
fit_res = []
tensor_x_test, tensor_y_test = np_to_tensor((x_test, y_test))
# 测试时更新10次,分别打印0次、1次和10次的拟合状态
# If 0 in fits we log the loss before any training
if 0 in num_steps:
loss, logits = compute_loss(model, tensor_x_test, tensor_y_test)
fit_res.append((0, logits, loss))
# step是1~10
for step in range(1, np.max(num_steps) + 1):
# 这一步在fine-tune
train_batch(x, y, model, optimizer)
# logits:经过两个隐藏层后的值
loss, logits = compute_loss(model, tensor_x_test, tensor_y_test)
# 记录step=1和step=10
if step in num_steps:
fit_res.append(
(
step,
logits,
loss
)
)
return fit_res
def eval_sinewave_for_test(model, sinusoid_generator=None, num_steps=(0, 1, 10), lr=0.01, plot=True):
'''
model: Already trained model.
sinusoid_generator: A sinusoidGenerator instance.
num_steps: Number of training steps to be logged.
lr: Learning rate used for training on the test data.
plot: If plot is True than it plots how the curves are fitted along
`num_steps`.
Returns:
The fit results. A list containing the loss, logits and step. For
every step at `num_steps`.
'''
if sinusoid_generator is None:
sinusoid_generator = SinusoidGenerator(K=10)
# generate equally spaced samples for ploting,100个点
x_test, y_test = sinusoid_generator.equally_spaced_samples(100)
# batch used for training,10个点,和上面是同一条正弦曲线
x, y = sinusoid_generator.batch()
# copy model so we can use the same model multiple times
copied_model = copy_model(model, x)
# use SGD for this part of training as described in the paper
optimizer = keras.optimizers.SGD(learning_rate=lr)
# run training and log fit results
fit_res = eval_sine_test(copied_model, optimizer, x, y, x_test, y_test, num_steps)
# plot
train, = plt.plot(x, y, '^')
ground_truth, = plt.plot(x_test, y_test)
plots = [train, ground_truth]
legend = ['Training Points', 'True Function']
for n, res, loss in fit_res:
cur, = plt.plot(x_test, res[:, 0], '--')
plots.append(cur)
legend.append(f'After {n} Steps')
plt.legend(plots, legend)
plt.ylim(-5, 5)
plt.xlim(-6, 6)
if plot:
plt.show()
return fit_res
def compute_gradients(model, x, y, loss_fn=loss_function):
with tf.GradientTape() as tape:
loss, _ = compute_loss(model, x, y, loss_fn)
# tape.gradient(y,x),求y在x处的导数值,model.trainable_variables,[w,b]权重值和偏置
return tape.gradient(loss, model.trainable_variables), loss
def apply_gradients(optimizer, gradients, variables):
# 更新model的权重值,Update the weights of the model.
optimizer.apply_gradients(zip(gradients, variables))
def train_batch(x, y, model, optimizer):
tensor_x, tensor_y = np_to_tensor((x, y))
gradients, loss = compute_gradients(model, tensor_x, tensor_y)
apply_gradients(optimizer, gradients, model.trainable_variables)
return loss
开始测试
# K=10, test_ds有10个正弦曲线,每个有10个点,np.random.randint(),从0~10任挑1个数,即任挑1个正弦曲线
for index in np.random.randint(0, len(test_ds), size=1):
eval_sinewave_for_test(maml, test_ds[index])
# K=5
for index in np.random.randint(0, len(test_ds), size=1):
eval_sinewave_for_test(maml_five, test_ds_five[index])
至此,maml模型训练和测试结束,下面是maml模型的对比对象,pretrained模型
7.原论文将maml模型和pretrained模型进行比较
预训练模型是直接使用用训练集训练出来的模型(参数),将模型fine-tune给测试集使用。所以首先要训练出一个回归模型
训练回归模型
def train_model(dataset, epochs=1, lr=0.01, log_steps=1000):
model = SineModel()
optimizer = keras.optimizers.Adam(learning_rate=lr)
for epoch in range(epochs):
losses = []
total_loss = 0
start = time.time()
for i, sinusoid_generator in enumerate(dataset):
x, y = sinusoid_generator.batch()
loss = train_batch(x, y, model, optimizer)
total_loss += loss
curr_loss = total_loss / (i + 1.0)
losses.append(curr_loss)
if i % log_steps == 0 and i > 0:
print('Step {}: loss = {}, Time to run {} steps = {:.2f} seconds'.format(
i, curr_loss, log_steps, time.time() - start))
start = time.time()
plt.plot(losses)
plt.title('Loss Vs Time steps')
plt.show()
return model
# 开始训练
# K=10,学习率为0.02
neural_net = train_model(train_ds, lr=0.02)
# 按原论文中的参数K=5时,学习率为0.01
neural_net_five = train_model(train_ds_five)
预训练模型的参数的更新过程(李宏毅老师的课程)
开始测试
# K=10
for index in np.random.randint(0, len(test_ds), size=1):
eval_sinewave_for_test(neural_net, test_ds[index])
# K=5
for index in np.random.randint(0, len(test_ds), size=1):
eval_sinewave_for_test(neural_net_five, test_ds_five[index])
8.损失对比
# 比较不同参数更新次数,模型的loss
def compare_maml_and_neural_net(maml, neural_net, sinusoid_generator, num_steps=list(range(10)),
intermediate_plot=True, marker='x', linestyle='--'):
'''
Args:
maml: An already trained MAML.
neural_net: An already trained neural net.
num_steps: Number of steps to be logged.
intermediate_plot: If True plots intermediate plots from
`eval_sinewave_for_test`.
marker: Marker used for plotting.
linestyle: Line style used for plotting.
'''
if intermediate_plot:
print('MAML')
# maml学到了初始化值,从拥有初始化值开始求值,模拟拟合过程
fit_maml = eval_sinewave_for_test(maml, sinusoid_generator, plot=intermediate_plot)
if intermediate_plot:
print('Neural Net')
fit_neural_net = eval_sinewave_for_test(neural_net, sinusoid_generator, plot=intermediate_plot)
# oracle为基准,损失为0
fit_oracle = []
for i in (0, 1, 10):
fit_oracle.append(
(
i,
0,
0
)
)
fit_res = {'MAML': fit_maml, 'Neural Net': fit_neural_net, 'oracle': fit_oracle}
legend = []
for name in fit_res:
x = []
y = []
for n, _, loss in fit_res[name]:
x.append(n)
y.append(loss)
plt.plot(x, y, marker=marker, linestyle=linestyle)
plt.xticks(num_steps)
legend.append(name)
plt.legend(legend)
plt.show()
index = np.random.choice(range(len(test_ds)))
compare_maml_and_neural_net(maml, neural_net, test_ds[index])