使用 DTensors 与 Keras

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

概述

在本教程中,您将学习如何使用 DTensors 与 Keras。

通过 DTensor 与 Keras 的集成,您可以重用现有的 Keras 层和模型来构建和训练分布式机器学习模型。

您将使用 MNIST 数据训练一个多层分类模型。将演示为子类化模型、顺序模型和函数式模型设置布局。

本教程假设您已阅读 DTensor 编程指南,并熟悉 MeshLayout 等基本 DTensor 概念。

本教程基于 使用 Keras 在 MNIST 上训练神经网络

设置

DTensor (tf.experimental.dtensor) 自 TensorFlow 2.9.0 版本起已成为 TensorFlow 的一部分。

首先,安装或升级 TensorFlow 数据集

pip install --quiet --upgrade tensorflow-datasets

接下来,导入 TensorFlow 和 dtensor,并将 TensorFlow 配置为使用 8 个虚拟 CPU。

即使本示例使用虚拟 CPU,DTensor 在 CPU、GPU 或 TPU 设备上的工作方式也相同。

import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow.experimental import dtensor
def configure_virtual_cpus(ncpu):
  phy_devices = tf.config.list_physical_devices('CPU')
  tf.config.set_logical_device_configuration(
        phy_devices[0], 
        [tf.config.LogicalDeviceConfiguration()] * ncpu)

configure_virtual_cpus(8)
tf.config.list_logical_devices('CPU')

devices = [f'CPU:{i}' for i in range(8)]

确定性伪随机数生成器

需要注意的是,DTensor API 要求每个运行的客户端具有相同的随机种子,以便在初始化权重时具有确定性行为。可以通过在 Keras 中使用 tf.keras.utils.set_random_seed() 设置全局种子来实现这一点。

tf.keras.backend.experimental.enable_tf_random_generator()
tf.keras.utils.set_random_seed(1337)

创建数据并行网格

本教程演示了数据并行训练。适应模型并行训练和空间并行训练就像切换到不同的 Layout 对象集一样简单。有关数据并行之外的分布式训练的更多信息,请参阅 使用 DTensor 进行分布式训练 教程。

数据并行训练是一种常用的并行训练方案,例如,也由 tf.distribute.MirroredStrategy 使用。

使用 DTensor,数据并行训练循环使用一个 Mesh,该网格包含一个单独的“批次”维度,其中每个设备运行模型的副本,该副本接收来自全局批次的碎片。

mesh = dtensor.create_mesh([("batch", 8)], devices=devices)

由于每个设备都运行模型的完整副本,因此模型变量将在整个网格中完全复制(未分片)。例如,此 Mesh 上的秩 2 权重的完全复制布局如下所示

example_weight_layout = dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh)  # or
example_weight_layout = dtensor.Layout.replicated(mesh, rank=2)

Mesh 上的秩 2 数据张量的布局将在第一个维度(有时称为 batch_sharded)上分片,

example_data_layout = dtensor.Layout(['batch', dtensor.UNSHARDED], mesh)  # or
example_data_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)

使用布局创建 Keras 层

在数据并行方案中,通常使用完全复制的布局创建模型权重,以便模型的每个副本都可以使用分片输入数据进行计算。

为了配置层权重的布局信息,Keras 在大多数内置层的层构造函数中公开了一个额外的参数。

以下示例构建了一个具有完全复制权重布局的小型图像分类模型。可以通过参数 kernel_layoutbias_layouttf.keras.layers.Dense 中指定布局信息 kernelbias。大多数内置的 Keras 层都已准备好显式指定层权重的 Layout

unsharded_layout_2d = dtensor.Layout.replicated(mesh, 2)
unsharded_layout_1d = dtensor.Layout.replicated(mesh, 1)
model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(input_shape=(28, 28)),
  tf.keras.layers.Dense(128, 
                        activation='relu',
                        name='d1',
                        kernel_layout=unsharded_layout_2d, 
                        bias_layout=unsharded_layout_1d),
  tf.keras.layers.Dense(10,
                        name='d2',
                        kernel_layout=unsharded_layout_2d, 
                        bias_layout=unsharded_layout_1d)
])

可以通过检查权重上的 layout 属性来检查布局信息。

for weight in model.weights:
  print(f'Weight name: {weight.name} with layout: {weight.layout}')
  break

加载数据集并构建输入管道

加载 MNIST 数据集并为其配置一些预处理输入管道。数据集本身与任何 DTensor 布局信息无关。

(ds_train, ds_test), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)
def normalize_img(image, label):
  """Normalizes images: `uint8` -> `float32`."""
  return tf.cast(image, tf.float32) / 255., label
batch_size = 128

ds_train = ds_train.map(
    normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(batch_size)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.map(
    normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(batch_size)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)

定义模型的训练逻辑

接下来,定义模型的训练和评估逻辑。

从 TensorFlow 2.9 开始,您必须为支持 DTensor 的 Keras 模型编写自定义训练循环。这是为了使用正确的布局信息打包输入数据,这些信息没有集成到 Keras 中的标准 tf.keras.Model.fit()tf.keras.Model.eval() 函数中。您将在即将发布的版本中获得更多 tf.data 支持。

@tf.function
def train_step(model, x, y, optimizer, metrics):
  with tf.GradientTape() as tape:
    logits = model(x, training=True)
    # tf.reduce_sum sums the batch sharded per-example loss to a replicated
    # global loss (scalar).
    loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
        y, logits, from_logits=True))

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  for metric in metrics.values():
    metric.update_state(y_true=y, y_pred=logits)

  loss_per_sample = loss / len(x)
  results = {'loss': loss_per_sample}
  return results
@tf.function
def eval_step(model, x, y, metrics):
  logits = model(x, training=False)
  loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
        y, logits, from_logits=True))

  for metric in metrics.values():
    metric.update_state(y_true=y, y_pred=logits)

  loss_per_sample = loss / len(x)
  results = {'eval_loss': loss_per_sample}
  return results
def pack_dtensor_inputs(images, labels, image_layout, label_layout):
  num_local_devices = image_layout.mesh.num_local_devices()
  images = tf.split(images, num_local_devices)
  labels = tf.split(labels, num_local_devices)
  images = dtensor.pack(images, image_layout)
  labels = dtensor.pack(labels, label_layout)
  return  images, labels

指标和优化器

在将 DTensor API 与 Keras MetricOptimizer 一起使用时,您需要提供额外的网格信息,以便任何内部状态变量和张量都可以与模型中的变量一起使用。

  • 对于优化器,DTensor 引入了一个新的实验命名空间 keras.dtensor.experimental.optimizers,其中许多现有的 Keras 优化器被扩展为接收一个额外的 mesh 参数。在将来的版本中,它可能会与 Keras 核心优化器合并。

  • 对于指标,您可以直接将 mesh 指定为构造函数的参数,以使其成为与 DTensor 兼容的 Metric

optimizer = tf.keras.dtensor.experimental.optimizers.Adam(0.01, mesh=mesh)
metrics = {'accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}
eval_metrics = {'eval_accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}

训练模型

以下示例演示了如何在批次维度上对来自输入管道的數據进行分片,并使用具有完全复制权重的模型进行训练。

经过 3 个 epoch 后,模型应该达到大约 97% 的准确率

num_epochs = 3

image_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=4)
label_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)

for epoch in range(num_epochs):
  print("============================") 
  print("Epoch: ", epoch)
  for metric in metrics.values():
    metric.reset_state()
  step = 0
  results = {}
  pbar = tf.keras.utils.Progbar(target=None, stateful_metrics=[])
  for input in ds_train:
    images, labels = input[0], input[1]
    images, labels = pack_dtensor_inputs(
        images, labels, image_layout, label_layout)

    results.update(train_step(model, images, labels, optimizer, metrics))
    for metric_name, metric in metrics.items():
      results[metric_name] = metric.result()

    pbar.update(step, values=results.items(), finalize=False)
    step += 1
  pbar.update(step, values=results.items(), finalize=True)

  for metric in eval_metrics.values():
    metric.reset_state()
  for input in ds_test:
    images, labels = input[0], input[1]
    images, labels = pack_dtensor_inputs(
        images, labels, image_layout, label_layout)
    results.update(eval_step(model, images, labels, eval_metrics))

  for metric_name, metric in eval_metrics.items():
    results[metric_name] = metric.result()

  for metric_name, metric in results.items():
    print(f"{metric_name}: {metric.numpy()}")

为现有模型代码指定布局

通常,您拥有适合您的用例的模型。为模型中每个单独的层指定 Layout 信息将是一项大量工作,需要大量编辑。

为了帮助您轻松地将现有的 Keras 模型转换为使用 DTensor API,您可以使用新的 tf.keras.dtensor.experimental.LayoutMap API,它允许您从全局角度指定 Layout

首先,您需要创建一个 LayoutMap 实例,它是一个类似字典的对象,包含您要为模型权重指定的 Layout

LayoutMap 在初始化时需要一个 Mesh 实例,该实例可用于为任何未配置布局的权重提供默认复制的 Layout。如果您希望所有模型权重都完全复制,则可以提供空的 LayoutMap,并且将使用默认网格来创建复制的 Layout

LayoutMap 使用字符串作为键,使用 Layout 作为值。普通 Python 字典和此类之间存在行为差异。在检索值时,字符串键将被视为正则表达式。

子类模型

考虑以下使用 Keras 子类化模型语法定义的模型。

class SubclassedModel(tf.keras.Model):

  def __init__(self, name=None):
    super().__init__(name=name)
    self.feature = tf.keras.layers.Dense(16)
    self.feature_2 = tf.keras.layers.Dense(24)
    self.dropout = tf.keras.layers.Dropout(0.1)

  def call(self, inputs, training=None):
    x = self.feature(inputs)
    x = self.dropout(x, training=training)
    return self.feature_2(x)

此模型中有 4 个权重,分别是两个 Dense 层的 kernelbias。它们中的每一个都根据对象路径进行映射

  • model.feature.kernel
  • model.feature.bias
  • model.feature_2.kernel
  • model.feature_2.bias

现在定义以下 LayoutMap 并将其应用于模型

layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)

layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)

with layout_map.scope():
  subclassed_model = SubclassedModel()

模型权重在第一次调用时创建,因此使用 DTensor 输入调用模型并确认权重具有预期的布局

dtensor_input = dtensor.copy_to_mesh(tf.zeros((16, 16)), layout=unsharded_layout_2d)
# Trigger the weights creation for subclass model
subclassed_model(dtensor_input)

print(subclassed_model.feature.kernel.layout)

这样,您就可以快速将 Layout 映射到您的模型,而无需更新任何现有代码。

顺序模型和函数式模型

对于 Keras 函数式模型和顺序模型,您也可以使用 tf.keras.dtensor.experimental.LayoutMap

layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)

layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
with layout_map.scope():
  inputs = tf.keras.Input((16,), batch_size=16)
  x = tf.keras.layers.Dense(16, name='feature')(inputs)
  x = tf.keras.layers.Dropout(0.1)(x)
  output = tf.keras.layers.Dense(32, name='feature_2')(x)
  model = tf.keras.Model(inputs, output)

print(model.layers[1].kernel.layout)
with layout_map.scope():
  model = tf.keras.Sequential([
      tf.keras.layers.Dense(16, name='feature', input_shape=(16,)),
      tf.keras.layers.Dropout(0.1),
      tf.keras.layers.Dense(32, name='feature_2')
  ])

print(model.layers[2].kernel.layout)