从 TPU embedding_columns 迁移到 TPUEmbedding 层

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

本指南演示了如何将嵌入式训练从 TensorFlow 1 的 embedding_column API 与 TPUEstimator 迁移到 TensorFlow 2 的 TPUEmbedding 层 API 与 TPUStrategy,在 TPU 上进行。

嵌入是(大型)矩阵。它们是查找表,将稀疏特征空间映射到密集向量。嵌入提供高效且密集的表示,捕获特征之间复杂的相似性和关系。

TensorFlow 包含专门支持在 TPU 上训练嵌入。这种 TPU 特定的嵌入支持允许您训练比单个 TPU 设备内存更大的嵌入,并在 TPU 上使用稀疏和不规则输入。

有关更多信息,请参阅 tfrs.layers.embedding.TPUEmbedding 层的 API 文档,以及 tf.tpu.experimental.embedding.TableConfigtf.tpu.experimental.embedding.FeatureConfig 文档以获取更多信息。有关 tf.distribute.TPUStrategy 的概述,请查看 分布式训练 指南和 使用 TPU 指南。如果您要从 TPUEstimator 迁移到 TPUStrategy,请查看 TPU 迁移指南

设置

首先安装 TensorFlow Recommenders 并导入一些必要的包

pip install tensorflow-recommenders
import tensorflow as tf
import tensorflow.compat.v1 as tf1

# TPUEmbedding layer is not part of TensorFlow.
import tensorflow_recommenders as tfrs
/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/requests/__init__.py:104: RequestsDependencyWarning: urllib3 (1.26.8) or chardet (2.3.0)/charset_normalizer (2.0.12) doesn't match a supported version!
  RequestsDependencyWarning)

并准备一个简单的演示数据集

features = [[1., 1.5]]
embedding_features_indices = [[0, 0], [0, 1]]
embedding_features_values = [0, 5]
labels = [[0.3]]
eval_features = [[4., 4.5]]
eval_embedding_features_indices = [[0, 0], [0, 1]]
eval_embedding_features_values = [4, 3]
eval_labels = [[0.8]]

TensorFlow 1:使用 TPUEstimator 在 TPU 上训练嵌入

在 TensorFlow 1 中,您可以使用 tf.compat.v1.tpu.experimental.embedding_column API 设置 TPU 嵌入,并使用 tf.compat.v1.estimator.tpu.TPUEstimator 在 TPU 上训练/评估模型。

输入是范围从零到 TPU 嵌入表词汇量大小的整数。首先使用 tf.feature_column.categorical_column_with_identity 将输入编码为分类 ID。由于输入特征是整数值,因此使用 "sparse_feature" 作为 key 参数,而 num_buckets 是嵌入表的词汇量大小 (10)。

embedding_id_column = (
      tf1.feature_column.categorical_column_with_identity(
          key="sparse_feature", num_buckets=10))

接下来,使用 tpu.experimental.embedding_column 将稀疏分类输入转换为密集表示,其中 dimension 是嵌入表的宽度。它将为每个 num_buckets 存储一个嵌入向量。

embedding_column = tf1.tpu.experimental.embedding_column(
    embedding_id_column, dimension=5)

现在,通过 tf.estimator.tpu.experimental.EmbeddingConfigSpec 定义 TPU 特定的嵌入配置。稍后,您将将其作为 embedding_config_spec 参数传递给 tf.estimator.tpu.TPUEstimator

embedding_config_spec = tf1.estimator.tpu.experimental.EmbeddingConfigSpec(
    feature_columns=(embedding_column,),
    optimization_parameters=(
        tf1.tpu.experimental.AdagradParameters(0.05)))

接下来,要使用 TPUEstimator,请定义

  • 训练数据的输入函数
  • 评估数据的评估输入函数
  • 一个用于指导 TPUEstimator 如何使用特征和标签定义训练操作的模型函数
def _input_fn(params):
  dataset = tf1.data.Dataset.from_tensor_slices((
      {"dense_feature": features,
       "sparse_feature": tf1.SparseTensor(
           embedding_features_indices,
           embedding_features_values, [1, 2])},
           labels))
  dataset = dataset.repeat()
  return dataset.batch(params['batch_size'], drop_remainder=True)

def _eval_input_fn(params):
  dataset = tf1.data.Dataset.from_tensor_slices((
      {"dense_feature": eval_features,
       "sparse_feature": tf1.SparseTensor(
           eval_embedding_features_indices,
           eval_embedding_features_values, [1, 2])},
           eval_labels))
  dataset = dataset.repeat()
  return dataset.batch(params['batch_size'], drop_remainder=True)

def _model_fn(features, labels, mode, params):
  embedding_features = tf1.keras.layers.DenseFeatures(embedding_column)(features)
  concatenated_features = tf1.keras.layers.Concatenate(axis=1)(
      [embedding_features, features["dense_feature"]])
  logits = tf1.layers.Dense(1)(concatenated_features)
  loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
  optimizer = tf1.train.AdagradOptimizer(0.05)
  optimizer = tf1.tpu.CrossShardOptimizer(optimizer)
  train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
  return tf1.estimator.tpu.TPUEstimatorSpec(mode, loss=loss, train_op=train_op)

定义完这些函数后,创建一个 tf.distribute.cluster_resolver.TPUClusterResolver 来提供集群信息,以及一个 tf.compat.v1.estimator.tpu.RunConfig 对象。

结合你定义的模型函数,现在可以创建一个 TPUEstimator。这里,我们将简化流程,跳过检查点保存。然后,你将为 TPUEstimator 指定训练和评估的批次大小。

cluster_resolver = tf1.distribute.cluster_resolver.TPUClusterResolver(tpu='')
print("All devices: ", tf1.config.list_logical_devices('TPU'))
All devices:  []
tpu_config = tf1.estimator.tpu.TPUConfig(
    iterations_per_loop=10,
    per_host_input_for_training=tf1.estimator.tpu.InputPipelineConfig
          .PER_HOST_V2)
config = tf1.estimator.tpu.RunConfig(
    cluster=cluster_resolver,
    save_checkpoints_steps=None,
    tpu_config=tpu_config)
estimator = tf1.estimator.tpu.TPUEstimator(
    model_fn=_model_fn, config=config, train_batch_size=8, eval_batch_size=8,
    embedding_config_spec=embedding_config_spec)
WARNING:tensorflow:Estimator's model_fn (<function _model_fn at 0x7f168033fc80>) includes params argument, but params are not passed to Estimator.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpxc9fm1_q
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpxc9fm1_q', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': None, '_session_config': allow_soft_placement: true
cluster_def {
  job {
    name: "worker"
    tasks {
      key: 0
      value: "10.240.1.10:8470"
    }
  }
}
isolate_session_state: true
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': None, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({'worker': ['10.240.1.10:8470']}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': 'grpc://10.240.1.10:8470', '_evaluation_master': 'grpc://10.240.1.10:8470', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_tpu_config': TPUConfig(iterations_per_loop=10, num_shards=None, num_cores_per_replica=None, per_host_input_for_training=3, tpu_job_name=None, initial_infeed_sleep_secs=None, input_partition_dims=None, eval_training_input_configuration=2, experimental_host_call_every_n_steps=1, experimental_allow_per_host_v2_parallel_get_next=False, experimental_feed_hook=None), '_cluster': <tensorflow.python.distribute.cluster_resolver.tpu.tpu_cluster_resolver.TPUClusterResolver object at 0x7f16803b4a20>}
INFO:tensorflow:_TPUContext: eval_on_tpu True

调用 TPUEstimator.train 开始训练模型

estimator.train(_input_fn, steps=1)
INFO:tensorflow:Querying Tensorflow master (grpc://10.240.1.10:8470) for TPU system metadata.
INFO:tensorflow:Found TPU system:
INFO:tensorflow:*** Num TPU Cores: 8
INFO:tensorflow:*** Num TPU Workers: 1
INFO:tensorflow:*** Num TPU Cores Per Worker: 8
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, -1, 6349538157198932596)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 17179869184, 9059152445598865227)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 17179869184, 3922455949451878923)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 17179869184, -6084187114011162725)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 17179869184, 8400191476321474241)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 17179869184, 5484621084550964852)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 17179869184, -8416772895681308377)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 17179869184, 2490716523526845408)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 17179869184, 7973779273400954871)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 17179869184, 6478654019570154047)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 17179869184, 7299189593611257732)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/training_util.py:236: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/tpu/feature_column_v2.py:479: IdentityCategoricalColumn._num_buckets (from tensorflow.python.feature_column.feature_column_v2) is deprecated and will be removed in a future version.
Instructions for updating:
The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead.
INFO:tensorflow:Querying Tensorflow master (grpc://10.240.1.10:8470) for TPU system metadata.
INFO:tensorflow:Found TPU system:
INFO:tensorflow:*** Num TPU Cores: 8
INFO:tensorflow:*** Num TPU Workers: 1
INFO:tensorflow:*** Num TPU Cores Per Worker: 8
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, -1, 6349538157198932596)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 17179869184, 9059152445598865227)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 17179869184, 3922455949451878923)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 17179869184, -6084187114011162725)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 17179869184, 8400191476321474241)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 17179869184, 5484621084550964852)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 17179869184, -8416772895681308377)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 17179869184, 2490716523526845408)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 17179869184, 7973779273400954871)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 17179869184, 6478654019570154047)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 17179869184, 7299189593611257732)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow/python/training/adagrad.py:77: calling Constant.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
INFO:tensorflow:Bypassing TPUEstimator hook
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:TPU job name worker
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/tpu/tpu_estimator.py:758: Variable.load (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Prefer Variable.assign which has equivalent behavior in 2.X.
INFO:tensorflow:Initialized dataset iterators in 0 seconds
INFO:tensorflow:Installing graceful shutdown hook.
INFO:tensorflow:Creating heartbeat manager for ['/job:worker/replica:0/task:0/device:CPU:0']
INFO:tensorflow:Configuring worker heartbeat: shutdown_mode: WAIT_FOR_COORDINATOR

INFO:tensorflow:Init TPU system
INFO:tensorflow:Initialized TPU in 8 seconds
INFO:tensorflow:Starting infeed thread controller.
INFO:tensorflow:Starting outfeed thread controller.
INFO:tensorflow:Enqueue next (1) batch(es) of data to infeed.
INFO:tensorflow:Dequeue next (1) batch(es) of data from outfeed.
INFO:tensorflow:Outfeed finished for iteration (0, 0)
INFO:tensorflow:loss = 0.6467617, step = 1
INFO:tensorflow:Stop infeed thread controller
INFO:tensorflow:Shutting down InfeedController thread.
INFO:tensorflow:InfeedController received shutdown signal, stopping.
INFO:tensorflow:Infeed thread finished, shutting down.
INFO:tensorflow:infeed marked as finished
INFO:tensorflow:Stop output thread controller
INFO:tensorflow:Shutting down OutfeedController thread.
INFO:tensorflow:OutfeedController received shutdown signal, stopping.
INFO:tensorflow:Outfeed thread finished, shutting down.
INFO:tensorflow:outfeed marked as finished
INFO:tensorflow:Shutdown TPU system.
INFO:tensorflow:Loss for final step: 0.6467617.
INFO:tensorflow:training_loop marked as finished
<tensorflow_estimator.python.estimator.tpu.tpu_estimator.TPUEstimator at 0x7f168035b128>

然后,调用 TPUEstimator.evaluate 使用评估数据评估模型

estimator.evaluate(_eval_input_fn, steps=1)
INFO:tensorflow:Could not find trained model in model_dir: /tmp/tmpxc9fm1_q, running initialization to evaluate.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Querying Tensorflow master (grpc://10.240.1.10:8470) for TPU system metadata.
INFO:tensorflow:Found TPU system:
INFO:tensorflow:*** Num TPU Cores: 8
INFO:tensorflow:*** Num TPU Workers: 1
INFO:tensorflow:*** Num TPU Cores Per Worker: 8
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, -1, 6349538157198932596)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 17179869184, 9059152445598865227)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 17179869184, 3922455949451878923)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 17179869184, -6084187114011162725)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 17179869184, 8400191476321474241)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 17179869184, 5484621084550964852)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 17179869184, -8416772895681308377)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 17179869184, 2490716523526845408)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 17179869184, 7973779273400954871)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 17179869184, 6478654019570154047)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 17179869184, 7299189593611257732)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/tpu/tpu_estimator.py:3406: div (from tensorflow.python.ops.math_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Deprecated in favor of operator or tf.math.divide.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2022-03-02T13:22:48
INFO:tensorflow:TPU job name worker
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Init TPU system
INFO:tensorflow:Initialized TPU in 11 seconds
INFO:tensorflow:Starting infeed thread controller.
INFO:tensorflow:Starting outfeed thread controller.
INFO:tensorflow:Initialized dataset iterators in 0 seconds
INFO:tensorflow:Enqueue next (1) batch(es) of data to infeed.
INFO:tensorflow:Dequeue next (1) batch(es) of data from outfeed.
INFO:tensorflow:Outfeed finished for iteration (0, 0)
INFO:tensorflow:Evaluation [1/1]
INFO:tensorflow:Stop infeed thread controller
INFO:tensorflow:Shutting down InfeedController thread.
INFO:tensorflow:InfeedController received shutdown signal, stopping.
INFO:tensorflow:Infeed thread finished, shutting down.
INFO:tensorflow:infeed marked as finished
INFO:tensorflow:Stop output thread controller
INFO:tensorflow:Shutting down OutfeedController thread.
INFO:tensorflow:OutfeedController received shutdown signal, stopping.
INFO:tensorflow:Outfeed thread finished, shutting down.
INFO:tensorflow:outfeed marked as finished
INFO:tensorflow:Shutdown TPU system.
INFO:tensorflow:Inference Time : 12.06464s
INFO:tensorflow:Finished evaluation at 2022-03-02-13:23:00
INFO:tensorflow:Saving dict for global step 1: global_step = 1, loss = 0.16138805
INFO:tensorflow:evaluation_loop marked as finished
{'loss': 0.16138805, 'global_step': 1}

TensorFlow 2:使用 TPUStrategy 在 TPU 上训练嵌入

在 TensorFlow 2 中,要在 TPU 工作节点上进行训练,请使用 tf.distribute.TPUStrategy 以及 Keras API 来定义模型和进行训练/评估。(有关使用 Keras Model.fit 和自定义训练循环(使用 tf.functiontf.GradientTape)的更多示例,请参阅 使用 TPU 指南。)

由于你需要执行一些初始化工作来连接到远程集群并初始化 TPU 工作节点,因此首先创建一个 TPUClusterResolver 来提供集群信息并连接到集群。(在 使用 TPU 指南的“TPU 初始化”部分中了解更多信息。)

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))
INFO:tensorflow:Clearing out eager caches
INFO:tensorflow:Clearing out eager caches
INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.10:8470
INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.10:8470
INFO:tensorflow:Finished initializing TPU system.
INFO:tensorflow:Finished initializing TPU system.
All devices:  [LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU')]

接下来,准备你的数据。这与你在 TensorFlow 1 示例中创建数据集的方式类似,只是数据集函数现在传递了一个 tf.distribute.InputContext 对象,而不是一个 params 字典。你可以使用此对象来确定本地批次大小(以及此管道所在的宿主,以便你可以正确地划分你的数据)。

  • 使用 tfrs.layers.embedding.TPUEmbedding API 时,在使用 Dataset.batch 对数据集进行批处理时,务必包含 drop_remainder=True 选项,因为 TPUEmbedding 需要固定批次大小。
  • 此外,如果评估和训练在同一组设备上进行,则必须使用相同的批次大小。
  • 最后,你应该使用 tf.keras.utils.experimental.DatasetCreator 以及特殊输入选项 - experimental_fetch_to_device=False - 在 tf.distribute.InputOptions(它包含特定于策略的配置)中。这将在下面演示
global_batch_size = 8

def _input_dataset(context: tf.distribute.InputContext):
  dataset = tf.data.Dataset.from_tensor_slices((
      {"dense_feature": features,
       "sparse_feature": tf.SparseTensor(
           embedding_features_indices,
           embedding_features_values, [1, 2])},
           labels))
  dataset = dataset.shuffle(10).repeat()
  dataset = dataset.batch(
      context.get_per_replica_batch_size(global_batch_size),
      drop_remainder=True)
  return dataset.prefetch(2)

def _eval_dataset(context: tf.distribute.InputContext):
  dataset = tf.data.Dataset.from_tensor_slices((
      {"dense_feature": eval_features,
       "sparse_feature": tf.SparseTensor(
           eval_embedding_features_indices,
           eval_embedding_features_values, [1, 2])},
           eval_labels))
  dataset = dataset.repeat()
  dataset = dataset.batch(
      context.get_per_replica_batch_size(global_batch_size),
      drop_remainder=True)
  return dataset.prefetch(2)

input_options = tf.distribute.InputOptions(
    experimental_fetch_to_device=False)

input_dataset = tf.keras.utils.experimental.DatasetCreator(
    _input_dataset, input_options=input_options)

eval_dataset = tf.keras.utils.experimental.DatasetCreator(
    _eval_dataset, input_options=input_options)

接下来,准备完数据后,你将创建一个 TPUStrategy,并在该策略的范围内定义模型、指标和优化器 (Strategy.scope)。

你应该在 Model.compile 中为 steps_per_execution 选择一个数字,因为它指定了每次 tf.function 调用期间要运行的批次数量,对于性能至关重要。此参数类似于 TPUEstimator 中使用的 iterations_per_loop

在 TensorFlow 1 中通过 tf.tpu.experimental.embedding_column(和 tf.tpu.experimental.shared_embedding_column)指定的特征和表格配置,可以在 TensorFlow 2 中通过一对配置对象直接指定

(有关更多详细信息,请参阅相关的 API 文档。)

strategy = tf.distribute.TPUStrategy(cluster_resolver)
with strategy.scope():
  optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)
  dense_input = tf.keras.Input(shape=(2,), dtype=tf.float32, batch_size=global_batch_size)
  sparse_input = tf.keras.Input(shape=(), dtype=tf.int32, batch_size=global_batch_size)
  embedded_input = tfrs.layers.embedding.TPUEmbedding(
      feature_config=tf.tpu.experimental.embedding.FeatureConfig(
          table=tf.tpu.experimental.embedding.TableConfig(
              vocabulary_size=10,
              dim=5,
              initializer=tf.initializers.TruncatedNormal(mean=0.0, stddev=1)),
          name="sparse_input"),
      optimizer=optimizer)(sparse_input)
  input = tf.keras.layers.Concatenate(axis=1)([dense_input, embedded_input])
  result = tf.keras.layers.Dense(1)(input)
  model = tf.keras.Model(inputs={"dense_feature": dense_input, "sparse_feature": sparse_input}, outputs=result)
  model.compile(optimizer, "mse", steps_per_execution=10)
INFO:tensorflow:Found TPU system:
INFO:tensorflow:Found TPU system:
INFO:tensorflow:*** Num TPU Cores: 8
INFO:tensorflow:*** Num TPU Cores: 8
INFO:tensorflow:*** Num TPU Workers: 1
INFO:tensorflow:*** Num TPU Workers: 1
INFO:tensorflow:*** Num TPU Cores Per Worker: 8
INFO:tensorflow:*** Num TPU Cores Per Worker: 8
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)

有了这些,你就可以使用训练数据集训练模型了

model.fit(input_dataset, epochs=5, steps_per_epoch=10)
Epoch 1/5
10/10 [==============================] - 2s 175ms/step - loss: 0.0057
10/10 [==============================] - 0s 3ms/step - loss: 0.0000e+00
10/10 [==============================] - 0s 3ms/step - loss: 0.0000e+00
10/10 [==============================] - 0s 3ms/step - loss: 0.0000e+00
10/10 [==============================] - 0s 3ms/step - loss: 0.0000e+00
<keras.callbacks.History at 0x7f16803b4a90>

最后,使用评估数据集评估模型

model.evaluate(eval_dataset, steps=1, return_dict=True)
1/1 [==============================] - 1s 1s/step - loss: 12.2297
{'loss': 12.229663848876953}

后续步骤

在 API 文档中了解有关设置特定于 TPU 的嵌入的更多信息

有关 TensorFlow 2 中 TPUStrategy 的更多信息,请考虑以下资源

要了解有关自定义训练的更多信息,请参阅

TPU - 谷歌专为机器学习设计的专用 ASIC - 可通过 Google ColabTPU Research CloudCloud TPU 获得。