在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
概述
此 Colab 简要介绍了 TensorFlow 2 的 TPUEmbeddingLayer。
TPUEmbeddingLayer 可以使用 Cloud TPU 上的嵌入加速器来加速嵌入查找,尤其是在您拥有许多大型嵌入表时。这在创建推荐模型时特别有用,因为这些模型通常具有非常大的嵌入表。
请按照 Google Cloud TPU 快速入门 的说明创建 GCP 帐户和 GCS 存储桶。您有 $300 免费积分 可用于开始使用任何 GCP 产品。您可以在 https://cloud.google.com/tpu/docs 了解有关 Cloud TPU 的更多信息。
设置
安装 Tensorflow 2.0 和 Tensorflow-Recommenders
pip install -U tensorflow-recommenders
import numpy as np
import tensorflow as tf
import tensorflow_recommenders as tfrs
连接到 TPU 节点或本地 TPU 并初始化 TPU 系统。
resolver = tf.distribute.cluster_resolver.TPUClusterResolver('').connect('')
创建 TPU 策略。需要在 TPU 上运行的模型应在 TPUStrategy 下创建。
strategy = tf.distribute.TPUStrategy(resolver)
您还可以检查 TPUStrategy 对象中的 TPU 硬件功能。
例如,您可以检查此 TPU 支持的嵌入功能的版本。有关详细文档,请查看 tf.tpu.experimental.HardwareFeature
。
embedding_feature = strategy.extended.tpu_hardware_feature.embedding_feature
assert embedding_feature == tf.tpu.experimental.HardwareFeature.EmbeddingFeature.V1, 'Make sure that you have the right TPU Hardware'
TPUEmbedding API 分解
特征和表配置
创建此层的实例时,您必须指定
- 完整的嵌入表集,
- 您希望在这些表中查找的特征,以及
- 您希望在表上使用的优化器。
有关完整选项集的更多详细信息,请参阅 tf.tpu.experimental.embedding.TableConfig
和 tf.tpu.experimental.embedding.FeatureConfig
的文档。我们将在此处介绍基本用法。
多个 FeatureConfig 对象可以使用相同的 TableConfig 对象,允许不同的特征共享相同的表。
table_config_one = tf.tpu.experimental.embedding.TableConfig(
vocabulary_size=8, dim=8)
table_config_two = tf.tpu.experimental.embedding.TableConfig(
vocabulary_size=16, dim=4)
feature_config = {
'feature_one':
tf.tpu.experimental.embedding.FeatureConfig(table=table_config_one),
'feature_two':
tf.tpu.experimental.embedding.FeatureConfig(table=table_config_one),
'feature_three':
tf.tpu.experimental.embedding.FeatureConfig(table=table_config_two)
}
优化器
可以通过将以下类型之一传递给优化器参数来全局指定优化器
- 字符串,'sgd'、'adagrad' 或 'adam' 之一,使用具有默认参数的给定优化器。
- Keras 优化器的实例。
- 来自 tf.tpu.experimental.embedding 模块的优化器类的实例。
您也可以通过 tf.tpu.experimental.embedding.TableConfig
的优化器参数在表级别指定优化器。这将完全覆盖此表的全局优化器。出于性能原因,建议您将不同优化器的总数降至最低。
optimizer=tf.tpu.experimental.embedding.SGD(0.1)
模型创建
以下是使用 tpu 嵌入层创建 keras 模型的两个示例。
对于函数式 Keras 模型
with strategy.scope():
embedding_inputs = {
'feature_one':
tf.keras.Input(batch_size=1024, shape=(1,), dtype=tf.int32),
'feature_two':
tf.keras.Input(
batch_size=1024, shape=(1,), dtype=tf.int32, ragged=True),
'feature_three':
tf.keras.Input(batch_size=1024, shape=(1,), dtype=tf.int32)
}
# embedding, feature_config and embedding_inputs all have the same nested
# structure.
embedding = tfrs.layers.embedding.TPUEmbedding(
feature_config=feature_config, optimizer=optimizer)(
embedding_inputs)
logits = tf.keras.layers.Dense(1)(
tf.concat(tf.nest.flatten(embedding), axis=1))
model = tf.keras.Model(embedding_inputs, logits)
对于子类式模型
class ModelWithEmbeddings(tf.keras.Model):
def __init__(self):
super(ModelWithEmbeddings, self).__init__()
self.embedding_layer = tfrs.layers.embedding.TPUEmbedding(
feature_config=feature_config, optimizer=optimizer)
self.dense = tf.keras.layers.Dense(1)
def call(self, inputs):
embedding = self.embedding_layer(inputs)
logits = self.dense(tf.concat(tf.nest.flatten(embedding), axis=1))
return logits
# Make sure that the tpu is reinitialized when you try to create another mdoel
tf.tpu.experimental.initialize_tpu_system(resolver)
with strategy.scope():
model = ModelWithEmbeddings()
WARNING:tensorflow:TPU system grpc://10.3.32.50:8470 has already been initialized. Reinitializing the TPU can cause previously created variables on TPU to be lost.
tf.tpu.experimental.initialize_tpu_system(resolver)
WARNING:tensorflow:TPU system grpc://10.3.32.50:8470 has already been initialized. Reinitializing the TPU can cause previously created variables on TPU to be lost. <tensorflow.python.tpu.topology.Topology at 0x7f2085f74400>
简单的 TPUEmbeddingLayer 示例
在本教程中,我们使用 TPUEmbeddingLayer 构建了一个简单的排序模型,该模型使用 MovieLens 100K 数据集。我们可以使用此模型根据 user_id
和 movie_id
预测评分。
安装并导入 tensorflow 数据集
pip install -q --upgrade tensorflow-datasets
import tensorflow_datasets as tfds
读取数据
为了使数据集可供 Cloud TPU 工作器访问。您需要创建一个 gcs 存储桶并将数据集下载到存储桶中。按照此 说明 创建您的 gcs 存储桶。
gcs_bucket = 'gs://YOUR-BUCKET-NAME'
from google.colab import auth
auth.authenticate_user()
首先,我们使用 tensorflow_dataset 获取数据。我们需要的数据是 movie_id
、user_id
和 user_rating
。
然后预处理数据并将它们转换为整数。
# Ratings data.
ratings = tfds.load(
"movielens/100k-ratings", data_dir=gcs_bucket, split="train")
# Select the basic features.
ratings = ratings.map(
lambda x: {
"movie_id": tf.strings.to_number(x["movie_id"]),
"user_id": tf.strings.to_number(x["user_id"]),
"user_rating": x["user_rating"],
})
准备数据集和模型
这里我们定义了一些模型的超参数。
per_replica_batch_size = 16
movie_vocabulary_size = 2048
movie_embedding_size = 64
user_vocabulary_size = 2048
user_embedding_size = 64
我们将通过将 80% 的评分放入训练集中,将 20% 的评分放入测试集中来拆分数据。
shuffled = ratings.shuffle(100_000, seed=42, reshuffle_each_iteration=False)
train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)
对数据集进行批处理并将其转换为分布式数据集。
train_dataset = train.batch(
per_replica_batch_size * strategy.num_replicas_in_sync,
drop_remainder=True).cache()
test_dataset = test.batch(
per_replica_batch_size * strategy.num_replicas_in_sync,
drop_remainder=True).cache()
distribute_train_dataset = strategy.experimental_distribute_dataset(
train_dataset,
options=tf.distribute.InputOptions(experimental_fetch_to_device=False))
distribute_test_dataset = strategy.experimental_distribute_dataset(
test_dataset,
options=tf.distribute.InputOptions(experimental_fetch_to_device=False))
这里我们创建优化器,指定特征和表配置。然后,我们使用嵌入层创建模型。
optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.1)
user_table = tf.tpu.experimental.embedding.TableConfig(
vocabulary_size=user_vocabulary_size, dim=user_embedding_size)
movie_table = tf.tpu.experimental.embedding.TableConfig(
vocabulary_size=movie_vocabulary_size, dim=movie_embedding_size)
feature_config = {
"movie_id": tf.tpu.experimental.embedding.FeatureConfig(table=movie_table),
"user_id": tf.tpu.experimental.embedding.FeatureConfig(table=user_table)
}
# Define a ranking model with embedding layer.
class EmbeddingModel(tfrs.models.Model):
def __init__(self):
super().__init__()
self.embedding_layer = tfrs.layers.embedding.TPUEmbedding(
feature_config=feature_config, optimizer=optimizer)
self.ratings = tf.keras.Sequential([
# Learn multiple dense layers.
tf.keras.layers.Dense(256, activation="relu"),
tf.keras.layers.Dense(64, activation="relu"),
# Make rating predictions in the final layer.
tf.keras.layers.Dense(1)
])
self.task: tf.keras.layers.Layer = tfrs.tasks.Ranking(
loss=tf.keras.losses.MeanSquaredError(
reduction=tf.keras.losses.Reduction.NONE),
metrics=[tf.keras.metrics.RootMeanSquaredError()])
def compute_loss(self, features, training=False):
embedding = self.embedding_layer({
"user_id": features["user_id"],
"movie_id": features["movie_id"]
})
rating_predictions = self.ratings(
tf.concat([embedding["user_id"], embedding["movie_id"]], axis=1))
return tf.reduce_sum(
self.task(
labels=features["user_rating"], predictions=rating_predictions)) * (
1 / (per_replica_batch_size * strategy.num_replicas_in_sync))
def call(self, features, serving_config=None):
embedding = self.embedding_layer(
{
"user_id": features["user_id"],
"movie_id": features["movie_id"]
},
serving_config=serving_config)
return self.ratings(
tf.concat([embedding["user_id"], embedding["movie_id"]], axis=1))
确保您在 TPUStrategy 下初始化模型。
with strategy.scope():
model = EmbeddingModel()
model.compile(optimizer=optimizer)
训练和评估模型
import os
训练模型
model.fit(distribute_train_dataset, steps_per_epoch=10, epochs=10)
Epoch 1/10 10/10 [==============================] - 7s 32ms/step - root_mean_squared_error: 2.7897 - loss: 0.0564 - regularization_loss: 0.0000e+00 - total_loss: 0.0564 Epoch 2/10 10/10 [==============================] - 0s 26ms/step - root_mean_squared_error: 1.1963 - loss: 0.0088 - regularization_loss: 0.0000e+00 - total_loss: 0.0088 Epoch 3/10 10/10 [==============================] - 0s 25ms/step - root_mean_squared_error: 1.1261 - loss: 0.0089 - regularization_loss: 0.0000e+00 - total_loss: 0.0089 Epoch 4/10 10/10 [==============================] - 0s 35ms/step - root_mean_squared_error: 1.1403 - loss: 0.0094 - regularization_loss: 0.0000e+00 - total_loss: 0.0094 Epoch 5/10 10/10 [==============================] - 0s 40ms/step - root_mean_squared_error: 1.1269 - loss: 0.0103 - regularization_loss: 0.0000e+00 - total_loss: 0.0103 Epoch 6/10 10/10 [==============================] - 0s 36ms/step - root_mean_squared_error: 1.1162 - loss: 0.0100 - regularization_loss: 0.0000e+00 - total_loss: 0.0100 Epoch 7/10 10/10 [==============================] - 0s 36ms/step - root_mean_squared_error: 1.1365 - loss: 0.0097 - regularization_loss: 0.0000e+00 - total_loss: 0.0097 Epoch 8/10 10/10 [==============================] - 0s 47ms/step - root_mean_squared_error: 1.1171 - loss: 0.0110 - regularization_loss: 0.0000e+00 - total_loss: 0.0110 Epoch 9/10 10/10 [==============================] - 0s 48ms/step - root_mean_squared_error: 1.1037 - loss: 0.0100 - regularization_loss: 0.0000e+00 - total_loss: 0.0100 Epoch 10/10 10/10 [==============================] - 0s 51ms/step - root_mean_squared_error: 1.0953 - loss: 0.0092 - regularization_loss: 0.0000e+00 - total_loss: 0.0092 <keras.callbacks.History at 0x7f2084d7ddf0>
在测试数据集上评估模型
model.evaluate(distribute_test_dataset, steps=10)
10/10 [==============================] - 4s 27ms/step - root_mean_squared_error: 1.1339 - loss: 0.0090 - regularization_loss: 0.0000e+00 - total_loss: 0.0090 [1.1338995695114136, 0.009662957862019539, 0, 0.009662957862019539]
保存和恢复检查点
您可以使用 gcs 存储桶来存储您的检查点。
确保您按照 说明 为 tpu 工作器提供对存储桶的访问权限。
model_dir = os.path.join(gcs_bucket, "saved_model")
为 TPU 模型创建检查点并将模型保存到存储桶中。
checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
saved_tpu_model_path = checkpoint.save(os.path.join(model_dir, "ckpt"))
您可以列出存储在该路径中的变量。
tf.train.list_variables(saved_tpu_model_path)
[('_CHECKPOINTABLE_OBJECT_GRAPH', []), ('model/embedding_layer/_tpu_embedding/.ATTRIBUTES/TPUEmbedding_saveable', []), ('model/embedding_layer/_tpu_embedding/table_0/.ATTRIBUTES/VARIABLE_VALUE', [2048, 64]), ('model/embedding_layer/_tpu_embedding/table_0/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE', [2048, 64]), ('model/embedding_layer/_tpu_embedding/table_1/.ATTRIBUTES/VARIABLE_VALUE', [2048, 64]), ('model/embedding_layer/_tpu_embedding/table_1/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE', [2048, 64]), ('model/ratings/layer_with_weights-0/bias/.ATTRIBUTES/VARIABLE_VALUE', [256]), ('model/ratings/layer_with_weights-0/bias/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE', [256]), ('model/ratings/layer_with_weights-0/kernel/.ATTRIBUTES/VARIABLE_VALUE', [128, 256]), ('model/ratings/layer_with_weights-0/kernel/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE', [128, 256]), ('model/ratings/layer_with_weights-1/bias/.ATTRIBUTES/VARIABLE_VALUE', [64]), ('model/ratings/layer_with_weights-1/bias/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE', [64]), ('model/ratings/layer_with_weights-1/kernel/.ATTRIBUTES/VARIABLE_VALUE', [256, 64]), ('model/ratings/layer_with_weights-1/kernel/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE', [256, 64]), ('model/ratings/layer_with_weights-2/bias/.ATTRIBUTES/VARIABLE_VALUE', [1]), ('model/ratings/layer_with_weights-2/bias/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE', [1]), ('model/ratings/layer_with_weights-2/kernel/.ATTRIBUTES/VARIABLE_VALUE', [64, 1]), ('model/ratings/layer_with_weights-2/kernel/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE', [64, 1]), ('model/task/_ranking_metrics/0/count/.ATTRIBUTES/VARIABLE_VALUE', []), ('model/task/_ranking_metrics/0/total/.ATTRIBUTES/VARIABLE_VALUE', []), ('optimizer/decay/.ATTRIBUTES/VARIABLE_VALUE', []), ('optimizer/iter/.ATTRIBUTES/VARIABLE_VALUE', []), ('optimizer/learning_rate/.ATTRIBUTES/VARIABLE_VALUE', []), ('save_counter/.ATTRIBUTES/VARIABLE_VALUE', [])]
您以后可以恢复检查点。这是一种常见的做法,可以为每个纪元对模型进行检查点,并在之后恢复检查点。
with strategy.scope():
checkpoint.restore(saved_tpu_model_path)
此外,您可以创建 CPU 模型并恢复在 TPU 上训练的权重。
cpu_model = EmbeddingModel()
# Create the cpu checkpoint and restore the tpu checkpoint.
cpu_checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=cpu_model)
cpu_checkpoint.restore(saved_tpu_model_path)
<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f20830fe5b0>
您也可以部分恢复嵌入权重。
embedding_checkpoint = tf.train.Checkpoint(embedding=model.embedding_layer)
saved_embedding_path = embedding_checkpoint.save(
os.path.join(model_dir, 'tpu-embedding'))
# Restore the embedding parameters on cpu model.
cpu_embedding_checkpoint = tf.train.Checkpoint(
embeddign=cpu_model.embedding_layer)
cpu_embedding_checkpoint.restore(saved_embedding_path)
<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f20831bbeb0>
# Save the embedding parameters on cpu model and restore it to the tpu model.
saved_cpu_embedding_path = embedding_checkpoint.save(
os.path.join(model_dir, 'cpu-embedding'))
with strategy.scope():
embedding_checkpoint.restore(saved_cpu_embedding_path)
服务
最后,您可以使用导出的 CPU 模型进行服务。服务是通过 tf.saved_model
API 完成的。
@tf.function
def serve_tensors(features):
return cpu_model(features)
signatures = {
'serving':
serve_tensors.get_concrete_function(
features={
'movie_id':
tf.TensorSpec(shape=(1,), dtype=tf.int32, name='movie_id'),
'user_id':
tf.TensorSpec(shape=(1,), dtype=tf.int32, name='user_id'),
}),
}
tf.saved_model.save(
cpu_model,
export_dir=os.path.join(model_dir, 'exported_model'),
signatures=signatures)
WARNING:tensorflow:Skipping full serialization of Keras layer <tensorflow_recommenders.tasks.ranking.Ranking object at 0x7f20831ead00>, because it is not built.
现在可以加载(在 Python 或 C 中)并用于服务的导出模型。
imported = tf.saved_model.load(os.path.join(model_dir, 'exported_model'))
predict_fn = imported.signatures['serving']
# Dummy serving data.
input_batch = {
'movie_id': tf.constant(np.array([100]), dtype=tf.int32),
'user_id': tf.constant(np.array([30]), dtype=tf.int32)
}
# The prediction it generates.
prediction = predict_fn(**input_batch)['output_0']
WARNING:tensorflow:Detecting that an object or model or tf.train.Checkpoint is being deleted with unrestored values. See the following logs for the specific values in question. To silence these warnings, use `status.expect_partial()`. See https://tensorflowcn.cn/api_docs/python/tf/train/Checkpoint#restorefor details about the status object returned by the restore function. WARNING:tensorflow:An attribute in the restored object could not be found in the checkpoint. Object: (root).embedding_layer._tpu_embedding, attribute: ['TPUEmbedding_saveable']
此外,您可以传递服务配置来进行服务。
注意:您可以使用训练好的嵌入表的一个子集来进行服务,方法是使用服务配置。
serving_config = {
'movie_id': tf.tpu.experimental.embedding.FeatureConfig(table=movie_table),
'user_id': tf.tpu.experimental.embedding.FeatureConfig(table=user_table)
}
prediction = cpu_model(input_batch, serving_config=serving_config)