在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
概述
tf.distribute.Strategy
是 TensorFlow API,用于将训练分布到多个 GPU、多台机器或 TPU 上。使用此 API,您可以用最少的代码更改来分布现有模型和训练代码。
tf.distribute.Strategy
的设计考虑了以下关键目标
- 易于使用,并支持多个用户群体,包括研究人员、机器学习工程师等。
- 提供开箱即用的良好性能。
- 轻松切换策略。
您可以使用 tf.distribute.Strategy
和 Keras 等高级 API 进行分布式训练 Model.fit
,以及 自定义训练循环(以及一般情况下使用 TensorFlow 的任何计算)。
在 TensorFlow 2.x 中,您可以以 Eager 模式执行程序,也可以使用 tf.function
在图中执行。 tf.distribute.Strategy
旨在支持这两种执行模式,但最适合与 tf.function
一起使用。仅建议将 Eager 模式用于调试目的,不支持 tf.distribute.TPUStrategy
。虽然本指南的重点是训练,但此 API 也可用于在不同平台上进行分布式评估和预测。
您可以使用 tf.distribute.Strategy
对代码进行很少的更改,因为 TensorFlow 的底层组件已更改为支持策略。这包括变量、层、模型、优化器、指标、摘要和检查点。
在本指南中,您将了解各种策略类型以及如何在不同情况下使用它们。要了解如何调试性能问题,请查看 优化 TensorFlow GPU 性能 指南。
设置 TensorFlow
import tensorflow as tf
策略类型
tf.distribute.Strategy
旨在涵盖不同轴上的许多用例。目前支持其中一些组合,其他组合将在未来添加。这些轴中的一些是
- 同步与异步训练: 这是两种常见的分布式训练方法,使用数据并行。在同步训练中,所有工作器同步地对输入数据的不同切片进行训练,并在每一步聚合梯度。在异步训练中,所有工作器独立地对输入数据进行训练并异步更新变量。通常,同步训练通过全减少来支持,异步训练通过参数服务器架构来支持。
- 硬件平台: 您可能希望将训练扩展到一台机器上的多个 GPU,或网络中的多台机器(每台机器有 0 个或多个 GPU),或 Cloud TPU 上。
为了支持这些用例,TensorFlow 有 MirroredStrategy
、TPUStrategy
、MultiWorkerMirroredStrategy
、ParameterServerStrategy
、CentralStorageStrategy
以及其他可用的策略。下一节将解释 TensorFlow 中哪些场景支持这些策略。以下是一个快速概述
训练 API | MirroredStrategy |
TPUStrategy |
MultiWorkerMirroredStrategy |
CentralStorageStrategy |
ParameterServerStrategy |
---|---|---|---|---|---|
Keras Model.fit |
支持 | 支持 | 支持 | 实验性支持 | 实验性支持 |
自定义训练循环 | 支持 | 支持 | 支持 | 实验性支持 | 实验性支持 |
Estimator API | 有限支持 | 不支持 | 有限支持 | 有限支持 | 有限支持 |
MirroredStrategy
tf.distribute.MirroredStrategy
支持在一台机器上的多个 GPU 上进行同步分布式训练。它为每个 GPU 设备创建一个副本。模型中的每个变量都在所有副本中镜像。这些变量共同构成一个称为 MirroredVariable
的单个概念变量。这些变量通过应用相同的更新来保持同步。
高效的全减少算法用于在设备之间通信变量更新。全减少通过将张量加起来在所有设备上聚合张量,并在每个设备上提供它们。它是一种融合算法,非常高效,可以显着降低同步开销。根据设备之间可用的通信类型,有许多全减少算法和实现可用。默认情况下,它使用 NVIDIA 集体通信库 (NCCL) 作为全减少实现。您可以从其他几个选项中选择,也可以编写自己的选项。
以下是如何创建 MirroredStrategy
的最简单方法
mirrored_strategy = tf.distribute.MirroredStrategy()
这将创建一个 MirroredStrategy
实例,该实例将使用对 TensorFlow 可见的全部 GPU 以及 NCCL 作为跨设备通信。
如果您希望仅使用机器上的部分 GPU,则可以像这样操作
mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
如果您希望覆盖跨设备通信,则可以使用 cross_device_ops
参数通过提供 tf.distribute.CrossDeviceOps
的实例来实现。目前,tf.distribute.HierarchicalCopyAllReduce
和 tf.distribute.ReductionToOneDevice
是除 tf.distribute.NcclAllReduce
之外的两个选项,后者是默认选项。
mirrored_strategy = tf.distribute.MirroredStrategy(
cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
TPUStrategy
tf.distribute.TPUStrategy
使您能够在 张量处理单元 (TPU) 上运行 TensorFlow 训练。TPU 是 Google 专为大幅加速机器学习工作负载而设计的专用 ASIC。它们在 Google Colab、TPU 研究云 和 Cloud TPU 上可用。
在分布式训练架构方面,TPUStrategy
与 MirroredStrategy
相同,它实现了同步分布式训练。TPU 提供了它们自己的高效全减少和其他集体操作的实现,这些操作跨多个 TPU 内核进行,这些内核在 TPUStrategy
中使用。
以下是如何实例化 TPUStrategy
cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)
该 TPUClusterResolver
实例有助于定位 TPU。在 Colab 中,您无需为其指定任何参数。
如果您希望将其用于 Cloud TPU
- 您必须在
tpu
参数中指定 TPU 资源的名称。 - 您必须在程序的开头显式初始化 TPU 系统。这是在 TPU 可用于计算之前所需的。初始化 TPU 系统还会清除 TPU 内存,因此在执行此步骤之前完成此步骤非常重要,以避免丢失状态。
MultiWorkerMirroredStrategy
tf.distribute.MultiWorkerMirroredStrategy
与 MirroredStrategy
非常相似。它在多个工作器上实现了同步分布式训练,每个工作器可能具有多个 GPU。与 tf.distribute.MirroredStrategy
相似,它在所有工作器上的每个设备上创建模型中所有变量的副本。
以下是如何创建 MultiWorkerMirroredStrategy
的最简单方法
strategy = tf.distribute.MultiWorkerMirroredStrategy()
MultiWorkerMirroredStrategy
有两种跨设备通信的实现。 CommunicationImplementation.RING
是基于 RPC 的,支持 CPU 和 GPU。 CommunicationImplementation.NCCL
使用 NCCL 并提供 GPU 上的最佳性能,但它不支持 CPU。 CollectiveCommunication.AUTO
将选择权推迟到 Tensorflow。您可以通过以下方式指定它们
communication_options = tf.distribute.experimental.CommunicationOptions(
implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
communication_options=communication_options)
与多 GPU 训练相比,启动多工作器训练的关键区别之一是多工作器设置。 'TF_CONFIG'
环境变量是 TensorFlow 中的标准方法,用于向集群中包含的每个工作器指定集群配置。在本文档的 设置 TF_CONFIG 部分 中了解更多信息。
有关 MultiWorkerMirroredStrategy
的更多详细信息,请考虑以下教程
ParameterServerStrategy
参数服务器训练是一种常见的数据并行方法,用于在多台机器上扩展模型训练。参数服务器训练集群由工作器和参数服务器组成。变量是在参数服务器上创建的,它们在每一步由工作器读取和更新。有关详细信息,请查看 参数服务器训练 教程。
在 TensorFlow 2 中,参数服务器训练使用基于中央协调器的架构,通过 tf.distribute.experimental.coordinator.ClusterCoordinator
类来实现。
在此实现中, worker
和 parameter server
任务运行 tf.distribute.Server
,这些服务器监听来自协调器的任务。协调器创建资源、调度训练任务、写入检查点并处理任务故障。
在协调器上运行的程序中,您将使用 ParameterServerStrategy
对象来定义训练步骤,并使用 ClusterCoordinator
将训练步骤调度到远程工作器。以下是如何创建它们的简单方法
strategy = tf.distribute.experimental.ParameterServerStrategy(
tf.distribute.cluster_resolver.TFConfigClusterResolver(),
variable_partitioner=variable_partitioner)
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
strategy)
要详细了解 ParameterServerStrategy
,请查看 使用 Keras Model.fit 和自定义训练循环进行参数服务器训练 教程。
在 TensorFlow 1 中, ParameterServerStrategy
仅通过 tf.compat.v1.distribute.experimental.ParameterServerStrategy
符号与 Estimator 一起使用。
CentralStorageStrategy
tf.distribute.experimental.CentralStorageStrategy
也进行同步训练。变量不会镜像,而是放置在 CPU 上,操作在所有本地 GPU 上复制。如果只有一个 GPU,所有变量和操作都将放置在该 GPU 上。
通过以下方式创建 CentralStorageStrategy
的实例
central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
这将创建一个 CentralStorageStrategy
实例,该实例将使用所有可见的 GPU 和 CPU。在将更新应用于变量之前,将聚合副本上的变量更新。
其他策略
除了上述策略之外,还有两种其他策略可能在使用 tf.distribute
API 进行原型设计和调试时有用。
默认策略
默认策略是一种分布式策略,当没有显式分布式策略在作用域内时存在。它实现了 tf.distribute.Strategy
接口,但只是一个直通,不提供实际的分布。例如,Strategy.run(fn)
将简单地调用 fn
。使用此策略编写的代码应该与没有使用任何策略编写的代码的行为完全相同。您可以将其视为一种“无操作”策略。
默认策略是一个单例——不能创建它的更多实例。可以使用 tf.distribute.get_strategy
在任何显式策略作用域之外获取它(与在显式策略作用域内获取当前策略相同的 API)。
default_strategy = tf.distribute.get_strategy()
此策略有两个主要目的
- 它允许无条件地编写分布式感知库代码。例如,在
tf.keras.optimizers
中,您可以使用tf.distribute.get_strategy
并使用该策略来减少梯度——它将始终返回一个可以在其上调用Strategy.reduce
API 的策略对象。
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None) # reduce some values
- 与库代码类似,它可以用来编写最终用户的程序,以便在有或没有分布式策略的情况下工作,而不需要条件逻辑。以下是一个说明此示例的代码片段
if tf.config.list_physical_devices('GPU'):
strategy = tf.distribute.MirroredStrategy()
else: # Use the Default Strategy
strategy = tf.distribute.get_strategy()
with strategy.scope():
# Do something interesting
print(tf.Variable(1.))
OneDeviceStrategy
tf.distribute.OneDeviceStrategy
是一种将所有变量和计算放置在单个指定设备上的策略。
strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")
此策略在许多方面不同于默认策略。在默认策略中,与在没有使用任何分布式策略的情况下运行 TensorFlow 相比,变量放置逻辑保持不变。但是,当使用 OneDeviceStrategy
时,在其作用域内创建的所有变量都将显式放置在指定的设备上。此外,通过 OneDeviceStrategy.run
调用的任何函数也将被放置在指定的设备上。
通过此策略分发的输入将被预取到指定的设备。在默认策略中,没有输入分布。
与默认策略类似,此策略也可以用来在切换到实际将数据分布到多个设备/机器的其他策略之前测试您的代码。这将比默认策略更多地练习分布式策略机制,但不会像使用 MirroredStrategy
或 TPUStrategy
那样达到全部程度。如果您想要的行为与没有策略相同的代码,那么请使用默认策略。
到目前为止,您已经了解了不同的策略以及如何实例化它们。接下来的几节将展示使用它们来分布您的训练的不同方法。
将 tf.distribute.Strategy 与 Keras Model.fit 一起使用
tf.distribute.Strategy
集成到 tf.keras
中,它是 TensorFlow 对 Keras API 规范 的实现。 tf.keras
是一个用于构建和训练模型的高级 API。通过集成到 tf.keras
后端,您可以无缝地将您在 Keras 训练框架中编写的训练进行分布 使用 Model.fit。
以下是您需要在代码中更改的内容
- 创建一个适当的
tf.distribute.Strategy
实例。 - 将 Keras 模型、优化器和指标的创建移到
strategy.scope
内。因此,模型的call()
、train_step()
和test_step()
方法中的代码都将被分布并在加速器上执行。
TensorFlow 分布式策略支持所有类型的 Keras 模型——Sequential、Functional 和 子类化
以下是一个代码片段,用于对具有一个 Dense
层的非常简单的 Keras 模型执行此操作
mirrored_strategy = tf.distribute.MirroredStrategy()
with mirrored_strategy.scope():
model = tf.keras.Sequential([
tf.keras.layers.Dense(1, input_shape=(1,),
kernel_regularizer=tf.keras.regularizers.L2(1e-4))])
model.compile(loss='mse', optimizer='sgd')
此示例使用 MirroredStrategy
,因此您可以在具有多个 GPU 的机器上运行它。 strategy.scope()
指示 Keras 使用哪个策略来分布训练。在该作用域内创建模型/优化器/指标允许您创建分布式变量而不是普通变量。设置好之后,您就可以像平常一样拟合您的模型。 MirroredStrategy
会负责在可用的 GPU 上复制模型的训练、聚合梯度等等。
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
这里一个 tf.data.Dataset
提供训练和评估输入。您也可以使用 NumPy 数组
import numpy as np
inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
在这两种情况下——使用 Dataset
或 NumPy——给定输入的每个批次都会在多个副本之间平均分配。例如,如果您使用的是具有 2 个 GPU 的 MirroredStrategy
,则每个大小为 10 的批次将在 2 个 GPU 之间分配,每个 GPU 在每个步骤中接收 5 个输入示例。然后,每个 epoch 的训练速度会随着您添加更多 GPU 而加快。通常,您希望在添加更多加速器时增加批次大小,以便有效地利用额外的计算能力。您还需要根据模型重新调整学习率。您可以使用 strategy.num_replicas_in_sync
来获取副本的数量。
mirrored_strategy.num_replicas_in_sync
# Compute a global batch size using a number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)
LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15, 20:0.175}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]
现在支持什么?
训练 API | MirroredStrategy |
TPUStrategy |
MultiWorkerMirroredStrategy |
ParameterServerStrategy |
CentralStorageStrategy |
---|---|---|---|---|---|
Keras Model.fit |
支持 | 支持 | 支持 | 实验性支持 | 实验性支持 |
示例和教程
以下是一些教程和示例,它们说明了上述集成与 Keras Model.fit
的端到端集成
- 教程:使用
Model.fit
和MirroredStrategy
进行训练。 - 教程:使用
Model.fit
和MultiWorkerMirroredStrategy
进行训练。 - 指南:包含使用
Model.fit
和TPUStrategy
的示例。 - 教程:使用
Model.fit
和ParameterServerStrategy
进行参数服务器训练。 - 教程:使用
Model.fit
和TPUStrategy
对 GLUE 基准测试中的许多任务进行 BERT 微调。 - TensorFlow Model Garden 存储库,其中包含使用各种策略实现的最先进模型的集合。
将 tf.distribute.Strategy 与自定义训练循环一起使用
如上所示,将 tf.distribute.Strategy
与 Keras Model.fit
一起使用只需要更改几行代码。只需付出更多努力,您也可以使用 tf.distribute.Strategy
与自定义训练循环一起使用。
如果您需要比 Estimator 或 Keras 更灵活的训练循环控制,则可以编写自定义训练循环。例如,当使用 GAN 时,您可能希望在每轮中执行不同数量的生成器或鉴别器步骤。类似地,高级框架不太适合强化学习训练。
tf.distribute.Strategy
类提供了一组核心方法来支持自定义训练循环。使用这些方法可能需要对代码进行一些小的重构,但完成之后,您应该能够通过更改策略实例来轻松地在 GPU、TPU 和多台机器之间切换。
以下是一个简短的代码片段,说明了使用与之前相同的 Keras 模型的简单训练示例的这种用例。
首先,在策略的作用域内创建模型和优化器。这确保了使用模型和优化器创建的任何变量都是镜像变量。
with mirrored_strategy.scope():
model = tf.keras.Sequential([
tf.keras.layers.Dense(1, input_shape=(1,),
kernel_regularizer=tf.keras.regularizers.L2(1e-4))])
optimizer = tf.keras.optimizers.SGD()
接下来,创建输入数据集并调用 tf.distribute.Strategy.experimental_distribute_dataset
来根据策略分布数据集。
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(1000).batch(
global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
然后,定义训练的一个步骤。使用 tf.GradientTape
来计算梯度,并使用优化器来应用这些梯度以更新模型的变量。要分布此训练步骤,请将其放在一个名为 train_step
的函数中,并将其与您从之前创建的 dist_dataset
获取的数据集输入一起传递给 tf.distribute.Strategy.run
# Sets `reduction=NONE` to leave it to tf.nn.compute_average_loss() below.
loss_object = tf.keras.losses.BinaryCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)
def train_step(inputs):
features, labels = inputs
with tf.GradientTape() as tape:
predictions = model(features, training=True)
per_example_loss = loss_object(labels, predictions)
loss = tf.nn.compute_average_loss(per_example_loss)
model_losses = model.losses
if model_losses:
loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss
@tf.function
def distributed_train_step(dist_inputs):
per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
axis=None)
上面代码中需要注意的几点
您使用
tf.nn.compute_average_loss
将每个示例的预测损失减少为标量。tf.nn.compute_average_loss
对每个示例的损失求和,并将总和除以全局批次大小。这很重要,因为稍后在每个副本上计算梯度之后,它们会通过**求和**在副本之间进行聚合。默认情况下,全局批次大小被认为是
tf.get_strategy().num_replicas_in_sync * tf.shape(per_example_loss)[0]
。它也可以作为关键字参数global_batch_size=
显式指定。如果没有短批次,默认值等效于tf.nn.compute_average_loss(..., global_batch_size=global_batch_size)
,其中global_batch_size
定义如上。(有关短批次以及如何避免或处理它们的更多信息,请参阅 自定义训练教程。)您使用
tf.nn.scale_regularization_loss
将注册到Model
对象的正则化损失(如果有)也按1/num_replicas_in_sync
进行缩放。对于那些依赖于输入的正则化损失,它由建模代码而不是自定义训练循环来执行对每个副本(!) 批次大小的平均;这样,建模代码就可以保持对复制的不可知性,而训练循环就可以保持对正则化损失如何计算的不可知性。当您在分布式策略作用域内调用
apply_gradients
时,它的行为会发生改变。具体来说,在同步训练期间在每个并行实例上应用梯度之前,它会对所有副本的梯度进行求和。您还使用了
tf.distribute.Strategy.reduce
API 来聚合由tf.distribute.Strategy.run
返回的结果以进行报告。tf.distribute.Strategy.run
返回策略中每个本地副本的结果,并且有多种方法可以消耗此结果。您可以reduce
它们以获得聚合值。您也可以使用tf.distribute.Strategy.experimental_local_results
获取结果中包含的值列表,每个本地副本一个。
最后,在定义训练步骤后,您可以遍历 dist_dataset
并循环运行训练。
for dist_inputs in dist_dataset:
print(distributed_train_step(dist_inputs))
在上面的示例中,您遍历了 dist_dataset
来为您的训练提供输入。您还提供了 tf.distribute.Strategy.make_experimental_numpy_dataset
来支持 NumPy 输入。您可以在调用 tf.distribute.Strategy.experimental_distribute_dataset
之前使用此 API 创建数据集。
遍历数据的另一种方法是显式使用迭代器。当您希望运行特定数量的步骤而不是遍历整个数据集时,您可能需要这样做。上面的迭代现在将修改为首先创建一个迭代器,然后显式调用 next
来获取输入数据。
iterator = iter(dist_dataset)
for _ in range(10):
print(distributed_train_step(next(iterator)))
这涵盖了使用 tf.distribute.Strategy
API 分布自定义训练循环的最简单情况。
现在支持什么?
训练 API | MirroredStrategy |
TPUStrategy |
MultiWorkerMirroredStrategy |
ParameterServerStrategy |
CentralStorageStrategy |
---|---|---|---|---|---|
自定义训练循环 | 支持 | 支持 | 支持 | 实验性支持 | 实验性支持 |
示例和教程
以下是一些使用分布式策略与自定义训练循环的示例
- 教程: 使用自定义训练循环和
MirroredStrategy
进行训练。 - 教程: 使用自定义训练循环和
MultiWorkerMirroredStrategy
进行训练。 - 指南: 包含使用
TPUStrategy
的自定义训练循环示例。 - 教程: 使用自定义训练循环和
ParameterServerStrategy
进行参数服务器训练。 - TensorFlow Model Garden 存储库,其中包含使用各种策略实现的最先进模型的集合。
其他主题
本节涵盖与多个用例相关的主题。
设置 TF_CONFIG 环境变量
对于多工作器训练,如前所述,您需要为集群中运行的每个二进制文件设置 'TF_CONFIG'
环境变量。 'TF_CONFIG'
环境变量是一个 JSON 字符串,它指定构成集群的任务、它们的地址以及每个任务在集群中的角色。 tensorflow/ecosystem
存储库提供了一个 Kubernetes 模板,它为您的训练任务设置 'TF_CONFIG'
。
'TF_CONFIG'
有两个组成部分:集群和任务。
- 集群提供有关训练集群的信息,它是一个字典,包含不同类型的作业,例如工作器。在多工作器训练中,通常有一个工作器承担更多责任,例如保存检查点和为 TensorBoard 编写摘要文件,除了常规工作器所做的事情之外。此类工作器被称为“首席”工作器,并且通常将索引为
0
的工作器指定为首席工作器(实际上,这就是tf.distribute.Strategy
的实现方式)。 - 另一方面,任务提供有关当前任务的信息。第一个组件集群对所有工作器都相同,而第二个组件任务在每个工作器上都不同,并指定该工作器的类型和索引。
'TF_CONFIG'
的一个示例是
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"]
},
"task": {"type": "worker", "index": 1}
})
此 'TF_CONFIG'
指定在 "cluster"
中有三个工作器和两个 "ps"
任务,以及它们的主机和端口。 "task"
部分指定当前任务在 "cluster"
中的角色——工作器 1
(第二个工作器)。集群中的有效角色是 "chief"
、"worker"
、"ps"
和 "evaluator"
。除了使用 tf.distribute.experimental.ParameterServerStrategy
之外,不应该有 "ps"
作业。
下一步是什么?
tf.distribute.Strategy
正在积极开发中。试用它并使用 GitHub 问题 提供您的反馈。