在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
概述
参数服务器训练 是一种常见的 数据并行方法,用于在多台机器上扩展模型训练。
参数服务器训练集群由 工作器 和 参数服务器 组成。变量在参数服务器上创建,并在每一步由工作器读取和更新。默认情况下,工作器独立地读取和更新这些变量,而不会彼此同步。这就是为什么有时参数服务器式训练被称为 异步训练。
在 TensorFlow 2 中,参数服务器训练由 tf.distribute.ParameterServerStrategy
类提供支持,该类将训练步骤分配到一个集群,该集群可以扩展到数千个工作器(以及参数服务器)。
支持的训练方法
主要支持两种训练方法
- Keras
Model.fit
API:如果您更喜欢高级抽象和训练处理。如果您正在训练一个tf.keras.Model
,通常建议使用此方法。 - 自定义训练循环:如果您更喜欢定义训练循环的细节(您可以参考关于 自定义训练、从头开始编写训练循环 和 使用 Keras 和 MultiWorkerMirroredStrategy 的自定义训练循环 的指南,了解更多详细信息)。
具有作业和任务的集群
无论您选择哪种 API (Model.fit
或自定义训练循环),TensorFlow 2 中的分布式训练都涉及一个 'cluster'
,其中包含多个 'jobs'
,每个作业可能包含一个或多个 'tasks'
。
使用参数服务器训练时,建议您拥有
- 一个 协调器 作业(作业名称为
chief
) - 多个 工作器 作业(作业名称为
worker
) - 多个 参数服务器 作业(作业名称为
ps
)
协调器 创建资源、调度训练任务、写入检查点并处理任务故障。工作器 和 参数服务器 运行 tf.distribute.Server
实例,这些实例监听来自协调器的请求。
使用 Model.fit
API 的参数服务器训练
使用 Model.fit
API 的参数服务器训练要求协调器使用 tf.distribute.ParameterServerStrategy
对象。类似于 Model.fit
在没有策略或使用其他策略的情况下使用,工作流程涉及创建和编译模型、准备回调以及调用 Model.fit
。
使用自定义训练循环的参数服务器训练
对于自定义训练循环,tf.distribute.coordinator.ClusterCoordinator
类是协调器使用的关键组件。
ClusterCoordinator
类需要与tf.distribute.ParameterServerStrategy
对象一起使用。- 此
tf.distribute.Strategy
对象需要提供集群的信息,并用于定义训练步骤,如 使用 tf.distribute.Strategy 的自定义训练 中所示。 ClusterCoordinator
对象随后将这些训练步骤的执行调度到远程工作器。
ClusterCoordinator
对象提供的最重要的 API 是 schedule
schedule
API 将一个tf.function
入队,并立即返回一个类似未来的RemoteValue
。- 排队的函数将在后台线程中调度到远程工作器,它们的
RemoteValue
将异步填充。 - 由于
schedule
不需要工作器分配,因此传入的tf.function
可以执行在任何可用的工作器上。 - 如果执行它的工作器在其完成之前变得不可用,则该函数将在另一个可用的工作器上重试。
- 由于这个事实以及函数执行不是原子的这一事实,单个函数调用可能会执行多次。
除了调度远程函数之外,ClusterCoordinator
还帮助在所有工作器上创建数据集,并在工作器从故障中恢复时重建这些数据集。
教程设置
本教程将分支到 Model.fit
和自定义训练循环路径,您可以选择适合您需求的路径。除“使用 X 训练”以外的部分适用于这两种路径。
pip install portpicker
集群设置
如上所述,参数服务器训练集群需要一个运行训练程序的协调器任务,一个或多个运行 TensorFlow 服务器的工作器和参数服务器任务——tf.distribute.Server
——以及可能还需要一个运行辅助评估的额外评估任务(请参阅下面的 辅助评估部分)。设置它们的必要条件是
- 协调器任务需要知道所有其他 TensorFlow 服务器的地址和端口,除了评估器。
- 工作器和参数服务器需要知道它们需要监听哪个端口。为了简单起见,您通常可以在这些任务上创建 TensorFlow 服务器时传入完整的集群信息。
- 评估器任务不必知道训练集群的设置。如果知道,它不应该尝试连接到训练集群。
- 工作器和参数服务器的任务类型应分别为
"worker"
和"ps"
。出于遗留原因,协调器应使用"chief"
作为任务类型。
在本教程中,您将创建一个进程内集群,以便整个参数服务器训练可以在 Colab 中运行。您将在后面的部分了解如何在 真实集群 中进行设置。
进程内集群
您将首先预先创建多个 TensorFlow 服务器,然后您将连接到它们。请注意,这仅用于本教程演示的目的,在实际训练中,服务器将在 "worker"
和 "ps"
机器上启动。
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec,
job_name="worker",
task_index=i,
config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec,
job_name="ps",
task_index=i,
protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
return cluster_resolver
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
进程内集群设置经常用于单元测试,例如 这里。
本地测试的另一个选择是在本地机器上启动进程——查看 使用 Keras 的多工作器训练,了解此方法的示例。
实例化 ParameterServerStrategy
在深入研究训练代码之前,让我们实例化一个 tf.distribute.ParameterServerStrategy
对象。请注意,无论您是继续使用 Model.fit
还是自定义训练循环,都需要此对象。variable_partitioner
参数将在 变量分片部分 中解释。
variable_partitioner = (
tf.distribute.experimental.partitioners.MinSizePartitioner(
min_shard_bytes=(256 << 10),
max_shards=NUM_PS))
strategy = tf.distribute.ParameterServerStrategy(
cluster_resolver,
variable_partitioner=variable_partitioner)
为了使用 GPU 进行训练,请分配每个工作器可见的 GPU。ParameterServerStrategy
将使用每个工作器上所有可用的 GPU,但限制是所有工作器应具有相同数量的可用 GPU。
变量分片
变量分片是指将一个变量拆分为多个较小的变量,这些变量称为 分片。当访问这些分片时,变量分片可能有助于分发网络负载。它还有助于将普通变量的计算和存储分布到多个参数服务器上,例如,当使用可能不适合单个机器内存的非常大的嵌入时。
要启用变量分片,您可以在构造 ParameterServerStrategy
对象时传入一个 variable_partitioner
。每次创建变量时都会调用 variable_partitioner
,并且预计它将返回变量每个维度上的分片数量。提供了一些开箱即用的 variable_partitioner
,例如 tf.distribute.experimental.partitioners.MinSizePartitioner
。建议使用基于大小的分片器,例如 tf.distribute.experimental.partitioners.MinSizePartitioner
,以避免对小变量进行分片,这可能会对模型训练速度产生负面影响。
当传入 variable_partitioner
时,并且您直接在 Strategy.scope
下创建变量时,该变量将成为一个容器类型,具有一个 variables
属性,该属性提供对分片列表的访问。在大多数情况下,此容器将通过连接所有分片自动转换为张量。因此,它可以用作普通变量。另一方面,某些 TensorFlow 方法(例如 tf.nn.embedding_lookup
)为这种容器类型提供了高效的实现,并且在这些方法中将避免自动连接。
有关更多详细信息,请参阅 tf.distribute.ParameterServerStrategy
的 API 文档。
使用 Model.fit
训练
Keras 通过 Model.fit
提供了一个易于使用的训练 API,该 API 在幕后处理训练循环,并具有可覆盖的 train_step
和回调的灵活性,这些回调提供诸如检查点保存或 TensorBoard 的摘要保存等功能。使用 Model.fit
,可以使用相同的训练代码与其他策略一起使用,只需简单地交换策略对象即可。
输入数据
Keras Model.fit
与 tf.distribute.ParameterServerStrategy
一起使用时,可以接受以 tf.data.Dataset
、tf.distribute.DistributedDataset
或 tf.keras.utils.experimental.DatasetCreator
形式的输入数据,其中 Dataset
是推荐的选项,因为它易于使用。但是,如果您在使用 Dataset
时遇到内存问题,则可能需要使用 DatasetCreator
,并使用可调用的 dataset_fn
参数(有关详细信息,请参阅 tf.keras.utils.experimental.DatasetCreator
的 API 文档)。
如果您将数据集转换为 tf.data.Dataset
,则应使用 Dataset.shuffle
和 Dataset.repeat
,如下面的代码示例所示。
- 使用参数服务器训练的 Keras
Model.fit
假设每个工作器接收相同的数据集,除非数据集以不同的方式进行洗牌。因此,通过调用Dataset.shuffle
,您可以确保对数据的更均匀迭代。 - 由于工作器不会同步,因此它们可能在不同的时间完成处理其数据集。因此,使用参数服务器训练定义 epoch 的最简单方法是使用
Dataset.repeat
(在没有参数的情况下调用时,它会无限期地重复数据集),并在Model.fit
调用中指定steps_per_epoch
参数。
有关 shuffle
和 repeat
的更多详细信息,请参阅 tf.data 指南 的“训练工作流”部分。
global_batch_size = 64
x = tf.random.uniform((10, 10))
y = tf.random.uniform((10,))
dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
dataset = dataset.batch(global_batch_size)
dataset = dataset.prefetch(2)
如果您改为使用 tf.keras.utils.experimental.DatasetCreator
创建数据集,则 dataset_fn
中的代码将在每个工作器机器上的输入设备(通常是 CPU)上调用。
模型构建和编译
现在,您将创建一个 tf.keras.Model
(一个简单的 tf.keras.models.Sequential
模型,用于演示目的),然后调用 Model.compile
来合并组件,例如优化器、指标和其他参数,例如 steps_per_execution
。
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])
model.compile(tf.keras.optimizers.legacy.SGD(), loss="mse", steps_per_execution=10)
回调和训练
在您调用 Keras Model.fit
进行实际训练之前,请准备任何需要的 回调,以执行常见任务,例如
tf.keras.callbacks.ModelCheckpoint
:以一定频率保存模型,例如在每个 epoch 之后。tf.keras.callbacks.BackupAndRestore
:通过备份模型和当前 epoch 编号来提供容错能力,如果集群遇到不可用情况(例如中止或抢占)。然后,您可以在作业失败后从重启中恢复训练状态,并从中断的 epoch 开始继续训练。tf.keras.callbacks.TensorBoard
:定期将模型日志写入摘要文件,这些文件可以在 TensorBoard 工具中可视化。
working_dir = "/tmp/my_working_dir"
log_dir = os.path.join(working_dir, "log")
ckpt_filepath = os.path.join(working_dir, "ckpt")
backup_dir = os.path.join(working_dir, "backup")
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=log_dir),
tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dataset, epochs=5, steps_per_epoch=20, callbacks=callbacks)
直接使用 ClusterCoordinator
(可选)
即使您选择 Model.fit
训练路径,您也可以选择实例化一个 tf.distribute.coordinator.ClusterCoordinator
对象,以调度您希望在工作器上执行的其他函数。有关更多详细信息和示例,请参阅 使用自定义训练循环进行训练 部分。
使用自定义训练循环进行训练
使用 tf.distribute.Strategy
的自定义训练循环提供了定义训练循环的极大灵活性。使用上面定义的 ParameterServerStrategy
(作为 strategy
),您将使用 tf.distribute.coordinator.ClusterCoordinator
来调度训练步骤的执行到远程工作器。
然后,您将创建一个模型,定义一个数据集,并定义一个步骤函数,就像您在使用其他 tf.distribute.Strategy
进行训练循环时所做的那样。您可以在 使用 tf.distribute.Strategy 进行自定义训练 教程中找到更多详细信息。
为了确保有效的数据集预取,请使用下面 调度训练步骤到远程工作器 部分中提到的推荐的分布式数据集创建 API。此外,请确保在 worker_fn
中调用 Strategy.run
,以充分利用分配给工作器的 GPU。对于使用或不使用 GPU 进行训练,其余步骤相同。
让我们按照以下步骤创建这些组件
设置数据
首先,编写一个创建数据集的函数。
如果您希望使用 Keras 预处理层 或 Tensorflow Transform 层 对数据进行预处理,请在 **dataset_fn
外部** 和 **Strategy.scope
下** 创建这些层,就像您对任何其他 Keras 层所做的那样。这是因为 dataset_fn
将被包装到 tf.function
中,然后在每个工作器上执行以生成数据管道。
如果您不遵循上述步骤,创建层可能会创建 Tensorflow 状态,这些状态将从 tf.function
提升到协调器。因此,在工作器上访问它们将导致协调器和工作器之间重复的 RPC 调用,并导致显著的减速。
将层放置在 Strategy.scope
下将改为在所有工作器上创建它们。然后,您将通过 tf.data.Dataset.map
在 dataset_fn
中应用转换。有关使用分布式输入进行数据预处理的更多信息,请参阅 分布式输入 教程中的“数据预处理”。
feature_vocab = [
"avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]
with strategy.scope():
feature_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=feature_vocab,
mask_token=None)
label_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=label_vocab,
num_oov_indices=0,
mask_token=None)
raw_feature_input = tf.keras.layers.Input(
shape=(3,),
dtype=tf.string,
name="feature")
feature_id_input = feature_lookup_layer(raw_feature_input)
feature_preprocess_stage = tf.keras.Model(
{"features": raw_feature_input},
feature_id_input)
raw_label_input = tf.keras.layers.Input(
shape=(1,),
dtype=tf.string,
name="label")
label_id_input = label_lookup_layer(raw_label_input)
label_preprocess_stage = tf.keras.Model(
{"label": raw_label_input},
label_id_input)
在数据集中生成玩具示例
def feature_and_label_gen(num_examples=200):
examples = {"features": [], "label": []}
for _ in range(num_examples):
features = random.sample(feature_vocab, 3)
label = ["yes"] if "avenger" in features else ["no"]
examples["features"].append(features)
examples["label"].append(label)
return examples
examples = feature_and_label_gen()
然后,创建一个包装在 dataset_fn
中的训练数据集
def dataset_fn(_):
raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
train_dataset = raw_dataset.map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(200).batch(32).repeat()
return train_dataset
构建模型
接下来,创建模型和其他对象。确保在 Strategy.scope
下创建所有变量。
# These variables created under the `Strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
# Create the model. The input needs to be compatible with Keras processing layers.
model_input = tf.keras.layers.Input(
shape=(3,), dtype=tf.int64, name="model_input")
emb_layer = tf.keras.layers.Embedding(
input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
dense_output = tf.keras.layers.Dense(
units=1, activation="sigmoid",
kernel_regularizer=tf.keras.regularizers.L2(1e-4),
)(emb_output)
model = tf.keras.Model({"features": model_input}, dense_output)
optimizer = tf.keras.optimizers.legacy.RMSprop(learning_rate=0.1)
accuracy = tf.keras.metrics.Accuracy()
让我们确认 FixedShardsPartitioner
的使用将所有变量拆分为两个分片,并且每个分片都分配给不同的参数服务器
assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
print(emb_layer.weights[0].device)
print(emb_layer.weights[1].device)
定义训练步骤
第三,创建一个包装在 tf.function
中的训练步骤
@tf.function
def step_fn(iterator):
def replica_fn(batch_data, labels):
with tf.GradientTape() as tape:
pred = model(batch_data, training=True)
per_example_loss = tf.keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
model_losses = model.losses
if model_losses:
loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
accuracy.update_state(labels, actual_pred)
return loss
batch_data, labels = next(iterator)
losses = strategy.run(replica_fn, args=(batch_data, labels))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
在上面的训练步骤函数中,在 step_fn
中调用 Strategy.run
和 Strategy.reduce
可以支持每个工作器上的多个 GPU。如果工作器分配了 GPU,Strategy.run
将在多个副本(GPU)上分配数据集。它们对 tf.nn.compute_average_loss()
的并行调用计算一个工作器副本(GPU)上的损失平均值,独立于工作器的总数。
调度训练步骤到远程工作器
在 ParameterServerStrategy
定义所有计算之后,您将使用 tf.distribute.coordinator.ClusterCoordinator
类来创建资源并将训练步骤分配到远程工作器。
让我们首先创建一个 ClusterCoordinator
对象,并将策略对象传递给它
coordinator = tf.distribute.coordinator.ClusterCoordinator(strategy)
然后,使用 ClusterCoordinator.create_per_worker_dataset
API 创建一个每个工作器的数据集和一个迭代器,该 API 将数据集复制到所有工作器。在下面的 per_worker_dataset_fn
中,建议将 dataset_fn
包装到 strategy.distribute_datasets_from_function
中,以允许有效地将预取无缝地分配到 GPU。
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
最后一步是使用 ClusterCoordinator.schedule
将计算分配到远程工作器
- 方法
schedule
将一个tf.function
加入队列,并立即返回一个类似于未来的RemoteValue
。排队的函数将在后台线程中分发到远程工作器,并且RemoteValue
将异步填充。 - 方法
join
(ClusterCoordinator.join
) 可用于等待所有已安排的函数执行完毕。
num_epochs = 4
steps_per_epoch = 5
for i in range(num_epochs):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
以下是获取 RemoteValue
结果的方法
loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print("Final loss is %f" % loss.fetch())
或者,您可以启动所有步骤,并在等待完成时执行其他操作
for _ in range(total_steps):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
time.sleep(10)
# Do something like logging metrics or writing checkpoints.
有关此特定示例的完整训练和服务工作流程,请查看此 测试。
有关数据集创建的更多信息
上面代码中的数据集是使用 ClusterCoordinator.create_per_worker_dataset
API 创建的。它为每个工作器创建一个数据集,并返回一个容器对象。您可以对其调用 iter
方法以创建每个工作器的迭代器。每个工作器的迭代器包含每个工作器的一个迭代器,并且相应的工作器切片将被替换为传递给 ClusterCoordinator.schedule
方法的函数的输入参数中,在函数在特定工作器上执行之前。
方法 ClusterCoordinator.schedule
假设工作器是等效的,因此假设不同工作器上的数据集是相同的(除了它们可能以不同的方式进行洗牌)。因此,建议重复数据集,并安排有限数量的步骤,而不是依赖于从数据集接收 OutOfRangeError
。
另一个重要说明是,tf.data
数据集不支持跨任务边界的隐式序列化和反序列化。因此,重要的是在传递给 ClusterCoordinator.create_per_worker_dataset
的函数内部创建整个数据集。 create_per_worker_dataset
API 也可以直接接受 tf.data.Dataset
或 tf.distribute.DistributedDataset
作为输入。
评估
使用 tf.distribute.ParameterServerStrategy
训练执行评估的两种主要方法是内联评估和辅助评估。每种方法都有其自身的优缺点,如下所述。如果您没有偏好,建议使用内联评估方法。对于使用 Model.fit
的用户, Model.evaluate
在后台使用内联(分布式)评估。
内联评估
在这种方法中,协调器在训练和评估之间交替,因此称为内联评估。
内联评估有几个好处。例如
- 它可以支持单个任务无法容纳的大型评估模型和评估数据集。
- 评估结果可用于为训练下一个时期做出决策,例如是否提前停止训练。
有两种方法可以实现内联评估:直接评估和分布式评估。
- 直接评估:对于小型模型和评估数据集,协调器可以在分布式模型上直接运行评估,并在协调器上使用评估数据集
eval_dataset = tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).batch(8)
eval_accuracy = tf.keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
print("Evaluation accuracy: %f" % eval_accuracy.result())
- 分布式评估:对于在协调器上直接运行不可行的大型模型或数据集,协调器任务可以通过
ClusterCoordinator.schedule
/ClusterCoordinator.join
方法将评估任务分发到工作器
with strategy.scope():
# Define the eval metric on parameter servers.
eval_accuracy = tf.keras.metrics.Accuracy()
@tf.function
def eval_step(iterator):
def replica_fn(batch_data, labels):
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
batch_data, labels = next(iterator)
strategy.run(replica_fn, args=(batch_data, labels))
def eval_dataset_fn():
return tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(16).repeat().batch(8)
per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)
eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print("Evaluation accuracy: %f" % eval_accuracy.result())
启用完全一次评估
方法 schedule
和 join
的 tf.distribute.coordinator.ClusterCoordinator
默认情况下不支持访问保证或完全一次语义。换句话说,在上面的示例中,不能保证数据集中的所有评估示例都将被完全评估一次;有些可能不会被访问,有些可能被评估多次。
完全一次评估可能是首选,以减少跨时期的评估方差,并改进通过提前停止、超参数调整或其他方法完成的模型选择。有不同的方法可以启用完全一次评估
- 使用工作流程
Model.fit/.evaluate
,可以通过向Model.compile
添加参数来启用它。请参阅pss_evaluation_shards
参数的文档。 - API
tf.data
服务可用于在使用ParameterServerStrategy
时为评估提供完全一次访问(请参阅tf.data.experimental.service
API 文档的动态分片部分)。 - 辅助评估 默认情况下提供完全一次评估,因为评估是在一台机器上进行的。但是,这可能比在许多工作器上分布式执行评估要慢得多。
第一个选项,使用 Model.compile
,是大多数用户的建议解决方案。
完全一次评估有一些限制
- 不支持使用完全一次访问保证编写自定义分布式评估循环。如果您需要对此的支持,请提交 GitHub 问题。
- 它不能自动处理使用 API
Layer.add_metric
的指标的计算。这些指标应从评估中排除,或重新制作成Metric
对象。
辅助评估
在 tf.distribute.ParameterServerStrategy
训练中定义和运行评估循环的另一种方法称为辅助评估,您将在其中创建一个专用的评估器任务,该任务会重复读取检查点并在最新检查点上运行评估(有关检查点的更多详细信息,请参阅 本指南)。协调器和工作器任务不会花费任何时间进行评估,因此对于固定数量的迭代,整体训练时间应该比使用其他评估方法更短。但是,它需要一个额外的评估器任务和定期检查点以触发评估。
要为辅助评估编写评估循环,您有两个选择
- 使用 API
tf.keras.utils.SidecarEvaluator
。 - 创建一个自定义评估循环。
有关选项 1 的更多详细信息,请参阅 API tf.keras.utils.SidecarEvaluator
文档。
辅助评估仅支持单个任务。这意味着
保证每个示例都被评估一次。如果评估器被抢占或重新启动,它只需从最新的检查点重新启动评估循环,并且在重新启动之前完成的部分评估进度将被丢弃。
但是,在单个任务上运行评估意味着完整的评估可能需要很长时间。
如果模型的大小太大而无法放入评估器的内存中,则单个辅助评估不适用。
另一个需要注意的是,实现 tf.keras.utils.SidecarEvaluator
以及下面的自定义评估循环可能会跳过一些检查点,因为它总是选择最新的可用检查点,并且在评估时期,训练集群可能会生成多个检查点。您可以编写一个评估每个检查点的自定义评估循环,但这在本教程中没有介绍。另一方面,如果检查点的生成频率低于运行评估所需的时间,它可能会处于空闲状态。
自定义评估循环提供了对细节的更多控制,例如选择要评估的检查点,或提供与评估一起运行的任何其他逻辑。以下是一个可能的自定义辅助评估循环
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)
for latest_checkpoint in tf.train.checkpoints_iterator(
checkpoint_dir):
try:
checkpoint.restore(latest_checkpoint).expect_partial()
except (tf.errors.OpError,) as e:
# checkpoint may be deleted by training when it is about to read it.
continue
# Optionally add callbacks to write summaries.
eval_model.evaluate(eval_data)
# Evaluation finishes when it has evaluated the last epoch.
if latest_checkpoint.endswith('-{}'.format(train_epochs)):
break
现实世界中的集群
在实际生产环境中,您将在不同机器上的不同进程中运行所有任务。在每个任务上配置集群信息的简便方法是设置环境变量 "TF_CONFIG"
,并使用 tf.distribute.cluster_resolver.TFConfigClusterResolver
解析 "TF_CONFIG"
。
有关环境变量 "TF_CONFIG"
的一般描述,请参阅 分布式训练 指南中的“设置环境变量 TF_CONFIG
”。
如果您使用 Kubernetes 或其他配置模板启动训练任务,那么这些模板很可能已经为您设置了 “TF_CONFIG"
。
设置环境变量 "TF_CONFIG"
假设您有 3 个工作器和 2 个参数服务器。那么工作器 1 的 "TF_CONFIG"
可以是
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
评估器的 "TF_CONFIG"
可以是
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"evaluator": ["host7:port"]
},
"task": {"type": "evaluator", "index": 0}
})
评估器中上述 "TF_CONFIG"
字符串中的 "cluster"
部分是可选的。
如果您对所有任务使用相同的二进制文件
如果您希望使用单个二进制文件运行所有这些任务,则需要在程序开始时让您的程序分支到不同的角色中
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
# Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
# Run sidecar evaluation
else:
# Run the coordinator.
以下代码启动一个 TensorFlow 服务器并等待,这对角色 "worker"
和 "ps"
有用
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol=cluster_resolver.rpc_layer or "grpc",
start=True)
server.join()
处理任务失败
工作器失败
自定义训练循环 tf.distribute.coordinator.ClusterCoordinator
和方法 Model.fit
都为工作器失败提供了内置的容错能力。工作器恢复后, ClusterCoordinator
会在工作器上调用数据集重新创建。
参数服务器或协调器失败
但是,当协调器看到参数服务器错误时,它会立即引发一个UnavailableError
或AbortedError
。在这种情况下,您可以重新启动协调器。协调器本身也可能变得不可用。因此,建议使用某些工具以避免丢失训练进度。
对于
Model.fit
,您应该使用BackupAndRestore
回调,它会自动处理进度保存和恢复。有关示例,请参见上面的回调和训练部分。对于自定义训练循环,您应该定期检查模型变量,并在训练开始之前从检查点加载模型变量(如果有)。如果检查点包含优化器,则可以从
optimizer.iterations
大致推断出训练进度。
checkpoint_manager = tf.train.CheckpointManager(
tf.train.Checkpoint(model=model, optimizer=optimizer),
checkpoint_dir,
max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint = checkpoint_manager.checkpoint
checkpoint.restore(
checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()
global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch
for _ in range(starting_epoch, num_epochs):
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
coordinator.join()
checkpoint_manager.save()
获取RemoteValue
如果函数成功执行,则保证获取RemoteValue
会成功。这是因为目前在函数执行后,返回值会立即复制到协调器。如果在复制过程中出现任何工作器故障,该函数将在另一个可用的工作器上重试。因此,如果您想优化性能,可以安排没有返回值的函数。
错误报告
一旦协调器看到错误(例如来自参数服务器的UnavailableError
或其他应用程序错误,例如来自tf.debugging.check_numerics
的InvalidArgument
),它将在引发错误之前取消所有挂起和排队的函数。获取其相应的RemoteValue
将引发CancelledError
。
在引发错误后,协调器不会引发相同的错误或来自已取消函数的任何错误。
性能改进
使用tf.distribute.ParameterServerStrategy
和tf.distribute.coordinator.ClusterCoordinator
进行训练时,您可能会遇到性能问题的几个可能原因。
一个常见的原因是参数服务器负载不均衡,一些负载过重的参数服务器已达到容量。也可能存在多个根本原因。缓解此问题的几种简单方法是
- 通过在构建
ParameterServerStrategy
时指定variable_partitioner
来对大型模型变量进行分片。 避免创建所有参数服务器在单个步骤中都需要的一个热点变量,方法是
1) 在优化器中使用恒定学习率或子类化
tf.keras.optimizers.schedules.LearningRateSchedule
。这是因为默认行为是学习率将成为放置在特定参数服务器上的变量,并在每一步由所有其他参数服务器请求);以及2) 使用
tf.keras.optimizers.legacy.Optimizer
(标准tf.keras.optimizers.Optimizer
仍然可能导致热点变量)。在将大型词汇表传递给 Keras 预处理层之前,对其进行混洗。
性能问题的另一个可能原因是协调器。schedule
/join
的实现是基于 Python 的,因此可能存在线程开销。此外,协调器和工作器之间的延迟可能很大。如果是这种情况
对于
Model.fit
,您可以将Model.compile
中提供的steps_per_execution
参数设置为大于 1 的值。对于自定义训练循环,您可以将多个步骤打包到单个
tf.function
中。
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
for _ in range(steps_per_invocation):
features, labels = next(iterator)
def replica_fn(features, labels):
...
strategy.run(replica_fn, args=(features, labels))
随着库的进一步优化,希望大多数用户将来不必手动打包步骤。
此外,提高性能的一个小技巧是安排没有返回值的函数,如上面处理任务失败部分中所述。
已知限制
大多数已知限制已在以上部分中介绍。本节提供一个摘要。
ParameterServerStrategy
通用
- 每个任务(包括协调器)都需要
os.environment["grpc_fail_fast"]="use_caller"
,以使容错正常工作。 - 不支持同步参数服务器训练。
- 通常需要将多个步骤打包到单个函数中才能获得最佳性能。
- 不支持通过
tf.saved_model.load
加载包含分片变量的 saved_model。请注意,使用 TensorFlow Serving 加载此类 saved_model 预计会正常工作(有关详细信息,请参阅服务教程)。 - 不支持在不重启协调器任务的情况下从参数服务器故障中恢复。
- 创建
tf.lookup.StaticHashTable
(通常由一些 Keras 预处理层使用,例如tf.keras.layers.IntegerLookup
、tf.keras.layers.StringLookup
和tf.keras.layers.TextVectorization
)应该放在Strategy.scope
下。否则,资源将被放置在协调器上,并且从工作器到协调器的查找 RPC 会造成性能影响。
Model.fit
细节
Model.fit
中需要steps_per_epoch
参数。您可以选择一个在 epoch 中提供适当间隔的值。ParameterServerStrategy
不支持具有批次级调用的自定义回调,原因是性能问题。您应该将这些调用转换为具有适当选择的steps_per_epoch
的 epoch 级调用,以便它们每steps_per_epoch
个步骤调用一次。内置回调不受影响:它们的批次级调用已修改为具有高性能。正在计划支持ParameterServerStrategy
的批次级调用。- 出于同样的原因,与其他策略不同,进度条和指标仅在 epoch 边界处记录。
- 不支持
run_eagerly
。
自定义训练循环细节
ClusterCoordinator.schedule
通常不支持对数据集的访问保证,尽管可以通过Model.fit/.evaluate
实现对评估的访问保证。请参见启用一次性评估。- 当
ClusterCoordinator.create_per_worker_dataset
与可调用对象一起使用时,整个数据集必须在传递给它的函数内部创建。 - 在由
ClusterCoordinator.create_per_worker_dataset
创建的数据集中,会忽略tf.data.Options
。