在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
本指南演示了如何将您的多工作器分布式训练工作流程从 TensorFlow 1 迁移到 TensorFlow 2。
要使用 CPU/GPU 执行多工作器训练
- 在 TensorFlow 1 中,您通常使用
tf.estimator.train_and_evaluate
和tf.estimator.Estimator
API。 - 在 TensorFlow 2 中,使用 Keras API 编写模型、损失函数、优化器和指标。然后,使用 Keras
Model.fit
API 或自定义训练循环(使用tf.GradientTape
)通过tf.distribute.experimental.ParameterServerStrategy
或tf.distribute.MultiWorkerMirroredStrategy
在多个工作器上进行分布式训练。有关更多详细信息,请参阅以下教程
设置
从一些必要的导入和一个简单的演示数据集开始
# The notebook uses a dataset instance for `Model.fit` with
# `ParameterServerStrategy`, which depends on symbols in TF 2.7.
# Install a utility needed for this demonstration
!pip install portpicker
import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
labels = [[0.3], [0.5], [0.7]]
eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]
eval_labels = [[0.8], [0.9], [1.]]
您将需要 'TF_CONFIG'
配置环境变量才能在 TensorFlow 中的多台机器上进行训练。使用 'TF_CONFIG'
指定 'cluster'
和 'task'
的地址。(在 分布式训练 指南中了解更多信息。)
import json
import os
tf_config = {
'cluster': {
'chief': ['localhost:11111'],
'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
'ps': ['localhost:12121', 'localhost:13131'],
},
'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)
使用 del
语句删除变量(但在 TensorFlow 1 中的实际多工作器训练中,您不必这样做)
del os.environ['TF_CONFIG']
TensorFlow 1:使用 tf.estimator API 进行多工作器分布式训练
以下代码片段演示了 TF1 中多工作器训练的规范工作流程:您将使用 tf.estimator.Estimator
、tf.estimator.TrainSpec
、tf.estimator.EvalSpec
和 tf.estimator.train_and_evaluate
API 来分布式训练
def _input_fn():
return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)
def _eval_input_fn():
return tf1.data.Dataset.from_tensor_slices(
(eval_features, eval_labels)).batch(1)
def _model_fn(features, labels, mode):
logits = tf1.layers.Dense(1)(features)
loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
optimizer = tf1.train.AdagradOptimizer(0.05)
train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)
estimator = tf1.estimator.Estimator(model_fn=_model_fn)
train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)
eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
TensorFlow 2:使用分布式策略进行多工作器训练
在 TensorFlow 2 中,通过 tf.distribute.Strategy
在多个工作器(包括 CPU、GPU 和 TPU)上进行分布式训练。
以下示例演示了如何使用两种这样的策略:tf.distribute.experimental.ParameterServerStrategy
和 tf.distribute.MultiWorkerMirroredStrategy
,它们都设计用于使用多个工作器进行 CPU/GPU 训练。
ParameterServerStrategy
使用协调器('chief'
),这使其更适合该 Colab 笔记本中的环境。您将在此处使用一些实用程序来设置支持元素,这些元素对于此处可运行的体验至关重要:您将创建一个进程内集群,其中使用线程来模拟参数服务器('ps'
)和工作器('worker'
)。有关参数服务器训练的更多信息,请参阅 使用 ParameterServerStrategy 进行参数服务器训练 教程。
在此示例中,首先使用 tf.distribute.cluster_resolver.TFConfigClusterResolver
定义 'TF_CONFIG'
环境变量,以提供集群信息。如果您正在使用集群管理系统进行分布式训练,请检查它是否已为您提供 'TF_CONFIG'
,在这种情况下,您无需显式设置此环境变量。(在 使用 TensorFlow 进行分布式训练 指南的设置 'TF_CONFIG'
环境变量部分中了解更多信息。)
# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker
chief_port = portpicker.pick_unused_port()
worker_ports = [portpicker.pick_unused_port() for _ in range(3)]
ps_ports = [portpicker.pick_unused_port() for _ in range(2)]
# Dump the cluster information to `'TF_CONFIG'`.
tf_config = {
'cluster': {
'chief': ["localhost:%s" % chief_port],
'worker': ["localhost:%s" % port for port in worker_ports],
'ps': ["localhost:%s" % port for port in ps_ports],
},
'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)
# Use a cluster resolver to bridge the information to the strategy created below.
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
然后,为工作节点和参数服务器分别创建 tf.distribute.Server
。
# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
worker_config = tf.compat.v1.ConfigProto()
worker_config.inter_op_parallelism_threads = 4
for i in range(3):
tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name="worker",
task_index=i,
config=worker_config)
for i in range(2):
tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name="ps",
task_index=i)
在实际的分布式训练中,您不会在协调器上启动所有 tf.distribute.Server
,而是会使用多台机器,并将指定为 "worker"
和 "ps"
(参数服务器)的机器分别运行一个 tf.distribute.Server
。有关更多详细信息,请参阅 参数服务器训练 教程中的“现实世界中的集群”部分。
准备好所有内容后,创建 ParameterServerStrategy
对象。
strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
创建策略对象后,定义模型、优化器和其他变量,并在 Strategy.scope
API 中调用 Keras Model.compile
来分布式训练。(有关更多信息,请参阅 Strategy.scope
API 文档。)
如果您希望通过例如定义前向和后向传递来自定义训练,请参阅 参数服务器训练 教程中的“使用自定义训练循环进行训练”部分以了解更多详细信息。
dataset = tf.data.Dataset.from_tensor_slices(
(features, labels)).shuffle(10).repeat().batch(64)
eval_dataset = tf.data.Dataset.from_tensor_slices(
(eval_features, eval_labels)).repeat().batch(1)
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
optimizer = tf.keras.optimizers.legacy.Adagrad(learning_rate=0.05)
model.compile(optimizer, "mse")
model.fit(dataset, epochs=5, steps_per_epoch=10)
model.evaluate(eval_dataset, steps=10, return_dict=True)
分区器 (
tf.distribute.experimental.partitioners
)
ParameterServerStrategy
在 TensorFlow 2 中支持变量分区,并提供与 TensorFlow 1 相同的分区器,但名称更清晰: -tf.compat.v1.variable_axis_size_partitioner
->tf.distribute.experimental.partitioners.MaxSizePartitioner
:一个分区器,它将分片保持在最大大小以下)。 -tf.compat.v1.min_max_variable_partitioner
->tf.distribute.experimental.partitioners.MinSizePartitioner
:一个分区器,它为每个分片分配最小大小。 -tf.compat.v1.fixed_size_partitioner
->tf.distribute.experimental.partitioners.FixedShardsPartitioner
:一个分区器,它分配固定数量的分片。
或者,您可以使用 MultiWorkerMirroredStrategy
对象。
# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
strategy = tf.distribute.MultiWorkerMirroredStrategy()
您可以用 MultiWorkerMirroredStrategy
对象替换上面使用的策略,以使用此策略执行训练。
与 tf.estimator
API 一样,由于 MultiWorkerMirroredStrategy
是一个多客户端策略,因此没有简单的方法可以在此 Colab 笔记本中运行分布式训练。因此,用此策略替换上面的代码最终会以本地方式运行。多工作节点训练 使用 Keras Model.fit/自定义训练循环 教程演示了如何在 Colab 中设置 'TF_CONFIG'
变量,并在 localhost 上使用两个工作节点运行多工作节点训练。在实践中,您将在外部 IP 地址/端口上创建多个工作节点,并使用 'TF_CONFIG'
变量为每个工作节点指定集群配置。
下一步
要了解有关使用 TensorFlow 2 中的 tf.distribute.experimental.ParameterServerStrategy
和 tf.distribute.MultiWorkerMirroredStrategy
进行多工作节点分布式训练的更多信息,请考虑以下资源。