在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
概述
本教程演示了如何使用 Keras 模型和 Model.fit
API,以及 tf.distribute.MultiWorkerMirroredStrategy
API 执行多工作业分布式训练。借助此策略,为单工作业设计的 Keras 模型可以无缝地在多个工作作业上运行,只需进行最少的代码更改。
要了解如何将 MultiWorkerMirroredStrategy
与 Keras 和自定义训练循环结合使用,请参阅 使用 Keras 和 MultiWorkerMirroredStrategy 的自定义训练循环。
本教程包含一个最小的多工作器示例,其中包含两个工作器,用于演示目的。
选择正确的策略
在深入研究之前,请确保 tf.distribute.MultiWorkerMirroredStrategy
是您加速器和训练的正确选择。以下是两种使用数据并行性进行分布式训练的常见方法
- 同步训练,其中训练步骤在工作器和副本之间同步,例如
tf.distribute.MirroredStrategy
、tf.distribute.TPUStrategy
和tf.distribute.MultiWorkerMirroredStrategy
。所有工作器同步地对输入数据的不同切片进行训练,并在每一步聚合梯度。 - 异步训练,其中训练步骤并非严格同步,例如
tf.distribute.experimental.ParameterServerStrategy
。所有工作器独立地对输入数据进行训练,并异步更新变量。
如果您正在寻找没有 TPU 的多工作器同步训练,那么 tf.distribute.MultiWorkerMirroredStrategy
是您的选择。它在所有工作器上的每个设备上创建模型层中所有变量的副本。它使用 CollectiveOps
(一种用于集体通信的 TensorFlow 操作)来聚合梯度并保持变量同步。对于有兴趣的人,请查看 tf.distribute.experimental.CommunicationOptions
参数以了解集体实现选项。
有关 tf.distribute.Strategy
API 的概述,请参阅 TensorFlow 中的分布式训练。
设置
从一些必要的导入开始
import json
import os
import sys
在导入 TensorFlow 之前,对环境进行一些更改
- 在实际应用中,每个工作器都将在不同的机器上。在本教程中,所有工作器都将在 **这** 台机器上运行。因此,禁用所有 GPU 以防止所有工作器尝试使用同一个 GPU 导致的错误。
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
- 重置
TF_CONFIG
环境变量(您将在后面了解有关此变量的更多信息)
os.environ.pop('TF_CONFIG', None)
- 确保当前目录位于 Python 的路径上 - 这允许笔记本导入
%%writefile
稍后编写的文件
if '.' not in sys.path:
sys.path.insert(0, '.')
安装 tf-nightly
,因为 tf.keras.callbacks.BackupAndRestore
中的 save_freq
参数在特定步骤处的检查点保存频率是从 TensorFlow 2.10 开始引入的
pip install tf-nightly
最后,导入 TensorFlow
import tensorflow as tf
数据集和模型定义
接下来,创建一个包含简单模型和数据集设置的 mnist_setup.py
文件。本教程中的工作器进程将使用此 Python 文件
%%writefile mnist_setup.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
在单个工作器上训练模型
尝试训练模型几个 epoch,并观察单个工作器的结果,以确保一切正常。随着训练的进行,损失应该下降,准确率应该提高。
import mnist_setup
batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
多工作器配置
现在让我们进入多工作器训练的世界。
包含作业和任务的集群
在 TensorFlow 中,分布式训练涉及一个 'cluster'
,其中包含多个作业,每个作业可能包含一个或多个 'task'
。
您将需要 TF_CONFIG
配置环境变量才能在多台机器上进行训练,每台机器可能具有不同的角色。 TF_CONFIG
是一个 JSON 字符串,用于指定集群中每个工作器的集群配置。
TF_CONFIG
变量有两个组成部分: 'cluster'
和 'task'
。
'cluster'
对所有工作器都相同,并提供有关训练集群的信息,这是一个字典,包含不同类型的作业,例如'worker'
或'chief'
。- 在使用
tf.distribute.MultiWorkerMirroredStrategy
的多工作器训练中,通常有一个'worker'
承担更多责任,例如保存检查点和为 TensorBoard 编写摘要文件,除了常规'worker'
所做的工作之外。这种'worker'
被称为首席工作器(作业名称为'chief'
)。 - 通常,
'index'
为0
的工作器是'chief'
。
- 在使用
'task'
提供有关当前任务的信息,每个工作器都不相同。它指定该工作器的'type'
和'index'
。
以下是一个配置示例
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
请注意, tf_config
只是 Python 中的局部变量。要将其用于训练配置,请将其序列化为 JSON 并将其放置在 TF_CONFIG
环境变量中。
json.dumps(tf_config)
在上面的配置示例中,您将任务 'type'
设置为 'worker'
,并将任务 'index'
设置为 0
。因此,这台机器是第一个工作器。它将被指定为 'chief'
工作器。
在实践中,您将在外部 IP 地址/端口上创建多个工作器,并在每个工作器上相应地设置 TF_CONFIG
变量。为了说明目的,本教程展示了如何在 localhost
上使用两个工作器设置 TF_CONFIG
变量。
- 第一个(
'chief'
)工作器的TF_CONFIG
如上所示。 - 对于第二个工作器,您将设置
tf_config['task']['index']=1
笔记本中的环境变量和子进程
子进程继承其父进程的环境变量。因此,如果您在此 Jupyter Notebook 进程中设置了一个环境变量
os.environ['GREETINGS'] = 'Hello TensorFlow!'
... 那么您可以从子进程中访问环境变量
echo ${GREETINGS}
在下一节中,您将使用此方法将 TF_CONFIG
传递给工作器子进程。在实际场景中,您永远不会真正以这种方式启动作业 - 本教程只是展示了如何使用最小的多工作器示例来做到这一点。
训练模型
要训练模型,首先创建一个 tf.distribute.MultiWorkerMirroredStrategy
实例
strategy = tf.distribute.MultiWorkerMirroredStrategy()
通过将 tf.distribute.Strategy
API 集成到 tf.keras
中,您将进行的唯一更改是将模型构建和 model.compile()
调用包含在 strategy.scope()
中。分布式策略的作用域决定了变量的创建方式和位置,在 MultiWorkerMirroredStrategy
的情况下,创建的变量是 MirroredVariable
,并且它们在每个工作器上都进行了复制。
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
要真正使用 MultiWorkerMirroredStrategy
运行,您需要运行工作器进程并将 TF_CONFIG
传递给它们。
与之前编写的 mnist_setup.py
文件类似,这里有每个工作器将运行的 main.py
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist_setup
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
在上面的代码片段中,请注意,传递给 Dataset.batch
的 global_batch_size
被设置为 per_worker_batch_size * num_workers
。这确保了每个工作器处理的批次包含 per_worker_batch_size
个示例,无论工作器数量如何。
当前目录现在包含这两个 Python 文件
ls *.py
将 TF_CONFIG
序列化为 JSON 并将其添加到环境变量中
os.environ['TF_CONFIG'] = json.dumps(tf_config)
现在,您可以启动一个工作器进程,该进程将运行 main.py
并使用 TF_CONFIG
# first kill any previous runs
%killbgscripts
python main.py &> job_0.log
关于上面的命令,需要注意几点
- 它使用
%%bash
,这是一个 笔记本“魔法”,用于运行一些 bash 命令。 - 它使用
--bg
标志在后台运行bash
进程,因为此工作器不会终止。它在开始之前等待所有工作器。
后台工作器进程不会将输出打印到此笔记本,因此 &>
将其输出重定向到一个文件,以便您稍后可以在日志文件中检查发生了什么。
因此,请等待几秒钟,让进程启动
import time
time.sleep(10)
现在,检查到目前为止输出到工作器日志文件的内容
cat job_0.log
日志文件的最后一行应该显示: Started server with target: grpc://127.0.0.1:12345
。第一个工作器现在已准备就绪,正在等待所有其他工作器准备就绪以继续。
因此,更新第二个工作器进程的 tf_config
以便其拾取
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
启动第二个工作器。这将开始训练,因为所有工作器都处于活动状态(因此无需将此进程置于后台)。
python main.py
如果您重新检查第一个工作器写入的日志,您将了解它参与了该模型的训练。
cat job_0.log
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
深入了解多工作器训练
到目前为止,您已经了解了如何执行基本的多工作器设置。本教程的其余部分将详细介绍其他因素,这些因素对于实际用例可能有用或重要。
数据集分片
在多工作器训练中,需要数据集分片来确保收敛和性能。
上一节中的示例依赖于由 tf.distribute.Strategy
API 提供的默认自动分片。您可以通过设置 tf.data.experimental.AutoShardPolicy
来控制分片 tf.data.experimental.DistributeOptions
.
要详细了解自动分片,请参阅 分布式输入指南.
以下是如何关闭自动分片的快速示例,以便每个副本处理每个示例(不推荐)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
评估
如果您将 validation_data
也传递到 Model.fit
中,它将在每个 epoch 之间交替进行训练和评估。评估工作分布在同一组工作器上,其结果被聚合并可供所有工作器使用。
与训练类似,验证数据集在文件级别自动分片。您需要在验证数据集中设置全局批次大小,并设置 validation_steps
。
建议对评估使用重复数据集(通过调用 tf.data.Dataset.repeat
)。
或者,您还可以创建另一个任务,该任务定期读取检查点并运行评估。这就是 Estimator 所做的。但这不是执行评估的推荐方法,因此省略了其详细信息。
性能
要调整多工作器训练的性能,您可以尝试以下操作
tf.distribute.MultiWorkerMirroredStrategy
提供多个 集体通信实现RING
使用 gRPC 作为跨主机通信层来实现基于环的集体通信。NCCL
使用 NVIDIA 集体通信库 来实现集体通信。AUTO
将选择推迟到运行时。
集体通信实现的最佳选择取决于集群中 GPU 的数量、GPU 的类型以及网络互连。要覆盖自动选择,请指定
MultiWorkerMirroredStrategy
构造函数的communication_options
参数。例如communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
如果可能,将变量转换为
tf.float
- 官方 ResNet 模型包含 一个示例,说明如何执行此操作。
容错
在同步训练中,如果其中一个工作器出现故障且不存在故障恢复机制,则集群将失败。
将 Keras 与 tf.distribute.Strategy
一起使用具有容错的优势,在工作器死亡或不稳定时,您可以通过将训练状态保存在您选择的分布式文件系统中来实现这一点,这样,当先前失败或抢占的实例重新启动时,训练状态将被恢复。
当一个工作器不可用时,其他工作器将失败(可能在超时后)。在这种情况下,需要重新启动不可用的工作器,以及其他已失败的工作器。
ModelCheckpoint
回调
ModelCheckpoint
回调不再提供容错功能,请改用 BackupAndRestore
回调。
ModelCheckpoint
回调仍然可以用来保存检查点。但有了它,如果训练被打断或成功完成,为了从检查点继续训练,用户需要手动加载模型。
可选地,用户可以选择在 ModelCheckpoint
回调之外保存和恢复模型/权重。
模型保存和加载
要使用 model.save
或 tf.saved_model.save
保存模型,每个工作器的保存目标都需要不同。
- 对于非首席工作器,您需要将模型保存到临时目录。
- 对于首席工作器,您需要保存到提供的模型目录。
工作器上的临时目录需要是唯一的,以防止多个工作器尝试写入同一位置而导致错误。
所有目录中保存的模型都相同,通常只应引用首席工作器保存的模型以进行恢复或服务。
您应该有一些清理逻辑,在训练完成后删除工作器创建的临时目录。
在首席工作器和工作器上同时保存的原因是,您可能在检查点过程中聚合变量,这需要首席工作器和工作器都参与 allreduce 通信协议。另一方面,让首席工作器和工作器保存到同一个模型目录会导致由于争用而导致错误。
使用 MultiWorkerMirroredStrategy
,程序在每个工作器上运行,为了知道当前工作器是否是首席工作器,它利用了具有属性 task_type
和 task_id
的集群解析器对象
task_type
告诉您当前作业是什么(例如,'worker'
)。task_id
告诉您工作器的标识符。- 具有
task_id == 0
的工作器被指定为首席工作器。
在下面的代码片段中,write_filepath
函数提供要写入的文件路径,该路径取决于工作器的 task_id
- 对于首席工作器(具有
task_id == 0
),它写入原始文件路径。 - 对于其他工作器,它创建一个临时目录——
temp_dir
——在目录路径中使用task_id
来写入
model_path = '/tmp/keras-model'
def _is_chief(task_type, task_id):
# Note: there are two possible `TF_CONFIG` configurations.
# 1) In addition to `worker` tasks, a `chief` task type is use;
# in this case, this function should be modified to
# `return task_type == 'chief'`.
# 2) Only `worker` task type is used; in this case, worker 0 is
# regarded as the chief. The implementation demonstrated here
# is for this case.
# For the purpose of this Colab section, the `task_type` is `None` case
# is added because it is effectively run with only a single worker.
return (task_type == 'worker' and task_id == 0) or task_type is None
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
有了它,您现在就可以保存了
multi_worker_model.save(write_model_path)
如上所述,稍后模型应仅从首席工作器保存到的文件路径加载。因此,删除非首席工作器保存的临时文件
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(os.path.dirname(write_model_path))
现在,当您需要加载时,可以使用方便的 tf.keras.models.load_model
API,并继续进行进一步的工作。
这里假设只使用单个工作器来加载和继续训练,在这种情况下,您不会在另一个 strategy.scope()
中调用 tf.keras.models.load_model
(注意 strategy = tf.distribute.MultiWorkerMirroredStrategy()
,如前所述)
loaded_model = tf.keras.models.load_model(model_path)
# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
检查点保存和恢复
另一方面,检查点允许您保存模型的权重并恢复它们,而无需保存整个模型。
在这里,您将创建一个 tf.train.Checkpoint
来跟踪模型,该模型由 tf.train.CheckpointManager
管理,因此只保留最新的检查点
checkpoint_dir = '/tmp/ckpt'
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
设置好 CheckpointManager
后,您现在就可以保存并删除非首席工作器保存的检查点了
checkpoint_manager.save()
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(write_checkpoint_dir)
现在,当您需要恢复模型时,可以使用方便的 tf.train.latest_checkpoint
函数找到保存的最新检查点。恢复检查点后,您可以继续训练。
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
BackupAndRestore
回调
tf.keras.callbacks.BackupAndRestore
回调通过在 BackupAndRestore
的 backup_dir
参数下备份模型和当前训练状态到临时检查点文件来提供容错功能。
一旦作业被打断并重新启动,BackupAndRestore
回调将恢复最后一个检查点,您可以从上次保存训练状态的 epoch 和 step 的开头继续训练。
要使用它,请在 Model.fit
调用中提供 tf.keras.callbacks.BackupAndRestore
的实例。
使用 MultiWorkerMirroredStrategy
,如果一个 worker 被中断,整个集群将暂停,直到被中断的 worker 重新启动。其他 worker 也将重新启动,被中断的 worker 将重新加入集群。然后,每个 worker 将读取之前保存的检查点文件,并恢复其以前的状态,从而使集群重新同步。然后,训练将继续。分布式数据集迭代器状态将被重新初始化,不会被恢复。
The BackupAndRestore
回调使用 CheckpointManager
来保存和恢复训练状态,它会生成一个名为 checkpoint 的文件,该文件跟踪现有的检查点以及最新的检查点。因此,backup_dir
不应重复使用来存储其他检查点,以避免名称冲突。
目前,BackupAndRestore
回调支持没有策略的单 worker 训练 - MirroredStrategy
- 以及使用 MultiWorkerMirroredStrategy
的多 worker 训练。
以下是多 worker 训练和单 worker 训练的两个示例
# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback. The training state
# is backed up at epoch boundaries by default.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
如果 BackupAndRestore
回调中的 save_freq
参数设置为 'epoch'
,则模型将在每个 epoch 之后备份。
# The training state is backed up at epoch boundaries because `save_freq` is
# set to `epoch`.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
如果 BackupAndRestore
回调中的 save_freq
参数设置为大于 0
的整数,则模型将在每 save_freq
个批次之后备份。
# The training state is backed up at every 30 steps because `save_freq` is set
# to an integer value of `30`.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup', save_freq=30)]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
如果您检查在 BackupAndRestore
中指定的 backup_dir
目录,您可能会注意到一些临时生成的检查点文件。这些文件是用于恢复之前丢失的实例,它们将在您的训练成功退出后,由库在 Model.fit
结束时被删除。
其他资源
- The TensorFlow 中的分布式训练 指南概述了可用的分布式策略。
- The 使用 Keras 和 MultiWorkerMirroredStrategy 的自定义训练循环 教程展示了如何将
MultiWorkerMirroredStrategy
与 Keras 和自定义训练循环一起使用。 - 查看 官方模型,其中许多模型可以配置为运行多个分布式策略。
- The 使用 tf.function 提高性能 指南提供了有关其他策略和工具的信息,例如 TensorFlow Profiler,您可以使用它来优化 TensorFlow 模型的性能。