TensorFlow 分布式训练

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

概述

tf.distribute.Strategy 是 TensorFlow API,用于将训练分布到多个 GPU、多台机器或 TPU 上。使用此 API,您可以用最少的代码更改来分布现有模型和训练代码。

tf.distribute.Strategy 的设计考虑了以下关键目标

  • 易于使用,并支持多个用户群体,包括研究人员、机器学习工程师等。
  • 提供开箱即用的良好性能。
  • 轻松切换策略。

您可以使用 tf.distribute.Strategy 和 Keras 等高级 API 进行分布式训练 Model.fit,以及 自定义训练循环(以及一般情况下使用 TensorFlow 的任何计算)。

在 TensorFlow 2.x 中,您可以以 Eager 模式执行程序,也可以使用 tf.function 在图中执行。 tf.distribute.Strategy 旨在支持这两种执行模式,但最适合与 tf.function 一起使用。仅建议将 Eager 模式用于调试目的,不支持 tf.distribute.TPUStrategy。虽然本指南的重点是训练,但此 API 也可用于在不同平台上进行分布式评估和预测。

您可以使用 tf.distribute.Strategy 对代码进行很少的更改,因为 TensorFlow 的底层组件已更改为支持策略。这包括变量、层、模型、优化器、指标、摘要和检查点。

在本指南中,您将了解各种策略类型以及如何在不同情况下使用它们。要了解如何调试性能问题,请查看 优化 TensorFlow GPU 性能 指南。

设置 TensorFlow

import tensorflow as tf

策略类型

tf.distribute.Strategy 旨在涵盖不同轴上的许多用例。目前支持其中一些组合,其他组合将在未来添加。这些轴中的一些是

  • 同步与异步训练: 这是两种常见的分布式训练方法,使用数据并行。在同步训练中,所有工作器同步地对输入数据的不同切片进行训练,并在每一步聚合梯度。在异步训练中,所有工作器独立地对输入数据进行训练并异步更新变量。通常,同步训练通过全减少来支持,异步训练通过参数服务器架构来支持。
  • 硬件平台: 您可能希望将训练扩展到一台机器上的多个 GPU,或网络中的多台机器(每台机器有 0 个或多个 GPU),或 Cloud TPU 上。

为了支持这些用例,TensorFlow 有 MirroredStrategyTPUStrategyMultiWorkerMirroredStrategyParameterServerStrategyCentralStorageStrategy 以及其他可用的策略。下一节将解释 TensorFlow 中哪些场景支持这些策略。以下是一个快速概述

训练 API MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Keras Model.fit 支持 支持 支持 实验性支持 实验性支持
自定义训练循环 支持 支持 支持 实验性支持 实验性支持
Estimator API 有限支持 不支持 有限支持 有限支持 有限支持

MirroredStrategy

tf.distribute.MirroredStrategy 支持在一台机器上的多个 GPU 上进行同步分布式训练。它为每个 GPU 设备创建一个副本。模型中的每个变量都在所有副本中镜像。这些变量共同构成一个称为 MirroredVariable 的单个概念变量。这些变量通过应用相同的更新来保持同步。

高效的全减少算法用于在设备之间通信变量更新。全减少通过将张量加起来在所有设备上聚合张量,并在每个设备上提供它们。它是一种融合算法,非常高效,可以显着降低同步开销。根据设备之间可用的通信类型,有许多全减少算法和实现可用。默认情况下,它使用 NVIDIA 集体通信库 (NCCL) 作为全减少实现。您可以从其他几个选项中选择,也可以编写自己的选项。

以下是如何创建 MirroredStrategy 的最简单方法

mirrored_strategy = tf.distribute.MirroredStrategy()

这将创建一个 MirroredStrategy 实例,该实例将使用对 TensorFlow 可见的全部 GPU 以及 NCCL 作为跨设备通信。

如果您希望仅使用机器上的部分 GPU,则可以像这样操作

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

如果您希望覆盖跨设备通信,则可以使用 cross_device_ops 参数通过提供 tf.distribute.CrossDeviceOps 的实例来实现。目前,tf.distribute.HierarchicalCopyAllReducetf.distribute.ReductionToOneDevice 是除 tf.distribute.NcclAllReduce 之外的两个选项,后者是默认选项。

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())

TPUStrategy

tf.distribute.TPUStrategy 使您能够在 张量处理单元 (TPU) 上运行 TensorFlow 训练。TPU 是 Google 专为大幅加速机器学习工作负载而设计的专用 ASIC。它们在 Google ColabTPU 研究云Cloud TPU 上可用。

在分布式训练架构方面,TPUStrategyMirroredStrategy 相同,它实现了同步分布式训练。TPU 提供了它们自己的高效全减少和其他集体操作的实现,这些操作跨多个 TPU 内核进行,这些内核在 TPUStrategy 中使用。

以下是如何实例化 TPUStrategy

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)

TPUClusterResolver 实例有助于定位 TPU。在 Colab 中,您无需为其指定任何参数。

如果您希望将其用于 Cloud TPU

  • 您必须在 tpu 参数中指定 TPU 资源的名称。
  • 您必须在程序的开头显式初始化 TPU 系统。这是在 TPU 可用于计算之前所需的。初始化 TPU 系统还会清除 TPU 内存,因此在执行此步骤之前完成此步骤非常重要,以避免丢失状态。

MultiWorkerMirroredStrategy

tf.distribute.MultiWorkerMirroredStrategyMirroredStrategy 非常相似。它在多个工作器上实现了同步分布式训练,每个工作器可能具有多个 GPU。与 tf.distribute.MirroredStrategy 相似,它在所有工作器上的每个设备上创建模型中所有变量的副本。

以下是如何创建 MultiWorkerMirroredStrategy 的最简单方法

strategy = tf.distribute.MultiWorkerMirroredStrategy()

MultiWorkerMirroredStrategy 有两种跨设备通信的实现。 CommunicationImplementation.RING 是基于 RPC 的,支持 CPU 和 GPU。 CommunicationImplementation.NCCL 使用 NCCL 并提供 GPU 上的最佳性能,但它不支持 CPU。 CollectiveCommunication.AUTO 将选择权推迟到 Tensorflow。您可以通过以下方式指定它们

communication_options = tf.distribute.experimental.CommunicationOptions(
    implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
    communication_options=communication_options)

与多 GPU 训练相比,启动多工作器训练的关键区别之一是多工作器设置。 'TF_CONFIG' 环境变量是 TensorFlow 中的标准方法,用于向集群中包含的每个工作器指定集群配置。在本文档的 设置 TF_CONFIG 部分 中了解更多信息。

有关 MultiWorkerMirroredStrategy 的更多详细信息,请考虑以下教程

ParameterServerStrategy

参数服务器训练是一种常见的数据并行方法,用于在多台机器上扩展模型训练。参数服务器训练集群由工作器和参数服务器组成。变量是在参数服务器上创建的,它们在每一步由工作器读取和更新。有关详细信息,请查看 参数服务器训练 教程。

在 TensorFlow 2 中,参数服务器训练使用基于中央协调器的架构,通过 tf.distribute.experimental.coordinator.ClusterCoordinator 类来实现。

在此实现中, workerparameter server 任务运行 tf.distribute.Server,这些服务器监听来自协调器的任务。协调器创建资源、调度训练任务、写入检查点并处理任务故障。

在协调器上运行的程序中,您将使用 ParameterServerStrategy 对象来定义训练步骤,并使用 ClusterCoordinator 将训练步骤调度到远程工作器。以下是如何创建它们的简单方法

strategy = tf.distribute.experimental.ParameterServerStrategy(
    tf.distribute.cluster_resolver.TFConfigClusterResolver(),
    variable_partitioner=variable_partitioner)
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
    strategy)

要详细了解 ParameterServerStrategy,请查看 使用 Keras Model.fit 和自定义训练循环进行参数服务器训练 教程。

在 TensorFlow 1 中, ParameterServerStrategy 仅通过 tf.compat.v1.distribute.experimental.ParameterServerStrategy 符号与 Estimator 一起使用。

CentralStorageStrategy

tf.distribute.experimental.CentralStorageStrategy 也进行同步训练。变量不会镜像,而是放置在 CPU 上,操作在所有本地 GPU 上复制。如果只有一个 GPU,所有变量和操作都将放置在该 GPU 上。

通过以下方式创建 CentralStorageStrategy 的实例

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()

这将创建一个 CentralStorageStrategy 实例,该实例将使用所有可见的 GPU 和 CPU。在将更新应用于变量之前,将聚合副本上的变量更新。

其他策略

除了上述策略之外,还有两种其他策略可能在使用 tf.distribute API 进行原型设计和调试时有用。

默认策略

默认策略是一种分布式策略,当没有显式分布式策略在作用域内时存在。它实现了 tf.distribute.Strategy 接口,但只是一个直通,不提供实际的分布。例如,Strategy.run(fn) 将简单地调用 fn。使用此策略编写的代码应该与没有使用任何策略编写的代码的行为完全相同。您可以将其视为一种“无操作”策略。

默认策略是一个单例——不能创建它的更多实例。可以使用 tf.distribute.get_strategy 在任何显式策略作用域之外获取它(与在显式策略作用域内获取当前策略相同的 API)。

default_strategy = tf.distribute.get_strategy()

此策略有两个主要目的

# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
  • 与库代码类似,它可以用来编写最终用户的程序,以便在有或没有分布式策略的情况下工作,而不需要条件逻辑。以下是一个说明此示例的代码片段
if tf.config.list_physical_devices('GPU'):
  strategy = tf.distribute.MirroredStrategy()
else:  # Use the Default Strategy
  strategy = tf.distribute.get_strategy()

with strategy.scope():
  # Do something interesting
  print(tf.Variable(1.))

OneDeviceStrategy

tf.distribute.OneDeviceStrategy 是一种将所有变量和计算放置在单个指定设备上的策略。

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

此策略在许多方面不同于默认策略。在默认策略中,与在没有使用任何分布式策略的情况下运行 TensorFlow 相比,变量放置逻辑保持不变。但是,当使用 OneDeviceStrategy 时,在其作用域内创建的所有变量都将显式放置在指定的设备上。此外,通过 OneDeviceStrategy.run 调用的任何函数也将被放置在指定的设备上。

通过此策略分发的输入将被预取到指定的设备。在默认策略中,没有输入分布。

与默认策略类似,此策略也可以用来在切换到实际将数据分布到多个设备/机器的其他策略之前测试您的代码。这将比默认策略更多地练习分布式策略机制,但不会像使用 MirroredStrategyTPUStrategy 那样达到全部程度。如果您想要的行为与没有策略相同的代码,那么请使用默认策略。

到目前为止,您已经了解了不同的策略以及如何实例化它们。接下来的几节将展示使用它们来分布您的训练的不同方法。

将 tf.distribute.Strategy 与 Keras Model.fit 一起使用

tf.distribute.Strategy 集成到 tf.keras 中,它是 TensorFlow 对 Keras API 规范 的实现。 tf.keras 是一个用于构建和训练模型的高级 API。通过集成到 tf.keras 后端,您可以无缝地将您在 Keras 训练框架中编写的训练进行分布 使用 Model.fit

以下是您需要在代码中更改的内容

  1. 创建一个适当的 tf.distribute.Strategy 实例。
  2. 将 Keras 模型、优化器和指标的创建移到 strategy.scope 内。因此,模型的 call()train_step()test_step() 方法中的代码都将被分布并在加速器上执行。

TensorFlow 分布式策略支持所有类型的 Keras 模型——SequentialFunctional子类化

以下是一个代码片段,用于对具有一个 Dense 层的非常简单的 Keras 模型执行此操作

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([
      tf.keras.layers.Dense(1, input_shape=(1,),
                            kernel_regularizer=tf.keras.regularizers.L2(1e-4))])
  model.compile(loss='mse', optimizer='sgd')

此示例使用 MirroredStrategy,因此您可以在具有多个 GPU 的机器上运行它。 strategy.scope() 指示 Keras 使用哪个策略来分布训练。在该作用域内创建模型/优化器/指标允许您创建分布式变量而不是普通变量。设置好之后,您就可以像平常一样拟合您的模型。 MirroredStrategy 会负责在可用的 GPU 上复制模型的训练、聚合梯度等等。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)

这里一个 tf.data.Dataset 提供训练和评估输入。您也可以使用 NumPy 数组

import numpy as np

inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)

在这两种情况下——使用 Dataset 或 NumPy——给定输入的每个批次都会在多个副本之间平均分配。例如,如果您使用的是具有 2 个 GPU 的 MirroredStrategy,则每个大小为 10 的批次将在 2 个 GPU 之间分配,每个 GPU 在每个步骤中接收 5 个输入示例。然后,每个 epoch 的训练速度会随着您添加更多 GPU 而加快。通常,您希望在添加更多加速器时增加批次大小,以便有效地利用额外的计算能力。您还需要根据模型重新调整学习率。您可以使用 strategy.num_replicas_in_sync 来获取副本的数量。

mirrored_strategy.num_replicas_in_sync
# Compute a global batch size using a number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15, 20:0.175}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

现在支持什么?

训练 API MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
Keras Model.fit 支持 支持 支持 实验性支持 实验性支持

示例和教程

以下是一些教程和示例,它们说明了上述集成与 Keras Model.fit 的端到端集成

  1. 教程:使用 Model.fitMirroredStrategy 进行训练。
  2. 教程:使用 Model.fitMultiWorkerMirroredStrategy 进行训练。
  3. 指南:包含使用 Model.fitTPUStrategy 的示例。
  4. 教程:使用 Model.fitParameterServerStrategy 进行参数服务器训练。
  5. 教程:使用 Model.fitTPUStrategy 对 GLUE 基准测试中的许多任务进行 BERT 微调。
  6. TensorFlow Model Garden 存储库,其中包含使用各种策略实现的最先进模型的集合。

将 tf.distribute.Strategy 与自定义训练循环一起使用

如上所示,将 tf.distribute.Strategy 与 Keras Model.fit 一起使用只需要更改几行代码。只需付出更多努力,您也可以使用 tf.distribute.Strategy 与自定义训练循环一起使用

如果您需要比 Estimator 或 Keras 更灵活的训练循环控制,则可以编写自定义训练循环。例如,当使用 GAN 时,您可能希望在每轮中执行不同数量的生成器或鉴别器步骤。类似地,高级框架不太适合强化学习训练。

tf.distribute.Strategy 类提供了一组核心方法来支持自定义训练循环。使用这些方法可能需要对代码进行一些小的重构,但完成之后,您应该能够通过更改策略实例来轻松地在 GPU、TPU 和多台机器之间切换。

以下是一个简短的代码片段,说明了使用与之前相同的 Keras 模型的简单训练示例的这种用例。

首先,在策略的作用域内创建模型和优化器。这确保了使用模型和优化器创建的任何变量都是镜像变量。

with mirrored_strategy.scope():
  model = tf.keras.Sequential([
      tf.keras.layers.Dense(1, input_shape=(1,),
                            kernel_regularizer=tf.keras.regularizers.L2(1e-4))])
  optimizer = tf.keras.optimizers.SGD()

接下来,创建输入数据集并调用 tf.distribute.Strategy.experimental_distribute_dataset 来根据策略分布数据集。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(1000).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

然后,定义训练的一个步骤。使用 tf.GradientTape 来计算梯度,并使用优化器来应用这些梯度以更新模型的变量。要分布此训练步骤,请将其放在一个名为 train_step 的函数中,并将其与您从之前创建的 dist_dataset 获取的数据集输入一起传递给 tf.distribute.Strategy.run

# Sets `reduction=NONE` to leave it to tf.nn.compute_average_loss() below.
loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    per_example_loss = loss_object(labels, predictions)
    loss = tf.nn.compute_average_loss(per_example_loss)
    model_losses = model.losses
    if model_losses:
      loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

上面代码中需要注意的几点

  1. 您使用 tf.nn.compute_average_loss 将每个示例的预测损失减少为标量。 tf.nn.compute_average_loss 对每个示例的损失求和,并将总和除以全局批次大小。这很重要,因为稍后在每个副本上计算梯度之后,它们会通过**求和**在副本之间进行聚合。

    默认情况下,全局批次大小被认为是 tf.get_strategy().num_replicas_in_sync * tf.shape(per_example_loss)[0]。它也可以作为关键字参数 global_batch_size= 显式指定。如果没有短批次,默认值等效于 tf.nn.compute_average_loss(..., global_batch_size=global_batch_size),其中 global_batch_size 定义如上。(有关短批次以及如何避免或处理它们的更多信息,请参阅 自定义训练教程。)

  2. 您使用 tf.nn.scale_regularization_loss 将注册到 Model 对象的正则化损失(如果有)也按 1/num_replicas_in_sync 进行缩放。对于那些依赖于输入的正则化损失,它由建模代码而不是自定义训练循环来执行对每个副本(!) 批次大小的平均;这样,建模代码就可以保持对复制的不可知性,而训练循环就可以保持对正则化损失如何计算的不可知性。

  3. 当您在分布式策略作用域内调用 apply_gradients 时,它的行为会发生改变。具体来说,在同步训练期间在每个并行实例上应用梯度之前,它会对所有副本的梯度进行求和。

  4. 您还使用了 tf.distribute.Strategy.reduce API 来聚合由 tf.distribute.Strategy.run 返回的结果以进行报告。 tf.distribute.Strategy.run 返回策略中每个本地副本的结果,并且有多种方法可以消耗此结果。您可以 reduce 它们以获得聚合值。您也可以使用 tf.distribute.Strategy.experimental_local_results 获取结果中包含的值列表,每个本地副本一个。

最后,在定义训练步骤后,您可以遍历 dist_dataset 并循环运行训练。

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))

在上面的示例中,您遍历了 dist_dataset 来为您的训练提供输入。您还提供了 tf.distribute.Strategy.make_experimental_numpy_dataset 来支持 NumPy 输入。您可以在调用 tf.distribute.Strategy.experimental_distribute_dataset 之前使用此 API 创建数据集。

遍历数据的另一种方法是显式使用迭代器。当您希望运行特定数量的步骤而不是遍历整个数据集时,您可能需要这样做。上面的迭代现在将修改为首先创建一个迭代器,然后显式调用 next 来获取输入数据。

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))

这涵盖了使用 tf.distribute.Strategy API 分布自定义训练循环的最简单情况。

现在支持什么?

训练 API MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
自定义训练循环 支持 支持 支持 实验性支持 实验性支持

示例和教程

以下是一些使用分布式策略与自定义训练循环的示例

  1. 教程: 使用自定义训练循环和 MirroredStrategy 进行训练。
  2. 教程: 使用自定义训练循环和 MultiWorkerMirroredStrategy 进行训练。
  3. 指南: 包含使用 TPUStrategy 的自定义训练循环示例。
  4. 教程: 使用自定义训练循环和 ParameterServerStrategy 进行参数服务器训练。
  5. TensorFlow Model Garden 存储库,其中包含使用各种策略实现的最先进模型的集合。

其他主题

本节涵盖与多个用例相关的主题。

设置 TF_CONFIG 环境变量

对于多工作器训练,如前所述,您需要为集群中运行的每个二进制文件设置 'TF_CONFIG' 环境变量。 'TF_CONFIG' 环境变量是一个 JSON 字符串,它指定构成集群的任务、它们的地址以及每个任务在集群中的角色。 tensorflow/ecosystem 存储库提供了一个 Kubernetes 模板,它为您的训练任务设置 'TF_CONFIG'

'TF_CONFIG' 有两个组成部分:集群和任务。

  • 集群提供有关训练集群的信息,它是一个字典,包含不同类型的作业,例如工作器。在多工作器训练中,通常有一个工作器承担更多责任,例如保存检查点和为 TensorBoard 编写摘要文件,除了常规工作器所做的事情之外。此类工作器被称为“首席”工作器,并且通常将索引为 0 的工作器指定为首席工作器(实际上,这就是 tf.distribute.Strategy 的实现方式)。
  • 另一方面,任务提供有关当前任务的信息。第一个组件集群对所有工作器都相同,而第二个组件任务在每个工作器上都不同,并指定该工作器的类型和索引。

'TF_CONFIG' 的一个示例是

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

'TF_CONFIG' 指定在 "cluster" 中有三个工作器和两个 "ps" 任务,以及它们的主机和端口。 "task" 部分指定当前任务在 "cluster" 中的角色——工作器 1(第二个工作器)。集群中的有效角色是 "chief""worker""ps""evaluator"。除了使用 tf.distribute.experimental.ParameterServerStrategy 之外,不应该有 "ps" 作业。

下一步是什么?

tf.distribute.Strategy 正在积极开发中。试用它并使用 GitHub 问题 提供您的反馈。