在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
本指南演示了如何在 Tensor 处理单元 (TPU) 和 TPU Pod(通过专用高速网络接口连接的 TPU 设备集合)上使用 tf.keras
和自定义训练循环执行基本训练。
TPU 是 Google 定制开发的专用集成电路 (ASIC),用于加速机器学习工作负载。它们可通过 Google Colab、TPU 研究云 和 Cloud TPU 获得。
设置
在运行此 Colab 笔记本之前,请确保您的硬件加速器是 TPU,方法是检查笔记本设置:运行时 > 更改运行时类型 > 硬件加速器 > TPU。
导入一些必要的库,包括 TensorFlow 数据集
import tensorflow as tf
import os
import tensorflow_datasets as tfds
2023-06-09 12:13:32.486552: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT
TPU 初始化
TPU 通常是 Cloud TPU 工作器,它们与运行用户 Python 程序的本地进程不同。因此,您需要执行一些初始化工作才能连接到远程集群并初始化 TPU。请注意,传递给 tf.distribute.cluster_resolver.TPUClusterResolver
的 tpu
参数是专门用于 Colab 的特殊地址。如果您在 Google Compute Engine (GCE) 上运行代码,则应改为传递 Cloud TPU 的名称。
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(resolver)
# This is the TPU initialization code that has to be at the beginning.
tf.tpu.experimental.initialize_tpu_system(resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))
INFO:tensorflow:Deallocate tpu buffers before initializing tpu system. 2023-06-09 12:13:34.011755: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:266] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected INFO:tensorflow:Deallocate tpu buffers before initializing tpu system. INFO:tensorflow:Initializing the TPU system: grpc://10.25.167.66:8470 INFO:tensorflow:Initializing the TPU system: grpc://10.25.167.66:8470 INFO:tensorflow:Finished initializing TPU system. INFO:tensorflow:Finished initializing TPU system. All devices: [LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU')]
手动设备放置
TPU 初始化后,您可以使用手动设备放置将计算放置在单个 TPU 设备上
a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
with tf.device('/TPU:0'):
c = tf.matmul(a, b)
print("c device: ", c.device)
print(c)
c device: /job:worker/replica:0/task:0/device:TPU:0 tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32)
分布式策略
通常,您会在多个 TPU 上以数据并行的方式运行模型。为了在多个 TPU(以及多个 GPU 或多台机器)上分布模型,TensorFlow 提供了 tf.distribute.Strategy
API。您可以替换分布式策略,模型将在任何给定的(TPU)设备上运行。在 使用 TensorFlow 进行分布式训练 指南中了解更多信息。
使用 tf.distribute.TPUStrategy
选项实现同步分布式训练。TPU 提供了跨多个 TPU 内核的高效全约简和其他集体操作的实现,这些实现被用于 TPUStrategy
中。
为了演示这一点,创建一个 tf.distribute.TPUStrategy
对象。
strategy = tf.distribute.TPUStrategy(resolver)
INFO:tensorflow:Found TPU system: INFO:tensorflow:Found TPU system: INFO:tensorflow:*** Num TPU Cores: 8 INFO:tensorflow:*** Num TPU Cores: 8 INFO:tensorflow:*** Num TPU Workers: 1 INFO:tensorflow:*** Num TPU Workers: 1 INFO:tensorflow:*** Num TPU Cores Per Worker: 8 INFO:tensorflow:*** Num TPU Cores Per Worker: 8 INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)
为了复制一个计算以便它可以在所有 TPU 内核中运行,可以将其传递到 Strategy.run
API 中。以下是一个示例,它展示了所有内核接收相同的输入 (a, b)
并在每个内核上独立执行矩阵乘法。输出将是所有副本的值。
@tf.function
def matmul_fn(x, y):
z = tf.matmul(x, y)
return z
z = strategy.run(matmul_fn, args=(a, b))
print(z)
PerReplica:{ 0: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 1: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 2: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 3: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 4: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 5: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 6: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 7: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32) }
TPU 上的分类
在介绍了基本概念之后,考虑一个更具体的例子。本节演示如何使用分布式策略 - tf.distribute.TPUStrategy
- 在云 TPU 上训练 Keras 模型。
定义 Keras 模型
从定义一个 Sequential
Keras 模型 开始,用于在 MNIST 数据集上进行图像分类。它与你在 CPU 或 GPU 上训练时使用的模型没有区别。请注意,Keras 模型创建需要在 Strategy.scope
内,以便变量可以在每个 TPU 设备上创建。代码的其他部分不需要在 Strategy
范围内。
def create_model():
return tf.keras.Sequential(
[tf.keras.layers.Conv2D(256, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(256, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)])
加载数据集
在使用云 TPU 时,高效地使用 tf.data.Dataset
API 至关重要。你可以在 输入管道性能指南 中了解更多关于数据集性能的信息。
如果你使用的是 TPU 节点,你需要将 TensorFlow Dataset
读取的所有数据文件存储在 Google Cloud Storage (GCS) 存储桶 中。如果你使用的是 TPU VM,你可以将数据存储在任何你想要的地方。有关 TPU 节点和 TPU VM 的更多信息,请参阅 TPU 系统架构 文档。
对于大多数用例,建议将你的数据转换为 TFRecord
格式,并使用 tf.data.TFRecordDataset
来读取它。查看 TFRecord 和 tf.Example 教程,了解如何执行此操作的详细信息。这不是一个硬性要求,你可以使用其他数据集读取器,例如 tf.data.FixedLengthRecordDataset
或 tf.data.TextLineDataset
。
你可以使用 tf.data.Dataset.cache
将整个小型数据集加载到内存中。
无论使用哪种数据格式,强烈建议你使用大约 100MB 的大型文件。这在网络环境中尤其重要,因为打开文件的开销要高得多。
如以下代码所示,你应该使用 Tensorflow Datasets tfds.load
模块来获取 MNIST 训练和测试数据的副本。请注意,指定了 try_gcs
以使用公共 GCS 存储桶中可用的副本。如果你没有指定它,TPU 将无法访问下载的数据。
def get_dataset(batch_size, is_training=True):
split = 'train' if is_training else 'test'
dataset, info = tfds.load(name='mnist', split=split, with_info=True,
as_supervised=True, try_gcs=True)
# Normalize the input data.
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255.0
return image, label
dataset = dataset.map(scale)
# Only shuffle and repeat the dataset in training. The advantage of having an
# infinite dataset for training is to avoid the potential last partial batch
# in each epoch, so that you don't need to think about scaling the gradients
# based on the actual batch size.
if is_training:
dataset = dataset.shuffle(10000)
dataset = dataset.repeat()
dataset = dataset.batch(batch_size)
return dataset
使用 Keras 高级 API 训练模型
你可以使用 Keras Model.fit
和 Model.compile
API 训练你的模型。此步骤中没有任何 TPU 特定的内容 - 你编写代码就像使用多个 GPU 和 MirroredStrategy
而不是 TPUStrategy
一样。你可以在 使用 Keras 进行分布式训练 教程中了解更多信息。
with strategy.scope():
model = create_model()
model.compile(optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['sparse_categorical_accuracy'])
batch_size = 200
steps_per_epoch = 60000 // batch_size
validation_steps = 10000 // batch_size
train_dataset = get_dataset(batch_size, is_training=True)
test_dataset = get_dataset(batch_size, is_training=False)
model.fit(train_dataset,
epochs=5,
steps_per_epoch=steps_per_epoch,
validation_data=test_dataset,
validation_steps=validation_steps)
Epoch 1/5 300/300 [==============================] - 17s 32ms/step - loss: 0.1235 - sparse_categorical_accuracy: 0.9620 - val_loss: 0.0462 - val_sparse_categorical_accuracy: 0.9856 Epoch 2/5 300/300 [==============================] - 7s 24ms/step - loss: 0.0333 - sparse_categorical_accuracy: 0.9894 - val_loss: 0.0401 - val_sparse_categorical_accuracy: 0.9878 Epoch 3/5 300/300 [==============================] - 7s 24ms/step - loss: 0.0186 - sparse_categorical_accuracy: 0.9938 - val_loss: 0.0352 - val_sparse_categorical_accuracy: 0.9900 Epoch 4/5 300/300 [==============================] - 7s 25ms/step - loss: 0.0127 - sparse_categorical_accuracy: 0.9957 - val_loss: 0.0482 - val_sparse_categorical_accuracy: 0.9879 Epoch 5/5 300/300 [==============================] - 7s 24ms/step - loss: 0.0111 - sparse_categorical_accuracy: 0.9962 - val_loss: 0.0448 - val_sparse_categorical_accuracy: 0.9894 <keras.callbacks.History at 0x7f79107c8d30>
为了减少 Python 开销并最大限度地提高 TPU 的性能,将 steps_per_execution
参数传递给 Keras Model.compile
。在这个例子中,它将吞吐量提高了大约 50%。
with strategy.scope():
model = create_model()
model.compile(optimizer='adam',
# Anything between 2 and `steps_per_epoch` could help here.
steps_per_execution = 50,
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['sparse_categorical_accuracy'])
model.fit(train_dataset,
epochs=5,
steps_per_epoch=steps_per_epoch,
validation_data=test_dataset,
validation_steps=validation_steps)
Epoch 1/5 300/300 [==============================] - 14s 45ms/step - loss: 0.1306 - sparse_categorical_accuracy: 0.9591 - val_loss: 0.0420 - val_sparse_categorical_accuracy: 0.9863 Epoch 2/5 300/300 [==============================] - 3s 10ms/step - loss: 0.0333 - sparse_categorical_accuracy: 0.9900 - val_loss: 0.0502 - val_sparse_categorical_accuracy: 0.9846 Epoch 3/5 300/300 [==============================] - 3s 10ms/step - loss: 0.0193 - sparse_categorical_accuracy: 0.9936 - val_loss: 0.0406 - val_sparse_categorical_accuracy: 0.9879 Epoch 4/5 300/300 [==============================] - 3s 10ms/step - loss: 0.0135 - sparse_categorical_accuracy: 0.9955 - val_loss: 0.0416 - val_sparse_categorical_accuracy: 0.9882 Epoch 5/5 300/300 [==============================] - 3s 10ms/step - loss: 0.0110 - sparse_categorical_accuracy: 0.9962 - val_loss: 0.0463 - val_sparse_categorical_accuracy: 0.9882 <keras.callbacks.History at 0x7f7898488e20>
使用自定义训练循环训练模型
你也可以使用 tf.function
和 tf.distribute
API 直接创建和训练你的模型。你可以使用 Strategy.experimental_distribute_datasets_from_function
API 来分发给定数据集函数的 tf.data.Dataset
。请注意,在下面的示例中,传递到 Dataset
的批次大小是每个副本的批次大小,而不是全局批次大小。要了解更多信息,请查看 使用 tf.distribute.Strategy
进行自定义训练 教程。
首先,创建模型、数据集和 tf.function
。
# Create the model, optimizer and metrics inside the `tf.distribute.Strategy`
# scope, so that the variables can be mirrored on each device.
with strategy.scope():
model = create_model()
optimizer = tf.keras.optimizers.Adam()
training_loss = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
'training_accuracy', dtype=tf.float32)
# Calculate per replica batch size, and distribute the `tf.data.Dataset`s
# on each TPU worker.
per_replica_batch_size = batch_size // strategy.num_replicas_in_sync
train_dataset = strategy.experimental_distribute_datasets_from_function(
lambda _: get_dataset(per_replica_batch_size, is_training=True))
@tf.function
def train_step(iterator):
"""The step function for one training step."""
def step_fn(inputs):
"""The computation to run on each TPU device."""
images, labels = inputs
with tf.GradientTape() as tape:
logits = model(images, training=True)
loss = tf.keras.losses.sparse_categorical_crossentropy(
labels, logits, from_logits=True)
loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
training_loss.update_state(loss * strategy.num_replicas_in_sync)
training_accuracy.update_state(labels, logits)
strategy.run(step_fn, args=(next(iterator),))
WARNING:tensorflow:From /tmpfs/tmp/ipykernel_9094/1509474074.py:14: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: rename to distribute_datasets_from_function WARNING:tensorflow:From /tmpfs/tmp/ipykernel_9094/1509474074.py:14: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: rename to distribute_datasets_from_function
然后,运行训练循环。
steps_per_eval = 10000 // batch_size
train_iterator = iter(train_dataset)
for epoch in range(5):
print('Epoch: {}/5'.format(epoch))
for step in range(steps_per_epoch):
train_step(train_iterator)
print('Current step: {}, training loss: {}, accuracy: {}%'.format(
optimizer.iterations.numpy(),
round(float(training_loss.result()), 4),
round(float(training_accuracy.result()) * 100, 2)))
training_loss.reset_states()
training_accuracy.reset_states()
Epoch: 0/5 Current step: 300, training loss: 0.1465, accuracy: 95.4% Epoch: 1/5 Current step: 600, training loss: 0.035, accuracy: 98.94% Epoch: 2/5 Current step: 900, training loss: 0.0197, accuracy: 99.39% Epoch: 3/5 Current step: 1200, training loss: 0.0126, accuracy: 99.59% Epoch: 4/5 Current step: 1500, training loss: 0.0109, accuracy: 99.64%
在 tf.function
中使用多个步骤提高性能
你可以在 tf.function
中运行多个步骤来提高性能。这是通过将 Strategy.run
调用包装在 tf.range
中,并在 tf.function
内,AutoGraph 将将其转换为 TPU 工作器上的 tf.while_loop
。你可以在 使用 tf.function
提高性能 指南中了解更多关于 tf.function
的信息。
尽管性能有所提高,但与在 tf.function
中运行单个步骤相比,这种方法存在一些权衡。在 tf.function
中运行多个步骤的灵活性较低 - 你无法在步骤中运行任何东西或任意 Python 代码。
@tf.function
def train_multiple_steps(iterator, steps):
"""The step function for one training step."""
def step_fn(inputs):
"""The computation to run on each TPU device."""
images, labels = inputs
with tf.GradientTape() as tape:
logits = model(images, training=True)
loss = tf.keras.losses.sparse_categorical_crossentropy(
labels, logits, from_logits=True)
loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
training_loss.update_state(loss * strategy.num_replicas_in_sync)
training_accuracy.update_state(labels, logits)
for _ in tf.range(steps):
strategy.run(step_fn, args=(next(iterator),))
# Convert `steps_per_epoch` to `tf.Tensor` so the `tf.function` won't get
# retraced if the value changes.
train_multiple_steps(train_iterator, tf.convert_to_tensor(steps_per_epoch))
print('Current step: {}, training loss: {}, accuracy: {}%'.format(
optimizer.iterations.numpy(),
round(float(training_loss.result()), 4),
round(float(training_accuracy.result()) * 100, 2)))
Current step: 1800, training loss: 0.009, accuracy: 99.72%
下一步
要了解有关云 TPU 及其使用方法的更多信息
- Google Cloud TPU: Google Cloud TPU 主页。
- Google Cloud TPU 文档: Google Cloud TPU 文档,其中包括
- Cloud TPU 简介: 使用 Cloud TPU 的概述。
- Cloud TPU 快速入门: 使用 TensorFlow 和其他主要机器学习框架使用 Cloud TPU VM 的快速入门介绍。
- Google Cloud TPU Colab 笔记本: 端到端训练示例。
- Google Cloud TPU 性能指南: 通过调整 Cloud TPU 配置参数以适应你的应用程序,进一步提高 Cloud TPU 性能。
- 使用 TensorFlow 进行分布式训练: 如何使用分布式策略 - 包括
tf.distribute.TPUStrategy
- 并提供展示最佳实践的示例。 - TPU 嵌入:TensorFlow 包含专门支持通过
tf.tpu.experimental.embedding
在 TPU 上训练嵌入。此外,TensorFlow Recommenders 有tfrs.layers.embedding.TPUEmbedding
。嵌入提供高效且密集的表示,捕获特征之间复杂的相似性和关系。TensorFlow 的 TPU 特定嵌入支持允许你训练比单个 TPU 设备内存更大的嵌入,并在 TPU 上使用稀疏和不规则输入。 - TPU 研究云 (TRC): TRC 使研究人员能够申请访问超过 1,000 个 Cloud TPU 设备的集群。