迁移容错机制

容错是指定期保存可跟踪对象(如参数和模型)状态的机制。这使您能够在训练期间程序/机器发生故障时恢复它们。

本指南首先演示了如何通过使用 tf.estimator.RunConfig 指定指标保存,将容错添加到使用 TensorFlow 1 中的 tf.estimator.Estimator 进行的训练中。然后,您将学习如何在 TensorFlow 2 中通过两种方式实现训练的容错。

这两种方法都将在 检查点 文件中备份和恢复训练状态。

设置

安装 tf-nightly,因为在 tf.keras.callbacks.BackupAndRestore 中使用 save_freq 参数在特定步骤保存检查点的频率是从 TensorFlow 2.10 开始引入的。

pip install tf-nightly
import tensorflow.compat.v1 as tf1
import tensorflow as tf
import numpy as np
import tempfile
import time
mnist = tf.keras.datasets.mnist

(x_train, y_train),(x_test, y_test) = mnist.load_data()
x_train
, x_test = x_train / 255.0, x_test / 255.0

TensorFlow 1:使用 tf.estimator.RunConfig 保存检查点

在 TensorFlow 1 中,您可以通过配置 tf.estimator.RunConfig 来配置 tf.estimator 以每一步保存检查点。

在本示例中,首先编写一个挂钩,该挂钩在第五个检查点期间人为地抛出错误。

class InterruptHook(tf1.train.SessionRunHook):
 
# A hook for artificially interrupting training.
 
def begin(self):
   
self._step = -1

 
def before_run(self, run_context):
   
self._step += 1

 
def after_run(self, run_context, run_values):
   
if self._step == 5:
     
raise RuntimeError('Interruption')

接下来,配置 tf.estimator.Estimator 以保存每个检查点,并使用 MNIST 数据集。

feature_columns = [tf1.feature_column.numeric_column("x", shape=[28, 28])]
config
= tf1.estimator.RunConfig(save_summary_steps=1,
                                 save_checkpoints_steps
=1)

path
= tempfile.mkdtemp()

classifier
= tf1.estimator.DNNClassifier(
    feature_columns
=feature_columns,
    hidden_units
=[256, 32],
    optimizer
=tf1.train.AdamOptimizer(0.001),
    n_classes
=10,
    dropout
=0.2,
    model_dir
=path,
    config
= config
)

train_input_fn
= tf1.estimator.inputs.numpy_input_fn(
    x
={"x": x_train},
    y
=y_train.astype(np.int32),
    num_epochs
=10,
    batch_size
=50,
    shuffle
=True,
)

开始训练模型。您之前定义的挂钩将引发人为异常。

try:
  classifier
.train(input_fn=train_input_fn,
                   hooks
=[InterruptHook()],
                   max_steps
=10)
except Exception as e:
 
print(f'{type(e).__name__}:{e}')

使用最后一个保存的检查点重建 tf.estimator.Estimator 并继续训练。

classifier = tf1.estimator.DNNClassifier(
    feature_columns
=feature_columns,
    hidden_units
=[256, 32],
    optimizer
=tf1.train.AdamOptimizer(0.001),
    n_classes
=10,
    dropout
=0.2,
    model_dir
=path,
    config
= config
)
classifier
.train(input_fn=train_input_fn,
                   max_steps
= 10)

TensorFlow 2:使用回调和 Model.fit 进行备份和恢复

在 TensorFlow 2 中,如果您使用 Keras Model.fit API 进行训练,您可以提供 tf.keras.callbacks.BackupAndRestore 回调以添加容错功能。

为了帮助演示这一点,首先定义一个 Keras Callback 类,该类在第四个 epoch 检查点期间人为地抛出错误。

class InterruptAtEpoch(tf.keras.callbacks.Callback):
 
# A callback for artificially interrupting training.
 
def __init__(self, interrupting_epoch=3):
   
self.interrupting_epoch = interrupting_epoch

 
def on_epoch_end(self, epoch, log=None):
   
if epoch == self.interrupting_epoch:
     
raise RuntimeError('Interruption')

然后,定义并实例化一个简单的 Keras 模型,定义损失函数,调用 Model.compile,并设置一个 tf.keras.callbacks.BackupAndRestore 回调,该回调将在 epoch 边界处将检查点保存到临时目录中。

def create_model():
 
return tf.keras.models.Sequential([
    tf
.keras.layers.Flatten(input_shape=(28, 28)),
    tf
.keras.layers.Dense(512, activation='relu'),
    tf
.keras.layers.Dropout(0.2),
    tf
.keras.layers.Dense(10)
 
])
loss
= tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
model
= create_model()
model
.compile(optimizer='adam',
              loss
=loss,
              metrics
=['accuracy'])
log_dir
= tempfile.mkdtemp()
backup_restore_callback
= tf.keras.callbacks.BackupAndRestore(
    backup_dir
= log_dir)

使用 Model.fit 开始训练模型。在训练期间,由于上面实例化的 tf.keras.callbacks.BackupAndRestore,将保存检查点,而 InterruptAtEpoch 类将在第四个 epoch 之后引发人为异常以模拟故障。

try:
  model
.fit(x=x_train,
            y
=y_train,
            epochs
=10,
            steps_per_epoch
=100,
            validation_data
=(x_test, y_test),
            callbacks
=[backup_restore_callback, InterruptAtEpoch()])
except Exception as e:
 
print(f'{type(e).__name__}:{e}')

接下来,实例化 Keras 模型,调用 Model.compile,并使用 Model.fit 从先前保存的检查点继续训练模型。

model = create_model()
model
.compile(optimizer='adam',
              loss
=loss,
              metrics
=['accuracy'],
              steps_per_execution
=10)
model
.fit(x=x_train,
            y
=y_train,
            epochs
=10,
            steps_per_epoch
=100,
            validation_data
=(x_test, y_test),
            callbacks
=[backup_restore_callback])

定义另一个 Callback 类,该类在第 140 步期间人为地抛出错误。

class InterruptAtStep(tf.keras.callbacks.Callback):
 
# A callback for artificially interrupting training.
 
def __init__(self, interrupting_step=140):
   
self.total_step_count = 0
   
self.interrupting_step = interrupting_step

 
def on_batch_begin(self, batch, logs=None):
   
self.total_step_count += 1

 
def on_batch_end(self, batch, logs=None):
   
if self.total_step_count == self.interrupting_step:
     
print("\nInterrupting at step count", self.total_step_count)
     
raise RuntimeError('Interruption')

为了确保每 30 步保存一次检查点,请将 BackupAndRestore 回调中的 save_freq 设置为 30InterruptAtStep 将引发一个人工异常,以模拟在 epoch 1 和 step 40(总步数 140)处发生的故障。检查点将在 epoch 1 和 step 20 处最后保存。

log_dir_2 = tempfile.mkdtemp()

backup_restore_callback
= tf.keras.callbacks.BackupAndRestore(
    backup_dir
= log_dir_2, save_freq=30
)
model
= create_model()
model
.compile(optimizer='adam',
              loss
=loss,
              metrics
=['accuracy'])
try:
  model
.fit(x=x_train,
            y
=y_train,
            epochs
=10,
            steps_per_epoch
=100,
            validation_data
=(x_test, y_test),
            callbacks
=[backup_restore_callback, InterruptAtStep()])
except Exception as e:
 
print(f'{type(e).__name__}:{e}')

接下来,实例化 Keras 模型,调用 Model.compile,并使用 Model.fit 从之前保存的检查点继续训练模型。注意,训练从 epoch 2 和 step 21 开始。

model = create_model()
model
.compile(optimizer='adam',
              loss
=loss,
              metrics
=['accuracy'],
              steps_per_execution
=10)
model
.fit(x=x_train,
            y
=y_train,
            epochs
=10,
            steps_per_epoch
=100,
            validation_data
=(x_test, y_test),
            callbacks
=[backup_restore_callback])

TensorFlow 2:使用自定义训练循环编写手动检查点

如果您在 TensorFlow 2 中使用自定义训练循环,可以使用 tf.train.Checkpointtf.train.CheckpointManager API 实现容错机制。

此示例演示如何

首先定义并实例化 Keras 模型、优化器和损失函数。然后,创建一个 Checkpoint 来管理两个具有可跟踪状态的对象(模型和优化器),以及一个 CheckpointManager 用于在临时目录中记录和保存多个检查点。

model = create_model()
optimizer
= tf.keras.optimizers.SGD(learning_rate=0.001)
loss_fn
= tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
log_dir
= tempfile.mkdtemp()
epochs
= 5
steps_per_epoch
= 5

checkpoint
= tf.train.Checkpoint(model=model, optimizer=optimizer)
checkpoint_manager
= tf.train.CheckpointManager(
            checkpoint
, log_dir, max_to_keep=2)

现在,实现一个自定义训练循环,在第一个 epoch 之后,每次新 epoch 开始时都会加载最后一个检查点。

for epoch in range(epochs):
 
if epoch > 0:
      tf
.train.load_checkpoint(save_path)
 
print(f"\nStart of epoch {epoch}")

 
for step in range(steps_per_epoch):
   
with tf.GradientTape() as tape:

      logits
= model(x_train, training=True)
      loss_value
= loss_fn(y_train, logits)

      grads
= tape.gradient(loss_value, model.trainable_weights)
      optimizer
.apply_gradients(zip(grads, model.trainable_weights))

    save_path
= checkpoint_manager.save()
   
print(f"Checkpoint saved to {save_path}")
   
print(f"Training loss at step {step}: {loss_value}")

后续步骤

要详细了解 TensorFlow 2 中的容错和检查点,请参考以下文档

您可能还会发现以下与 分布式训练 相关的资料有用