使用 Transformer 和 Keras 进行神经机器翻译

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

本教程演示了如何创建和训练一个 序列到序列 Transformer 模型,将 葡萄牙语翻译成英语。Transformer 最初由 Vaswani 等人 (2017) 在 "Attention is all you need" 中提出。

Transformer 是深度神经网络,它用 自注意力 替换 CNN 和 RNN。自注意力使 Transformer 能够轻松地在输入序列之间传输信息。

Google AI 博客文章 中所述

用于机器翻译的神经网络通常包含一个编码器,用于读取输入句子并生成其表示。然后,解码器在参考编码器生成的表示时,逐词生成输出句子。Transformer 从为每个词生成初始表示或嵌入开始... 然后,使用自注意力,它从所有其他词中聚合信息,生成每个词的新表示,该表示由整个上下文(由填充的球体表示)提供信息。然后,此步骤在所有词的并行中重复多次,依次生成新的表示。

Applying the Transformer to machine translation

图 1:将 Transformer 应用于机器翻译。来源:Google AI 博客

这需要消化很多内容,本教程的目标是将其分解成易于理解的部分。在本教程中,您将

  • 准备数据。
  • 实现必要的组件
    • 位置嵌入。
    • 注意力层。
    • 编码器和解码器。
  • 构建和训练 Transformer。
  • 生成翻译。
  • 导出模型。

为了充分利用本教程,了解 文本生成的基础知识 和注意力机制会有所帮助。

Transformer 是一种序列到序列的编码器-解码器模型,类似于 带注意力机制的 NMT 教程 中的模型。单层 Transformer 需要更多代码来编写,但与该编码器-解码器 RNN 模型几乎相同。唯一的区别是 RNN 层被自注意力层替换。本教程构建了一个 4 层 Transformer,它更大、更强大,但从根本上来说并不比它更复杂。

RNN+注意力模型 1 层 Transformer

在训练完本笔记本中的模型后,您将能够输入一个葡萄牙语句子并返回其英语翻译。

Attention heatmap

图 2:可视化注意力权重,您可以在本教程结束时生成这些权重。

为什么 Transformer 很重要

  • Transformer 在对顺序数据(例如自然语言)进行建模方面表现出色。
  • 循环神经网络 (RNN) 不同,Transformer 是可并行的。这使得它们在 GPU 和 TPU 等硬件上效率更高。主要原因是 Transformer 用注意力替换了递归,并且计算可以同时进行。层输出可以并行计算,而不是像 RNN 那样串行计算。
  • RNN(例如 seq2seq,2014)或 卷积神经网络 (CNN)(例如 ByteNet)不同,Transformer 能够捕获数据中输入或输出序列中远距离位置之间的远距离或长距离上下文和依赖关系。因此,可以学习更长的连接。注意力允许每个位置在每一层访问整个输入,而在 RNN 和 CNN 中,信息需要经过许多处理步骤才能移动很长距离,这使得学习变得更加困难。
  • Transformer 对数据中的时间/空间关系没有假设。这非常适合处理一组对象(例如,星际争霸单位)。

Encoder self-attention distribution for the word it from the 5th to the 6th layer of a Transformer trained on English-to-French translation

图 3:在英语到法语翻译(八个注意力头之一)上训练的 Transformer 的第 5 层到第 6 层中“it”一词的编码器自注意力分布。来源:Google AI 博客

设置

首先安装 TensorFlow Datasets 用于加载数据集,以及 TensorFlow Text 用于文本预处理。

# Install the most re version of TensorFlow to use the improved
# masking support for `tf.keras.layers.MultiHeadAttention`.
apt install --allow-change-held-packages libcudnn8=8.1.0.77-1+cuda11.2
pip uninstall -y -q tensorflow keras tensorflow-estimator tensorflow-text
pip install protobuf~=3.20.3
pip install -q tensorflow_datasets
pip install -q -U tensorflow-text tensorflow

导入必要的模块

import logging
import time

import numpy as np
import matplotlib.pyplot as plt

import tensorflow_datasets as tfds
import tensorflow as tf

import tensorflow_text

数据处理

本节将从 本教程 中下载数据集和子词分词器,然后将其封装在 tf.data.Dataset 中用于训练。

测试数据集

# Create training and validation set batches.
train_batches = make_batches(train_examples)
val_batches = make_batches(val_examples)

生成的 tf.data.Dataset 对象已为使用 Keras 进行训练做好准备。Keras Model.fit 训练需要 (inputs, labels) 对。该 inputs 是分词后的葡萄牙语和英语序列对,(pt, en)。该 labels 是偏移 1 的相同英语序列。这种偏移是为了确保在每个位置输入 en 序列时,label 都是下一个词元。

底部是输入,顶部是标签。

这与 文本生成教程 相同,只是这里有额外的输入“上下文”(葡萄牙语序列),模型会“以它为条件”。

这种设置被称为“教师强迫”,因为无论模型在每个时间步的输出是什么,它都会在下一个时间步获得真实值作为输入。这是一种简单高效的训练文本生成模型的方法。它效率很高,因为您不需要按顺序运行模型,不同序列位置的输出可以并行计算。

您可能期望 input, output 对只是 Portuguese, English 序列。给定葡萄牙语序列,模型会尝试生成英语序列。

可以训练一个以这种方式工作的模型。您需要编写推理循环并将模型的输出传递回输入。它速度较慢(时间步无法并行运行),并且学习难度更大(模型必须在获得开头正确之前才能获得句尾正确),但它可以提供更稳定的模型,因为模型必须学习在训练期间纠正自己的错误。

for (pt, en), en_labels in train_batches.take(1):
  break

print(pt.shape)
print(en.shape)
print(en_labels.shape)

enen_labels 是相同的,只是偏移 1

print(en[0][:10])
print(en_labels[0][:10])

定义组件

Transformer 内部有很多内容。需要记住的重要事项是

  1. 它遵循与标准序列到序列模型(包含编码器和解码器)相同的通用模式。
  2. 如果您逐步完成它,所有内容都会变得有意义。
原始 Transformer 图 4 层 Transformer 的表示

在您完成本教程的过程中,将解释这两个图中的每个组件。

嵌入和位置编码层

编码器和解码器的输入都使用相同的嵌入和位置编码逻辑。

嵌入和位置编码层

给定一个词元序列,输入词元(葡萄牙语)和目标词元(英语)都必须使用 tf.keras.layers.Embedding 层转换为向量。

模型中使用的注意力层将它们的输入视为一组向量,没有顺序。由于模型不包含任何循环层或卷积层,因此它需要某种方法来识别词序,否则它会将输入序列视为 词袋 实例,how are youhow you areyou how are 等等,都是无法区分的。

Transformer 会向嵌入向量添加“位置编码”。它使用不同频率(跨序列)的一组正弦和余弦。根据定义,相邻元素将具有相似的位置编码。

原始论文使用以下公式来计算位置编码

\[\Large{PE_{(pos, 2i)} = \sin(pos / 10000^{2i / d_{model} })} \]

\[\Large{PE_{(pos, 2i+1)} = \cos(pos / 10000^{2i / d_{model} })} \]

def positional_encoding(length, depth):
  depth = depth/2

  positions = np.arange(length)[:, np.newaxis]     # (seq, 1)
  depths = np.arange(depth)[np.newaxis, :]/depth   # (1, depth)

  angle_rates = 1 / (10000**depths)         # (1, depth)
  angle_rads = positions * angle_rates      # (pos, depth)

  pos_encoding = np.concatenate(
      [np.sin(angle_rads), np.cos(angle_rads)],
      axis=-1) 

  return tf.cast(pos_encoding, dtype=tf.float32)

位置编码函数是正弦和余弦的堆叠,它们根据其在嵌入向量深度中的位置以不同的频率振动。它们在位置轴上振动。

根据定义,这些向量在位置轴上与相邻向量很好地对齐。在下面,位置编码向量被归一化,并且来自位置 1000 的向量通过点积与所有其他向量进行比较

因此,使用它来创建一个 PositionEmbedding 层,该层会查找词元的嵌入向量并添加位置向量

class PositionalEmbedding(tf.keras.layers.Layer):
  def __init__(self, vocab_size, d_model):
    super().__init__()
    self.d_model = d_model
    self.embedding = tf.keras.layers.Embedding(vocab_size, d_model, mask_zero=True) 
    self.pos_encoding = positional_encoding(length=2048, depth=d_model)

  def compute_mask(self, *args, **kwargs):
    return self.embedding.compute_mask(*args, **kwargs)

  def call(self, x):
    length = tf.shape(x)[1]
    x = self.embedding(x)
    # This factor sets the relative scale of the embedding and positonal_encoding.
    x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
    x = x + self.pos_encoding[tf.newaxis, :length, :]
    return x
embed_pt = PositionalEmbedding(vocab_size=tokenizers.pt.get_vocab_size().numpy(), d_model=512)
embed_en = PositionalEmbedding(vocab_size=tokenizers.en.get_vocab_size().numpy(), d_model=512)

pt_emb = embed_pt(pt)
en_emb = embed_en(en)
en_emb._keras_mask

添加和归一化

添加和归一化

这些“添加和归一化”块散布在整个模型中。每个块都会加入一个残差连接,并将结果通过 LayerNormalization 层运行。

组织代码的最简单方法是围绕这些残差块。以下部分将为每个块定义自定义层类。

包含残差“添加和归一化”块是为了确保训练效率。残差连接为梯度提供了一条直接路径(并确保向量被注意力层 **更新** 而不是 **替换**),而归一化则保持输出的合理范围。

基本注意力层

注意力层在整个模型中使用。这些层都相同,只是注意力的配置方式不同。每个层都包含一个 layers.MultiHeadAttention、一个 layers.LayerNormalization 和一个 layers.Add

基本注意力层

要实现这些注意力层,请从一个简单的基类开始,该类只包含组件层。每个用例都将作为子类实现。以这种方式编写代码会多写一些代码,但它可以使意图清晰。

class BaseAttention(tf.keras.layers.Layer):
  def __init__(self, **kwargs):
    super().__init__()
    self.mha = tf.keras.layers.MultiHeadAttention(**kwargs)
    self.layernorm = tf.keras.layers.LayerNormalization()
    self.add = tf.keras.layers.Add()

注意力复习

在您深入了解每个用例的具体细节之前,这里快速复习一下注意力的工作原理

基本注意力层

有两个输入

  1. 查询序列;正在处理的序列;执行注意力的序列(底部)。
  2. 上下文序列;正在关注的序列(左侧)。

输出与查询序列具有相同的形状。

常见的比较是,此操作类似于字典查找。一个模糊可微分向量化的字典查找。

这是一个普通的 Python 字典,有 3 个键和 3 个值,传递给单个查询。

d = {'color': 'blue', 'age': 22, 'type': 'pickup'}
result = d['color']
  • 您要查找的是 query
  • 字典包含的信息类型是 key
  • 该信息是 value

当您在普通字典中查找 query 时,字典会找到匹配的 key,并返回其关联的 valuequery 可能会匹配 key,也可能不匹配。您可以想象一个模糊字典,其中键不必完全匹配。如果您在上面的字典中查找 d["species"],您可能希望它返回 "pickup",因为这是与查询最匹配的值。

注意力层执行类似的模糊查找,但它不仅查找最佳键。它根据 query 与每个 key 的匹配程度来组合 value

它是如何工作的?在注意力层中,querykeyvalue 都是向量。注意力层不是进行哈希查找,而是组合 querykey 向量以确定它们匹配程度,即“注意力分数”。该层返回所有 value 的加权平均值,权重由“注意力分数”决定。

查询序列中的每个位置都提供一个 query 向量。上下文序列充当字典。上下文序列中的每个位置都提供一个 keyvalue 向量。输入向量不会直接使用,layers.MultiHeadAttention 层包含 layers.Dense 层,用于在使用输入向量之前对其进行投影。

交叉注意力层

Transformer 的核心是交叉注意力层。该层连接编码器和解码器。该层是模型中最直接的注意力使用方式,它执行与 带注意力机制的 NMT 教程 中的注意力块相同的任务。

交叉注意力层

要实现此功能,您需要将目标序列 x 作为 query,并将 context 序列作为 key/value 传递给 mha 层。

class CrossAttention(BaseAttention):
  def call(self, x, context):
    attn_output, attn_scores = self.mha(
        query=x,
        key=context,
        value=context,
        return_attention_scores=True)

    # Cache the attention scores for plotting later.
    self.last_attn_scores = attn_scores

    x = self.add([x, attn_output])
    x = self.layernorm(x)

    return x

下面的漫画展示了信息如何流经该层。各列代表对上下文序列的加权求和。

为简便起见,未显示残差连接。

交叉注意力层

输出长度是 query 序列的长度,而不是上下文 key/value 序列的长度。

该图进一步简化,如下所示。无需绘制整个“注意力权重”矩阵。关键在于,每个 query 位置都可以看到上下文中的所有 key/value 对,但查询之间不会交换信息。

每个查询都可以看到整个上下文。

在示例输入上运行测试

sample_ca = CrossAttention(num_heads=2, key_dim=512)

print(pt_emb.shape)
print(en_emb.shape)
print(sample_ca(en_emb, pt_emb).shape)

全局自注意力层

该层负责处理上下文序列,并在其长度上传播信息。

全局自注意力层

由于上下文序列在生成翻译时是固定的,因此允许信息双向流动。

在 Transformer 和自注意力机制出现之前,模型通常使用 RNN 或 CNN 来执行此任务。

双向 RNN 和 CNN

RNN 和 CNN 存在局限性。

  • RNN 允许信息流经整个序列,但它需要经过许多处理步骤才能到达那里(限制梯度流)。这些 RNN 步骤必须按顺序执行,因此 RNN 难以利用现代并行设备。
  • 在 CNN 中,每个位置都可以并行处理,但它只提供有限的感受野。感受野仅随 CNN 层数线性增长。您需要堆叠多个卷积层才能在序列中传输信息(Wavenet 通过使用扩张卷积来减少此问题)。

另一方面,全局自注意力层允许每个序列元素直接访问其他每个序列元素,只需很少的操作,并且所有输出都可以并行计算。

要实现此层,您只需将目标序列 x 作为 queryvalue 参数传递给 mha 层。

class GlobalSelfAttention(BaseAttention):
  def call(self, x):
    attn_output = self.mha(
        query=x,
        value=x,
        key=x)
    x = self.add([x, attn_output])
    x = self.layernorm(x)
    return x
sample_gsa = GlobalSelfAttention(num_heads=2, key_dim=512)

print(pt_emb.shape)
print(sample_gsa(pt_emb).shape)

保持与之前相同的风格,您可以这样绘制它。

全局自注意力层

同样,为了清晰起见,省略了残差连接。

更紧凑、更准确的绘制方式如下所示。

全局自注意力层

因果自注意力层

该层与全局自注意力层执行类似的工作,用于输出序列。

因果自注意力层

这需要与编码器的全局自注意力层进行不同的处理。

文本生成教程带注意力机制的 NMT 教程一样,Transformer 是一个“自回归”模型:它们一次生成一个标记,并将该输出反馈给输入。为了使此模型高效,这些模型确保每个序列元素的输出仅取决于之前的序列元素;这些模型是“因果”的。

单向 RNN 本质上是因果的。要创建因果卷积,您只需填充输入并移动输出以使其正确对齐(使用 layers.Conv1D(padding='causal'))。

因果 RNN 和 CNN

因果模型在两个方面效率很高。

  1. 在训练中,它允许您在仅执行一次模型的情况下计算输出序列中每个位置的损失。
  2. 在推理过程中,对于每个新生成的标记,您只需要计算其输出,之前序列元素的输出可以重复使用。
    • 对于 RNN,您只需要 RNN 状态来考虑之前的计算(将 return_state=True 传递给 RNN 层的构造函数)。
    • 对于 CNN,您需要遵循 Fast Wavenet 的方法。

要构建因果自注意力层,您需要在计算注意力分数和对注意力 value 求和时使用适当的掩码。

如果您在调用 MultiHeadAttention 层时将 use_causal_mask = True 传递给它,则会自动处理此问题。

class CausalSelfAttention(BaseAttention):
  def call(self, x):
    attn_output = self.mha(
        query=x,
        value=x,
        key=x,
        use_causal_mask = True)
    x = self.add([x, attn_output])
    x = self.layernorm(x)
    return x

因果掩码确保每个位置只能访问其之前的那些位置。

因果自注意力层

同样,为了清晰起见,省略了残差连接。

该层的更紧凑表示如下所示。

因果自注意力层

测试该层

sample_csa = CausalSelfAttention(num_heads=2, key_dim=512)

print(en_emb.shape)
print(sample_csa(en_emb).shape)

早期序列元素的输出不依赖于后面的元素,因此在应用该层之前修剪元素的顺序无关紧要。

out1 = sample_csa(embed_en(en[:, :3])) 
out2 = sample_csa(embed_en(en))[:, :3]

tf.reduce_max(abs(out1 - out2)).numpy()

前馈网络

Transformer 还包括此逐点前馈网络,位于编码器和解码器中。

前馈网络

该网络由两个线性层(tf.keras.layers.Dense)组成,中间有一个 ReLU 激活函数,以及一个 dropout 层。与注意力层一样,此处的代码还包括残差连接和归一化。

class FeedForward(tf.keras.layers.Layer):
  def __init__(self, d_model, dff, dropout_rate=0.1):
    super().__init__()
    self.seq = tf.keras.Sequential([
      tf.keras.layers.Dense(dff, activation='relu'),
      tf.keras.layers.Dense(d_model),
      tf.keras.layers.Dropout(dropout_rate)
    ])
    self.add = tf.keras.layers.Add()
    self.layer_norm = tf.keras.layers.LayerNormalization()

  def call(self, x):
    x = self.add([x, self.seq(x)])
    x = self.layer_norm(x) 
    return x

测试该层,输出与输入具有相同的形状。

sample_ffn = FeedForward(512, 2048)

print(en_emb.shape)
print(sample_ffn(en_emb).shape)

编码器层

编码器包含 N 个编码器层的堆栈。其中每个 EncoderLayer 包含一个 GlobalSelfAttention 层和一个 FeedForward 层。

编码器层

以下是 EncoderLayer 的定义。

class EncoderLayer(tf.keras.layers.Layer):
  def __init__(self,*, d_model, num_heads, dff, dropout_rate=0.1):
    super().__init__()

    self.self_attention = GlobalSelfAttention(
        num_heads=num_heads,
        key_dim=d_model,
        dropout=dropout_rate)

    self.ffn = FeedForward(d_model, dff)

  def call(self, x):
    x = self.self_attention(x)
    x = self.ffn(x)
    return x

快速测试一下,输出将与输入具有相同的形状。

sample_encoder_layer = EncoderLayer(d_model=512, num_heads=8, dff=2048)

print(pt_emb.shape)
print(sample_encoder_layer(pt_emb).shape)

编码器

接下来构建编码器。

编码器

编码器由以下部分组成:

  • 输入端的 PositionalEmbedding 层。
  • EncoderLayer 层的堆栈。
class Encoder(tf.keras.layers.Layer):
  def __init__(self, *, num_layers, d_model, num_heads,
               dff, vocab_size, dropout_rate=0.1):
    super().__init__()

    self.d_model = d_model
    self.num_layers = num_layers

    self.pos_embedding = PositionalEmbedding(
        vocab_size=vocab_size, d_model=d_model)

    self.enc_layers = [
        EncoderLayer(d_model=d_model,
                     num_heads=num_heads,
                     dff=dff,
                     dropout_rate=dropout_rate)
        for _ in range(num_layers)]
    self.dropout = tf.keras.layers.Dropout(dropout_rate)

  def call(self, x):
    # `x` is token-IDs shape: (batch, seq_len)
    x = self.pos_embedding(x)  # Shape `(batch_size, seq_len, d_model)`.

    # Add dropout.
    x = self.dropout(x)

    for i in range(self.num_layers):
      x = self.enc_layers[i](x)

    return x  # Shape `(batch_size, seq_len, d_model)`.

测试编码器

# Instantiate the encoder.
sample_encoder = Encoder(num_layers=4,
                         d_model=512,
                         num_heads=8,
                         dff=2048,
                         vocab_size=8500)

sample_encoder_output = sample_encoder(pt, training=False)

# Print the shape.
print(pt.shape)
print(sample_encoder_output.shape)  # Shape `(batch_size, input_seq_len, d_model)`.

解码器层

解码器的堆栈稍微复杂一些,每个 DecoderLayer 包含一个 CausalSelfAttention 层、一个 CrossAttention 层和一个 FeedForward 层。

解码器层
class DecoderLayer(tf.keras.layers.Layer):
  def __init__(self,
               *,
               d_model,
               num_heads,
               dff,
               dropout_rate=0.1):
    super(DecoderLayer, self).__init__()

    self.causal_self_attention = CausalSelfAttention(
        num_heads=num_heads,
        key_dim=d_model,
        dropout=dropout_rate)

    self.cross_attention = CrossAttention(
        num_heads=num_heads,
        key_dim=d_model,
        dropout=dropout_rate)

    self.ffn = FeedForward(d_model, dff)

  def call(self, x, context):
    x = self.causal_self_attention(x=x)
    x = self.cross_attention(x=x, context=context)

    # Cache the last attention scores for plotting later
    self.last_attn_scores = self.cross_attention.last_attn_scores

    x = self.ffn(x)  # Shape `(batch_size, seq_len, d_model)`.
    return x

测试解码器层

sample_decoder_layer = DecoderLayer(d_model=512, num_heads=8, dff=2048)

sample_decoder_layer_output = sample_decoder_layer(
    x=en_emb, context=pt_emb)

print(en_emb.shape)
print(pt_emb.shape)
print(sample_decoder_layer_output.shape)  # `(batch_size, seq_len, d_model)`

解码器

Encoder 类似,Decoder 由一个 PositionalEmbedding 层和一个 DecoderLayer 层的堆栈组成。

嵌入和位置编码层

通过扩展 tf.keras.layers.Layer 来定义解码器。

class Decoder(tf.keras.layers.Layer):
  def __init__(self, *, num_layers, d_model, num_heads, dff, vocab_size,
               dropout_rate=0.1):
    super(Decoder, self).__init__()

    self.d_model = d_model
    self.num_layers = num_layers

    self.pos_embedding = PositionalEmbedding(vocab_size=vocab_size,
                                             d_model=d_model)
    self.dropout = tf.keras.layers.Dropout(dropout_rate)
    self.dec_layers = [
        DecoderLayer(d_model=d_model, num_heads=num_heads,
                     dff=dff, dropout_rate=dropout_rate)
        for _ in range(num_layers)]

    self.last_attn_scores = None

  def call(self, x, context):
    # `x` is token-IDs shape (batch, target_seq_len)
    x = self.pos_embedding(x)  # (batch_size, target_seq_len, d_model)

    x = self.dropout(x)

    for i in range(self.num_layers):
      x  = self.dec_layers[i](x, context)

    self.last_attn_scores = self.dec_layers[-1].last_attn_scores

    # The shape of x is (batch_size, target_seq_len, d_model).
    return x

测试解码器

# Instantiate the decoder.
sample_decoder = Decoder(num_layers=4,
                         d_model=512,
                         num_heads=8,
                         dff=2048,
                         vocab_size=8000)

output = sample_decoder(
    x=en,
    context=pt_emb)

# Print the shapes.
print(en.shape)
print(pt_emb.shape)
print(output.shape)
sample_decoder.last_attn_scores.shape  # (batch, heads, target_seq, input_seq)

创建了 Transformer 编码器和解码器后,现在该构建 Transformer 模型并对其进行训练了。

Transformer

现在您有了 EncoderDecoder。要完成 Transformer 模型,您需要将它们组合在一起,并添加一个最终的线性(Dense)层,该层将每个位置的最终向量转换为输出标记概率。

解码器的输出是此最终线性层的输入。

Transformer

一个编码器和解码器中都只有一层的 TransformerRNN+注意力教程 中的模型几乎完全相同。多层 Transformer 具有更多层,但本质上执行的是相同的工作。

1 层 Transformer 一个 4 层 Transformer
RNN+注意力模型

通过扩展 tf.keras.Model 来创建 Transformer

class Transformer(tf.keras.Model):
  def __init__(self, *, num_layers, d_model, num_heads, dff,
               input_vocab_size, target_vocab_size, dropout_rate=0.1):
    super().__init__()
    self.encoder = Encoder(num_layers=num_layers, d_model=d_model,
                           num_heads=num_heads, dff=dff,
                           vocab_size=input_vocab_size,
                           dropout_rate=dropout_rate)

    self.decoder = Decoder(num_layers=num_layers, d_model=d_model,
                           num_heads=num_heads, dff=dff,
                           vocab_size=target_vocab_size,
                           dropout_rate=dropout_rate)

    self.final_layer = tf.keras.layers.Dense(target_vocab_size)

  def call(self, inputs):
    # To use a Keras model with `.fit` you must pass all your inputs in the
    # first argument.
    context, x  = inputs

    context = self.encoder(context)  # (batch_size, context_len, d_model)

    x = self.decoder(x, context)  # (batch_size, target_len, d_model)

    # Final linear layer output.
    logits = self.final_layer(x)  # (batch_size, target_len, target_vocab_size)

    try:
      # Drop the keras mask, so it doesn't scale the losses/metrics.
      # b/250038731
      del logits._keras_mask
    except AttributeError:
      pass

    # Return the final output and the attention weights.
    return logits

超参数

为了使此示例保持较小规模且速度相对较快,层数(num_layers)、嵌入的维度(d_model)和 FeedForward 层的内部维度(dff)都已减少。

原始 Transformer 论文中描述的基本模型使用 num_layers=6d_model=512dff=2048

自注意力头的数量保持不变(num_heads=8)。

num_layers = 4
d_model = 128
dff = 512
num_heads = 8
dropout_rate = 0.1

试一试

实例化 Transformer 模型

transformer = Transformer(
    num_layers=num_layers,
    d_model=d_model,
    num_heads=num_heads,
    dff=dff,
    input_vocab_size=tokenizers.pt.get_vocab_size().numpy(),
    target_vocab_size=tokenizers.en.get_vocab_size().numpy(),
    dropout_rate=dropout_rate)

测试它

output = transformer((pt, en))

print(en.shape)
print(pt.shape)
print(output.shape)
attn_scores = transformer.decoder.dec_layers[-1].last_attn_scores
print(attn_scores.shape)  # (batch, heads, target_seq, input_seq)

打印模型的摘要

transformer.summary()

训练

现在该准备模型并开始对其进行训练了。

设置优化器

使用 Adam 优化器,并根据原始 Transformer 论文 中的公式自定义学习率调度器。

\[\Large{lrate = d_{model}^{-0.5} * \min(step{\_}num^{-0.5}, step{\_}num \cdot warmup{\_}steps^{-1.5})}\]

class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
  def __init__(self, d_model, warmup_steps=4000):
    super().__init__()

    self.d_model = d_model
    self.d_model = tf.cast(self.d_model, tf.float32)

    self.warmup_steps = warmup_steps

  def __call__(self, step):
    step = tf.cast(step, dtype=tf.float32)
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps ** -1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

实例化优化器(在本例中为 tf.keras.optimizers.Adam)。

learning_rate = CustomSchedule(d_model)

optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98,
                                     epsilon=1e-9)

测试自定义学习率调度器。

plt.plot(learning_rate(tf.range(40000, dtype=tf.float32)))
plt.ylabel('Learning Rate')
plt.xlabel('Train Step')

设置损失和指标。

由于目标序列已填充,因此在计算损失时应用填充掩码非常重要。使用交叉熵损失函数 (tf.keras.losses.SparseCategoricalCrossentropy)。

def masked_loss(label, pred):
  mask = label != 0
  loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')
  loss = loss_object(label, pred)

  mask = tf.cast(mask, dtype=loss.dtype)
  loss *= mask

  loss = tf.reduce_sum(loss)/tf.reduce_sum(mask)
  return loss


def masked_accuracy(label, pred):
  pred = tf.argmax(pred, axis=2)
  label = tf.cast(label, pred.dtype)
  match = label == pred

  mask = label != 0

  match = match & mask

  match = tf.cast(match, dtype=tf.float32)
  mask = tf.cast(mask, dtype=tf.float32)
  return tf.reduce_sum(match)/tf.reduce_sum(mask)

训练模型。

所有组件准备就绪后,使用 model.compile 配置训练过程,然后使用 model.fit 运行它。

transformer.compile(
    loss=masked_loss,
    optimizer=optimizer,
    metrics=[masked_accuracy])
transformer.fit(train_batches,
                epochs=20,
                validation_data=val_batches)

运行推理。

现在可以通过执行翻译来测试模型。以下步骤用于推理。

  • 使用葡萄牙语分词器 (tokenizers.pt) 对输入句子进行编码。这是编码器输入。
  • 解码器输入初始化为 [START] 标记。
  • 计算填充掩码和前瞻掩码。
  • 然后,decoder 通过查看 encoder output 和它自己的输出(自注意力)来输出预测结果。
  • 将预测的标记连接到解码器输入,并将其传递给解码器。
  • 在这种方法中,解码器根据其预测的先前标记来预测下一个标记。

通过子类化 tf.Module 来定义 Translator 类。

class Translator(tf.Module):
  def __init__(self, tokenizers, transformer):
    self.tokenizers = tokenizers
    self.transformer = transformer

  def __call__(self, sentence, max_length=MAX_TOKENS):
    # The input sentence is Portuguese, hence adding the `[START]` and `[END]` tokens.
    assert isinstance(sentence, tf.Tensor)
    if len(sentence.shape) == 0:
      sentence = sentence[tf.newaxis]

    sentence = self.tokenizers.pt.tokenize(sentence).to_tensor()

    encoder_input = sentence

    # As the output language is English, initialize the output with the
    # English `[START]` token.
    start_end = self.tokenizers.en.tokenize([''])[0]
    start = start_end[0][tf.newaxis]
    end = start_end[1][tf.newaxis]

    # `tf.TensorArray` is required here (instead of a Python list), so that the
    # dynamic-loop can be traced by `tf.function`.
    output_array = tf.TensorArray(dtype=tf.int64, size=0, dynamic_size=True)
    output_array = output_array.write(0, start)

    for i in tf.range(max_length):
      output = tf.transpose(output_array.stack())
      predictions = self.transformer([encoder_input, output], training=False)

      # Select the last token from the `seq_len` dimension.
      predictions = predictions[:, -1:, :]  # Shape `(batch_size, 1, vocab_size)`.

      predicted_id = tf.argmax(predictions, axis=-1)

      # Concatenate the `predicted_id` to the output which is given to the
      # decoder as its input.
      output_array = output_array.write(i+1, predicted_id[0])

      if predicted_id == end:
        break

    output = tf.transpose(output_array.stack())
    # The output shape is `(1, tokens)`.
    text = tokenizers.en.detokenize(output)[0]  # Shape: `()`.

    tokens = tokenizers.en.lookup(output)[0]

    # `tf.function` prevents us from using the attention_weights that were
    # calculated on the last iteration of the loop.
    # So, recalculate them outside the loop.
    self.transformer([encoder_input, output[:,:-1]], training=False)
    attention_weights = self.transformer.decoder.last_attn_scores

    return text, tokens, attention_weights

创建此 Translator 类的实例,并尝试几次。

translator = Translator(tokenizers, transformer)
def print_translation(sentence, tokens, ground_truth):
  print(f'{"Input:":15s}: {sentence}')
  print(f'{"Prediction":15s}: {tokens.numpy().decode("utf-8")}')
  print(f'{"Ground truth":15s}: {ground_truth}')

示例 1

sentence = 'este é um problema que temos que resolver.'
ground_truth = 'this is a problem we have to solve .'

translated_text, translated_tokens, attention_weights = translator(
    tf.constant(sentence))
print_translation(sentence, translated_text, ground_truth)

示例 2

sentence = 'os meus vizinhos ouviram sobre esta ideia.'
ground_truth = 'and my neighboring homes heard about this idea .'

translated_text, translated_tokens, attention_weights = translator(
    tf.constant(sentence))
print_translation(sentence, translated_text, ground_truth)

示例 3

sentence = 'vou então muito rapidamente partilhar convosco algumas histórias de algumas coisas mágicas que aconteceram.'
ground_truth = "so i'll just share with you some stories very quickly of some magical things that have happened."

translated_text, translated_tokens, attention_weights = translator(
    tf.constant(sentence))
print_translation(sentence, translated_text, ground_truth)

创建注意力图。

您在上一节中创建的 Translator 类返回一个注意力热图字典,您可以使用它来可视化模型的内部工作原理。

例如

sentence = 'este é o primeiro livro que eu fiz.'
ground_truth = "this is the first book i've ever done."

translated_text, translated_tokens, attention_weights = translator(
    tf.constant(sentence))
print_translation(sentence, translated_text, ground_truth)

创建一个函数,在生成标记时绘制注意力。

def plot_attention_head(in_tokens, translated_tokens, attention):
  # The model didn't generate `<START>` in the output. Skip it.
  translated_tokens = translated_tokens[1:]

  ax = plt.gca()
  ax.matshow(attention)
  ax.set_xticks(range(len(in_tokens)))
  ax.set_yticks(range(len(translated_tokens)))

  labels = [label.decode('utf-8') for label in in_tokens.numpy()]
  ax.set_xticklabels(
      labels, rotation=90)

  labels = [label.decode('utf-8') for label in translated_tokens.numpy()]
  ax.set_yticklabels(labels)
head = 0
# Shape: `(batch=1, num_heads, seq_len_q, seq_len_k)`.
attention_heads = tf.squeeze(attention_weights, 0)
attention = attention_heads[head]
attention.shape

这些是输入(葡萄牙语)标记。

in_tokens = tf.convert_to_tensor([sentence])
in_tokens = tokenizers.pt.tokenize(in_tokens).to_tensor()
in_tokens = tokenizers.pt.lookup(in_tokens)[0]
in_tokens

这些是输出(英语翻译)标记。

translated_tokens
plot_attention_head(in_tokens, translated_tokens, attention)
def plot_attention_weights(sentence, translated_tokens, attention_heads):
  in_tokens = tf.convert_to_tensor([sentence])
  in_tokens = tokenizers.pt.tokenize(in_tokens).to_tensor()
  in_tokens = tokenizers.pt.lookup(in_tokens)[0]

  fig = plt.figure(figsize=(16, 8))

  for h, head in enumerate(attention_heads):
    ax = fig.add_subplot(2, 4, h+1)

    plot_attention_head(in_tokens, translated_tokens, head)

    ax.set_xlabel(f'Head {h+1}')

  plt.tight_layout()
  plt.show()
plot_attention_weights(sentence,
                       translated_tokens,
                       attention_weights[0])

该模型可以处理不熟悉的词语。'triceratops''encyclopédia' 都不在输入数据集中,即使没有共享词汇表,模型也会尝试对其进行音译。例如

sentence = 'Eu li sobre triceratops na enciclopédia.'
ground_truth = 'I read about triceratops in the encyclopedia.'

translated_text, translated_tokens, attention_weights = translator(
    tf.constant(sentence))
print_translation(sentence, translated_text, ground_truth)

plot_attention_weights(sentence, translated_tokens, attention_weights[0])

导出模型。

您已经测试了模型,并且推理正在工作。接下来,您可以将其导出为 tf.saved_model。要了解有关以 SavedModel 格式保存和加载模型的信息,请使用 本指南

通过子类化 tf.Module 并使用 tf.function__call__ 方法上,创建一个名为 ExportTranslator 的类。

class ExportTranslator(tf.Module):
  def __init__(self, translator):
    self.translator = translator

  @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.string)])
  def __call__(self, sentence):
    (result,
     tokens,
     attention_weights) = self.translator(sentence, max_length=MAX_TOKENS)

    return result

在上面的 tf.function 中,只返回输出句子。由于 非严格执行tf.function 中,任何不必要的数值都不会被计算。

translator 包装在新建的 ExportTranslator 中。

translator = ExportTranslator(translator)

由于模型使用 tf.argmax 解码预测结果,因此预测结果是确定性的。原始模型和从其 SavedModel 重新加载的模型应该给出相同的预测结果。

translator('este é o primeiro livro que eu fiz.').numpy()
tf.saved_model.save(translator, export_dir='translator')
reloaded = tf.saved_model.load('translator')
reloaded('este é o primeiro livro que eu fiz.').numpy()

结论

在本教程中,您了解了

  • Transformers 及其在机器学习中的重要性。
  • 注意力、自注意力和多头注意力。
  • 使用嵌入的词位编码。
  • 原始 Transformer 的编码器-解码器架构。
  • 自注意力中的掩码。
  • 如何将所有这些组合在一起以翻译文本。

此架构的缺点是

  • 对于时间序列,时间步长的输出是根据整个历史计算的,而不是仅根据输入和当前隐藏状态计算的。这可能效率较低。
  • 如果输入具有时间/空间关系,例如文本或图像,则必须添加一些词位编码,否则模型将有效地看到一个词袋。

如果您想练习,您可以尝试使用它做很多事情。例如

  • 使用不同的数据集来训练 Transformer。
  • 通过更改超参数来创建原始论文中的“基本 Transformer”或“Transformer XL”配置。
  • 使用此处定义的层来创建 BERT 的实现。
  • 使用束搜索来获得更好的预测结果。

存在各种基于 Transformer 的模型,其中许多模型在原始 Transformer 的 2017 版本的基础上进行了改进,包括编码器-解码器、仅编码器和仅解码器架构。

以下研究出版物介绍了其中一些模型。

您可以在以下 Google 博客文章中了解有关其他模型的更多信息。

如果您有兴趣研究基于注意力的模型如何在自然语言处理之外的任务中应用,请查看以下资源。