在 TensorFlow.org 上查看
|
在 Google Colab 中运行
|
在 GitHub 上查看源代码
|
下载笔记本
|
概述
参数服务器训练 是一种常见的 数据并行方法,用于在多台机器上扩展模型训练。
参数服务器训练集群由 工作器 和 参数服务器 组成。变量在参数服务器上创建,并在每一步由工作器读取和更新。默认情况下,工作器独立地读取和更新这些变量,而不会彼此同步。这就是为什么有时参数服务器式训练被称为 异步训练。
在 TensorFlow 2 中,参数服务器训练由 tf.distribute.ParameterServerStrategy 类提供支持,该类将训练步骤分配到一个集群,该集群可以扩展到数千个工作器(以及参数服务器)。
支持的训练方法
主要支持两种训练方法
- Keras
Model.fitAPI:如果您更喜欢高级抽象和训练处理。如果您正在训练一个tf.keras.Model,通常建议使用此方法。 - 自定义训练循环:如果您更喜欢定义训练循环的细节(您可以参考关于 自定义训练、从头开始编写训练循环 和 使用 Keras 和 MultiWorkerMirroredStrategy 的自定义训练循环 的指南,了解更多详细信息)。
具有作业和任务的集群
无论您选择哪种 API (Model.fit 或自定义训练循环),TensorFlow 2 中的分布式训练都涉及一个 'cluster',其中包含多个 'jobs',每个作业可能包含一个或多个 'tasks'。
使用参数服务器训练时,建议您拥有
- 一个 协调器 作业(作业名称为
chief) - 多个 工作器 作业(作业名称为
worker) - 多个 参数服务器 作业(作业名称为
ps)
协调器 创建资源、调度训练任务、写入检查点并处理任务故障。工作器 和 参数服务器 运行 tf.distribute.Server 实例,这些实例监听来自协调器的请求。
使用 Model.fit API 的参数服务器训练
使用 Model.fit API 的参数服务器训练要求协调器使用 tf.distribute.ParameterServerStrategy 对象。类似于 Model.fit 在没有策略或使用其他策略的情况下使用,工作流程涉及创建和编译模型、准备回调以及调用 Model.fit。
使用自定义训练循环的参数服务器训练
对于自定义训练循环,tf.distribute.coordinator.ClusterCoordinator 类是协调器使用的关键组件。
ClusterCoordinator类需要与tf.distribute.ParameterServerStrategy对象一起使用。- 此
tf.distribute.Strategy对象需要提供集群的信息,并用于定义训练步骤,如 使用 tf.distribute.Strategy 的自定义训练 中所示。 ClusterCoordinator对象随后将这些训练步骤的执行调度到远程工作器。
ClusterCoordinator 对象提供的最重要的 API 是 schedule
scheduleAPI 将一个tf.function入队,并立即返回一个类似未来的RemoteValue。- 排队的函数将在后台线程中调度到远程工作器,它们的
RemoteValue将异步填充。 - 由于
schedule不需要工作器分配,因此传入的tf.function可以执行在任何可用的工作器上。 - 如果执行它的工作器在其完成之前变得不可用,则该函数将在另一个可用的工作器上重试。
- 由于这个事实以及函数执行不是原子的这一事实,单个函数调用可能会执行多次。
除了调度远程函数之外,ClusterCoordinator 还帮助在所有工作器上创建数据集,并在工作器从故障中恢复时重建这些数据集。
教程设置
本教程将分支到 Model.fit 和自定义训练循环路径,您可以选择适合您需求的路径。除“使用 X 训练”以外的部分适用于这两种路径。
pip install portpicker
集群设置
如上所述,参数服务器训练集群需要一个运行训练程序的协调器任务,一个或多个运行 TensorFlow 服务器的工作器和参数服务器任务——tf.distribute.Server——以及可能还需要一个运行辅助评估的额外评估任务(请参阅下面的 辅助评估部分)。设置它们的必要条件是
- 协调器任务需要知道所有其他 TensorFlow 服务器的地址和端口,除了评估器。
- 工作器和参数服务器需要知道它们需要监听哪个端口。为了简单起见,您通常可以在这些任务上创建 TensorFlow 服务器时传入完整的集群信息。
- 评估器任务不必知道训练集群的设置。如果知道,它不应该尝试连接到训练集群。
- 工作器和参数服务器的任务类型应分别为
"worker"和"ps"。出于遗留原因,协调器应使用"chief"作为任务类型。
在本教程中,您将创建一个进程内集群,以便整个参数服务器训练可以在 Colab 中运行。您将在后面的部分了解如何在 真实集群 中进行设置。
进程内集群
您将首先预先创建多个 TensorFlow 服务器,然后您将连接到它们。请注意,这仅用于本教程演示的目的,在实际训练中,服务器将在 "worker" 和 "ps" 机器上启动。
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec,
job_name="worker",
task_index=i,
config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec,
job_name="ps",
task_index=i,
protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
return cluster_resolver
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
进程内集群设置经常用于单元测试,例如 这里。
本地测试的另一个选择是在本地机器上启动进程——查看 使用 Keras 的多工作器训练,了解此方法的示例。
实例化 ParameterServerStrategy
在深入研究训练代码之前,让我们实例化一个 tf.distribute.ParameterServerStrategy 对象。请注意,无论您是继续使用 Model.fit 还是自定义训练循环,都需要此对象。variable_partitioner 参数将在 变量分片部分 中解释。
variable_partitioner = (
tf.distribute.experimental.partitioners.MinSizePartitioner(
min_shard_bytes=(256 << 10),
max_shards=NUM_PS))
strategy = tf.distribute.ParameterServerStrategy(
cluster_resolver,
variable_partitioner=variable_partitioner)
为了使用 GPU 进行训练,请分配每个工作器可见的 GPU。ParameterServerStrategy 将使用每个工作器上所有可用的 GPU,但限制是所有工作器应具有相同数量的可用 GPU。
变量分片
变量分片是指将一个变量拆分为多个较小的变量,这些变量称为 分片。当访问这些分片时,变量分片可能有助于分发网络负载。它还有助于将普通变量的计算和存储分布到多个参数服务器上,例如,当使用可能不适合单个机器内存的非常大的嵌入时。
要启用变量分片,您可以在构造 ParameterServerStrategy 对象时传入一个 variable_partitioner。每次创建变量时都会调用 variable_partitioner,并且预计它将返回变量每个维度上的分片数量。提供了一些开箱即用的 variable_partitioner,例如 tf.distribute.experimental.partitioners.MinSizePartitioner。建议使用基于大小的分片器,例如 tf.distribute.experimental.partitioners.MinSizePartitioner,以避免对小变量进行分片,这可能会对模型训练速度产生负面影响。
当传入 variable_partitioner 时,并且您直接在 Strategy.scope 下创建变量时,该变量将成为一个容器类型,具有一个 variables 属性,该属性提供对分片列表的访问。在大多数情况下,此容器将通过连接所有分片自动转换为张量。因此,它可以用作普通变量。另一方面,某些 TensorFlow 方法(例如 tf.nn.embedding_lookup)为这种容器类型提供了高效的实现,并且在这些方法中将避免自动连接。
有关更多详细信息,请参阅 tf.distribute.ParameterServerStrategy 的 API 文档。
使用 Model.fit 训练
Keras 通过 Model.fit 提供了一个易于使用的训练 API,该 API 在幕后处理训练循环,并具有可覆盖的 train_step 和回调的灵活性,这些回调提供诸如检查点保存或 TensorBoard 的摘要保存等功能。使用 Model.fit,可以使用相同的训练代码与其他策略一起使用,只需简单地交换策略对象即可。
输入数据
Keras Model.fit 与 tf.distribute.ParameterServerStrategy 一起使用时,可以接受以 tf.data.Dataset、tf.distribute.DistributedDataset 或 tf.keras.utils.experimental.DatasetCreator 形式的输入数据,其中 Dataset 是推荐的选项,因为它易于使用。但是,如果您在使用 Dataset 时遇到内存问题,则可能需要使用 DatasetCreator,并使用可调用的 dataset_fn 参数(有关详细信息,请参阅 tf.keras.utils.experimental.DatasetCreator 的 API 文档)。
如果您将数据集转换为 tf.data.Dataset,则应使用 Dataset.shuffle 和 Dataset.repeat,如下面的代码示例所示。
- 使用参数服务器训练的 Keras
Model.fit假设每个工作器接收相同的数据集,除非数据集以不同的方式进行洗牌。因此,通过调用Dataset.shuffle,您可以确保对数据的更均匀迭代。 - 由于工作器不会同步,因此它们可能在不同的时间完成处理其数据集。因此,使用参数服务器训练定义 epoch 的最简单方法是使用
Dataset.repeat(在没有参数的情况下调用时,它会无限期地重复数据集),并在Model.fit调用中指定steps_per_epoch参数。
有关 shuffle 和 repeat 的更多详细信息,请参阅 tf.data 指南 的“训练工作流”部分。
global_batch_size = 64
x = tf.random.uniform((10, 10))
y = tf.random.uniform((10,))
dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
dataset = dataset.batch(global_batch_size)
dataset = dataset.prefetch(2)
如果您改为使用 tf.keras.utils.experimental.DatasetCreator 创建数据集,则 dataset_fn 中的代码将在每个工作器机器上的输入设备(通常是 CPU)上调用。
模型构建和编译
现在,您将创建一个 tf.keras.Model(一个简单的 tf.keras.models.Sequential 模型,用于演示目的),然后调用 Model.compile 来合并组件,例如优化器、指标和其他参数,例如 steps_per_execution。
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])
model.compile(tf.keras.optimizers.legacy.SGD(), loss="mse", steps_per_execution=10)
回调和训练
在您调用 Keras Model.fit 进行实际训练之前,请准备任何需要的 回调,以执行常见任务,例如
tf.keras.callbacks.ModelCheckpoint:以一定频率保存模型,例如在每个 epoch 之后。tf.keras.callbacks.BackupAndRestore:通过备份模型和当前 epoch 编号来提供容错能力,如果集群遇到不可用情况(例如中止或抢占)。然后,您可以在作业失败后从重启中恢复训练状态,并从中断的 epoch 开始继续训练。tf.keras.callbacks.TensorBoard:定期将模型日志写入摘要文件,这些文件可以在 TensorBoard 工具中可视化。
working_dir = "/tmp/my_working_dir"
log_dir = os.path.join(working_dir, "log")
ckpt_filepath = os.path.join(working_dir, "ckpt")
backup_dir = os.path.join(working_dir, "backup")
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=log_dir),
tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dataset, epochs=5, steps_per_epoch=20, callbacks=callbacks)
直接使用 ClusterCoordinator(可选)
即使您选择 Model.fit 训练路径,您也可以选择实例化一个 tf.distribute.coordinator.ClusterCoordinator 对象,以调度您希望在工作器上执行的其他函数。有关更多详细信息和示例,请参阅 使用自定义训练循环进行训练 部分。
使用自定义训练循环进行训练
使用 tf.distribute.Strategy 的自定义训练循环提供了定义训练循环的极大灵活性。使用上面定义的 ParameterServerStrategy(作为 strategy),您将使用 tf.distribute.coordinator.ClusterCoordinator 来调度训练步骤的执行到远程工作器。
然后,您将创建一个模型,定义一个数据集,并定义一个步骤函数,就像您在使用其他 tf.distribute.Strategy 进行训练循环时所做的那样。您可以在 使用 tf.distribute.Strategy 进行自定义训练 教程中找到更多详细信息。
为了确保有效的数据集预取,请使用下面 调度训练步骤到远程工作器 部分中提到的推荐的分布式数据集创建 API。此外,请确保在 worker_fn 中调用 Strategy.run,以充分利用分配给工作器的 GPU。对于使用或不使用 GPU 进行训练,其余步骤相同。
让我们按照以下步骤创建这些组件
设置数据
首先,编写一个创建数据集的函数。
如果您希望使用 Keras 预处理层 或 Tensorflow Transform 层 对数据进行预处理,请在 **dataset_fn 外部** 和 **Strategy.scope 下** 创建这些层,就像您对任何其他 Keras 层所做的那样。这是因为 dataset_fn 将被包装到 tf.function 中,然后在每个工作器上执行以生成数据管道。
如果您不遵循上述步骤,创建层可能会创建 Tensorflow 状态,这些状态将从 tf.function 提升到协调器。因此,在工作器上访问它们将导致协调器和工作器之间重复的 RPC 调用,并导致显著的减速。
将层放置在 Strategy.scope 下将改为在所有工作器上创建它们。然后,您将通过 tf.data.Dataset.map 在 dataset_fn 中应用转换。有关使用分布式输入进行数据预处理的更多信息,请参阅 分布式输入 教程中的“数据预处理”。
feature_vocab = [
"avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]
with strategy.scope():
feature_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=feature_vocab,
mask_token=None)
label_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=label_vocab,
num_oov_indices=0,
mask_token=None)
raw_feature_input = tf.keras.layers.Input(
shape=(3,),
dtype=tf.string,
name="feature")
feature_id_input = feature_lookup_layer(raw_feature_input)
feature_preprocess_stage = tf.keras.Model(
{"features": raw_feature_input},
feature_id_input)
raw_label_input = tf.keras.layers.Input(
shape=(1,),
dtype=tf.string,
name="label")
label_id_input = label_lookup_layer(raw_label_input)
label_preprocess_stage = tf.keras.Model(
{"label": raw_label_input},
label_id_input)
在数据集中生成玩具示例
def feature_and_label_gen(num_examples=200):
examples = {"features": [], "label": []}
for _ in range(num_examples):
features = random.sample(feature_vocab, 3)
label = ["yes"] if "avenger" in features else ["no"]
examples["features"].append(features)
examples["label"].append(label)
return examples
examples = feature_and_label_gen()
然后,创建一个包装在 dataset_fn 中的训练数据集
def dataset_fn(_):
raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
train_dataset = raw_dataset.map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(200).batch(32).repeat()
return train_dataset
构建模型
接下来,创建模型和其他对象。确保在 Strategy.scope 下创建所有变量。
# These variables created under the `Strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
# Create the model. The input needs to be compatible with Keras processing layers.
model_input = tf.keras.layers.Input(
shape=(3,), dtype=tf.int64, name="model_input")
emb_layer = tf.keras.layers.Embedding(
input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
dense_output = tf.keras.layers.Dense(
units=1, activation="sigmoid",
kernel_regularizer=tf.keras.regularizers.L2(1e-4),
)(emb_output)
model = tf.keras.Model({"features": model_input}, dense_output)
optimizer = tf.keras.optimizers.legacy.RMSprop(learning_rate=0.1)
accuracy = tf.keras.metrics.Accuracy()
让我们确认 FixedShardsPartitioner 的使用将所有变量拆分为两个分片,并且每个分片都分配给不同的参数服务器
assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
print(emb_layer.weights[0].device)
print(emb_layer.weights[1].device)
定义训练步骤
第三,创建一个包装在 tf.function 中的训练步骤
@tf.function
def step_fn(iterator):
def replica_fn(batch_data, labels):
with tf.GradientTape() as tape:
pred = model(batch_data, training=True)
per_example_loss = tf.keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
model_losses = model.losses
if model_losses:
loss += tf.nn.scale_regularization_loss(tf.add_n(model_losses))
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
accuracy.update_state(labels, actual_pred)
return loss
batch_data, labels = next(iterator)
losses = strategy.run(replica_fn, args=(batch_data, labels))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
在上面的训练步骤函数中,在 step_fn 中调用 Strategy.run 和 Strategy.reduce 可以支持每个工作器上的多个 GPU。如果工作器分配了 GPU,Strategy.run 将在多个副本(GPU)上分配数据集。它们对 tf.nn.compute_average_loss() 的并行调用计算一个工作器副本(GPU)上的损失平均值,独立于工作器的总数。
调度训练步骤到远程工作器
在 ParameterServerStrategy 定义所有计算之后,您将使用 tf.distribute.coordinator.ClusterCoordinator 类来创建资源并将训练步骤分配到远程工作器。
让我们首先创建一个 ClusterCoordinator 对象,并将策略对象传递给它
coordinator = tf.distribute.coordinator.ClusterCoordinator(strategy)
然后,使用 ClusterCoordinator.create_per_worker_dataset API 创建一个每个工作器的数据集和一个迭代器,该 API 将数据集复制到所有工作器。在下面的 per_worker_dataset_fn 中,建议将 dataset_fn 包装到 strategy.distribute_datasets_from_function 中,以允许有效地将预取无缝地分配到 GPU。
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
最后一步是使用 ClusterCoordinator.schedule 将计算分配到远程工作器
- 方法
schedule将一个tf.function加入队列,并立即返回一个类似于未来的RemoteValue。排队的函数将在后台线程中分发到远程工作器,并且RemoteValue将异步填充。 - 方法
join(ClusterCoordinator.join) 可用于等待所有已安排的函数执行完毕。
num_epochs = 4
steps_per_epoch = 5
for i in range(num_epochs):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
以下是获取 RemoteValue 结果的方法
loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print("Final loss is %f" % loss.fetch())
或者,您可以启动所有步骤,并在等待完成时执行其他操作
for _ in range(total_steps):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
time.sleep(10)
# Do something like logging metrics or writing checkpoints.
有关此特定示例的完整训练和服务工作流程,请查看此 测试。
有关数据集创建的更多信息
上面代码中的数据集是使用 ClusterCoordinator.create_per_worker_dataset API 创建的。它为每个工作器创建一个数据集,并返回一个容器对象。您可以对其调用 iter 方法以创建每个工作器的迭代器。每个工作器的迭代器包含每个工作器的一个迭代器,并且相应的工作器切片将被替换为传递给 ClusterCoordinator.schedule 方法的函数的输入参数中,在函数在特定工作器上执行之前。
方法 ClusterCoordinator.schedule 假设工作器是等效的,因此假设不同工作器上的数据集是相同的(除了它们可能以不同的方式进行洗牌)。因此,建议重复数据集,并安排有限数量的步骤,而不是依赖于从数据集接收 OutOfRangeError。
另一个重要说明是,tf.data 数据集不支持跨任务边界的隐式序列化和反序列化。因此,重要的是在传递给 ClusterCoordinator.create_per_worker_dataset 的函数内部创建整个数据集。 create_per_worker_dataset API 也可以直接接受 tf.data.Dataset 或 tf.distribute.DistributedDataset 作为输入。
评估
使用 tf.distribute.ParameterServerStrategy 训练执行评估的两种主要方法是内联评估和辅助评估。每种方法都有其自身的优缺点,如下所述。如果您没有偏好,建议使用内联评估方法。对于使用 Model.fit 的用户, Model.evaluate 在后台使用内联(分布式)评估。
内联评估
在这种方法中,协调器在训练和评估之间交替,因此称为内联评估。
内联评估有几个好处。例如
- 它可以支持单个任务无法容纳的大型评估模型和评估数据集。
- 评估结果可用于为训练下一个时期做出决策,例如是否提前停止训练。
有两种方法可以实现内联评估:直接评估和分布式评估。
- 直接评估:对于小型模型和评估数据集,协调器可以在分布式模型上直接运行评估,并在协调器上使用评估数据集
eval_dataset = tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).batch(8)
eval_accuracy = tf.keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
print("Evaluation accuracy: %f" % eval_accuracy.result())
- 分布式评估:对于在协调器上直接运行不可行的大型模型或数据集,协调器任务可以通过
ClusterCoordinator.schedule/ClusterCoordinator.join方法将评估任务分发到工作器
with strategy.scope():
# Define the eval metric on parameter servers.
eval_accuracy = tf.keras.metrics.Accuracy()
@tf.function
def eval_step(iterator):
def replica_fn(batch_data, labels):
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
batch_data, labels = next(iterator)
strategy.run(replica_fn, args=(batch_data, labels))
def eval_dataset_fn():
return tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(16).repeat().batch(8)
per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)
eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print("Evaluation accuracy: %f" % eval_accuracy.result())
启用完全一次评估
方法 schedule 和 join 的 tf.distribute.coordinator.ClusterCoordinator 默认情况下不支持访问保证或完全一次语义。换句话说,在上面的示例中,不能保证数据集中的所有评估示例都将被完全评估一次;有些可能不会被访问,有些可能被评估多次。
完全一次评估可能是首选,以减少跨时期的评估方差,并改进通过提前停止、超参数调整或其他方法完成的模型选择。有不同的方法可以启用完全一次评估
- 使用工作流程
Model.fit/.evaluate,可以通过向Model.compile添加参数来启用它。请参阅pss_evaluation_shards参数的文档。 - API
tf.data服务可用于在使用ParameterServerStrategy时为评估提供完全一次访问(请参阅tf.data.experimental.serviceAPI 文档的动态分片部分)。 - 辅助评估 默认情况下提供完全一次评估,因为评估是在一台机器上进行的。但是,这可能比在许多工作器上分布式执行评估要慢得多。
第一个选项,使用 Model.compile,是大多数用户的建议解决方案。
完全一次评估有一些限制
- 不支持使用完全一次访问保证编写自定义分布式评估循环。如果您需要对此的支持,请提交 GitHub 问题。
- 它不能自动处理使用 API
Layer.add_metric的指标的计算。这些指标应从评估中排除,或重新制作成Metric对象。
辅助评估
在 tf.distribute.ParameterServerStrategy 训练中定义和运行评估循环的另一种方法称为辅助评估,您将在其中创建一个专用的评估器任务,该任务会重复读取检查点并在最新检查点上运行评估(有关检查点的更多详细信息,请参阅 本指南)。协调器和工作器任务不会花费任何时间进行评估,因此对于固定数量的迭代,整体训练时间应该比使用其他评估方法更短。但是,它需要一个额外的评估器任务和定期检查点以触发评估。
要为辅助评估编写评估循环,您有两个选择
- 使用 API
tf.keras.utils.SidecarEvaluator。 - 创建一个自定义评估循环。
有关选项 1 的更多详细信息,请参阅 API tf.keras.utils.SidecarEvaluator 文档。
辅助评估仅支持单个任务。这意味着
保证每个示例都被评估一次。如果评估器被抢占或重新启动,它只需从最新的检查点重新启动评估循环,并且在重新启动之前完成的部分评估进度将被丢弃。
但是,在单个任务上运行评估意味着完整的评估可能需要很长时间。
如果模型的大小太大而无法放入评估器的内存中,则单个辅助评估不适用。
另一个需要注意的是,实现 tf.keras.utils.SidecarEvaluator 以及下面的自定义评估循环可能会跳过一些检查点,因为它总是选择最新的可用检查点,并且在评估时期,训练集群可能会生成多个检查点。您可以编写一个评估每个检查点的自定义评估循环,但这在本教程中没有介绍。另一方面,如果检查点的生成频率低于运行评估所需的时间,它可能会处于空闲状态。
自定义评估循环提供了对细节的更多控制,例如选择要评估的检查点,或提供与评估一起运行的任何其他逻辑。以下是一个可能的自定义辅助评估循环
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)
for latest_checkpoint in tf.train.checkpoints_iterator(
checkpoint_dir):
try:
checkpoint.restore(latest_checkpoint).expect_partial()
except (tf.errors.OpError,) as e:
# checkpoint may be deleted by training when it is about to read it.
continue
# Optionally add callbacks to write summaries.
eval_model.evaluate(eval_data)
# Evaluation finishes when it has evaluated the last epoch.
if latest_checkpoint.endswith('-{}'.format(train_epochs)):
break
现实世界中的集群
在实际生产环境中,您将在不同机器上的不同进程中运行所有任务。在每个任务上配置集群信息的简便方法是设置环境变量 "TF_CONFIG",并使用 tf.distribute.cluster_resolver.TFConfigClusterResolver 解析 "TF_CONFIG"。
有关环境变量 "TF_CONFIG" 的一般描述,请参阅 分布式训练 指南中的“设置环境变量 TF_CONFIG”。
如果您使用 Kubernetes 或其他配置模板启动训练任务,那么这些模板很可能已经为您设置了 “TF_CONFIG"。
设置环境变量 "TF_CONFIG"
假设您有 3 个工作器和 2 个参数服务器。那么工作器 1 的 "TF_CONFIG" 可以是
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
评估器的 "TF_CONFIG" 可以是
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"evaluator": ["host7:port"]
},
"task": {"type": "evaluator", "index": 0}
})
评估器中上述 "TF_CONFIG" 字符串中的 "cluster" 部分是可选的。
如果您对所有任务使用相同的二进制文件
如果您希望使用单个二进制文件运行所有这些任务,则需要在程序开始时让您的程序分支到不同的角色中
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
# Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
# Run sidecar evaluation
else:
# Run the coordinator.
以下代码启动一个 TensorFlow 服务器并等待,这对角色 "worker" 和 "ps" 有用
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol=cluster_resolver.rpc_layer or "grpc",
start=True)
server.join()
处理任务失败
工作器失败
自定义训练循环 tf.distribute.coordinator.ClusterCoordinator 和方法 Model.fit 都为工作器失败提供了内置的容错能力。工作器恢复后, ClusterCoordinator 会在工作器上调用数据集重新创建。
参数服务器或协调器失败
但是,当协调器看到参数服务器错误时,它会立即引发一个UnavailableError或AbortedError。在这种情况下,您可以重新启动协调器。协调器本身也可能变得不可用。因此,建议使用某些工具以避免丢失训练进度。
对于
Model.fit,您应该使用BackupAndRestore回调,它会自动处理进度保存和恢复。有关示例,请参见上面的回调和训练部分。对于自定义训练循环,您应该定期检查模型变量,并在训练开始之前从检查点加载模型变量(如果有)。如果检查点包含优化器,则可以从
optimizer.iterations大致推断出训练进度。
checkpoint_manager = tf.train.CheckpointManager(
tf.train.Checkpoint(model=model, optimizer=optimizer),
checkpoint_dir,
max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint = checkpoint_manager.checkpoint
checkpoint.restore(
checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()
global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch
for _ in range(starting_epoch, num_epochs):
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
coordinator.join()
checkpoint_manager.save()
获取RemoteValue
如果函数成功执行,则保证获取RemoteValue会成功。这是因为目前在函数执行后,返回值会立即复制到协调器。如果在复制过程中出现任何工作器故障,该函数将在另一个可用的工作器上重试。因此,如果您想优化性能,可以安排没有返回值的函数。
错误报告
一旦协调器看到错误(例如来自参数服务器的UnavailableError或其他应用程序错误,例如来自tf.debugging.check_numerics的InvalidArgument),它将在引发错误之前取消所有挂起和排队的函数。获取其相应的RemoteValue将引发CancelledError。
在引发错误后,协调器不会引发相同的错误或来自已取消函数的任何错误。
性能改进
使用tf.distribute.ParameterServerStrategy和tf.distribute.coordinator.ClusterCoordinator进行训练时,您可能会遇到性能问题的几个可能原因。
一个常见的原因是参数服务器负载不均衡,一些负载过重的参数服务器已达到容量。也可能存在多个根本原因。缓解此问题的几种简单方法是
- 通过在构建
ParameterServerStrategy时指定variable_partitioner来对大型模型变量进行分片。 避免创建所有参数服务器在单个步骤中都需要的一个热点变量,方法是
1) 在优化器中使用恒定学习率或子类化
tf.keras.optimizers.schedules.LearningRateSchedule。这是因为默认行为是学习率将成为放置在特定参数服务器上的变量,并在每一步由所有其他参数服务器请求);以及2) 使用
tf.keras.optimizers.legacy.Optimizer(标准tf.keras.optimizers.Optimizer仍然可能导致热点变量)。在将大型词汇表传递给 Keras 预处理层之前,对其进行混洗。
性能问题的另一个可能原因是协调器。schedule/join的实现是基于 Python 的,因此可能存在线程开销。此外,协调器和工作器之间的延迟可能很大。如果是这种情况
对于
Model.fit,您可以将Model.compile中提供的steps_per_execution参数设置为大于 1 的值。对于自定义训练循环,您可以将多个步骤打包到单个
tf.function中。
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
for _ in range(steps_per_invocation):
features, labels = next(iterator)
def replica_fn(features, labels):
...
strategy.run(replica_fn, args=(features, labels))
随着库的进一步优化,希望大多数用户将来不必手动打包步骤。
此外,提高性能的一个小技巧是安排没有返回值的函数,如上面处理任务失败部分中所述。
已知限制
大多数已知限制已在以上部分中介绍。本节提供一个摘要。
ParameterServerStrategy通用
- 每个任务(包括协调器)都需要
os.environment["grpc_fail_fast"]="use_caller",以使容错正常工作。 - 不支持同步参数服务器训练。
- 通常需要将多个步骤打包到单个函数中才能获得最佳性能。
- 不支持通过
tf.saved_model.load加载包含分片变量的 saved_model。请注意,使用 TensorFlow Serving 加载此类 saved_model 预计会正常工作(有关详细信息,请参阅服务教程)。 - 不支持在不重启协调器任务的情况下从参数服务器故障中恢复。
- 创建
tf.lookup.StaticHashTable(通常由一些 Keras 预处理层使用,例如tf.keras.layers.IntegerLookup、tf.keras.layers.StringLookup和tf.keras.layers.TextVectorization)应该放在Strategy.scope下。否则,资源将被放置在协调器上,并且从工作器到协调器的查找 RPC 会造成性能影响。
Model.fit 细节
Model.fit中需要steps_per_epoch参数。您可以选择一个在 epoch 中提供适当间隔的值。ParameterServerStrategy不支持具有批次级调用的自定义回调,原因是性能问题。您应该将这些调用转换为具有适当选择的steps_per_epoch的 epoch 级调用,以便它们每steps_per_epoch个步骤调用一次。内置回调不受影响:它们的批次级调用已修改为具有高性能。正在计划支持ParameterServerStrategy的批次级调用。- 出于同样的原因,与其他策略不同,进度条和指标仅在 epoch 边界处记录。
- 不支持
run_eagerly。
自定义训练循环细节
ClusterCoordinator.schedule通常不支持对数据集的访问保证,尽管可以通过Model.fit/.evaluate实现对评估的访问保证。请参见启用一次性评估。- 当
ClusterCoordinator.create_per_worker_dataset与可调用对象一起使用时,整个数据集必须在传递给它的函数内部创建。 - 在由
ClusterCoordinator.create_per_worker_dataset创建的数据集中,会忽略tf.data.Options。
在 TensorFlow.org 上查看
在 Google Colab 中运行
在 GitHub 上查看源代码
下载笔记本