TensorFlow 2 TPUEmbeddingLayer:快速入门

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

概述

此 Colab 简要介绍了 TensorFlow 2 的 TPUEmbeddingLayer。

TPUEmbeddingLayer 可以使用 Cloud TPU 上的嵌入加速器来加速嵌入查找,尤其是在您拥有许多大型嵌入表时。这在创建推荐模型时特别有用,因为这些模型通常具有非常大的嵌入表。

请按照 Google Cloud TPU 快速入门 的说明创建 GCP 帐户和 GCS 存储桶。您有 $300 免费积分 可用于开始使用任何 GCP 产品。您可以在 https://cloud.google.com/tpu/docs 了解有关 Cloud TPU 的更多信息。

设置

安装 Tensorflow 2.0 和 Tensorflow-Recommenders

pip install -U tensorflow-recommenders
import numpy as np
import tensorflow as tf
import tensorflow_recommenders as tfrs

连接到 TPU 节点或本地 TPU 并初始化 TPU 系统。

resolver = tf.distribute.cluster_resolver.TPUClusterResolver('').connect('')

创建 TPU 策略。需要在 TPU 上运行的模型应在 TPUStrategy 下创建。

strategy = tf.distribute.TPUStrategy(resolver)

您还可以检查 TPUStrategy 对象中的 TPU 硬件功能。

例如,您可以检查此 TPU 支持的嵌入功能的版本。有关详细文档,请查看 tf.tpu.experimental.HardwareFeature

embedding_feature = strategy.extended.tpu_hardware_feature.embedding_feature
assert embedding_feature == tf.tpu.experimental.HardwareFeature.EmbeddingFeature.V1, 'Make sure that you have the right TPU Hardware'

TPUEmbedding API 分解

特征和表配置

创建此层的实例时,您必须指定

  1. 完整的嵌入表集,
  2. 您希望在这些表中查找的特征,以及
  3. 您希望在表上使用的优化器。

有关完整选项集的更多详细信息,请参阅 tf.tpu.experimental.embedding.TableConfigtf.tpu.experimental.embedding.FeatureConfig 的文档。我们将在此处介绍基本用法。

多个 FeatureConfig 对象可以使用相同的 TableConfig 对象,允许不同的特征共享相同的表。

table_config_one = tf.tpu.experimental.embedding.TableConfig(
    vocabulary_size=8, dim=8)
table_config_two = tf.tpu.experimental.embedding.TableConfig(
    vocabulary_size=16, dim=4)
feature_config = {
    'feature_one':
        tf.tpu.experimental.embedding.FeatureConfig(table=table_config_one),
    'feature_two':
        tf.tpu.experimental.embedding.FeatureConfig(table=table_config_one),
    'feature_three':
        tf.tpu.experimental.embedding.FeatureConfig(table=table_config_two)
}

优化器

可以通过将以下类型之一传递给优化器参数来全局指定优化器

  1. 字符串,'sgd'、'adagrad' 或 'adam' 之一,使用具有默认参数的给定优化器。
  2. Keras 优化器的实例。
  3. 来自 tf.tpu.experimental.embedding 模块的优化器类的实例。

您也可以通过 tf.tpu.experimental.embedding.TableConfig 的优化器参数在表级别指定优化器。这将完全覆盖此表的全局优化器。出于性能原因,建议您将不同优化器的总数降至最低。

optimizer=tf.tpu.experimental.embedding.SGD(0.1)

模型创建

以下是使用 tpu 嵌入层创建 keras 模型的两个示例。

对于函数式 Keras 模型

with strategy.scope():
  embedding_inputs = {
      'feature_one':
          tf.keras.Input(batch_size=1024, shape=(1,), dtype=tf.int32),
      'feature_two':
          tf.keras.Input(
              batch_size=1024, shape=(1,), dtype=tf.int32, ragged=True),
      'feature_three':
          tf.keras.Input(batch_size=1024, shape=(1,), dtype=tf.int32)
  }
  # embedding, feature_config and embedding_inputs all have the same nested
  # structure.
  embedding = tfrs.layers.embedding.TPUEmbedding(
      feature_config=feature_config, optimizer=optimizer)(
          embedding_inputs)
  logits = tf.keras.layers.Dense(1)(
      tf.concat(tf.nest.flatten(embedding), axis=1))
  model = tf.keras.Model(embedding_inputs, logits)

对于子类式模型

class ModelWithEmbeddings(tf.keras.Model):

  def __init__(self):
    super(ModelWithEmbeddings, self).__init__()
    self.embedding_layer = tfrs.layers.embedding.TPUEmbedding(
        feature_config=feature_config, optimizer=optimizer)
    self.dense = tf.keras.layers.Dense(1)

  def call(self, inputs):
    embedding = self.embedding_layer(inputs)
    logits = self.dense(tf.concat(tf.nest.flatten(embedding), axis=1))
    return logits


# Make sure that the tpu is reinitialized when you try to create another mdoel
tf.tpu.experimental.initialize_tpu_system(resolver)
with strategy.scope():
  model = ModelWithEmbeddings()
WARNING:tensorflow:TPU system grpc://10.3.32.50:8470 has already been initialized. Reinitializing the TPU can cause previously created variables on TPU to be lost.
tf.tpu.experimental.initialize_tpu_system(resolver)
WARNING:tensorflow:TPU system grpc://10.3.32.50:8470 has already been initialized. Reinitializing the TPU can cause previously created variables on TPU to be lost.
<tensorflow.python.tpu.topology.Topology at 0x7f2085f74400>

简单的 TPUEmbeddingLayer 示例

在本教程中,我们使用 TPUEmbeddingLayer 构建了一个简单的排序模型,该模型使用 MovieLens 100K 数据集。我们可以使用此模型根据 user_idmovie_id 预测评分。

安装并导入 tensorflow 数据集

pip install -q --upgrade tensorflow-datasets
import tensorflow_datasets as tfds

读取数据

为了使数据集可供 Cloud TPU 工作器访问。您需要创建一个 gcs 存储桶并将数据集下载到存储桶中。按照此 说明 创建您的 gcs 存储桶。

gcs_bucket = 'gs://YOUR-BUCKET-NAME'
from google.colab import auth
auth.authenticate_user()

首先,我们使用 tensorflow_dataset 获取数据。我们需要的数据是 movie_iduser_iduser_rating

然后预处理数据并将它们转换为整数。

# Ratings data.
ratings = tfds.load(
    "movielens/100k-ratings", data_dir=gcs_bucket, split="train")

# Select the basic features.
ratings = ratings.map(
    lambda x: {
        "movie_id": tf.strings.to_number(x["movie_id"]),
        "user_id": tf.strings.to_number(x["user_id"]),
        "user_rating": x["user_rating"],
    })

准备数据集和模型

这里我们定义了一些模型的超参数。

per_replica_batch_size = 16
movie_vocabulary_size = 2048
movie_embedding_size = 64
user_vocabulary_size = 2048
user_embedding_size = 64

我们将通过将 80% 的评分放入训练集中,将 20% 的评分放入测试集中来拆分数据。

shuffled = ratings.shuffle(100_000, seed=42, reshuffle_each_iteration=False)

train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)

对数据集进行批处理并将其转换为分布式数据集。

train_dataset = train.batch(
    per_replica_batch_size * strategy.num_replicas_in_sync,
    drop_remainder=True).cache()
test_dataset = test.batch(
    per_replica_batch_size * strategy.num_replicas_in_sync,
    drop_remainder=True).cache()
distribute_train_dataset = strategy.experimental_distribute_dataset(
    train_dataset,
    options=tf.distribute.InputOptions(experimental_fetch_to_device=False))
distribute_test_dataset = strategy.experimental_distribute_dataset(
    test_dataset,
    options=tf.distribute.InputOptions(experimental_fetch_to_device=False))

这里我们创建优化器,指定特征和表配置。然后,我们使用嵌入层创建模型。

optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.1)

user_table = tf.tpu.experimental.embedding.TableConfig(
    vocabulary_size=user_vocabulary_size, dim=user_embedding_size)
movie_table = tf.tpu.experimental.embedding.TableConfig(
    vocabulary_size=movie_vocabulary_size, dim=movie_embedding_size)
feature_config = {
    "movie_id": tf.tpu.experimental.embedding.FeatureConfig(table=movie_table),
    "user_id": tf.tpu.experimental.embedding.FeatureConfig(table=user_table)
}


# Define a ranking model with embedding layer.
class EmbeddingModel(tfrs.models.Model):

  def __init__(self):
    super().__init__()

    self.embedding_layer = tfrs.layers.embedding.TPUEmbedding(
        feature_config=feature_config, optimizer=optimizer)
    self.ratings = tf.keras.Sequential([
        # Learn multiple dense layers.
        tf.keras.layers.Dense(256, activation="relu"),
        tf.keras.layers.Dense(64, activation="relu"),
        # Make rating predictions in the final layer.
        tf.keras.layers.Dense(1)
    ])
    self.task: tf.keras.layers.Layer = tfrs.tasks.Ranking(
        loss=tf.keras.losses.MeanSquaredError(
            reduction=tf.keras.losses.Reduction.NONE),
        metrics=[tf.keras.metrics.RootMeanSquaredError()])

  def compute_loss(self, features, training=False):
    embedding = self.embedding_layer({
        "user_id": features["user_id"],
        "movie_id": features["movie_id"]
    })
    rating_predictions = self.ratings(
        tf.concat([embedding["user_id"], embedding["movie_id"]], axis=1))

    return tf.reduce_sum(
        self.task(
            labels=features["user_rating"], predictions=rating_predictions)) * (
                1 / (per_replica_batch_size * strategy.num_replicas_in_sync))

  def call(self, features, serving_config=None):
    embedding = self.embedding_layer(
        {
            "user_id": features["user_id"],
            "movie_id": features["movie_id"]
        },
        serving_config=serving_config)
    return self.ratings(
        tf.concat([embedding["user_id"], embedding["movie_id"]], axis=1))

确保您在 TPUStrategy 下初始化模型。

with strategy.scope():
  model = EmbeddingModel()
  model.compile(optimizer=optimizer)

训练和评估模型

import os

训练模型

model.fit(distribute_train_dataset, steps_per_epoch=10, epochs=10)
Epoch 1/10
10/10 [==============================] - 7s 32ms/step - root_mean_squared_error: 2.7897 - loss: 0.0564 - regularization_loss: 0.0000e+00 - total_loss: 0.0564
Epoch 2/10
10/10 [==============================] - 0s 26ms/step - root_mean_squared_error: 1.1963 - loss: 0.0088 - regularization_loss: 0.0000e+00 - total_loss: 0.0088
Epoch 3/10
10/10 [==============================] - 0s 25ms/step - root_mean_squared_error: 1.1261 - loss: 0.0089 - regularization_loss: 0.0000e+00 - total_loss: 0.0089
Epoch 4/10
10/10 [==============================] - 0s 35ms/step - root_mean_squared_error: 1.1403 - loss: 0.0094 - regularization_loss: 0.0000e+00 - total_loss: 0.0094
Epoch 5/10
10/10 [==============================] - 0s 40ms/step - root_mean_squared_error: 1.1269 - loss: 0.0103 - regularization_loss: 0.0000e+00 - total_loss: 0.0103
Epoch 6/10
10/10 [==============================] - 0s 36ms/step - root_mean_squared_error: 1.1162 - loss: 0.0100 - regularization_loss: 0.0000e+00 - total_loss: 0.0100
Epoch 7/10
10/10 [==============================] - 0s 36ms/step - root_mean_squared_error: 1.1365 - loss: 0.0097 - regularization_loss: 0.0000e+00 - total_loss: 0.0097
Epoch 8/10
10/10 [==============================] - 0s 47ms/step - root_mean_squared_error: 1.1171 - loss: 0.0110 - regularization_loss: 0.0000e+00 - total_loss: 0.0110
Epoch 9/10
10/10 [==============================] - 0s 48ms/step - root_mean_squared_error: 1.1037 - loss: 0.0100 - regularization_loss: 0.0000e+00 - total_loss: 0.0100
Epoch 10/10
10/10 [==============================] - 0s 51ms/step - root_mean_squared_error: 1.0953 - loss: 0.0092 - regularization_loss: 0.0000e+00 - total_loss: 0.0092
<keras.callbacks.History at 0x7f2084d7ddf0>

在测试数据集上评估模型

model.evaluate(distribute_test_dataset, steps=10)
10/10 [==============================] - 4s 27ms/step - root_mean_squared_error: 1.1339 - loss: 0.0090 - regularization_loss: 0.0000e+00 - total_loss: 0.0090
[1.1338995695114136, 0.009662957862019539, 0, 0.009662957862019539]

保存和恢复检查点

您可以使用 gcs 存储桶来存储您的检查点。

确保您按照 说明 为 tpu 工作器提供对存储桶的访问权限。

model_dir = os.path.join(gcs_bucket, "saved_model")

为 TPU 模型创建检查点并将模型保存到存储桶中。

checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
saved_tpu_model_path = checkpoint.save(os.path.join(model_dir, "ckpt"))

您可以列出存储在该路径中的变量。

tf.train.list_variables(saved_tpu_model_path)
[('_CHECKPOINTABLE_OBJECT_GRAPH', []),
 ('model/embedding_layer/_tpu_embedding/.ATTRIBUTES/TPUEmbedding_saveable',
  []),
 ('model/embedding_layer/_tpu_embedding/table_0/.ATTRIBUTES/VARIABLE_VALUE',
  [2048, 64]),
 ('model/embedding_layer/_tpu_embedding/table_0/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE',
  [2048, 64]),
 ('model/embedding_layer/_tpu_embedding/table_1/.ATTRIBUTES/VARIABLE_VALUE',
  [2048, 64]),
 ('model/embedding_layer/_tpu_embedding/table_1/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE',
  [2048, 64]),
 ('model/ratings/layer_with_weights-0/bias/.ATTRIBUTES/VARIABLE_VALUE', [256]),
 ('model/ratings/layer_with_weights-0/bias/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE',
  [256]),
 ('model/ratings/layer_with_weights-0/kernel/.ATTRIBUTES/VARIABLE_VALUE',
  [128, 256]),
 ('model/ratings/layer_with_weights-0/kernel/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE',
  [128, 256]),
 ('model/ratings/layer_with_weights-1/bias/.ATTRIBUTES/VARIABLE_VALUE', [64]),
 ('model/ratings/layer_with_weights-1/bias/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE',
  [64]),
 ('model/ratings/layer_with_weights-1/kernel/.ATTRIBUTES/VARIABLE_VALUE',
  [256, 64]),
 ('model/ratings/layer_with_weights-1/kernel/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE',
  [256, 64]),
 ('model/ratings/layer_with_weights-2/bias/.ATTRIBUTES/VARIABLE_VALUE', [1]),
 ('model/ratings/layer_with_weights-2/bias/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE',
  [1]),
 ('model/ratings/layer_with_weights-2/kernel/.ATTRIBUTES/VARIABLE_VALUE',
  [64, 1]),
 ('model/ratings/layer_with_weights-2/kernel/.OPTIMIZER_SLOT/optimizer/accumulator/.ATTRIBUTES/VARIABLE_VALUE',
  [64, 1]),
 ('model/task/_ranking_metrics/0/count/.ATTRIBUTES/VARIABLE_VALUE', []),
 ('model/task/_ranking_metrics/0/total/.ATTRIBUTES/VARIABLE_VALUE', []),
 ('optimizer/decay/.ATTRIBUTES/VARIABLE_VALUE', []),
 ('optimizer/iter/.ATTRIBUTES/VARIABLE_VALUE', []),
 ('optimizer/learning_rate/.ATTRIBUTES/VARIABLE_VALUE', []),
 ('save_counter/.ATTRIBUTES/VARIABLE_VALUE', [])]

您以后可以恢复检查点。这是一种常见的做法,可以为每个纪元对模型进行检查点,并在之后恢复检查点。

with strategy.scope():
  checkpoint.restore(saved_tpu_model_path)

此外,您可以创建 CPU 模型并恢复在 TPU 上训练的权重。

cpu_model = EmbeddingModel()

# Create the cpu checkpoint and restore the tpu checkpoint.
cpu_checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=cpu_model)
cpu_checkpoint.restore(saved_tpu_model_path)
<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f20830fe5b0>

您也可以部分恢复嵌入权重。

embedding_checkpoint = tf.train.Checkpoint(embedding=model.embedding_layer)
saved_embedding_path = embedding_checkpoint.save(
    os.path.join(model_dir, 'tpu-embedding'))
# Restore the embedding parameters on cpu model.
cpu_embedding_checkpoint = tf.train.Checkpoint(
    embeddign=cpu_model.embedding_layer)
cpu_embedding_checkpoint.restore(saved_embedding_path)
<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f20831bbeb0>
# Save the embedding parameters on cpu model and restore it to the tpu model.
saved_cpu_embedding_path = embedding_checkpoint.save(
    os.path.join(model_dir, 'cpu-embedding'))
with strategy.scope():
  embedding_checkpoint.restore(saved_cpu_embedding_path)

服务

最后,您可以使用导出的 CPU 模型进行服务。服务是通过 tf.saved_model API 完成的。

@tf.function
def serve_tensors(features):
  return cpu_model(features)


signatures = {
    'serving':
        serve_tensors.get_concrete_function(
            features={
                'movie_id':
                    tf.TensorSpec(shape=(1,), dtype=tf.int32, name='movie_id'),
                'user_id':
                    tf.TensorSpec(shape=(1,), dtype=tf.int32, name='user_id'),
            }),
}
tf.saved_model.save(
    cpu_model,
    export_dir=os.path.join(model_dir, 'exported_model'),
    signatures=signatures)
WARNING:tensorflow:Skipping full serialization of Keras layer <tensorflow_recommenders.tasks.ranking.Ranking object at 0x7f20831ead00>, because it is not built.

现在可以加载(在 Python 或 C 中)并用于服务的导出模型。

imported = tf.saved_model.load(os.path.join(model_dir, 'exported_model'))
predict_fn = imported.signatures['serving']

# Dummy serving data.
input_batch = {
    'movie_id': tf.constant(np.array([100]), dtype=tf.int32),
    'user_id': tf.constant(np.array([30]), dtype=tf.int32)
}
# The prediction it generates.
prediction = predict_fn(**input_batch)['output_0']
WARNING:tensorflow:Detecting that an object or model or tf.train.Checkpoint is being deleted with unrestored values. See the following logs for the specific values in question. To silence these warnings, use `status.expect_partial()`. See https://tensorflowcn.cn/api_docs/python/tf/train/Checkpoint#restorefor details about the status object returned by the restore function.
WARNING:tensorflow:An attribute in the restored object could not be found in the checkpoint. Object: (root).embedding_layer._tpu_embedding, attribute: ['TPUEmbedding_saveable']

此外,您可以传递服务配置来进行服务。

注意:您可以使用训练好的嵌入表的一个子集来进行服务,方法是使用服务配置。

serving_config = {
    'movie_id': tf.tpu.experimental.embedding.FeatureConfig(table=movie_table),
    'user_id': tf.tpu.experimental.embedding.FeatureConfig(table=user_table)
}
prediction = cpu_model(input_batch, serving_config=serving_config)