使用视觉注意力进行图像加标题

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看 下载笔记本

对于如下示例所示的图像,您的目标是生成一个标题,例如“一个冲浪者在冲浪”。

一个冲浪者,来自 wikimedia

此处使用的模型架构受 Show, Attend and Tell: Neural Image Caption Generation with Visual Attention 的启发,但已更新为使用 2 层 Transformer 解码器。为了充分利用本教程,您应该具备 文本生成seq2seq 模型和注意力Transformer 方面的一些经验。

在本教程中构建的模型架构如下所示。从图像中提取特征,并将其传递给 Transformer 解码器的交叉注意力层。

模型架构

Transformer 解码器主要由注意力层构建。它使用自注意力来处理正在生成的序列,并使用交叉注意力来关注图像。

通过检查交叉注意力层的注意力权重,您将看到模型在生成单词时正在关注图像的哪些部分。

Prediction

本笔记本是一个端到端示例。当您运行笔记本时,它会下载一个数据集,提取并缓存图像特征,并训练一个解码器模型。然后,它使用该模型为新图像生成字幕。

设置

apt install --allow-change-held-packages libcudnn8=8.6.0.163-1+cuda11.8
pip uninstall -y tensorflow estimator keras
pip install -U tensorflow_text tensorflow tensorflow_datasets
pip install einops

本教程使用了大量导入,主要用于加载数据集。

[可选] 数据处理

本节下载一个字幕数据集并将其准备用于训练。它对输入文本进行标记化,并缓存将所有图像通过预训练特征提取器模型运行的结果。理解本节中的所有内容并不重要。

数据已准备好进行训练

在这些预处理步骤之后,以下是数据集

train_ds = load_dataset('train_cache')
test_ds = load_dataset('test_cache')
train_ds.element_spec

该数据集现在返回 (input, label) 对,适用于使用 keras 进行训练。inputs(images, input_tokens) 对。images 已使用特征提取器模型进行处理。对于 input_tokens 中的每个位置,模型都会查看迄今为止的文本并尝试预测下一个位置,该位置与 labels 中的相同位置对齐。

for (inputs, ex_labels) in train_ds.take(1):
  (ex_img, ex_in_tok) = inputs

print(ex_img.shape)
print(ex_in_tok.shape)
print(ex_labels.shape)

输入标记和标签相同,只是偏移了 1 步

print(ex_in_tok[0].numpy())
print(ex_labels[0].numpy())

Transformer 解码器模型

此模型假设预训练的图像编码器已足够,并且仅专注于构建文本解码器。本教程使用 2 层 Transformer 解码器。

实现与 Transformers 教程 中的实现几乎相同。请参阅该教程以了解更多详情。

Transformer 编码器和解码器。

该模型将分三个主要部分实现

  1. 输入 - 标记嵌入和位置编码 (SeqEmbedding)。
  2. 解码器 - 一组 Transformer 解码器层 (DecoderLayer),其中每个层包含
    1. 因果自注意力层 (CausalSelfAttention),其中每个输出位置都可以关注迄今为止的输出。
    2. 交叉注意力层 (CrossAttention),其中每个输出位置都可以关注输入图像。
    3. 前馈网络 (FeedForward) 层,该层进一步独立处理每个输出位置。
  3. 输出 - 对输出词汇表进行多类分类。

输入

输入文本已分成标记并转换为 ID 序列。

记住,与 CNN 或 RNN 不同,Transformer 的注意力层对序列的顺序是不变的。如果没有位置输入,它只会看到一个无序的集合,而不是一个序列。因此,除了每个标记 ID 的简单向量嵌入之外,嵌入层还将包括序列中每个位置的嵌入。

下面定义的 SeqEmbedding

  • 它查找每个标记的嵌入向量。
  • 它查找每个序列位置的嵌入向量。
  • 它将两者相加。
  • 它使用 mask_zero=True 为模型初始化 keras 掩码。
class SeqEmbedding(tf.keras.layers.Layer):
  def __init__(self, vocab_size, max_length, depth):
    super().__init__()
    self.pos_embedding = tf.keras.layers.Embedding(input_dim=max_length, output_dim=depth)

    self.token_embedding = tf.keras.layers.Embedding(
        input_dim=vocab_size,
        output_dim=depth,
        mask_zero=True)

    self.add = tf.keras.layers.Add()

  def call(self, seq):
    seq = self.token_embedding(seq) # (batch, seq, depth)

    x = tf.range(tf.shape(seq)[1])  # (seq)
    x = x[tf.newaxis, :]  # (1, seq)
    x = self.pos_embedding(x)  # (1, seq, depth)

    return self.add([seq,x])

解码器

解码器是一个标准的 Transformer 解码器,它包含一个 DecoderLayers 堆栈,其中每个堆栈包含三个子层:CausalSelfAttentionCrossAttentionFeedForward。这些实现几乎与 Transformer 教程 中的实现相同,请参阅该教程以了解更多详细信息。

CausalSelfAttention 层如下所示

class CausalSelfAttention(tf.keras.layers.Layer):
  def __init__(self, **kwargs):
    super().__init__()
    self.mha = tf.keras.layers.MultiHeadAttention(**kwargs)
    # Use Add instead of + so the keras mask propagates through.
    self.add = tf.keras.layers.Add() 
    self.layernorm = tf.keras.layers.LayerNormalization()

  def call(self, x):
    attn = self.mha(query=x, value=x,
                    use_causal_mask=True)
    x = self.add([x, attn])
    return self.layernorm(x)

CrossAttention 层如下所示。请注意 return_attention_scores 的用法。

class CrossAttention(tf.keras.layers.Layer):
  def __init__(self,**kwargs):
    super().__init__()
    self.mha = tf.keras.layers.MultiHeadAttention(**kwargs)
    self.add = tf.keras.layers.Add() 
    self.layernorm = tf.keras.layers.LayerNormalization()

  def call(self, x, y, **kwargs):
    attn, attention_scores = self.mha(
             query=x, value=y,
             return_attention_scores=True)

    self.last_attention_scores = attention_scores

    x = self.add([x, attn])
    return self.layernorm(x)

FeedForward 层如下所示。请记住,layers.Dense 层应用于输入的最后一个轴。输入的形状为 (batch, sequence, channels),因此它会自动在 batchsequence 轴上逐点应用。

class FeedForward(tf.keras.layers.Layer):
  def __init__(self, units, dropout_rate=0.1):
    super().__init__()
    self.seq = tf.keras.Sequential([
        tf.keras.layers.Dense(units=2*units, activation='relu'),
        tf.keras.layers.Dense(units=units),
        tf.keras.layers.Dropout(rate=dropout_rate),
    ])

    self.layernorm = tf.keras.layers.LayerNormalization()

  def call(self, x):
    x = x + self.seq(x)
    return self.layernorm(x)

接下来,将这三个层排列成一个更大的 DecoderLayer。每个解码器层按顺序应用三个较小的层。在每个子层之后,out_seq 的形状为 (batch, sequence, channels)。解码器层还返回 attention_scores 以供以后可视化。

class DecoderLayer(tf.keras.layers.Layer):
  def __init__(self, units, num_heads=1, dropout_rate=0.1):
    super().__init__()

    self.self_attention = CausalSelfAttention(num_heads=num_heads,
                                              key_dim=units,
                                              dropout=dropout_rate)
    self.cross_attention = CrossAttention(num_heads=num_heads,
                                          key_dim=units,
                                          dropout=dropout_rate)
    self.ff = FeedForward(units=units, dropout_rate=dropout_rate)


  def call(self, inputs, training=False):
    in_seq, out_seq = inputs

    # Text input
    out_seq = self.self_attention(out_seq)

    out_seq = self.cross_attention(out_seq, in_seq)

    self.last_attention_scores = self.cross_attention.last_attention_scores

    out_seq = self.ff(out_seq)

    return out_seq

输出

输出层至少需要一个 layers.Dense 层,以便为每个位置的每个标记生成 logit 预测。

但是,你可以添加一些其他功能,让这项工作做得更好

  1. 处理无效标记:模型将生成文本。它永远不应该生成填充、未知或开始标记 ('''[UNK]''[START]')。因此,将这些标记的偏差设置为较大的负值。

  2. 智能初始化:稠密层的默认初始化将提供一个模型,该模型最初几乎以均匀的可能性预测每个标记。实际标记分布远非均匀。输出层的初始偏差的最佳值为每个标记的概率的对数。因此,包含一个 adapt 方法来计数标记并设置最佳初始偏差。这将均匀分布的熵 (log(vocabulary_size)) 的初始损失减少到分布的边际熵 (-p*log(p))。

智能初始化将显著减少初始损失

output_layer = TokenOutput(tokenizer, banned_tokens=('', '[UNK]', '[START]'))
# This might run a little faster if the dataset didn't also have to load the image data.
output_layer.adapt(train_ds.map(lambda inputs, labels: labels))

构建模型

要构建模型,您需要组合几个部分

  1. 图像 feature_extractor 和文本 tokenizer 和。
  2. seq_embedding 层,将批量的标记 ID 转换为向量 (batch, sequence, channels)
  3. DecoderLayers 层的堆栈,它将处理文本和图像数据。
  4. output_layer,它返回对下一个单词应该是什么的逐点预测。
class Captioner(tf.keras.Model):
  @classmethod
  def add_method(cls, fun):
    setattr(cls, fun.__name__, fun)
    return fun

  def __init__(self, tokenizer, feature_extractor, output_layer, num_layers=1,
               units=256, max_length=50, num_heads=1, dropout_rate=0.1):
    super().__init__()
    self.feature_extractor = feature_extractor
    self.tokenizer = tokenizer
    self.word_to_index = tf.keras.layers.StringLookup(
        mask_token="",
        vocabulary=tokenizer.get_vocabulary())
    self.index_to_word = tf.keras.layers.StringLookup(
        mask_token="",
        vocabulary=tokenizer.get_vocabulary(),
        invert=True) 

    self.seq_embedding = SeqEmbedding(
        vocab_size=tokenizer.vocabulary_size(),
        depth=units,
        max_length=max_length)

    self.decoder_layers = [
        DecoderLayer(units, num_heads=num_heads, dropout_rate=dropout_rate)
        for n in range(num_layers)]

    self.output_layer = output_layer

在您调用模型进行训练时,它会接收一个 image, txt 对。为了使此函数更易于使用,请灵活处理输入

  • 如果图像有 3 个通道,则通过 feature_extractor 运行它。否则,假设它已经存在。类似地
  • 如果文本具有数据类型 tf.string,则通过 tokenizer 运行它。

之后,运行模型只需要几个步骤

  1. 展平提取的图像特征,以便可以将其输入到解码器层。
  2. 查找标记嵌入。
  3. 在图像特征和文本嵌入上运行 DecoderLayer 的堆栈。
  4. 运行输出层以预测每个位置的下一个标记。
@Captioner.add_method
  def call(self, inputs):
    image, txt = inputs

    if image.shape[-1] == 3:
      # Apply the feature-extractor, if you get an RGB image.
      image = self.feature_extractor(image)

    # Flatten the feature map
    image = einops.rearrange(image, 'b h w c -> b (h w) c')


    if txt.dtype == tf.string:
      # Apply the tokenizer if you get string inputs.
      txt = tokenizer(txt)

    txt = self.seq_embedding(txt)

    # Look at the image
    for dec_layer in self.decoder_layers:
      txt = dec_layer(inputs=(image, txt))

    txt = self.output_layer(txt)

    return txt
model = Captioner(tokenizer, feature_extractor=mobilenet, output_layer=output_layer,
                  units=256, dropout_rate=0.5, num_layers=2, num_heads=2)

生成标题

在开始训练之前,编写一些代码来生成标题。您将使用它来查看训练的进度。

首先下载一张测试图片

image_url = 'https://tensorflowcn.cn/images/surf.jpg'
image_path = tf.keras.utils.get_file('surf.jpg', origin=image_url)
image = load_image(image_path)

要使用此模型为图像添加标题

  • 提取 img_features
  • 使用 [START] 令牌初始化输出令牌列表。
  • img_featurestokens 传递到模型中。
    • 它返回一个逻辑列表。
    • 根据这些逻辑选择下一个令牌。
    • 将其添加到令牌列表中,并继续循环。
    • 如果它生成 '[END]' 令牌,则跳出循环。

因此,添加一个“简单”的方法来执行此操作

@Captioner.add_method
def simple_gen(self, image, temperature=1):
  initial = self.word_to_index([['[START]']]) # (batch, sequence)
  img_features = self.feature_extractor(image[tf.newaxis, ...])

  tokens = initial # (batch, sequence)
  for n in range(50):
    preds = self((img_features, tokens)).numpy()  # (batch, sequence, vocab)
    preds = preds[:,-1, :]  #(batch, vocab)
    if temperature==0:
        next = tf.argmax(preds, axis=-1)[:, tf.newaxis]  # (batch, 1)
    else:
        next = tf.random.categorical(preds/temperature, num_samples=1)  # (batch, 1)
    tokens = tf.concat([tokens, next], axis=1) # (batch, sequence) 

    if next[0] == self.word_to_index('[END]'):
      break
  words = index_to_word(tokens[0, 1:-1])
  result = tf.strings.reduce_join(words, axis=-1, separator=' ')
  return result.numpy().decode()

以下是该图像的一些生成标题,该模型未经训练,因此它们还没有多大意义

for t in (0.0, 0.5, 1.0):
  result = model.simple_gen(image, temperature=t)
  print(result)

温度参数允许您在 3 种模式之间进行插值

  1. 贪婪解码 (temperature=0.0) - 在每个步骤中选择最可能的下一个令牌。
  2. 根据逻辑进行随机采样 (temperature=1.0)。
  3. 统一随机采样 (temperature >> 1.0)。

由于模型未经训练,并且使用了基于频率的初始化,“贪婪”输出(第一个)通常只包含最常见的令牌:['a', '.', '[END]']

训练

要训练模型,您需要几个其他组件

  • 损失和指标
  • 优化器
  • 可选回调

损失和指标

以下是掩码损失和准确性的实现

在计算损失的掩码时,请注意 loss < 1e8。此项将 banned_tokens 的人为、不可能的高损失丢弃。

def masked_loss(labels, preds):  
  loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels, preds)

  mask = (labels != 0) & (loss < 1e8) 
  mask = tf.cast(mask, loss.dtype)

  loss = loss*mask
  loss = tf.reduce_sum(loss)/tf.reduce_sum(mask)
  return loss

def masked_acc(labels, preds):
  mask = tf.cast(labels!=0, tf.float32)
  preds = tf.argmax(preds, axis=-1)
  labels = tf.cast(labels, tf.int64)
  match = tf.cast(preds == labels, mask.dtype)
  acc = tf.reduce_sum(match*mask)/tf.reduce_sum(mask)
  return acc

回调

为了在训练期间获得反馈,请设置 keras.callbacks.Callback 在每个时期末为冲浪者图像生成一些标题。

class GenerateText(tf.keras.callbacks.Callback):
  def __init__(self):
    image_url = 'https://tensorflowcn.cn/images/surf.jpg'
    image_path = tf.keras.utils.get_file('surf.jpg', origin=image_url)
    self.image = load_image(image_path)

  def on_epoch_end(self, epochs=None, logs=None):
    print()
    print()
    for t in (0.0, 0.5, 1.0):
      result = self.model.simple_gen(self.image, temperature=t)
      print(result)
    print()

它会生成三个输出字符串,如前面的示例所示,第一个是“贪婪”,在每一步选择 logits 的 argmax。

g = GenerateText()
g.model = model
g.on_epoch_end(0)

还要使用 callbacks.EarlyStopping 在模型开始过拟合时终止训练。

callbacks = [
    GenerateText(),
    tf.keras.callbacks.EarlyStopping(
        patience=5, restore_best_weights=True)]

训练

配置并执行训练。

model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
           loss=masked_loss,
           metrics=[masked_acc])

要进行更频繁的报告,请使用 Dataset.repeat() 方法,并将 steps_per_epochvalidation_steps 参数设置为 Model.fit

Flickr8k 上进行此设置后,对数据集的完整遍历是 900 多个批次,但以下报告时代为 100 个步骤。

history = model.fit(
    train_ds.repeat(),
    steps_per_epoch=100,
    validation_data=test_ds.repeat(),
    validation_steps=20,
    epochs=100,
    callbacks=callbacks)

绘制训练运行期间的损失和准确性

plt.plot(history.history['loss'], label='loss')
plt.plot(history.history['val_loss'], label='val_loss')
plt.ylim([0, max(plt.ylim())])
plt.xlabel('Epoch #')
plt.ylabel('CE/token')
plt.legend()
plt.plot(history.history['masked_acc'], label='accuracy')
plt.plot(history.history['val_masked_acc'], label='val_accuracy')
plt.ylim([0, max(plt.ylim())])
plt.xlabel('Epoch #')
plt.ylabel('CE/token')
plt.legend()

注意力图

现在,使用训练好的模型,对图像运行该 simple_gen 方法

result = model.simple_gen(image, temperature=0.0)
result

将输出拆分为标记

str_tokens = result.split()
str_tokens.append('[END]')

每个 DecoderLayers 都缓存其 CrossAttention 层的注意力分数。每个注意力图的形状为 (batch=1, heads, sequence, image)

attn_maps = [layer.last_attention_scores for layer in model.decoder_layers]
[map.shape for map in attn_maps]

因此,沿着 batch 轴堆叠地图,然后在 (batch, heads) 轴上取平均值,同时将 image 轴拆分为 height, width

attention_maps = tf.concat(attn_maps, axis=0)
attention_maps = einops.reduce(
    attention_maps,
    'batch heads sequence (height width) -> sequence height width',
    height=7, width=7,
    reduction='mean')

现在,您有一个单独的注意力图,用于每个序列预测。每个地图中的值应该合计为 1.

einops.reduce(attention_maps, 'sequence height width -> sequence', reduction='sum')

因此,这里就是模型在生成输出的每个标记时将注意力集中在哪里

def plot_attention_maps(image, str_tokens, attention_map):
    fig = plt.figure(figsize=(16, 9))

    len_result = len(str_tokens)

    titles = []
    for i in range(len_result):
      map = attention_map[i]
      grid_size = max(int(np.ceil(len_result/2)), 2)
      ax = fig.add_subplot(3, grid_size, i+1)
      titles.append(ax.set_title(str_tokens[i]))
      img = ax.imshow(image)
      ax.imshow(map, cmap='gray', alpha=0.6, extent=img.get_extent(),
                clim=[0.0, np.max(map)])

    plt.tight_layout()
plot_attention_maps(image/255, str_tokens, attention_maps)

现在将其放入一个更实用的函数中

@Captioner.add_method
def run_and_show_attention(self, image, temperature=0.0):
  result_txt = self.simple_gen(image, temperature)
  str_tokens = result_txt.split()
  str_tokens.append('[END]')

  attention_maps = [layer.last_attention_scores for layer in self.decoder_layers]
  attention_maps = tf.concat(attention_maps, axis=0)
  attention_maps = einops.reduce(
      attention_maps,
      'batch heads sequence (height width) -> sequence height width',
      height=7, width=7,
      reduction='mean')

  plot_attention_maps(image/255, str_tokens, attention_maps)
  t = plt.suptitle(result_txt)
  t.set_y(1.05)
run_and_show_attention(model, image)

在您自己的图像上试用

为了好玩,下面为您提供了一种方法,您可以使用它为自己的图像添加您刚刚训练过的模型的字幕。请记住,它是用相对较少的数据训练的,并且您的图像可能与训练数据不同(因此请做好迎接奇怪结果的准备!)

image_url = 'https://tensorflowcn.cn/images/bedroom_hrnet_tutorial.jpg'
image_path = tf.keras.utils.get_file(origin=image_url)
image = load_image(image_path)

run_and_show_attention(model, image)