10.4 Transformer实现#
在前面两节内容中我们分别详细介绍了Transformer模型的原理与多头注意力机制的实现过程,接下来,我们将会一步一步地来详细介绍如何通过 PyTorch框架实现Transformer的整体网络结构, 包括嵌入层、编码器和解码器等等。下面,首先介绍嵌入层的实现过程。
10.4.1 嵌入层实现#
1. Token Embedding 实现
这里首先要实现的是最基础的字符嵌入层,通过PyTorch框架只需要一行代码便可以得到一个字符嵌入层,如下所示:
1 class TokenEmbedding(nn.Module):
2 def __init__(self, vocab_size, emb_size):
3 super(TokenEmbedding, self).__init__()
4 self.embedding = nn.Embedding(vocab_size, emb_size)
5 self.emb_size = emb_size
6 def forward(self, tokens):
7 return self.embedding(tokens.long()) * math.sqrt(self.emb_size)在上述代码中,第4行便是根据指定的词表大小和维度建立一个字符嵌入层。第6~7行是字符嵌入层对应的前向传播过程,即取每个词的索引对应的词向量,其中输入tokens的形状为[len, batch_size],返回结果的形状为[len, batch_size, emb_size]。
2. Positional Embedding实现
在「第10.2.4节 Transformer原理:自注意力、多头注意力与位置编码」内容中我们已经详细介绍了位置编码的作用和原理,同时根据式(10-9)我们还能得到
$$ 1/\left(10000^{\frac{2i}{d_{\text{model}}}}\right)=\exp\left(2i\left(\frac{-\log(10000)}{d_{\text{model}}}\right)\right)\tag{10-11} $$根据式(10-9)和式(10-11),位置编码的实现过程如下所示:
1 class PositionalEncoding(nn.Module):
2 def __init__(self, d_model, dropout=0.1, max_len=5000):
3 super(PositionalEncoding, self).__init__()
4 self.dropout = nn.Dropout(p=dropout)
5 pe = torch.zeros(max_len, d_model)
6 position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
7 div_term = torch.exp(torch.arange(0,d_model,2).float()*(-math.log(10000.0)/d_model))
8 pe[:, 0::2] = torch.sin(position * div_term)
9 pe[:, 1::2] = torch.cos(position * div_term)
10 pe = pe.unsqueeze(1)
11 self.register_buffer('pe', pe)
12 def forward(self, x):
13 x = x + self.pe[:x.size(0), :]
14 return self.dropout(x) 在上述代码中,第1行d_model表示指定模型的维度,max_len表示指定最大的位置长度,需要大于最大句子的长度。第5行用于初始化一个全0的位置矩阵来保存位置信息,即图10-17中括号里的第2个矩阵。第6~9行是用来计算每个维度(每一列)的相关位置信息,其中第8行是计算偶数列的位置信息,第9行是计算奇数列的位置信息。第10行是进行维度扩充形状为[max_len, 1, d_model]。第11行是声明一个不可训练的模型变量,后续可通过以类成员变量的方式进行引用。第12~14行为前向传播计算过程,其中输入x的形状为[x_len, batch_size, emb_size],输出的形状同为[x_len, batch_size, emb_size]。
3. 使用示例
在实现完这个两部分的代码之后,可以通过如下方式进行使用:
1 if __name__ == '__main__':
2 x = torch.tensor([[1, 3, 5, 7, 9], [2, 4, 6, 8, 10]], dtype=torch.long)
3 x = x.transpose(0, 1)
4 token_embedding = TokenEmbedding(vocab_size=11, emb_size=512)
5 x = token_embedding(tokens=x)
6 pos_embedding = PositionalEncoding(d_model=512)
7 x = pos_embedding(x=x)在上述代码中,第2~3行是指定输入,并将batch_size放到第2个维度。第4~5行是字符嵌入的计算过程,输出结果形状为[5, 2, 512]。第6~7行是位置编码的计算过程,输出结果形状同样为[5, 2, 512]。
10.4.2 编码器实现#
根据图10-24可知,Transformer编码器是由多个编码层TransformerEncoderLayer所构成,因此下面我们先实现编码层然后再基于编码层构建得到编码器。
1. 编码层实现
对于一个单独的编码层来说其内部结构即为图10-18的左侧(不包括嵌入层)部分。对于这部分前向传播过程,可以通过如下代码来进行实现:
1 class MyTransformerEncoderLayer(nn.Module):
2 def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1):
3 super(MyTransformerEncoderLayer, self).__init__()
4 self.self_attn = MyMultiheadAttention(d_model, nhead, dropout=dropout)
5 self.dropout1 = nn.Dropout(dropout)
6 self.norm1 = nn.LayerNorm(d_model)
7 self.linear1 = nn.Linear(d_model, dim_feedforward)
8 self.dropout = nn.Dropout(dropout)
9 self.linear2 = nn.Linear(dim_feedforward, d_model)
10 self.activation = nn.ReLU()
11 self.dropout2 = nn.Dropout(dropout)
12 self.norm2 = nn.LayerNorm(d_model)
13
14 def forward(self, src, src_mask=None, src_key_padding_mask=None):
15 src2 = self.self_attn(src, src, src, src_mask,src_key_padding_mask)[0]
16 src = src + self.dropout1(src2)
17 src = self.norm1(src)
18 src2 = self.activation(self.linear1(src))
19 src2 = self.linear2(self.dropout(src2))
20 src = src + self.dropout2(src2)
21 src = self.norm2(src)
22 return src 在上述代码中,第4行是实例化一个多头注意力层。第5~12行是分别实例化相关线性层、丢弃层或层归一化层。第14行中src表示经过嵌入层后的输出,形状为[src_len,batch_size, d_model],src_mask表示注意力掩码此时为None,src_key_padding_mask为编码器对应的填充掩码,形状为[batch_size, src_len]。第15行是多头注意力机的前向传播计算过程,此时只取返回的第1个结果,形状为[src_len,batch_size,d_model],即多头的线性组合输出。第22行则是最后计算得到的输出,形状为[src_len,batch_size,d_model]。
2. 编码器实现
在实现完一个标准的编码层后便可以基于此来实现堆叠多个编码层,从而得到Transformer中的编码器。对于这部分内容,可以通过如下代码来实现:
1 def _get_clones(module, N):
2 return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
3
4 class MyTransformerEncoder(nn.Module):
5 def __init__(self, encoder_layer, num_layers, norm=None):
6 super(MyTransformerEncoder, self).__init__()
7 self.layers = _get_clones(encoder_layer, num_layers)
8 self.num_layers = num_layers
9 self.norm = norm
10
11 def forward(self, src, mask=None, src_key_padding_mask=None):
12 output = src
13 for mod in self.layers:
14 output = mod(output, mask,src_key_padding_mask)
15 if self.norm is not None:
16 output = self.norm(output)
17 return output 在上述代码中,第1~2行是用来定义一个克隆多个编码层或解码层功能函数。第7行中的encoder_layer是一个实例化的编码层,self.layers中保存的是一个包含有多个编码层的ModuleList。第13~14行便是用来实现多个编码层堆叠起来的效果,并完成整个前向传播的计算过程。第15~17行分别是对多个编码层的输出结果进行层归一化,然后返回最后的结果,形状为[src_len,batch_size,d_model]。
3. 使用示例
在完成编码器的实现过程后便可以将其用于对输入序列进行编码。例如可以仅仅通过一个编码器对输入序列进行编码,然后将最后的输出再输入到一个分类器中进行分类处理,而BERT模型的整体结构便是如此。下面先看一个使用示例。
1 if __name__ == '__main__':
2 src_len, batch_size = 5, 2
3 d_model, tgt_len = 32, 6
4 src, num_head = torch.rand((src_len, batch_size, d_model)), 8
5 src_key_padding_mask = torch.tensor([[False, False, False, True, True],
6 [False, False, False, False, True]])
7 encoder_layer = MyTransformerEncoderLayer(d_model=d_model, nhead=num_head)
8 my_transformer_encoder = MyTransformerEncoder(encoder_layer=encoder_layer,
9 num_layers=2,norm=nn.LayerNorm(d_model))
10 memory = my_transformer_encoder(src=src, mask=None, src_key_padding_mask) 在上述代码中,第4~6行是初始化模型的相关输入。第7行是根据参数实例化一个编码层。第8~9行是根据多个编码层实例化得到一个编码器。第10行便是用来得到整个编码器的前向传播输出结果,此时形状为[5, 2, 32],并且需要注意的是在编码器中不需要掩盖当前时刻之后的位置信息,所以mask=None。
10.4.3 解码器实现#
在介绍完编码器的实现过程后,下面开始介绍解码器的实现过程。Transformer解码器是由多个解码层TransformerEecoderLayer所构成,因此下面我们先实现解码层然后再基于解码层构建得到解码器。
1. 解码层实现
对于一个单独的解码层来说其内部结构即为图10-18的右侧(不包括嵌入层和分类层)部分。对于这部分前向传播过程,可以通过如下代码来进行实现:
1 class MyTransformerDecoderLayer(nn.Module):
2 def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1):
3 super(MyTransformerDecoderLayer, self).__init__()
4 self.self_attn = MyMultiheadAttention(embed_dim=d_model,
5 num_heads=nhead, dropout=dropout)
6 self.multihead_attn = MyMultiheadAttention(embed_dim=d_model,
7 num_heads=nhead, dropout=dropout)
8 self.linear1 = nn.Linear(d_model, dim_feedforward)
9 self.dropout = nn.Dropout(dropout)
10 self.linear2 = nn.Linear(dim_feedforward, d_model)
11 self.norm1 = nn.LayerNorm(d_model)
12 self.norm2 = nn.LayerNorm(d_model)
13 self.norm3 = nn.LayerNorm(d_model)
14 self.dropout1 = nn.Dropout(dropout)
15 self.dropout2 = nn.Dropout(dropout)
16 self.dropout3 = nn.Dropout(dropout)
17 self.activation = nn.ReLU()在上述代码中,第4~5行用来定义图10-18中的掩码多头注意力机制实例化对象。第6~7行是定义图10-18中编码器与解码器交互的多头注意力机制实例化对象。第8~17行是分别实例化相关线性层、丢弃层或层归一化层等。
在完成类MyTransformerDecoderLayer的初始化后,便可以实现整个前向传播计算过程,代码如下所示:
1 def forward(self, tgt, mem, tgt_mask=None, mem_mask=None,
2 tgt_key_padding_mask=None,mem_key_padding_mask=None):
3 tgt2 = self.self_attn(tgt, tgt, tgt, tgt_mask,tgt_key_padding_mask)[0]
4 tgt = tgt + self.dropout1(tgt2)
5 tgt = self.norm1(tgt)
6 tgt2 = self.multihead_attn(tgt,mem,mem,mem_mask,mem_key_padding_mask)[0]
7 tgt = tgt + self.dropout2(tgt2)
8 tgt = self.norm2(tgt)
9 tgt2 = self.activation(self.linear1(tgt))
10 tgt2 = self.linear2(self.dropout(tgt2))
11 tgt = tgt + self.dropout3(tgt2)
12 tgt = self.norm3(tgt)
13 return tgt 在上述代码中,第1~2行里tgt是解码器部分的输入,形状为[tgt_len, batch_ size, d_model];mem是编码部分的输出结果形状为[src_len, batch_size, d_model];tgt_mask用于解码器中掩盖当前位置之后的信息掩码矩阵,形状[tgt_le n, tgt_len];mem_mask是编码器-解码器交互时的注意力掩码,一般为None;tgt_key_padding_mask是解码部分输入序列的填充情况,形状为[batch_size, tgt_len];mem_key_padding_mask是编码器输入部分的填充向量,形状为[batch_size, src_len]。
第3行用来完成图10-18中掩码多头注意力部分的计算过程,其中tgt_mask为注意力掩码矩阵,tgt_key_padding_mask为解码器输入对应的填充掩码。第6行用来完成解码器与编码器之间交互的多头注意力计算过程,其中mem_mask为None,mem_key_padding_mask同样为src_key_padding_mask,用来对编码器的输出进行填充部分的掩盖。第21行为最终计算得到的结果,形状为[tgt_len, batch_size, d_model]。
2. 解码器实现
在实现完一个标准的解码层后便可以基于此来实现堆叠多个解码层,从而得到Transformer中的解码器。对于这部分内容,可以通过如下代码来实现:
1 class MyTransformerDecoder(nn.Module):
2 def __init__(self, decoder_layer, num_layers, norm=None):
3 super(MyTransformerDecoder, self).__init__()
4 self.layers = _get_clones(decoder_layer, num_layers)
5 self.num_layers = num_layers
6 self.norm = norm
7
8 def forward(self, tgt, mem, tgt_mask=None, mem_mask=None,
9 tgt_key_padding_mask=None,mem_key_padding_mask=None):
10 output = tgt
11 for mod in self.layers:
12 output = mod(output, mem, tgt_mask, mem_mask,
13 tgt_key_padding_mask,mem_key_padding_mask)
14 if self.norm is not None:
15 output = self.norm(output)
16 return output 在上述代码中,第4行中的decoder_layer是一个实例化的解码层,self.layers中保存的是一个包含有多个解码层的ModuleList。第11~13行是用来实现多个解码层堆叠起来的效果,并完成整个前向传播的计算过程。第14~16行分别是对多个解码层的输出结果进行层归一化,然后返回最后的结果,形状为[tgt_len,batch_size,d_model]。
10.4.4 Transformer网络实现#
1. 网络定义
在实现完Transformer中各个基础模块后,接着便可以根据图10-18所示的结构来完成Transformer模型的构建。总体来说这部分的代码也相对简单,只需要将上述编码器解码器组合到一起即可。首先定义整个网络结构,具体代码如下所示:
1 class MyTransformer(nn.Module):
2 def __init__(self, d_model=512, nhead=8, num_encoder_layers=6,
3 num_decoder_layers=6, dim_feedforward=2048, dropout=0.1):
4 super(MyTransformer, self).__init__()
5 encoder_layer = MyTransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout)
6 encoder_norm = nn.LayerNorm(d_model)
7 self.encoder = MyTransformerEncoder(encoder_layer, num_encoder_layers, encoder_norm)
8 decoder_layer = MyTransformerDecoderLayer(d_model, nhead, dim_feedforward, dropout)
9 decoder_norm = nn.LayerNorm(d_model)
10 self.decoder = MyTransformerDecoder(decoder_layer, num_decoder_layers, decoder_norm)
11 self.d_model = d_model
12 self.nhead = nhead在上述代码中,第1~2行是模型的网络超参数,其中d_model为模型维度,nhead为多头个数,num_encoder_layers为编码层的个数,num_decoder_layers为解码层的个数,dim_feedforward为全连接层中的维度。第5~7行是用来定义编码器部分的网络结构。第8~10行是用来定义解码器部分的网络结构。
2. 前向传播
在定义完类MyTransformer的初始化函数后,便可以继续实现的整个前向传播过程,示例代码如下所示:
1 def forward(self, src, tgt, src_mask=None, tgt_mask=None,
2 mem_mask=None, src_key_padding_mask=None,
3 tgt_key_padding_mask=None, mem_key_padding_mask=None):
4 mem = self.encoder(src, mask=src_mask, src_key_padding_mask=src_key_padding_mask)
5 output = self.decoder(tgt=tgt, mem=mem, tgt_mask=tgt_mask, mem_mask=mem_mask,
6 tgt_key_padding_mask=tgt_key_padding_mask,
7 mem_key_padding_mask=mem_key_padding_mask)
8 return output在上述代码中,第1~3行为模型接收的相关参数,各个参数的信息在上面已经介绍过这里就不再赘述。第4行是编码器的前向传播计算过程。第5~7行是解码器对应的前向传播过程。可以发现,在上述代码中并没有图10-18中右上方的分类层。这里我们实现的是一个通用的编码器-解码器结构,而最后的分类层我们将放到特定下游任务中进行实现,详见「第10.5节 Transformer对联模型:序列生成实战示例」内容。
3. 注意力掩码构建
除了上述模块之外,Transformer 中还有一个需要实现的就是注意力掩码矩阵的生成,示例代码如下所示:
1 def generate_square_subsequent_mask(self, sz):
2 mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
3 mask = mask.float().masked_fill(mask == 0, float('-inf'))
4 mask = mask.masked_fill(mask == 1, float(0.0))
5 return mask 在上述代码中,第1行sz是指定掩码矩阵的大小。第2行torch.triu的作用是返回矩阵的上三角部分,其余部分置为0并进行转置,这样便得到了一个下三角全为1,其余部分为0的方阵。第3~4行是先将0的位置全部替换为负无穷,然后再将1的位置替换为0。
4. 使用示例
在实现完整个Transformer网络结构以后,可以通过如下方式进行使用:
1 if __name__ == '__main__':
2 src_len, batch_size = 5, 2
3 d_model, tgt_len = 32, 6
4 src, num_head = torch.rand((src_len, batch_size, d_model)), 8
5 src_key_padding_mask = torch.tensor([[False, False, False, True, True],
6 [False, False, False, False, True]])
7 tgt = torch.rand((tgt_len, batch_size, d_model))
8 tgt_key_padding_mask = torch.tensor([[False, False, False, True, True, True],
9 [False, False, False, False, True, True]])
10 my_transformer = MyTransformer(d_model=d_model, nhead=num_head, num_encoder_layers=6,
11 num_decoder_layers=6, dim_feedforward=500)
12 tgt_mask = my_transformer.generate_square_subsequent_mask(tgt_len)
13 out = my_transformer(src=src, tgt=tgt, tgt_mask=tgt_mask,
14 src_key_padding_mask=src_key_padding_mask,
15 tgt_key_padding_mask=tgt_key_padding_mask,
16 mem_key_padding_mask=src_key_padding_mask)
17 print(out.shape)在上述代码中,第2~3行是指定模型相关超参数。第4~6行分别生成编码器的输入和填充掩码。第7~9行是分别生成解码器对应的输入和填充掩码。第10~11行是根据模型参数实例化一个Transformer模型。第12行是根据目标序列长度生成注意力掩码矩阵。第13~17行则是根据各部分输入计算得到整个模型的输出,返回结果形状为[tgt_len,batch_size,d_model]。
10.4.5 小结#
在本节内容中,我们首先介绍了Transformer中嵌入层的实现,包括字符嵌入和位置编码两部分;然后分别详细介绍了编码器和解码器的实现过程,并且提到BERT模型的核心思想其实就是基于Transformer中的编码器构建而来;最后基于编码器和解码器介绍了如何构建整个完整的Transformer模型。在下一节内容中,我们将从会介绍如何基于Transformer模型来构建一个对联生成模型。