在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看 | 下载笔记本 |
对于如下示例所示的图像,您的目标是生成一个标题,例如“一个冲浪者在冲浪”。
一个冲浪者,来自 wikimedia |
---|
此处使用的模型架构受 Show, Attend and Tell: Neural Image Caption Generation with Visual Attention 的启发,但已更新为使用 2 层 Transformer 解码器。为了充分利用本教程,您应该具备 文本生成、seq2seq 模型和注意力 或 Transformer 方面的一些经验。
在本教程中构建的模型架构如下所示。从图像中提取特征,并将其传递给 Transformer 解码器的交叉注意力层。
模型架构 |
---|
Transformer 解码器主要由注意力层构建。它使用自注意力来处理正在生成的序列,并使用交叉注意力来关注图像。
通过检查交叉注意力层的注意力权重,您将看到模型在生成单词时正在关注图像的哪些部分。
本笔记本是一个端到端示例。当您运行笔记本时,它会下载一个数据集,提取并缓存图像特征,并训练一个解码器模型。然后,它使用该模型为新图像生成字幕。
设置
apt install --allow-change-held-packages libcudnn8=8.6.0.163-1+cuda11.8
pip uninstall -y tensorflow estimator keras
pip install -U tensorflow_text tensorflow tensorflow_datasets
pip install einops
本教程使用了大量导入,主要用于加载数据集。
[可选] 数据处理
本节下载一个字幕数据集并将其准备用于训练。它对输入文本进行标记化,并缓存将所有图像通过预训练特征提取器模型运行的结果。理解本节中的所有内容并不重要。
数据已准备好进行训练
在这些预处理步骤之后,以下是数据集
train_ds = load_dataset('train_cache')
test_ds = load_dataset('test_cache')
train_ds.element_spec
该数据集现在返回 (input, label)
对,适用于使用 keras 进行训练。inputs
是 (images, input_tokens)
对。images
已使用特征提取器模型进行处理。对于 input_tokens
中的每个位置,模型都会查看迄今为止的文本并尝试预测下一个位置,该位置与 labels
中的相同位置对齐。
for (inputs, ex_labels) in train_ds.take(1):
(ex_img, ex_in_tok) = inputs
print(ex_img.shape)
print(ex_in_tok.shape)
print(ex_labels.shape)
输入标记和标签相同,只是偏移了 1 步
print(ex_in_tok[0].numpy())
print(ex_labels[0].numpy())
Transformer 解码器模型
此模型假设预训练的图像编码器已足够,并且仅专注于构建文本解码器。本教程使用 2 层 Transformer 解码器。
实现与 Transformers 教程 中的实现几乎相同。请参阅该教程以了解更多详情。
Transformer 编码器和解码器。 |
---|
该模型将分三个主要部分实现
- 输入 - 标记嵌入和位置编码 (
SeqEmbedding
)。 - 解码器 - 一组 Transformer 解码器层 (
DecoderLayer
),其中每个层包含- 因果自注意力层 (
CausalSelfAttention
),其中每个输出位置都可以关注迄今为止的输出。 - 交叉注意力层 (
CrossAttention
),其中每个输出位置都可以关注输入图像。 - 前馈网络 (
FeedForward
) 层,该层进一步独立处理每个输出位置。
- 因果自注意力层 (
- 输出 - 对输出词汇表进行多类分类。
输入
输入文本已分成标记并转换为 ID 序列。
记住,与 CNN 或 RNN 不同,Transformer 的注意力层对序列的顺序是不变的。如果没有位置输入,它只会看到一个无序的集合,而不是一个序列。因此,除了每个标记 ID 的简单向量嵌入之外,嵌入层还将包括序列中每个位置的嵌入。
下面定义的 SeqEmbedding
层
- 它查找每个标记的嵌入向量。
- 它查找每个序列位置的嵌入向量。
- 它将两者相加。
- 它使用
mask_zero=True
为模型初始化 keras 掩码。
class SeqEmbedding(tf.keras.layers.Layer):
def __init__(self, vocab_size, max_length, depth):
super().__init__()
self.pos_embedding = tf.keras.layers.Embedding(input_dim=max_length, output_dim=depth)
self.token_embedding = tf.keras.layers.Embedding(
input_dim=vocab_size,
output_dim=depth,
mask_zero=True)
self.add = tf.keras.layers.Add()
def call(self, seq):
seq = self.token_embedding(seq) # (batch, seq, depth)
x = tf.range(tf.shape(seq)[1]) # (seq)
x = x[tf.newaxis, :] # (1, seq)
x = self.pos_embedding(x) # (1, seq, depth)
return self.add([seq,x])
解码器
解码器是一个标准的 Transformer 解码器,它包含一个 DecoderLayers
堆栈,其中每个堆栈包含三个子层:CausalSelfAttention
、CrossAttention
和 FeedForward
。这些实现几乎与 Transformer 教程 中的实现相同,请参阅该教程以了解更多详细信息。
CausalSelfAttention
层如下所示
class CausalSelfAttention(tf.keras.layers.Layer):
def __init__(self, **kwargs):
super().__init__()
self.mha = tf.keras.layers.MultiHeadAttention(**kwargs)
# Use Add instead of + so the keras mask propagates through.
self.add = tf.keras.layers.Add()
self.layernorm = tf.keras.layers.LayerNormalization()
def call(self, x):
attn = self.mha(query=x, value=x,
use_causal_mask=True)
x = self.add([x, attn])
return self.layernorm(x)
CrossAttention
层如下所示。请注意 return_attention_scores
的用法。
class CrossAttention(tf.keras.layers.Layer):
def __init__(self,**kwargs):
super().__init__()
self.mha = tf.keras.layers.MultiHeadAttention(**kwargs)
self.add = tf.keras.layers.Add()
self.layernorm = tf.keras.layers.LayerNormalization()
def call(self, x, y, **kwargs):
attn, attention_scores = self.mha(
query=x, value=y,
return_attention_scores=True)
self.last_attention_scores = attention_scores
x = self.add([x, attn])
return self.layernorm(x)
FeedForward
层如下所示。请记住,layers.Dense
层应用于输入的最后一个轴。输入的形状为 (batch, sequence, channels)
,因此它会自动在 batch
和 sequence
轴上逐点应用。
class FeedForward(tf.keras.layers.Layer):
def __init__(self, units, dropout_rate=0.1):
super().__init__()
self.seq = tf.keras.Sequential([
tf.keras.layers.Dense(units=2*units, activation='relu'),
tf.keras.layers.Dense(units=units),
tf.keras.layers.Dropout(rate=dropout_rate),
])
self.layernorm = tf.keras.layers.LayerNormalization()
def call(self, x):
x = x + self.seq(x)
return self.layernorm(x)
接下来,将这三个层排列成一个更大的 DecoderLayer
。每个解码器层按顺序应用三个较小的层。在每个子层之后,out_seq
的形状为 (batch, sequence, channels)
。解码器层还返回 attention_scores
以供以后可视化。
class DecoderLayer(tf.keras.layers.Layer):
def __init__(self, units, num_heads=1, dropout_rate=0.1):
super().__init__()
self.self_attention = CausalSelfAttention(num_heads=num_heads,
key_dim=units,
dropout=dropout_rate)
self.cross_attention = CrossAttention(num_heads=num_heads,
key_dim=units,
dropout=dropout_rate)
self.ff = FeedForward(units=units, dropout_rate=dropout_rate)
def call(self, inputs, training=False):
in_seq, out_seq = inputs
# Text input
out_seq = self.self_attention(out_seq)
out_seq = self.cross_attention(out_seq, in_seq)
self.last_attention_scores = self.cross_attention.last_attention_scores
out_seq = self.ff(out_seq)
return out_seq
输出
输出层至少需要一个 layers.Dense
层,以便为每个位置的每个标记生成 logit 预测。
但是,你可以添加一些其他功能,让这项工作做得更好
处理无效标记:模型将生成文本。它永远不应该生成填充、未知或开始标记 (
''
、'[UNK]'
、'[START]'
)。因此,将这些标记的偏差设置为较大的负值。智能初始化:稠密层的默认初始化将提供一个模型,该模型最初几乎以均匀的可能性预测每个标记。实际标记分布远非均匀。输出层的初始偏差的最佳值为每个标记的概率的对数。因此,包含一个
adapt
方法来计数标记并设置最佳初始偏差。这将均匀分布的熵 (log(vocabulary_size)
) 的初始损失减少到分布的边际熵 (-p*log(p)
)。
智能初始化将显著减少初始损失
output_layer = TokenOutput(tokenizer, banned_tokens=('', '[UNK]', '[START]'))
# This might run a little faster if the dataset didn't also have to load the image data.
output_layer.adapt(train_ds.map(lambda inputs, labels: labels))
构建模型
要构建模型,您需要组合几个部分
- 图像
feature_extractor
和文本tokenizer
和。 seq_embedding
层,将批量的标记 ID 转换为向量(batch, sequence, channels)
。DecoderLayers
层的堆栈,它将处理文本和图像数据。output_layer
,它返回对下一个单词应该是什么的逐点预测。
class Captioner(tf.keras.Model):
@classmethod
def add_method(cls, fun):
setattr(cls, fun.__name__, fun)
return fun
def __init__(self, tokenizer, feature_extractor, output_layer, num_layers=1,
units=256, max_length=50, num_heads=1, dropout_rate=0.1):
super().__init__()
self.feature_extractor = feature_extractor
self.tokenizer = tokenizer
self.word_to_index = tf.keras.layers.StringLookup(
mask_token="",
vocabulary=tokenizer.get_vocabulary())
self.index_to_word = tf.keras.layers.StringLookup(
mask_token="",
vocabulary=tokenizer.get_vocabulary(),
invert=True)
self.seq_embedding = SeqEmbedding(
vocab_size=tokenizer.vocabulary_size(),
depth=units,
max_length=max_length)
self.decoder_layers = [
DecoderLayer(units, num_heads=num_heads, dropout_rate=dropout_rate)
for n in range(num_layers)]
self.output_layer = output_layer
在您调用模型进行训练时,它会接收一个 image, txt
对。为了使此函数更易于使用,请灵活处理输入
- 如果图像有 3 个通道,则通过 feature_extractor 运行它。否则,假设它已经存在。类似地
- 如果文本具有数据类型
tf.string
,则通过 tokenizer 运行它。
之后,运行模型只需要几个步骤
- 展平提取的图像特征,以便可以将其输入到解码器层。
- 查找标记嵌入。
- 在图像特征和文本嵌入上运行
DecoderLayer
的堆栈。 - 运行输出层以预测每个位置的下一个标记。
@Captioner.add_method
def call(self, inputs):
image, txt = inputs
if image.shape[-1] == 3:
# Apply the feature-extractor, if you get an RGB image.
image = self.feature_extractor(image)
# Flatten the feature map
image = einops.rearrange(image, 'b h w c -> b (h w) c')
if txt.dtype == tf.string:
# Apply the tokenizer if you get string inputs.
txt = tokenizer(txt)
txt = self.seq_embedding(txt)
# Look at the image
for dec_layer in self.decoder_layers:
txt = dec_layer(inputs=(image, txt))
txt = self.output_layer(txt)
return txt
model = Captioner(tokenizer, feature_extractor=mobilenet, output_layer=output_layer,
units=256, dropout_rate=0.5, num_layers=2, num_heads=2)
生成标题
在开始训练之前,编写一些代码来生成标题。您将使用它来查看训练的进度。
首先下载一张测试图片
image_url = 'https://tensorflowcn.cn/images/surf.jpg'
image_path = tf.keras.utils.get_file('surf.jpg', origin=image_url)
image = load_image(image_path)
要使用此模型为图像添加标题
- 提取
img_features
- 使用
[START]
令牌初始化输出令牌列表。 - 将
img_features
和tokens
传递到模型中。- 它返回一个逻辑列表。
- 根据这些逻辑选择下一个令牌。
- 将其添加到令牌列表中,并继续循环。
- 如果它生成
'[END]'
令牌,则跳出循环。
因此,添加一个“简单”的方法来执行此操作
@Captioner.add_method
def simple_gen(self, image, temperature=1):
initial = self.word_to_index([['[START]']]) # (batch, sequence)
img_features = self.feature_extractor(image[tf.newaxis, ...])
tokens = initial # (batch, sequence)
for n in range(50):
preds = self((img_features, tokens)).numpy() # (batch, sequence, vocab)
preds = preds[:,-1, :] #(batch, vocab)
if temperature==0:
next = tf.argmax(preds, axis=-1)[:, tf.newaxis] # (batch, 1)
else:
next = tf.random.categorical(preds/temperature, num_samples=1) # (batch, 1)
tokens = tf.concat([tokens, next], axis=1) # (batch, sequence)
if next[0] == self.word_to_index('[END]'):
break
words = index_to_word(tokens[0, 1:-1])
result = tf.strings.reduce_join(words, axis=-1, separator=' ')
return result.numpy().decode()
以下是该图像的一些生成标题,该模型未经训练,因此它们还没有多大意义
for t in (0.0, 0.5, 1.0):
result = model.simple_gen(image, temperature=t)
print(result)
温度参数允许您在 3 种模式之间进行插值
- 贪婪解码 (
temperature=0.0
) - 在每个步骤中选择最可能的下一个令牌。 - 根据逻辑进行随机采样 (
temperature=1.0
)。 - 统一随机采样 (
temperature >> 1.0
)。
由于模型未经训练,并且使用了基于频率的初始化,“贪婪”输出(第一个)通常只包含最常见的令牌:['a', '.', '[END]']
。
训练
要训练模型,您需要几个其他组件
- 损失和指标
- 优化器
- 可选回调
损失和指标
以下是掩码损失和准确性的实现
在计算损失的掩码时,请注意 loss < 1e8
。此项将 banned_tokens
的人为、不可能的高损失丢弃。
def masked_loss(labels, preds):
loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels, preds)
mask = (labels != 0) & (loss < 1e8)
mask = tf.cast(mask, loss.dtype)
loss = loss*mask
loss = tf.reduce_sum(loss)/tf.reduce_sum(mask)
return loss
def masked_acc(labels, preds):
mask = tf.cast(labels!=0, tf.float32)
preds = tf.argmax(preds, axis=-1)
labels = tf.cast(labels, tf.int64)
match = tf.cast(preds == labels, mask.dtype)
acc = tf.reduce_sum(match*mask)/tf.reduce_sum(mask)
return acc
回调
为了在训练期间获得反馈,请设置 keras.callbacks.Callback
在每个时期末为冲浪者图像生成一些标题。
class GenerateText(tf.keras.callbacks.Callback):
def __init__(self):
image_url = 'https://tensorflowcn.cn/images/surf.jpg'
image_path = tf.keras.utils.get_file('surf.jpg', origin=image_url)
self.image = load_image(image_path)
def on_epoch_end(self, epochs=None, logs=None):
print()
print()
for t in (0.0, 0.5, 1.0):
result = self.model.simple_gen(self.image, temperature=t)
print(result)
print()
它会生成三个输出字符串,如前面的示例所示,第一个是“贪婪”,在每一步选择 logits 的 argmax。
g = GenerateText()
g.model = model
g.on_epoch_end(0)
还要使用 callbacks.EarlyStopping
在模型开始过拟合时终止训练。
callbacks = [
GenerateText(),
tf.keras.callbacks.EarlyStopping(
patience=5, restore_best_weights=True)]
训练
配置并执行训练。
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
loss=masked_loss,
metrics=[masked_acc])
要进行更频繁的报告,请使用 Dataset.repeat()
方法,并将 steps_per_epoch
和 validation_steps
参数设置为 Model.fit
。
在 Flickr8k
上进行此设置后,对数据集的完整遍历是 900 多个批次,但以下报告时代为 100 个步骤。
history = model.fit(
train_ds.repeat(),
steps_per_epoch=100,
validation_data=test_ds.repeat(),
validation_steps=20,
epochs=100,
callbacks=callbacks)
绘制训练运行期间的损失和准确性
plt.plot(history.history['loss'], label='loss')
plt.plot(history.history['val_loss'], label='val_loss')
plt.ylim([0, max(plt.ylim())])
plt.xlabel('Epoch #')
plt.ylabel('CE/token')
plt.legend()
plt.plot(history.history['masked_acc'], label='accuracy')
plt.plot(history.history['val_masked_acc'], label='val_accuracy')
plt.ylim([0, max(plt.ylim())])
plt.xlabel('Epoch #')
plt.ylabel('CE/token')
plt.legend()
注意力图
现在,使用训练好的模型,对图像运行该 simple_gen
方法
result = model.simple_gen(image, temperature=0.0)
result
将输出拆分为标记
str_tokens = result.split()
str_tokens.append('[END]')
每个 DecoderLayers
都缓存其 CrossAttention
层的注意力分数。每个注意力图的形状为 (batch=1, heads, sequence, image)
attn_maps = [layer.last_attention_scores for layer in model.decoder_layers]
[map.shape for map in attn_maps]
因此,沿着 batch
轴堆叠地图,然后在 (batch, heads)
轴上取平均值,同时将 image
轴拆分为 height, width
attention_maps = tf.concat(attn_maps, axis=0)
attention_maps = einops.reduce(
attention_maps,
'batch heads sequence (height width) -> sequence height width',
height=7, width=7,
reduction='mean')
现在,您有一个单独的注意力图,用于每个序列预测。每个地图中的值应该合计为 1.
einops.reduce(attention_maps, 'sequence height width -> sequence', reduction='sum')
因此,这里就是模型在生成输出的每个标记时将注意力集中在哪里
def plot_attention_maps(image, str_tokens, attention_map):
fig = plt.figure(figsize=(16, 9))
len_result = len(str_tokens)
titles = []
for i in range(len_result):
map = attention_map[i]
grid_size = max(int(np.ceil(len_result/2)), 2)
ax = fig.add_subplot(3, grid_size, i+1)
titles.append(ax.set_title(str_tokens[i]))
img = ax.imshow(image)
ax.imshow(map, cmap='gray', alpha=0.6, extent=img.get_extent(),
clim=[0.0, np.max(map)])
plt.tight_layout()
plot_attention_maps(image/255, str_tokens, attention_maps)
现在将其放入一个更实用的函数中
@Captioner.add_method
def run_and_show_attention(self, image, temperature=0.0):
result_txt = self.simple_gen(image, temperature)
str_tokens = result_txt.split()
str_tokens.append('[END]')
attention_maps = [layer.last_attention_scores for layer in self.decoder_layers]
attention_maps = tf.concat(attention_maps, axis=0)
attention_maps = einops.reduce(
attention_maps,
'batch heads sequence (height width) -> sequence height width',
height=7, width=7,
reduction='mean')
plot_attention_maps(image/255, str_tokens, attention_maps)
t = plt.suptitle(result_txt)
t.set_y(1.05)
run_and_show_attention(model, image)
在您自己的图像上试用
为了好玩,下面为您提供了一种方法,您可以使用它为自己的图像添加您刚刚训练过的模型的字幕。请记住,它是用相对较少的数据训练的,并且您的图像可能与训练数据不同(因此请做好迎接奇怪结果的准备!)
image_url = 'https://tensorflowcn.cn/images/bedroom_hrnet_tutorial.jpg'
image_path = tf.keras.utils.get_file(origin=image_url)
image = load_image(image_path)
run_and_show_attention(model, image)