使用 TPU

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

本指南演示了如何在 Tensor 处理单元 (TPU) 和 TPU Pod(通过专用高速网络接口连接的 TPU 设备集合)上使用 tf.keras 和自定义训练循环执行基本训练。

TPU 是 Google 定制开发的专用集成电路 (ASIC),用于加速机器学习工作负载。它们可通过 Google ColabTPU 研究云Cloud TPU 获得。

设置

在运行此 Colab 笔记本之前,请确保您的硬件加速器是 TPU,方法是检查笔记本设置:运行时 > 更改运行时类型 > 硬件加速器 > TPU

导入一些必要的库,包括 TensorFlow 数据集

import tensorflow as tf

import os
import tensorflow_datasets as tfds
2023-06-09 12:13:32.486552: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT

TPU 初始化

TPU 通常是 Cloud TPU 工作器,它们与运行用户 Python 程序的本地进程不同。因此,您需要执行一些初始化工作才能连接到远程集群并初始化 TPU。请注意,传递给 tf.distribute.cluster_resolver.TPUClusterResolvertpu 参数是专门用于 Colab 的特殊地址。如果您在 Google Compute Engine (GCE) 上运行代码,则应改为传递 Cloud TPU 的名称。

resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(resolver)
# This is the TPU initialization code that has to be at the beginning.
tf.tpu.experimental.initialize_tpu_system(resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))
INFO:tensorflow:Deallocate tpu buffers before initializing tpu system.
2023-06-09 12:13:34.011755: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:266] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
INFO:tensorflow:Deallocate tpu buffers before initializing tpu system.
INFO:tensorflow:Initializing the TPU system: grpc://10.25.167.66:8470
INFO:tensorflow:Initializing the TPU system: grpc://10.25.167.66:8470
INFO:tensorflow:Finished initializing TPU system.
INFO:tensorflow:Finished initializing TPU system.
All devices:  [LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU')]

手动设备放置

TPU 初始化后,您可以使用手动设备放置将计算放置在单个 TPU 设备上

a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])

with tf.device('/TPU:0'):
  c = tf.matmul(a, b)

print("c device: ", c.device)
print(c)
c device:  /job:worker/replica:0/task:0/device:TPU:0
tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32)

分布式策略

通常,您会在多个 TPU 上以数据并行的方式运行模型。为了在多个 TPU(以及多个 GPU 或多台机器)上分布模型,TensorFlow 提供了 tf.distribute.Strategy API。您可以替换分布式策略,模型将在任何给定的(TPU)设备上运行。在 使用 TensorFlow 进行分布式训练 指南中了解更多信息。

使用 tf.distribute.TPUStrategy 选项实现同步分布式训练。TPU 提供了跨多个 TPU 内核的高效全约简和其他集体操作的实现,这些实现被用于 TPUStrategy 中。

为了演示这一点,创建一个 tf.distribute.TPUStrategy 对象。

strategy = tf.distribute.TPUStrategy(resolver)
INFO:tensorflow:Found TPU system:
INFO:tensorflow:Found TPU system:
INFO:tensorflow:*** Num TPU Cores: 8
INFO:tensorflow:*** Num TPU Cores: 8
INFO:tensorflow:*** Num TPU Workers: 1
INFO:tensorflow:*** Num TPU Workers: 1
INFO:tensorflow:*** Num TPU Cores Per Worker: 8
INFO:tensorflow:*** Num TPU Cores Per Worker: 8
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)

为了复制一个计算以便它可以在所有 TPU 内核中运行,可以将其传递到 Strategy.run API 中。以下是一个示例,它展示了所有内核接收相同的输入 (a, b) 并在每个内核上独立执行矩阵乘法。输出将是所有副本的值。

@tf.function
def matmul_fn(x, y):
  z = tf.matmul(x, y)
  return z

z = strategy.run(matmul_fn, args=(a, b))
print(z)
PerReplica:{
  0: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  1: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  2: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  3: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  4: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  5: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  6: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  7: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32)
}

TPU 上的分类

在介绍了基本概念之后,考虑一个更具体的例子。本节演示如何使用分布式策略 - tf.distribute.TPUStrategy - 在云 TPU 上训练 Keras 模型。

定义 Keras 模型

从定义一个 Sequential Keras 模型 开始,用于在 MNIST 数据集上进行图像分类。它与你在 CPU 或 GPU 上训练时使用的模型没有区别。请注意,Keras 模型创建需要在 Strategy.scope 内,以便变量可以在每个 TPU 设备上创建。代码的其他部分不需要在 Strategy 范围内。

def create_model():
  return tf.keras.Sequential(
      [tf.keras.layers.Conv2D(256, 3, activation='relu', input_shape=(28, 28, 1)),
       tf.keras.layers.Conv2D(256, 3, activation='relu'),
       tf.keras.layers.Flatten(),
       tf.keras.layers.Dense(256, activation='relu'),
       tf.keras.layers.Dense(128, activation='relu'),
       tf.keras.layers.Dense(10)])

加载数据集

在使用云 TPU 时,高效地使用 tf.data.Dataset API 至关重要。你可以在 输入管道性能指南 中了解更多关于数据集性能的信息。

如果你使用的是 TPU 节点,你需要将 TensorFlow Dataset 读取的所有数据文件存储在 Google Cloud Storage (GCS) 存储桶 中。如果你使用的是 TPU VM,你可以将数据存储在任何你想要的地方。有关 TPU 节点和 TPU VM 的更多信息,请参阅 TPU 系统架构 文档。

对于大多数用例,建议将你的数据转换为 TFRecord 格式,并使用 tf.data.TFRecordDataset 来读取它。查看 TFRecord 和 tf.Example 教程,了解如何执行此操作的详细信息。这不是一个硬性要求,你可以使用其他数据集读取器,例如 tf.data.FixedLengthRecordDatasettf.data.TextLineDataset

你可以使用 tf.data.Dataset.cache 将整个小型数据集加载到内存中。

无论使用哪种数据格式,强烈建议你使用大约 100MB 的大型文件。这在网络环境中尤其重要,因为打开文件的开销要高得多。

如以下代码所示,你应该使用 Tensorflow Datasets tfds.load 模块来获取 MNIST 训练和测试数据的副本。请注意,指定了 try_gcs 以使用公共 GCS 存储桶中可用的副本。如果你没有指定它,TPU 将无法访问下载的数据。

def get_dataset(batch_size, is_training=True):
  split = 'train' if is_training else 'test'
  dataset, info = tfds.load(name='mnist', split=split, with_info=True,
                            as_supervised=True, try_gcs=True)

  # Normalize the input data.
  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255.0
    return image, label

  dataset = dataset.map(scale)

  # Only shuffle and repeat the dataset in training. The advantage of having an
  # infinite dataset for training is to avoid the potential last partial batch
  # in each epoch, so that you don't need to think about scaling the gradients
  # based on the actual batch size.
  if is_training:
    dataset = dataset.shuffle(10000)
    dataset = dataset.repeat()

  dataset = dataset.batch(batch_size)

  return dataset

使用 Keras 高级 API 训练模型

你可以使用 Keras Model.fitModel.compile API 训练你的模型。此步骤中没有任何 TPU 特定的内容 - 你编写代码就像使用多个 GPU 和 MirroredStrategy 而不是 TPUStrategy 一样。你可以在 使用 Keras 进行分布式训练 教程中了解更多信息。

with strategy.scope():
  model = create_model()
  model.compile(optimizer='adam',
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['sparse_categorical_accuracy'])

batch_size = 200
steps_per_epoch = 60000 // batch_size
validation_steps = 10000 // batch_size

train_dataset = get_dataset(batch_size, is_training=True)
test_dataset = get_dataset(batch_size, is_training=False)

model.fit(train_dataset,
          epochs=5,
          steps_per_epoch=steps_per_epoch,
          validation_data=test_dataset,
          validation_steps=validation_steps)
Epoch 1/5
300/300 [==============================] - 17s 32ms/step - loss: 0.1235 - sparse_categorical_accuracy: 0.9620 - val_loss: 0.0462 - val_sparse_categorical_accuracy: 0.9856
Epoch 2/5
300/300 [==============================] - 7s 24ms/step - loss: 0.0333 - sparse_categorical_accuracy: 0.9894 - val_loss: 0.0401 - val_sparse_categorical_accuracy: 0.9878
Epoch 3/5
300/300 [==============================] - 7s 24ms/step - loss: 0.0186 - sparse_categorical_accuracy: 0.9938 - val_loss: 0.0352 - val_sparse_categorical_accuracy: 0.9900
Epoch 4/5
300/300 [==============================] - 7s 25ms/step - loss: 0.0127 - sparse_categorical_accuracy: 0.9957 - val_loss: 0.0482 - val_sparse_categorical_accuracy: 0.9879
Epoch 5/5
300/300 [==============================] - 7s 24ms/step - loss: 0.0111 - sparse_categorical_accuracy: 0.9962 - val_loss: 0.0448 - val_sparse_categorical_accuracy: 0.9894
<keras.callbacks.History at 0x7f79107c8d30>

为了减少 Python 开销并最大限度地提高 TPU 的性能,将 steps_per_execution 参数传递给 Keras Model.compile。在这个例子中,它将吞吐量提高了大约 50%。

with strategy.scope():
  model = create_model()
  model.compile(optimizer='adam',
                # Anything between 2 and `steps_per_epoch` could help here.
                steps_per_execution = 50,
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['sparse_categorical_accuracy'])

model.fit(train_dataset,
          epochs=5,
          steps_per_epoch=steps_per_epoch,
          validation_data=test_dataset,
          validation_steps=validation_steps)
Epoch 1/5
300/300 [==============================] - 14s 45ms/step - loss: 0.1306 - sparse_categorical_accuracy: 0.9591 - val_loss: 0.0420 - val_sparse_categorical_accuracy: 0.9863
Epoch 2/5
300/300 [==============================] - 3s 10ms/step - loss: 0.0333 - sparse_categorical_accuracy: 0.9900 - val_loss: 0.0502 - val_sparse_categorical_accuracy: 0.9846
Epoch 3/5
300/300 [==============================] - 3s 10ms/step - loss: 0.0193 - sparse_categorical_accuracy: 0.9936 - val_loss: 0.0406 - val_sparse_categorical_accuracy: 0.9879
Epoch 4/5
300/300 [==============================] - 3s 10ms/step - loss: 0.0135 - sparse_categorical_accuracy: 0.9955 - val_loss: 0.0416 - val_sparse_categorical_accuracy: 0.9882
Epoch 5/5
300/300 [==============================] - 3s 10ms/step - loss: 0.0110 - sparse_categorical_accuracy: 0.9962 - val_loss: 0.0463 - val_sparse_categorical_accuracy: 0.9882
<keras.callbacks.History at 0x7f7898488e20>

使用自定义训练循环训练模型

你也可以使用 tf.functiontf.distribute API 直接创建和训练你的模型。你可以使用 Strategy.experimental_distribute_datasets_from_function API 来分发给定数据集函数的 tf.data.Dataset。请注意,在下面的示例中,传递到 Dataset 的批次大小是每个副本的批次大小,而不是全局批次大小。要了解更多信息,请查看 使用 tf.distribute.Strategy 进行自定义训练 教程。

首先,创建模型、数据集和 tf.function

# Create the model, optimizer and metrics inside the `tf.distribute.Strategy`
# scope, so that the variables can be mirrored on each device.
with strategy.scope():
  model = create_model()
  optimizer = tf.keras.optimizers.Adam()
  training_loss = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
  training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      'training_accuracy', dtype=tf.float32)

# Calculate per replica batch size, and distribute the `tf.data.Dataset`s
# on each TPU worker.
per_replica_batch_size = batch_size // strategy.num_replicas_in_sync

train_dataset = strategy.experimental_distribute_datasets_from_function(
    lambda _: get_dataset(per_replica_batch_size, is_training=True))

@tf.function
def train_step(iterator):
  """The step function for one training step."""

  def step_fn(inputs):
    """The computation to run on each TPU device."""
    images, labels = inputs
    with tf.GradientTape() as tape:
      logits = model(images, training=True)
      loss = tf.keras.losses.sparse_categorical_crossentropy(
          labels, logits, from_logits=True)
      loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
    training_loss.update_state(loss * strategy.num_replicas_in_sync)
    training_accuracy.update_state(labels, logits)

  strategy.run(step_fn, args=(next(iterator),))
WARNING:tensorflow:From /tmpfs/tmp/ipykernel_9094/1509474074.py:14: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version.
Instructions for updating:
rename to distribute_datasets_from_function
WARNING:tensorflow:From /tmpfs/tmp/ipykernel_9094/1509474074.py:14: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version.
Instructions for updating:
rename to distribute_datasets_from_function

然后,运行训练循环。

steps_per_eval = 10000 // batch_size

train_iterator = iter(train_dataset)
for epoch in range(5):
  print('Epoch: {}/5'.format(epoch))

  for step in range(steps_per_epoch):
    train_step(train_iterator)
  print('Current step: {}, training loss: {}, accuracy: {}%'.format(
      optimizer.iterations.numpy(),
      round(float(training_loss.result()), 4),
      round(float(training_accuracy.result()) * 100, 2)))
  training_loss.reset_states()
  training_accuracy.reset_states()
Epoch: 0/5
Current step: 300, training loss: 0.1465, accuracy: 95.4%
Epoch: 1/5
Current step: 600, training loss: 0.035, accuracy: 98.94%
Epoch: 2/5
Current step: 900, training loss: 0.0197, accuracy: 99.39%
Epoch: 3/5
Current step: 1200, training loss: 0.0126, accuracy: 99.59%
Epoch: 4/5
Current step: 1500, training loss: 0.0109, accuracy: 99.64%

tf.function 中使用多个步骤提高性能

你可以在 tf.function 中运行多个步骤来提高性能。这是通过将 Strategy.run 调用包装在 tf.range 中,并在 tf.function 内,AutoGraph 将将其转换为 TPU 工作器上的 tf.while_loop。你可以在 使用 tf.function 提高性能 指南中了解更多关于 tf.function 的信息。

尽管性能有所提高,但与在 tf.function 中运行单个步骤相比,这种方法存在一些权衡。在 tf.function 中运行多个步骤的灵活性较低 - 你无法在步骤中运行任何东西或任意 Python 代码。

@tf.function
def train_multiple_steps(iterator, steps):
  """The step function for one training step."""

  def step_fn(inputs):
    """The computation to run on each TPU device."""
    images, labels = inputs
    with tf.GradientTape() as tape:
      logits = model(images, training=True)
      loss = tf.keras.losses.sparse_categorical_crossentropy(
          labels, logits, from_logits=True)
      loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
    training_loss.update_state(loss * strategy.num_replicas_in_sync)
    training_accuracy.update_state(labels, logits)

  for _ in tf.range(steps):
    strategy.run(step_fn, args=(next(iterator),))

# Convert `steps_per_epoch` to `tf.Tensor` so the `tf.function` won't get
# retraced if the value changes.
train_multiple_steps(train_iterator, tf.convert_to_tensor(steps_per_epoch))

print('Current step: {}, training loss: {}, accuracy: {}%'.format(
      optimizer.iterations.numpy(),
      round(float(training_loss.result()), 4),
      round(float(training_accuracy.result()) * 100, 2)))
Current step: 1800, training loss: 0.009, accuracy: 99.72%

下一步

要了解有关云 TPU 及其使用方法的更多信息