迁移多工作器 CPU/GPU 训练

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

本指南演示了如何将您的多工作器分布式训练工作流程从 TensorFlow 1 迁移到 TensorFlow 2。

要使用 CPU/GPU 执行多工作器训练

设置

从一些必要的导入和一个简单的演示数据集开始

# The notebook uses a dataset instance for `Model.fit` with
# `ParameterServerStrategy`, which depends on symbols in TF 2.7.
# Install a utility needed for this demonstration
!pip install portpicker

import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
labels = [[0.3], [0.5], [0.7]]
eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]
eval_labels = [[0.8], [0.9], [1.]]

您将需要 'TF_CONFIG' 配置环境变量才能在 TensorFlow 中的多台机器上进行训练。使用 'TF_CONFIG' 指定 'cluster''task' 的地址。(在 分布式训练 指南中了解更多信息。)

import json
import os

tf_config = {
    'cluster': {
        'chief': ['localhost:11111'],
        'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
        'ps': ['localhost:12121', 'localhost:13131'],
    },
    'task': {'type': 'chief', 'index': 0}
}

os.environ['TF_CONFIG'] = json.dumps(tf_config)

使用 del 语句删除变量(但在 TensorFlow 1 中的实际多工作器训练中,您不必这样做)

del os.environ['TF_CONFIG']

TensorFlow 1:使用 tf.estimator API 进行多工作器分布式训练

以下代码片段演示了 TF1 中多工作器训练的规范工作流程:您将使用 tf.estimator.Estimatortf.estimator.TrainSpectf.estimator.EvalSpectf.estimator.train_and_evaluate API 来分布式训练

def _input_fn():
  return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)

def _eval_input_fn():
  return tf1.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).batch(1)

def _model_fn(features, labels, mode):
  logits = tf1.layers.Dense(1)(features)
  loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
  optimizer = tf1.train.AdagradOptimizer(0.05)
  train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
  return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

estimator = tf1.estimator.Estimator(model_fn=_model_fn)
train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)
eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

TensorFlow 2:使用分布式策略进行多工作器训练

在 TensorFlow 2 中,通过 tf.distribute.Strategy 在多个工作器(包括 CPU、GPU 和 TPU)上进行分布式训练。

以下示例演示了如何使用两种这样的策略:tf.distribute.experimental.ParameterServerStrategytf.distribute.MultiWorkerMirroredStrategy,它们都设计用于使用多个工作器进行 CPU/GPU 训练。

ParameterServerStrategy 使用协调器'chief'),这使其更适合该 Colab 笔记本中的环境。您将在此处使用一些实用程序来设置支持元素,这些元素对于此处可运行的体验至关重要:您将创建一个进程内集群,其中使用线程来模拟参数服务器('ps')和工作器('worker')。有关参数服务器训练的更多信息,请参阅 使用 ParameterServerStrategy 进行参数服务器训练 教程。

在此示例中,首先使用 tf.distribute.cluster_resolver.TFConfigClusterResolver 定义 'TF_CONFIG' 环境变量,以提供集群信息。如果您正在使用集群管理系统进行分布式训练,请检查它是否已为您提供 'TF_CONFIG',在这种情况下,您无需显式设置此环境变量。(在 使用 TensorFlow 进行分布式训练 指南的设置 'TF_CONFIG' 环境变量部分中了解更多信息。)

# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker

chief_port = portpicker.pick_unused_port()
worker_ports = [portpicker.pick_unused_port() for _ in range(3)]
ps_ports = [portpicker.pick_unused_port() for _ in range(2)]

# Dump the cluster information to `'TF_CONFIG'`.
tf_config = {
    'cluster': {
        'chief': ["localhost:%s" % chief_port],
        'worker': ["localhost:%s" % port for port in worker_ports],
        'ps':  ["localhost:%s" % port for port in ps_ports],
    },
    'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)

# Use a cluster resolver to bridge the information to the strategy created below.
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()

然后,为工作节点和参数服务器分别创建 tf.distribute.Server

# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
worker_config = tf.compat.v1.ConfigProto()
worker_config.inter_op_parallelism_threads = 4

for i in range(3):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="worker",
      task_index=i,
      config=worker_config)

for i in range(2):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="ps",
      task_index=i)

在实际的分布式训练中,您不会在协调器上启动所有 tf.distribute.Server,而是会使用多台机器,并将指定为 "worker""ps"(参数服务器)的机器分别运行一个 tf.distribute.Server。有关更多详细信息,请参阅 参数服务器训练 教程中的“现实世界中的集群”部分。

准备好所有内容后,创建 ParameterServerStrategy 对象。

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)

创建策略对象后,定义模型、优化器和其他变量,并在 Strategy.scope API 中调用 Keras Model.compile 来分布式训练。(有关更多信息,请参阅 Strategy.scope API 文档。)

如果您希望通过例如定义前向和后向传递来自定义训练,请参阅 参数服务器训练 教程中的“使用自定义训练循环进行训练”部分以了解更多详细信息。

dataset = tf.data.Dataset.from_tensor_slices(
      (features, labels)).shuffle(10).repeat().batch(64)

eval_dataset = tf.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).repeat().batch(1)

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
  optimizer = tf.keras.optimizers.legacy.Adagrad(learning_rate=0.05)
  model.compile(optimizer, "mse")

model.fit(dataset, epochs=5, steps_per_epoch=10)
model.evaluate(eval_dataset, steps=10, return_dict=True)

分区器 (tf.distribute.experimental.partitioners)

ParameterServerStrategy 在 TensorFlow 2 中支持变量分区,并提供与 TensorFlow 1 相同的分区器,但名称更清晰: - tf.compat.v1.variable_axis_size_partitioner -> tf.distribute.experimental.partitioners.MaxSizePartitioner:一个分区器,它将分片保持在最大大小以下)。 - tf.compat.v1.min_max_variable_partitioner -> tf.distribute.experimental.partitioners.MinSizePartitioner:一个分区器,它为每个分片分配最小大小。 - tf.compat.v1.fixed_size_partitioner -> tf.distribute.experimental.partitioners.FixedShardsPartitioner:一个分区器,它分配固定数量的分片。

或者,您可以使用 MultiWorkerMirroredStrategy 对象。

# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
strategy = tf.distribute.MultiWorkerMirroredStrategy()

您可以用 MultiWorkerMirroredStrategy 对象替换上面使用的策略,以使用此策略执行训练。

tf.estimator API 一样,由于 MultiWorkerMirroredStrategy 是一个多客户端策略,因此没有简单的方法可以在此 Colab 笔记本中运行分布式训练。因此,用此策略替换上面的代码最终会以本地方式运行。多工作节点训练 使用 Keras Model.fit/自定义训练循环 教程演示了如何在 Colab 中设置 'TF_CONFIG' 变量,并在 localhost 上使用两个工作节点运行多工作节点训练。在实践中,您将在外部 IP 地址/端口上创建多个工作节点,并使用 'TF_CONFIG' 变量为每个工作节点指定集群配置。

下一步

要了解有关使用 TensorFlow 2 中的 tf.distribute.experimental.ParameterServerStrategytf.distribute.MultiWorkerMirroredStrategy 进行多工作节点分布式训练的更多信息,请考虑以下资源。