使用 Estimator 进行多工作器训练

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

概述

本教程演示了如何使用 tf.distribute.Strategy 使用 tf.estimator 进行分布式多工作器训练。如果您使用 tf.estimator 编写代码,并且您有兴趣将训练扩展到单个机器之外并实现高性能,那么本教程适合您。

在开始之前,请阅读 分布式策略 指南。 多 GPU 训练教程 也与本教程相关,因为本教程使用相同的模型。

设置

首先,设置 TensorFlow 并导入必要的库。

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json
tf.compat.v1.disable_eager_execution()

输入函数

本教程使用来自 TensorFlow Datasets 的 MNIST 数据集。此处的代码类似于 多 GPU 训练教程,但有一个关键区别:在使用 Estimator 进行多工作器训练时,需要根据工作器数量对数据集进行分片,以确保模型收敛。输入数据按工作器索引进行分片,以便每个工作器处理 1/num_workers 个不同的数据集部分。

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

另一种实现收敛的合理方法是在每个工作器上使用不同的种子对数据集进行洗牌。

多工作器配置

本教程(与 多 GPU 训练教程 相比)的关键区别之一是多工作器设置。 TF_CONFIG 环境变量是为集群中每个工作器指定集群配置的标准方法。

TF_CONFIG 包含两个组件: clustertaskcluster 提供有关整个集群的信息,即集群中的工作器和参数服务器。 task 提供有关当前任务的信息。第一个组件 cluster 对集群中的所有工作器和参数服务器都是相同的,而第二个组件 task 在每个工作器和参数服务器上都不同,并指定其自己的 typeindex。在本例中,任务 typeworker,任务 index0

为了说明目的,本教程展示了如何在 localhost 上设置具有 2 个工作器的 TF_CONFIG。在实践中,您将在外部 IP 地址和端口上创建多个工作器,并在每个工作器上适当地设置 TF_CONFIG,即修改任务 index

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

定义模型

编写用于训练的层、优化器和损失函数。本教程使用 Keras 层定义模型,类似于 多 GPU 训练教程

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

MultiWorkerMirroredStrategy

要训练模型,请使用 tf.distribute.experimental.MultiWorkerMirroredStrategy 的实例。 MultiWorkerMirroredStrategy 在所有工作器上的每个设备上创建模型层中所有变量的副本。它使用 CollectiveOps(一种用于集体通信的 TensorFlow 操作)来聚合梯度并保持变量同步。 tf.distribute.Strategy 指南 中提供了有关此策略的更多详细信息。

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

训练和评估模型

接下来,在 Estimator 的 RunConfig 中指定分布策略,并通过调用 tf.estimator.train_and_evaluate 来训练和评估。本教程仅通过 train_distribute 指定策略来分布训练。也可以通过 eval_distribute 分布评估。

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)

优化训练性能

现在您拥有一个模型和一个由 tf.distribute.Strategy 提供支持的多工作器 Estimator。您可以尝试以下技术来优化多工作器训练的性能

  • 增加批次大小: 此处指定的批次大小是每个 GPU 的。通常,建议使用适合 GPU 内存的最大批次大小。
  • 转换变量: 如果可能,将变量转换为 tf.float。官方 ResNet 模型包含 一个示例,说明如何执行此操作。
  • 使用集体通信: MultiWorkerMirroredStrategy 提供多个 集体通信实现

    • RING 使用 gRPC 作为跨主机通信层来实现基于环的集体操作。
    • NCCL 使用 Nvidia 的 NCCL 来实现集体操作。
    • AUTO 将选择推迟到运行时。

    最佳集体操作实现选择取决于 GPU 的数量和类型以及集群中的网络互连。要覆盖自动选择,请为 MultiWorkerMirroredStrategy 构造函数的 communication 参数指定一个有效值,例如 communication=tf.distribute.experimental.CollectiveCommunication.NCCL

访问指南中的 性能部分,了解有关其他策略和 工具 的更多信息,您可以使用这些策略和工具来优化 TensorFlow 模型的性能。

其他代码示例

  1. 端到端示例,用于在 tensorflow/ecosystem 中使用 Kubernetes 模板进行多工作器训练。此示例从 Keras 模型开始,并使用 tf.keras.estimator.model_to_estimator API 将其转换为 Estimator。
  2. 官方模型,其中许多模型可以配置为运行多个分布策略。