机器翻译
数据预处理
将数据集清洗、转化为神经网络的输入mini-batch。
- 去除一些在标准字符集以外的特殊字符;把大写转换成小写;如果单词后紧跟标点符号,则在它们之间加一个空格。
分词
把数据集中字符形式的句子转换成单词组成的列表
建立词典
把单词组成的列表转换成单词id组成的列表。
利用到d2l包里的Vocab类(之前写过)。Vocab类的__init__方法建立语料库中每个单词与它的id的双向映射。
载入数据集
要保证每个batch里所有句子的输入长度是一样的,因为一个batch使用的是同一个时序大小的RNN,所以需要对句子进行padding:规定句子最长的长度max_len,如果比他长就截断,如果短就补足。
将padding完的句子转换成对应的单词id列表,给目标语言的句子加上开始和结束符以便翻译。生成一个数据生成器train_iter,每次为神经网络送入一个batch的样本。
- python知识点:
魔法函数__getitem__()
:重写类的该成员函数,可以使对象支持以下标的方式(即中括号内)获取值。- 其它一些魔法函数:
__init__()
:用于初始化;
__len__()
:返回一个int值用于表示长度
- 其它一些魔法函数:
Encoder-Decoder模型
encoder:输入到隐藏状态
decoder:隐藏状态到输出
- encoder的输出作为decoder的初始隐藏状态(类似于)的输入。
- 常应用于输入序列和输出序列的长度是可变的情况,如可以应用在对话系统、生成式任务中。
class Encoder(nn.Module):
def __init__(self, **kwargs):
super(Encoder, self).__init__(**kwargs)
def forward(self, X, *args):
raise NotImplementedError
#raise:显示地引发异常。一旦执行了raise语句,raise后面的语句将不能执行。
class Decoder(nn.Module):
def __init__(self, **kwargs):
super(Decoder, self).__init__(**kwargs)
def init_state(self, enc_outputs, *args):
raise NotImplementedError
def forward(self, X, state):
raise NotImplementedError
class EncoderDecoder(nn.Module):
def __init__(self, encoder, decoder, **kwargs):
super(EncoderDecoder, self).__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder
def forward(self, enc_X, dec_X, *args):
#encoder的输出作为decoder的初始隐藏状态(类似于H(-1))的输入
enc_outputs = self.encoder(enc_X, *args)
dec_state = self.decoder.init_state(enc_outputs, *args)
return self.decoder(dec_X, dec_state)
- python知识点:
raise
:当程序出现错误,python会自动引发异常,也可以通过raise显示地引发异常。一旦执行了raise语句,raise后面的语句将不能执行。
Sequence to Sequence模型
利用到encoder-decoder模型。encoder,decoder各自是一个循环神经网络。更准确的说decoder是一个生成式的语言模型。注意,一般的语言模型会初始化成0,但这里decoder的会初始化成encoder的输出。
模型
训练模型
- 在训练的时候,我们知道要被翻译成的句子是什么,我们就是一步步用我们已知的正确翻译句子预测出每个单词,直到预测到结束符<eos>,然后用预测出来的单词构成的句子和ground truth句子作比较算出loss值。
预测模型
- 在预测的时候,在decoder中,要将前一个循环神经网络的输出作为后一个循环神经网络的输入,直到预测到结束符<eos>。
具体结构
- decoder相比encoder多了全连接的dense层,它的作用是把每个循环神经单元的输出通过该层映射到整个单词表里,并选出最高得分的单词,作为当前循环神经单元的输出。
代码实现
Encoder
class Seq2SeqEncoder(d2l.Encoder):
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
dropout=0, **kwargs):
super(Seq2SeqEncoder, self).__init__(**kwargs)
self.num_hiddens=num_hiddens
self.num_layers=num_layers
self.embedding = nn.Embedding(vocab_size, embed_size) #给每个单词附上对应的词向量(可以是下载好的预训练的词向量,也可以是当场训练的)
self.rnn = nn.LSTM(embed_size,num_hiddens, num_layers, dropout=dropout)
def begin_state(self, batch_size, device):
return [torch.zeros(size=(self.num_layers, batch_size, self.num_hiddens), device=device),
torch.zeros(size=(self.num_layers, batch_size, self.num_hiddens), device=device)]
def forward(self, X, *args):
X = self.embedding(X) # X shape: (batch_size, seq_len, embed_size)
#X.transpose:
X = X.transpose(0, 1) # RNN needs first axes to be time
# state = self.begin_state(X.shape[1], device=X.device)
out, state = self.rnn(X)
# The shape of out is (seq_len, batch_size, num_hiddens).
# state contains the hidden state and the memory cell
# of the last time step, the shape is (num_layers, batch_size, num_hiddens)
return out, state
Decoder
class Seq2SeqDecoder(d2l.Decoder):
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
dropout=0, **kwargs):
super(Seq2SeqDecoder, self).__init__(**kwargs)
self.embedding = nn.Embedding(vocab_size, embed_size)
self.rnn = nn.LSTM(embed_size,num_hiddens, num_layers, dropout=dropout)
self.dense = nn.Linear(num_hiddens,vocab_size) #相比Encoder多出了dense层
def init_state(self, enc_outputs, *args):
return enc_outputs[1] #相当于上面forward函数返回的state,包含记忆细胞和隐层状态
def forward(self, X, state):
X = self.embedding(X).transpose(0, 1)
out, state = self.rnn(X, state)
#在decoder里要用out生成每个时间步的单词
# Make the batch to be the first dimension to simplify loss computation.
out = self.dense(out).transpose(0, 1)
return out, state
损失函数
- 注意:只计算句子有效部分的损失值,padding的部分不能算。
def SequenceMask(X, X_len,value=0):
#X是一个batch的损失,X_len是它这个batch里每个句子的有效长度,value是给无效部分的loss填充的值
maxlen = X.size(1)
mask = torch.arange(maxlen)[None, :].to(X_len.device) < X_len[:, None] #要放到同一个图上才能计算(跟下边训练部分的train_ch7函数中的部分放到一个device里)
X[~mask]=value
return X
class MaskedSoftmaxCELoss(nn.CrossEntropyLoss): #损失函数要改写nn.CrossEntropyLoss函数,其实就是要加上SequenceMask
# pred shape: (batch_size, seq_len, vocab_size)
# label shape: (batch_size, seq_len)
# valid_length shape: (batch_size, )
def forward(self, pred, label, valid_length):
# the sample weights shape should be (batch_size, seq_len)
weights = torch.ones_like(label)
weights = SequenceMask(weights, valid_length).float() #这样操作完之后1代表有效位,0代表padding位
self.reduction='none'
output=super(MaskedSoftmaxCELoss, self).forward(pred.transpose(1,2), label)
return (output*weights).mean(dim=1) #第1维是每个单词,对第1维取平均
训练
- 如果device用GPU,参与到反向传播计算的tensor和结构都要放在同一个device里(包括上面计算损失函数时用到的mask)。
def train_ch7(model, data_iter, lr, num_epochs, device): # Saved in d2l
#model指整个Sequence2Sequence结构
#如果device用GPU,参与到反向传播计算的tensor和结构都要放在同一个device里
model.to(device)
optimizer = optim.Adam(model.parameters(), lr=lr)
loss = MaskedSoftmaxCELoss()
tic = time.time()
for epoch in range(1, num_epochs+1):
l_sum, num_tokens_sum = 0.0, 0.0 #l_sum是该epoch的loss总和,num_tokens_sum是单词数量总和。用l_sum/num_tokens_sum算出的loss是比较有参考性的
for batch in data_iter:
optimizer.zero_grad()
X, X_vlen, Y, Y_vlen = [x.to(device) for x in batch]
Y_input, Y_label, Y_vlen = Y[:,:-1], Y[:,1:], Y_vlen-1
#Y_input只包含bos和word部分,不包含eos,因为它是decoder的输入
#Y_label是decoder生成的正确的ground truth,因此不需要bos部分了,只包含word和eos(因为是从第一个单词开始生成,直到eos结束)
#所以有效长度也要减1
Y_hat, _ = model(X, Y_input, X_vlen, Y_vlen)
l = loss(Y_hat, Y_label, Y_vlen).sum()
l.backward()
with torch.no_grad(): #梯度裁剪
d2l.grad_clipping_nn(model, 5, device)
num_tokens = Y_vlen.sum().item()
optimizer.step()
l_sum += l.sum().item()
num_tokens_sum += num_tokens
if epoch % 50 == 0:
print("epoch {0:4d},loss {1:.3f}, time {2:.1f} sec".format(
epoch, (l_sum/num_tokens_sum), time.time()-tic))
tic = time.time()
测试
def translate_ch7(model, src_sentence, src_vocab, tgt_vocab, max_len, device):
src_tokens = src_vocab[src_sentence.lower().split(' ')] #小写化,分词,得到id列表
src_len = len(src_tokens)
if src_len < max_len:
src_tokens += [src_vocab.pad] * (max_len - src_len)
enc_X = torch.tensor(src_tokens, device=device)
enc_valid_length = torch.tensor([src_len], device=device)
# use expand_dim to add the batch_size dimension.
enc_outputs = model.encoder(enc_X.unsqueeze(dim=0), enc_valid_length)
#unsqueeze(dim=0):在第0维增加一个维度(要增加一个batch_size的维度)
dec_state = model.decoder.init_state(enc_outputs, enc_valid_length)
dec_X = torch.tensor([tgt_vocab.bos], device=device).unsqueeze(dim=0)
predict_tokens = []
for _ in range(max_len):
Y, dec_state = model.decoder(dec_X, dec_state)
# The token with highest score is used as the next time step input.
dec_X = Y.argmax(dim=2)
py = dec_X.squeeze(dim=0).int().item()
if py == tgt_vocab.eos:
break
predict_tokens.append(py)
return ' '.join(tgt_vocab.to_tokens(predict_tokens))
- pytorch知识点:
torch.unsqueeze(dim)
:在维度dim处增加一个维度。
Beam Search(集束搜索)
-
简单贪心算法:对于每一个时间步,都找到单词表里得分最高的单词作为当前时间步的输出,再将这个输出作为下一个时间步的输入,再找下一个时间步得分最高的单词……
- 问题:只考虑了当前的最优解,没有考虑到句子的前后连贯性。
-
维特比算法:把所有的单词组合全试一遍,找到一个整体分数最高的句子。
- 问题:搜索空间太大。
集束搜索:结合了上述两种算法,是维特比算法的贪心形式。它使用beam size参数来限制在每一步保留下来的可能性词的数量。
集束搜索示意图:
注意力机制与seq2seq模型
注意力机制
普通的seq2seq结构存在着一些问题,尤其是RNN机制在实际中存在长程梯度消失的问题,对于较长的句子,我们很难寄希望于将输入的序列转化为定长的向量而保存所有的有效信息,所以随着所需翻译句子的长度的增加,这种结构的效果会显著下降。
与此同时,解码的目标词语可能只与原输入的部分词语有关,而并不是与所有的输入有关。例如,当把“Hello world”翻译成“Bonjour le monde”时,“Hello”映射成“Bonjour”,“world”映射成“monde”。在普通seq2seq模型中,解码器只能隐式地从编码器的最终状态中选择相应的信息。然而,注意力机制可以将这种选择过程显式地建模。
注意力机制框架
Attention 是一种通用的带权池化方法,输入由两部分构成:询问(query)和键值对(key-value pairs)。. Query , attention layer得到输出与value的维度一致 . 对于一个query来说,attention layer 会与每一个key计算注意力分数并进行权重的归一化,输出的向量则是value的加权求和,而每个key计算的权重与value一一对应。
为了计算输出,我们首先假设有一个函数 用于计算query和key的相似性,然后可以计算所有的 attention scores by
我们使用 softmax函数 获得注意力权重:
最终的输出就是value的加权求和:
不同的attetion layer的区别在于score函数的选择。
Softmax屏蔽
由于对于attention机制,我们只需考虑句子的原始部分,并不需要考虑句子padding的部分,所以我们要在softmax运算时进行屏蔽操作。因为softmax函数的输入是attention scores,由softmax函数的定义可知,为了使padding元素在经过softmax运算后对应的项变成0,attention scores中对应的项应该设置为。在实际的编码中,可以将其设置为足够小的数,如-1e6(意思是)。
-
python知识点:
对数组下标中None
的理解:
在numpy中,None
用于增加一个维度,它的效果和numpy.newaxis()是一样的,None是它的别名。
pytorch的tensor中也可以用None
实现类似的功能 。
代码示例:import numpy as np a=np.array([[11,12,13,14],[21,22,23,24],[31,32,33,34],[41,42,43,44]]) print('0维为None:') print(a[None,0:4]) print('1维为None:') print(a[0:4,None]) ''' 输出结果: 0维为None: [[[11 12 13 14] [21 22 23 24] [31 32 33 34] [41 42 43 44]]] 1维为None: [[[11 12 13 14]] [[21 22 23 24]] [[31 32 33 34]] [[41 42 43 44]]] '''
-
python知识点:
x.repeat(repeats,axis=None)
:x属于numpy的ndarray,repeats为重复次数,axis为重复的维度。该函数用于重复数组中的元素。
代码示例:x = np.array([1, 2, 3]) x.repeat(4, axis=0) ''' array([1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3]) ''' x = np.array([[1, 2, 3], [4, 5, 6]]) x.repeat(4, axis=0) ''' array([[1, 2, 3], [1, 2, 3], [1, 2, 3], [1, 2, 3], [4, 5, 6], [4, 5, 6], [4, 5, 6], [4, 5, 6]]) '''
实现Softmax屏蔽的代码
方法:将句子padding后的长度sequence_len做arange(),创建一个从0到sequence_len-1的列表,将这个列表中的每个元素与每句话的实际长度值做比较,如果元素值大于等于该句实际长度值则为True,表示该位置的元素是padding的部分;否则为False,这样得到一个与输入X同形状的mask矩阵。利用这个mask矩阵作为标记,对于所有为True的元素,将X中同位置的元素置为。然后再将X送入Softmax函数。
def SequenceMask(X, X_len,value=-1e6): #X是输入向量,size=batch_size*sequnce_len;X_len是有效句子长度,size=batch_size
maxlen = X.size(1)
#print(X.size(),torch.arange((maxlen),dtype=torch.float)[None, :],'\n',X_len[:, None] )
mask = torch.arange((maxlen),dtype=torch.float)[None, :] >= X_len[:, None]
#上面一行[None,:],[:, None]的作用是增加一个维度,增加的位置就是None所在的那个维度。
#此处的>=号还体现了python的广播机制
#扩展维度的目的就是让上边两项能够做比较,得到的mask能表明句子中哪些元素属于有效长度,哪些元素是padding的
#mask的size和X一样
#print(torch.arange((maxlen),dtype=torch.float)[None, :])
#print(X_len[:, None])
#print(mask)
X[mask]=value #在padding位置上填充上负无穷的值
return X
def masked_softmax(X, valid_length):
# X: 3-D tensor, valid_length: 1-D or 2-D tensor
softmax = nn.Softmax(dim=-1)
if valid_length is None:
return softmax(X)
else:
shape = X.shape
if valid_length.dim() == 1:
try:
#shape[1]是sequnce_len,原来输入的valid_length是针对每句话来说的
#比如[2,3]表示第一句话有效长度是2,第二句话有效长度是3。
#这里repeat是要把列表中每个元素重复sequnce_len遍,这样便于一句话中每个元素与修改后的valid_length列表中的元素一一对应
valid_length = torch.FloatTensor(valid_length.numpy().repeat(shape[1], axis=0))#[2,2,3,3]
except:
valid_length = torch.FloatTensor(valid_length.cpu().numpy().repeat(shape[1], axis=0))#[2,2,3,3]
else:
valid_length = valid_length.reshape((-1,))
# fill masked elements with a large negative, whose exp is 0
X = SequenceMask(X.reshape((-1, shape[-1])), valid_length)
return softmax(X).reshape(shape)
点积注意力
点积注意力假设query和keys有相同的维度, 即 . 通过计算query和key转置的乘积来计算attention score,通常还会除去 减少计算出来的score对维度𝑑的依赖性,如下
假设 有 个query, 有 个keys. 我们可以通过矩阵运算的方式计算所有 个score:
此外,它支持作为正则化随机删除一些注意力权重。
超出2维矩阵的乘法
和 是维度分别为 和的张量,进行 次二维矩阵乘法后得到 , 维度为 。
-
pytorch知识点:
torch.bmm(batch1, batch2, out=None) → Tensor
:计算多维乘法(叉乘)
代码示例:torch.bmm(torch.ones((2,1,3), dtype = torch.float), torch.ones((2,3,2), dtype = torch.float)) ''' 结果: tensor([[[3., 3.]], [[3., 3.]]]) '''
实现代码
class DotProductAttention(nn.Module):
def __init__(self, dropout, **kwargs):
super(DotProductAttention, self).__init__(**kwargs)
self.dropout = nn.Dropout(dropout)
# query: (batch_size, #queries, d)
# key: (batch_size, #kv_pairs, d)
# value: (batch_size, #kv_pairs, dim_v)
# valid_length: either (batch_size, ) or (batch_size, xx)
def forward(self, query, key, value, valid_length=None):
d = query.shape[-1]
# set transpose_b=True to swap the last two dimensions of key
scores = torch.bmm(query, key.transpose(1,2)) / math.sqrt(d)
attention_weights = self.dropout(masked_softmax(scores, valid_length))
print("attention_weight\n",attention_weights)
return torch.bmm(attention_weights, value)
多层感知机注意力
在多层感知器中,我们首先将 query 和 keys 投影到 。为了更具体,我们将可以学习的参数做如下映射
, , and . 将score函数定义
然后将key 和 value 在特征的维度上合并(concatenate),然后送至 a single hidden layer perceptron 这层中 hidden layer 为 ℎ,输出的size为 1 ,隐层激活函数为tanh,无偏置。
实现代码
class MLPAttention(nn.Module):
def __init__(self, units,ipt_dim,dropout, **kwargs):
super(MLPAttention, self).__init__(**kwargs)
# Use flatten=True to keep query's and key's 3-D shapes.
self.W_k = nn.Linear(ipt_dim, units, bias=False)
self.W_q = nn.Linear(ipt_dim, units, bias=False)
self.v = nn.Linear(units, 1, bias=False)
self.dropout = nn.Dropout(dropout)
def forward(self, query, key, value, valid_length):
query, key = self.W_k(query), self.W_q(key)
#print("size",query.size(),key.size())
# expand query to (batch_size, #querys, 1, units), and key to
# (batch_size, 1, #kv_pairs, units). Then plus them with broadcast.
features = query.unsqueeze(2) + key.unsqueeze(1)
#print("features:",features.size()) #--------------开启
scores = self.v(features).squeeze(-1)
attention_weights = self.dropout(masked_softmax(scores, valid_length))
return torch.bmm(attention_weights, value)
引入注意力机制的Seq2Seq模型
可将注意机制添加到sequence to sequence 模型中,以显式地使用权重聚合states。将decoder的时刻的隐藏状态作为query,encoder的每个时间步的输出作为key和value进行attention聚合。Attention model的输出当作成上下文信息context vector,并与解码器输入拼接起来一起送到解码器。
带有注意机制的seq2seq的编码器与之前章节中的Seq2SeqEncoder相同。在此处我们只关注解码器。
解码器
-
我们添加了一个MLP注意层(MLPAttention),它的隐藏大小与解码器中的LSTM层相同。然后我们通过从编码器传递三个参数来初始化解码器的状态:
- 编码器输出的各个状态:被用于attention layer的memory部分,有相同的key和values。
- 编码器最后一个时间步的隐藏状态:被用于初始化解码器的hidden state。
- 编码器的有效长度:通过它来排除掉编码器输出中的padding部分对attention层的影响。
-
pytorch知识点:
permute(dims)
:将tensor的维度换位。>>> x = torch.randn(2, 3, 5) >>> x.size() torch.Size([2, 3, 5]) >>> x.permute(2, 0, 1).size() torch.Size([5, 2, 3])
解码器实现代码
class Seq2SeqAttentionDecoder(d2l.Decoder):
def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
dropout=0, **kwargs):
super(Seq2SeqAttentionDecoder, self).__init__(**kwargs)
self.attention_cell = MLPAttention(num_hiddens,num_hiddens, dropout)
#上边那句的MLPAttention是上边我们刚刚实现的多层感知机attention
self.embedding = nn.Embedding(vocab_size, embed_size)
self.rnn = nn.LSTM(embed_size+ num_hiddens,num_hiddens, num_layers, dropout=dropout)
self.dense = nn.Linear(num_hiddens,vocab_size)
def init_state(self, enc_outputs, enc_valid_len, *args):
outputs, hidden_state = enc_outputs
# print("first:",outputs.size(),hidden_state[0].size(),hidden_state[1].size())
# Transpose outputs to (batch_size, seq_len, hidden_size)
return (outputs.permute(1,0,-1), hidden_state, enc_valid_len) #permute():维度换位
#outputs.swapaxes(0, 1)
def forward(self, X, state):
enc_outputs, hidden_state, enc_valid_len = state
#("X.size",X.size())
X = self.embedding(X).transpose(0,1)
# print("Xembeding.size2",X.size())
outputs = []
for l, x in enumerate(X):
# print(f"\n{l}-th token")
# print("x.first.size()",x.size())
# query shape: (batch_size, 1, hidden_size)
# select hidden state of the last rnn layer as query
query = hidden_state[0][-1].unsqueeze(1) # np.expand_dims(hidden_state[0][-1], axis=1)
# context has same shape as query
# print("query enc_outputs, enc_outputs:\n",query.size(), enc_outputs.size(), enc_outputs.size())
context = self.attention_cell(query, enc_outputs, enc_outputs, enc_valid_len)
# Concatenate on the feature dimension
# print("context.size:",context.size())
x = torch.cat((context, x.unsqueeze(1)), dim=-1)
# Reshape x to (1, batch_size, embed_size+hidden_size)
# print("rnn",x.size(), len(hidden_state))
out, hidden_state = self.rnn(x.transpose(0,1), hidden_state)
outputs.append(out)
outputs = self.dense(torch.cat(outputs, dim=0))
return outputs.transpose(0, 1), [enc_outputs, hidden_state,
enc_valid_len]
Transformer
Transformer模型利用attention机制实现了并行化捕捉序列依赖,并且同时处理序列的每个位置的tokens,上述优势使得Transformer模型在性能优异的同时大大减少了训练时间。
多头注意力层
自注意力模型:序列的每一个元素对应的key,value,query是完全一致的。
多头注意力层包含个并行的自注意力层,每一个这种层被成为一个head。对每个头来说,在进行注意力计算之前,我们会将query、key和value用三个现行层进行映射,这个注意力头的输出将会被拼接之后输入最后一个线性层进行整合。
假设query,key和value的维度分别是、和。那么对于每一个头,我们可以训练相应的模型权重、和,以得到每个头的输出:
这里的attention可以是任意的attention function,比如前一节介绍的dot-product attention以及MLP attention。之后我们将所有head对应的输出拼接起来,送入最后一个线性层进行整合,这个层的权重可以表示为
接下来我们就可以来实现多头注意力了,假设我们有h个头,隐藏层权重 与query,key,value的维度一致。除此之外,因为多头注意力层保持输入与输出张量的维度不变,所以输出feature的维度也设置为 。
代码实现
-
python知识点:
numpy.tile()
:将原矩阵横向、纵向地复制。>>>x = np.array([[1, 2], [3, 4]]) >>>np.tile(x, (3, 4)) [[1 2 1 2 1 2 1 2] [3 4 3 4 3 4 3 4] [1 2 1 2 1 2 1 2] [3 4 3 4 3 4 3 4] [1 2 1 2 1 2 1 2] [3 4 3 4 3 4 3 4]]
多头注意力层实现代码:
class MultiHeadAttention(nn.Module):
def __init__(self, input_size, hidden_size, num_heads, dropout, **kwargs):
super(MultiHeadAttention, self).__init__(**kwargs)
self.num_heads = num_heads
self.attention = DotProductAttention(dropout) #点积注意力层
self.W_q = nn.Linear(input_size, hidden_size, bias=False)
self.W_k = nn.Linear(input_size, hidden_size, bias=False)
self.W_v = nn.Linear(input_size, hidden_size, bias=False)
self.W_o = nn.Linear(hidden_size, hidden_size, bias=False)
def forward(self, query, key, value, valid_length):
# query, key, and value shape: (batch_size, seq_len, dim),
# where seq_len is the length of input sequence
# valid_length shape is either (batch_size, )
# or (batch_size, seq_len).
# Project and transpose query, key, and value from
# (batch_size, seq_len, hidden_size * num_heads) to
# (batch_size * num_heads, seq_len, hidden_size).
query = transpose_qkv(self.W_q(query), self.num_heads)
key = transpose_qkv(self.W_k(key), self.num_heads)
value = transpose_qkv(self.W_v(value), self.num_heads)
if valid_length is not None:
# Copy valid_length by num_heads times
device = valid_length.device
valid_length = valid_length.cpu().numpy() if valid_length.is_cuda else valid_length.numpy()
if valid_length.ndim == 1:
valid_length = torch.FloatTensor(np.tile(valid_length, self.num_heads))
else:
valid_length = torch.FloatTensor(np.tile(valid_length, (self.num_heads,1)))
#np.tile():将原矩阵横向、纵向地复制
valid_length = valid_length.to(device)
output = self.attention(query, key, value, valid_length) #这里是带有h个head的总输出
output_concat = transpose_output(output, self.num_heads)
return self.W_o(output_concat)
def transpose_qkv(X, num_heads):
# Original X shape: (batch_size, seq_len, hidden_size * num_heads),
# -1 means inferring its value, after first reshape, X shape:
# (batch_size, seq_len, num_heads, hidden_size)
X = X.view(X.shape[0], X.shape[1], num_heads, -1)
# After transpose, X shape: (batch_size, num_heads, seq_len, hidden_size)
X = X.transpose(2, 1).contiguous()
# Merge the first two dimensions. Use reverse=True to infer shape from
# right to left.
# output shape: (batch_size * num_heads, seq_len, hidden_size)
output = X.view(-1, X.shape[2], X.shape[3])
return output
# Saved in the d2l package for later use
def transpose_output(X, num_heads):
#将X的size从[batch_size*h,sequence_len,hidden_size]变成[batch_size,sequence_len,h*hidden_size]
# A reversed version of transpose_qkv
X = X.view(-1, num_heads, X.shape[1], X.shape[2])
X = X.transpose(2, 1).contiguous()
return X.view(X.shape[0], X.shape[1], -1)
基于位置的前馈网络
Transformer 模块另一个非常重要的部分就是基于位置的前馈网络(FFN),它接受一个形状为(batch_size,seq_length, feature_size)的三维张量。Position-wise FFN由两个全连接层组成,他们作用在最后一维(即feature_size)上。因为序列的每个位置的状态都会被单独地更新,所以我们称他为position-wise,这等效于一个1x1的卷积。与多头注意力层相似,FFN层同样只会对最后一维的大小进行改变。
代码实现
# Save to the d2l package.
class PositionWiseFFN(nn.Module):
def __init__(self, input_size, ffn_hidden_size, hidden_size_out, **kwargs):
super(PositionWiseFFN, self).__init__(**kwargs)
self.ffn_1 = nn.Linear(input_size, ffn_hidden_size)
self.ffn_2 = nn.Linear(ffn_hidden_size, hidden_size_out)
def forward(self, X):
return self.ffn_2(F.relu(self.ffn_1(X)))
Add and Norm
除了上面两个模块之外,Transformer还有一个重要的相加归一化层,它可以平滑地整合输入和其他层的输出,因此我们在每个多头注意力层和FFN层后面都添加一个含残差连接的Layer Norm层。这里Layer Norm与Batch Norm很相似,唯一的区别在于Batch Norm是对于batch size这个维度进行计算均值和方差的,而Layer Norm则是对最后一维进行计算。层归一化可以防止层内的数值变化过大,从而有利于加快训练速度并且提高泛化性能。
代码实现
# Save to the d2l package.
class AddNorm(nn.Module):
def __init__(self, hidden_size, dropout, **kwargs):
super(AddNorm, self).__init__(**kwargs)
self.dropout = nn.Dropout(dropout)
self.norm = nn.LayerNorm(hidden_size)
def forward(self, X, Y):
return self.norm(self.dropout(Y) + X)
位置编码
与循环神经网络不同,无论是多头注意力网络还是前馈神经网络都是独立地对每个位置的元素进行更新,这种特性帮助我们实现了高效的并行,却丢失了重要的序列顺序的信息。为了更好的捕捉序列信息,Transformer模型引入了位置编码去保持输入序列元素的位置。
假设输入序列的嵌入表示 , 序列长度为嵌入向量维度为,则其位置编码为 ,输出的向量就是二者相加 。
位置编码是一个二维的矩阵,i对应着序列中的顺序,j对应其embedding vector内部的维度索引。我们可以通过以下等式计算位置编码:
代码实现
class PositionalEncoding(nn.Module):
def __init__(self, embedding_size, dropout, max_len=1000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(dropout)
self.P = np.zeros((1, max_len, embedding_size))
X = np.arange(0, max_len).reshape(-1, 1) / np.power(
10000, np.arange(0, embedding_size, 2)/embedding_size)
self.P[:, :, 0::2] = np.sin(X)
self.P[:, :, 1::2] = np.cos(X)
self.P = torch.FloatTensor(self.P)
def forward(self, X):
if X.is_cuda and not self.P.is_cuda:
self.P = self.P.cuda()
X = X + self.P[:, :X.shape[1], :]
return self.dropout(X)
编码器
编码器包含一个多头注意力层,一个position-wise FFN,和两个 Add and Norm层。对于attention模型以及FFN模型,我们的输出维度都是与embedding维度一致的,这也是由于残差连接天生的特性导致的,因为我们要将前一层的输出与原始输入相加并归一化。
实现代码
class EncoderBlock(nn.Module):
def __init__(self, embedding_size, ffn_hidden_size, num_heads,
dropout, **kwargs): #embedding_size就是输入维度
super(EncoderBlock, self).__init__(**kwargs)
self.attention = MultiHeadAttention(embedding_size, embedding_size, num_heads, dropout)
self.addnorm_1 = AddNorm(embedding_size, dropout)
self.ffn = PositionWiseFFN(embedding_size, ffn_hidden_size, embedding_size)
self.addnorm_2 = AddNorm(embedding_size, dropout)
def forward(self, X, valid_length):
Y = self.addnorm_1(X, self.attention(X, X, X, valid_length))
return self.addnorm_2(Y, self.ffn(Y))
class TransformerEncoder(d2l.Encoder):
def __init__(self, vocab_size, embedding_size, ffn_hidden_size,
num_heads, num_layers, dropout, **kwargs):
super(TransformerEncoder, self).__init__(**kwargs)
self.embedding_size = embedding_size
self.embed = nn.Embedding(vocab_size, embedding_size)
self.pos_encoding = PositionalEncoding(embedding_size, dropout)
self.blks = nn.ModuleList()
for i in range(num_layers):
self.blks.append(
EncoderBlock(embedding_size, ffn_hidden_size,
num_heads, dropout))
def forward(self, X, valid_length, *args):
X = self.pos_encoding(self.embed(X) * math.sqrt(self.embedding_size))
for blk in self.blks:
X = blk(X, valid_length)
return X
解码器
Transformer 模型的解码器与编码器结构类似,然而,除了之前介绍的几个模块之外,编码器部分有另一个子模块。该模块也是多头注意力层,接受编码器的输出作为key和value,decoder的状态作为query。(Decoder 部分的第二个注意力层不是自注意力,key-value来自编码器而query来自解码器)。与编码器部分相类似,解码器同样是使用了add and norm机制,用残差和层归一化将各个子层的输出相连。
仔细来讲,在第t个时间步,当前输入是query,那么self attention接受了第t步以及前t-1步的所有输入。在训练时,由于第t位置的输入可以观测到全部的序列,这与预测阶段的情形项矛盾,所以我们要通过将第t个时间步所对应的可观测长度设置为t,以消除不需要看到的未来的信息(因为attention机制可以让我们观察到整个序列的情况)。
实现代码
class DecoderBlock(nn.Module):
def __init__(self, embedding_size, ffn_hidden_size, num_heads,dropout,i,**kwargs):
super(DecoderBlock, self).__init__(**kwargs)
self.i = i
self.attention_1 = MultiHeadAttention(embedding_size, embedding_size, num_heads, dropout)
self.addnorm_1 = AddNorm(embedding_size, dropout)
self.attention_2 = MultiHeadAttention(embedding_size, embedding_size, num_heads, dropout)
self.addnorm_2 = AddNorm(embedding_size, dropout)
self.ffn = PositionWiseFFN(embedding_size, ffn_hidden_size, embedding_size)
self.addnorm_3 = AddNorm(embedding_size, dropout)
def forward(self, X, state):
enc_outputs, enc_valid_length = state[0], state[1]
# state[2][self.i] stores all the previous t-1 query state of layer-i
# len(state[2]) = num_layers
# If training:
# state[2] is useless.
# If predicting:
# In the t-th timestep:
# state[2][self.i].shape = (batch_size, t-1, hidden_size)
# Demo:
# love dogs ! [EOS]
# | | | |
# Transformer
# Decoder
# | | | |
# I love dogs !
if state[2][self.i] is None:
key_values = X
else:
# shape of key_values = (batch_size, t, hidden_size)
key_values = torch.cat((state[2][self.i], X), dim=1)
state[2][self.i] = key_values
if self.training:
batch_size, seq_len, _ = X.shape
# Shape: (batch_size, seq_len), the values in the j-th column are j+1
#valid_length的作用:因为attention机制可以让我们观察到整个序列的情况,
#训练时必须要限制当前位置所能观测到的序列长度,只能让它观测到前面的词,不能观测到后面的词(因为我们要预测的就是后面的词)
#valid_length就是对可观测序列的限制
valid_length = torch.FloatTensor(np.tile(np.arange(1, seq_len+1), (batch_size, 1)))
valid_length = valid_length.to(X.device)
else:
valid_length = None
X2 = self.attention_1(X, key_values, key_values, valid_length)
Y = self.addnorm_1(X, X2)
Y2 = self.attention_2(Y, enc_outputs, enc_outputs, enc_valid_length)
Z = self.addnorm_2(Y, Y2)
return self.addnorm_3(Z, self.ffn(Z)), state
class TransformerDecoder(d2l.Decoder):
def __init__(self, vocab_size, embedding_size, ffn_hidden_size,
num_heads, num_layers, dropout, **kwargs):
super(TransformerDecoder, self).__init__(**kwargs)
self.embedding_size = embedding_size
self.num_layers = num_layers
self.embed = nn.Embedding(vocab_size, embedding_size)
self.pos_encoding = PositionalEncoding(embedding_size, dropout)
self.blks = nn.ModuleList()
for i in range(num_layers):
self.blks.append(
DecoderBlock(embedding_size, ffn_hidden_size, num_heads,
dropout, i))
self.dense = nn.Linear(embedding_size, vocab_size)
def init_state(self, enc_outputs, enc_valid_length, *args):
return [enc_outputs, enc_valid_length, [None]*self.num_layers]
def forward(self, X, state):
X = self.pos_encoding(self.embed(X) * math.sqrt(self.embedding_size))
for blk in self.blks:
X, state = blk(X, state)
return self.dense(X), state