在 TensorFlow.org 上查看
|
在 Google Colab 中运行
|
在 GitHub 上查看源码
|
下载笔记本
|
本教程演示了如何使用基于字符的 RNN 生成文本。您将使用 Andrej Karpathy 的文章《循环神经网络的不合理有效性》中的莎士比亚文集数据集。给定该数据中的一系列字符(例如“Shakespear”),训练模型以预测序列中的下一个字符(“e”)。通过反复调用该模型,可以生成更长的文本序列。
本教程包含使用 tf.keras 和 即时执行 (eager execution) 实现的可运行代码。以下是本教程中的模型在训练 30 个周期 (epoch) 并以提示词“Q”开头时的输出示例:
QUEENE: I had thought thou hadst a Roman; for the oracle, Thus by All bids the man against the word, Which are so weak of care, by old care done; Your children were in your holy love, And the precipitation through the bleeding throne. BISHOP OF ELY: Marry, and will, my lord, to weep in such a one were prettiest; Yet now I was adopted heir Of the world's lamentable day, To watch the next way with his father with his face? ESCALUS: The cause why then we are all resolved more sons. VOLUMNIA: O, no, no, no, no, no, no, no, no, no, no, no, no, no, no, no, no, no, no, no, no, it is no sin it should be dead, And love and pale as any will to that word. QUEEN ELIZABETH: But how long have I heard the soul for this world, And show his hands of life be proved to stand. PETRUCHIO: I say he look'd on, if I must be content To stay him from the fatal of our country's bliss. His lordship pluck'd from this sentence then for prey, And then let us twain, being the moon, were she such a case as fills m
虽然其中一些句子在语法上是通顺的,但大多数都没有意义。该模型还没有学习单词的含义,但请考虑
该模型是基于字符的。训练开始时,模型不知道如何拼写英语单词,甚至不知道单词是文本的一个单位。
输出的结构类似于戏剧——文本块通常以演讲者姓名开头,全部为大写字母,与数据集类似。
如下所示,该模型在小批量的文本(每个 100 个字符)上进行训练,并且仍然能够生成具有连贯结构的更长文本序列。
设置
导入 TensorFlow 和其他库
import tensorflow as tf
import numpy as np
import os
import time
下载莎士比亚数据集
若要使用您自己的数据运行此代码,请修改以下行。
path_to_file = tf.keras.utils.get_file('shakespeare.txt', 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt')
读取数据
首先,查看文本
# Read, then decode for py2 compat.
text = open(path_to_file, 'rb').read().decode(encoding='utf-8')
# length of text is the number of characters in it
print(f'Length of text: {len(text)} characters')
# Take a look at the first 250 characters in text
print(text[:250])
# The unique characters in the file
vocab = sorted(set(text))
print(f'{len(vocab)} unique characters')
处理文本
向量化文本
在训练之前,您需要将字符串转换为数值表示。
tf.keras.layers.StringLookup 层可以将每个字符转换为数字 ID。它只需要先将文本拆分为标记 (tokens)。
example_texts = ['abcdefg', 'xyz']
chars = tf.strings.unicode_split(example_texts, input_encoding='UTF-8')
chars
现在创建 tf.keras.layers.StringLookup 层
ids_from_chars = tf.keras.layers.StringLookup(
vocabulary=list(vocab), mask_token=None)
它将标记转换为字符 ID
ids = ids_from_chars(chars)
ids
由于本教程的目标是生成文本,因此反转此表示并从中恢复人类可读的字符串也很重要。为此,您可以使用 tf.keras.layers.StringLookup(..., invert=True)。
chars_from_ids = tf.keras.layers.StringLookup(
vocabulary=ids_from_chars.get_vocabulary(), invert=True, mask_token=None)
该层从 ID 向量中恢复字符,并将其作为字符的 tf.RaggedTensor 返回。
chars = chars_from_ids(ids)
chars
您可以使用 tf.strings.reduce_join 将字符连接回字符串。
tf.strings.reduce_join(chars, axis=-1).numpy()
def text_from_ids(ids):
return tf.strings.reduce_join(chars_from_ids(ids), axis=-1)
预测任务
给定一个字符或字符序列,下一个最可能的字符是什么?这就是您要训练模型执行的任务。模型的输入将是一个字符序列,您训练模型来预测输出——即每个时间步的下一个字符。
由于 RNN 维护着依赖于先前所见元素的内部状态,那么在给定截止目前计算出的所有字符的情况下,下一个字符是什么?
创建训练示例和目标
接下来将文本划分为示例序列。每个输入序列将包含来自文本的 seq_length 个字符。
对于每个输入序列,相应的目标包含相同长度的文本,只是向右移动了一个字符。
因此,将文本分解为 seq_length+1 的块。例如,假设 seq_length 为 4,我们的文本是“Hello”。输入序列将是“Hell”,目标序列为“ello”。
为此,首先使用 tf.data.Dataset.from_tensor_slices 函数将文本向量转换为字符索引流。
all_ids = ids_from_chars(tf.strings.unicode_split(text, 'UTF-8'))
all_ids
ids_dataset = tf.data.Dataset.from_tensor_slices(all_ids)
for ids in ids_dataset.take(10):
print(chars_from_ids(ids).numpy().decode('utf-8'))
seq_length = 100
batch 方法使您可以轻松地将这些单个字符转换为所需大小的序列。
sequences = ids_dataset.batch(seq_length+1, drop_remainder=True)
for seq in sequences.take(1):
print(chars_from_ids(seq))
如果您将标记连接回字符串,就更容易看出这是在做什么。
for seq in sequences.take(5):
print(text_from_ids(seq).numpy())
对于训练,您需要一个 (input, label) 对的数据集。其中 input 和 label 都是序列。在每个时间步,输入是当前字符,标签是下一个字符。
这是一个函数,它接收一个序列作为输入,对其进行复制并移动,以便为每个时间步对齐输入和标签。
def split_input_target(sequence):
input_text = sequence[:-1]
target_text = sequence[1:]
return input_text, target_text
split_input_target(list("Tensorflow"))
dataset = sequences.map(split_input_target)
for input_example, target_example in dataset.take(1):
print("Input :", text_from_ids(input_example).numpy())
print("Target:", text_from_ids(target_example).numpy())
创建训练批次
您使用了 tf.data 将文本拆分为可管理的序列。但在将此数据输入模型之前,您需要打乱数据并将其打包成批次。
# Batch size
BATCH_SIZE = 64
# Buffer size to shuffle the dataset
# (TF data is designed to work with possibly infinite sequences,
# so it doesn't attempt to shuffle the entire sequence in memory. Instead,
# it maintains a buffer in which it shuffles elements).
BUFFER_SIZE = 10000
dataset = (
dataset
.shuffle(BUFFER_SIZE)
.batch(BATCH_SIZE, drop_remainder=True)
.prefetch(tf.data.experimental.AUTOTUNE))
dataset
构建模型
本节将模型定义为 keras.Model 的子类(有关详细信息,请参阅通过子类化创建新的层和模型)。
该模型有三层:
tf.keras.layers.Embedding:输入层。一个可训练的查找表,它将每个字符 ID 映射到具有embedding_dim维度的向量。tf.keras.layers.GRU:一种 RNN 类型,大小为units=rnn_units(您也可以在此处使用 LSTM 层)。tf.keras.layers.Dense:输出层,具有vocab_size个输出。它为词汇表中的每个字符输出一个对数 (logit)。这些是模型根据词汇表中每个字符得出的对数似然。
# Length of the vocabulary in StringLookup Layer
vocab_size = len(ids_from_chars.get_vocabulary())
# The embedding dimension
embedding_dim = 256
# Number of RNN units
rnn_units = 1024
class MyModel(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, rnn_units):
super().__init__(self)
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
self.gru = tf.keras.layers.GRU(rnn_units,
return_sequences=True,
return_state=True)
self.dense = tf.keras.layers.Dense(vocab_size)
def call(self, inputs, states=None, return_state=False, training=False):
x = inputs
x = self.embedding(x, training=training)
if states is None:
states = self.gru.get_initial_state(x)
x, states = self.gru(x, initial_state=states, training=training)
x = self.dense(x, training=training)
if return_state:
return x, states
else:
return x
model = MyModel(
vocab_size=vocab_size,
embedding_dim=embedding_dim,
rnn_units=rnn_units)
对于每个字符,模型会查找嵌入,将嵌入作为输入运行 GRU 一个时间步,并应用全连接层 (dense layer) 来生成预测下一个字符对数似然的对数。

尝试该模型
现在运行该模型,看看它是否按预期运行。
首先检查输出的形状
for input_example_batch, target_example_batch in dataset.take(1):
example_batch_predictions = model(input_example_batch)
print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")
在上面的示例中,输入的序列长度为 100,但模型可以在任何长度的输入上运行。
model.summary()
要从模型中获得实际预测,您需要从输出分布中进行采样,以获得实际的字符索引。该分布由字符词汇表上的对数定义。
在批次的第一个示例上尝试一下
sampled_indices = tf.random.categorical(example_batch_predictions[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices, axis=-1).numpy()
这为我们在每个时间步提供了下一个字符索引的预测
sampled_indices
解码这些内容以查看此未训练模型预测的文本
print("Input:\n", text_from_ids(input_example_batch[0]).numpy())
print()
print("Next Char Predictions:\n", text_from_ids(sampled_indices).numpy())
训练模型
至此,该问题可以被视为一个标准的分类问题。在给定先前 RNN 状态和当前时间步输入的情况下,预测下一个字符的类别。
附加优化器和损失函数
标准的 tf.keras.losses.sparse_categorical_crossentropy 损失函数在此情况下有效,因为它应用于预测的最后一个维度。
因为您的模型返回对数,所以需要设置 from_logits 标志。
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True)
example_batch_mean_loss = loss(target_example_batch, example_batch_predictions)
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("Mean loss: ", example_batch_mean_loss)
新初始化的模型不应该太过自信,输出的对数应该都具有相似的量级。为了确认这一点,您可以检查平均损失的指数是否大约等于词汇表大小。高得多的损失意味着模型对其错误的答案很有把握,且初始化得很糟糕。
tf.exp(example_batch_mean_loss).numpy()
使用 tf.keras.Model.compile 方法配置训练过程。使用带有默认参数的 tf.keras.optimizers.Adam 和损失函数。
model.compile(optimizer='adam', loss=loss)
配置检查点
使用 tf.keras.callbacks.ModelCheckpoint 确保在训练期间保存检查点。
# Directory where the checkpoints will be saved
checkpoint_dir = './training_checkpoints'
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
filepath=checkpoint_prefix,
save_weights_only=True)
执行训练
为了保持合理的训练时间,使用 10 个周期来训练模型。在 Colab 中,将运行时设置为 GPU 以获得更快的训练速度。
EPOCHS = 20
history = model.fit(dataset, epochs=EPOCHS, callbacks=[checkpoint_callback])
生成文本
使用此模型生成文本的最简单方法是在循环中运行它,并在执行时跟踪模型的内部状态。

每次调用模型时,您都会传入一些文本和内部状态。模型返回对下一个字符的预测及其新状态。将预测和状态传回以继续生成文本。
以下代码执行单步预测
class OneStep(tf.keras.Model):
def __init__(self, model, chars_from_ids, ids_from_chars, temperature=1.0):
super().__init__()
self.temperature = temperature
self.model = model
self.chars_from_ids = chars_from_ids
self.ids_from_chars = ids_from_chars
# Create a mask to prevent "[UNK]" from being generated.
skip_ids = self.ids_from_chars(['[UNK]'])[:, None]
sparse_mask = tf.SparseTensor(
# Put a -inf at each bad index.
values=[-float('inf')]*len(skip_ids),
indices=skip_ids,
# Match the shape to the vocabulary
dense_shape=[len(ids_from_chars.get_vocabulary())])
self.prediction_mask = tf.sparse.to_dense(sparse_mask)
@tf.function
def generate_one_step(self, inputs, states=None):
# Convert strings to token IDs.
input_chars = tf.strings.unicode_split(inputs, 'UTF-8')
input_ids = self.ids_from_chars(input_chars).to_tensor()
# Run the model.
# predicted_logits.shape is [batch, char, next_char_logits]
predicted_logits, states = self.model(inputs=input_ids, states=states,
return_state=True)
# Only use the last prediction.
predicted_logits = predicted_logits[:, -1, :]
predicted_logits = predicted_logits/self.temperature
# Apply the prediction mask: prevent "[UNK]" from being generated.
predicted_logits = predicted_logits + self.prediction_mask
# Sample the output logits to generate token IDs.
predicted_ids = tf.random.categorical(predicted_logits, num_samples=1)
predicted_ids = tf.squeeze(predicted_ids, axis=-1)
# Convert from token ids to characters
predicted_chars = self.chars_from_ids(predicted_ids)
# Return the characters and model state.
return predicted_chars, states
one_step_model = OneStep(model, chars_from_ids, ids_from_chars)
在循环中运行它以生成一些文本。查看生成的文本,您会发现模型知道何时大写、何时分段,并模仿了类似莎士比亚的写作词汇。由于训练周期数较少,它尚未学会形成连贯的句子。
start = time.time()
states = None
next_char = tf.constant(['ROMEO:'])
result = [next_char]
for n in range(1000):
next_char, states = one_step_model.generate_one_step(next_char, states=states)
result.append(next_char)
result = tf.strings.join(result)
end = time.time()
print(result[0].numpy().decode('utf-8'), '\n\n' + '_'*80)
print('\nRun time:', end - start)
改进结果的最简单方法是延长训练时间(尝试 EPOCHS = 30)。
您还可以尝试不同的起始字符串,尝试添加另一个 RNN 层以提高模型的准确性,或者调整温度参数以生成更随机或更确定的预测。
如果您希望模型更快地生成文本,最简单的方法是对文本生成进行批处理。在下面的示例中,模型生成 5 个输出所花费的时间与上面生成 1 个输出的时间大致相同。
start = time.time()
states = None
next_char = tf.constant(['ROMEO:', 'ROMEO:', 'ROMEO:', 'ROMEO:', 'ROMEO:'])
result = [next_char]
for n in range(1000):
next_char, states = one_step_model.generate_one_step(next_char, states=states)
result.append(next_char)
result = tf.strings.join(result)
end = time.time()
print(result, '\n\n' + '_'*80)
print('\nRun time:', end - start)
导出生成器
此单步模型可以轻松地保存和恢复,从而允许您在任何接受 tf.saved_model 的地方使用它。
tf.saved_model.save(one_step_model, 'one_step')
one_step_reloaded = tf.saved_model.load('one_step')
states = None
next_char = tf.constant(['ROMEO:'])
result = [next_char]
for n in range(100):
next_char, states = one_step_reloaded.generate_one_step(next_char, states=states)
result.append(next_char)
print(tf.strings.join(result)[0].numpy().decode("utf-8"))
进阶:自定义训练
上述训练过程很简单,但无法提供太多控制权。它使用了教师强制 (teacher-forcing),这可以防止错误的预测被反馈回模型,因此模型永远不会学会如何从错误中恢复。
既然您已经了解了如何手动运行模型,接下来您将实现训练循环。这为您提供了一个起点,例如,如果您想实现课程学习 (curriculum learning) 以帮助稳定模型的开环输出。
自定义训练循环中最重要的部分是训练步长函数。
使用 tf.GradientTape 跟踪梯度。您可以通过阅读即时执行指南了解有关此方法的更多信息。
基本过程是:
- 在
tf.GradientTape下执行模型并计算损失。 - 计算更新并使用优化器将它们应用于模型。
class CustomTraining(MyModel):
@tf.function
def train_step(self, inputs):
inputs, labels = inputs
with tf.GradientTape() as tape:
predictions = self(inputs, training=True)
loss = self.loss(labels, predictions)
grads = tape.gradient(loss, model.trainable_variables)
self.optimizer.apply_gradients(zip(grads, model.trainable_variables))
return {'loss': loss}
上述 train_step 方法的实现遵循了 Keras 的 train_step 约定。这是可选的,但它允许您更改训练步骤的行为,同时仍然使用 Keras 的 Model.compile 和 Model.fit 方法。
model = CustomTraining(
vocab_size=len(ids_from_chars.get_vocabulary()),
embedding_dim=embedding_dim,
rnn_units=rnn_units)
model.compile(optimizer = tf.keras.optimizers.Adam(),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True))
model.fit(dataset, epochs=1)
或者,如果您需要更多控制权,可以编写自己的完整自定义训练循环。
EPOCHS = 10
mean = tf.metrics.Mean()
for epoch in range(EPOCHS):
start = time.time()
mean.reset_states()
for (batch_n, (inp, target)) in enumerate(dataset):
logs = model.train_step([inp, target])
mean.update_state(logs['loss'])
if batch_n % 50 == 0:
template = f"Epoch {epoch+1} Batch {batch_n} Loss {logs['loss']:.4f}"
print(template)
# saving (checkpoint) the model every 5 epochs
if (epoch + 1) % 5 == 0:
model.save_weights(checkpoint_prefix.format(epoch=epoch))
print()
print(f'Epoch {epoch+1} Loss: {mean.result().numpy():.4f}')
print(f'Time taken for 1 epoch {time.time() - start:.2f} sec')
print("_"*80)
model.save_weights(checkpoint_prefix.format(epoch=epoch))
在 TensorFlow.org 上查看
在 Google Colab 中运行
在 GitHub 上查看源码
下载笔记本