在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
概述
在本教程中,您将学习如何使用 DTensors 与 Keras。
通过 DTensor 与 Keras 的集成,您可以重用现有的 Keras 层和模型来构建和训练分布式机器学习模型。
您将使用 MNIST 数据训练一个多层分类模型。将演示为子类化模型、顺序模型和函数式模型设置布局。
本教程假设您已阅读 DTensor 编程指南,并熟悉 Mesh
和 Layout
等基本 DTensor 概念。
本教程基于 使用 Keras 在 MNIST 上训练神经网络。
设置
DTensor (tf.experimental.dtensor
) 自 TensorFlow 2.9.0 版本起已成为 TensorFlow 的一部分。
首先,安装或升级 TensorFlow 数据集
pip install --quiet --upgrade tensorflow-datasets
接下来,导入 TensorFlow 和 dtensor
,并将 TensorFlow 配置为使用 8 个虚拟 CPU。
即使本示例使用虚拟 CPU,DTensor 在 CPU、GPU 或 TPU 设备上的工作方式也相同。
import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow.experimental import dtensor
def configure_virtual_cpus(ncpu):
phy_devices = tf.config.list_physical_devices('CPU')
tf.config.set_logical_device_configuration(
phy_devices[0],
[tf.config.LogicalDeviceConfiguration()] * ncpu)
configure_virtual_cpus(8)
tf.config.list_logical_devices('CPU')
devices = [f'CPU:{i}' for i in range(8)]
确定性伪随机数生成器
需要注意的是,DTensor API 要求每个运行的客户端具有相同的随机种子,以便在初始化权重时具有确定性行为。可以通过在 Keras 中使用 tf.keras.utils.set_random_seed()
设置全局种子来实现这一点。
tf.keras.backend.experimental.enable_tf_random_generator()
tf.keras.utils.set_random_seed(1337)
创建数据并行网格
本教程演示了数据并行训练。适应模型并行训练和空间并行训练就像切换到不同的 Layout
对象集一样简单。有关数据并行之外的分布式训练的更多信息,请参阅 使用 DTensor 进行分布式训练 教程。
数据并行训练是一种常用的并行训练方案,例如,也由 tf.distribute.MirroredStrategy
使用。
使用 DTensor,数据并行训练循环使用一个 Mesh
,该网格包含一个单独的“批次”维度,其中每个设备运行模型的副本,该副本接收来自全局批次的碎片。
mesh = dtensor.create_mesh([("batch", 8)], devices=devices)
由于每个设备都运行模型的完整副本,因此模型变量将在整个网格中完全复制(未分片)。例如,此 Mesh
上的秩 2 权重的完全复制布局如下所示
example_weight_layout = dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh) # or
example_weight_layout = dtensor.Layout.replicated(mesh, rank=2)
此 Mesh
上的秩 2 数据张量的布局将在第一个维度(有时称为 batch_sharded
)上分片,
example_data_layout = dtensor.Layout(['batch', dtensor.UNSHARDED], mesh) # or
example_data_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
使用布局创建 Keras 层
在数据并行方案中,通常使用完全复制的布局创建模型权重,以便模型的每个副本都可以使用分片输入数据进行计算。
为了配置层权重的布局信息,Keras 在大多数内置层的层构造函数中公开了一个额外的参数。
以下示例构建了一个具有完全复制权重布局的小型图像分类模型。可以通过参数 kernel_layout
和 bias_layout
在 tf.keras.layers.Dense
中指定布局信息 kernel
和 bias
。大多数内置的 Keras 层都已准备好显式指定层权重的 Layout
。
unsharded_layout_2d = dtensor.Layout.replicated(mesh, 2)
unsharded_layout_1d = dtensor.Layout.replicated(mesh, 1)
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128,
activation='relu',
name='d1',
kernel_layout=unsharded_layout_2d,
bias_layout=unsharded_layout_1d),
tf.keras.layers.Dense(10,
name='d2',
kernel_layout=unsharded_layout_2d,
bias_layout=unsharded_layout_1d)
])
可以通过检查权重上的 layout
属性来检查布局信息。
for weight in model.weights:
print(f'Weight name: {weight.name} with layout: {weight.layout}')
break
加载数据集并构建输入管道
加载 MNIST 数据集并为其配置一些预处理输入管道。数据集本身与任何 DTensor 布局信息无关。
(ds_train, ds_test), ds_info = tfds.load(
'mnist',
split=['train', 'test'],
shuffle_files=True,
as_supervised=True,
with_info=True,
)
def normalize_img(image, label):
"""Normalizes images: `uint8` -> `float32`."""
return tf.cast(image, tf.float32) / 255., label
batch_size = 128
ds_train = ds_train.map(
normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(batch_size)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.map(
normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(batch_size)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)
定义模型的训练逻辑
接下来,定义模型的训练和评估逻辑。
从 TensorFlow 2.9 开始,您必须为支持 DTensor 的 Keras 模型编写自定义训练循环。这是为了使用正确的布局信息打包输入数据,这些信息没有集成到 Keras 中的标准 tf.keras.Model.fit()
或 tf.keras.Model.eval()
函数中。您将在即将发布的版本中获得更多 tf.data
支持。
@tf.function
def train_step(model, x, y, optimizer, metrics):
with tf.GradientTape() as tape:
logits = model(x, training=True)
# tf.reduce_sum sums the batch sharded per-example loss to a replicated
# global loss (scalar).
loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
y, logits, from_logits=True))
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
for metric in metrics.values():
metric.update_state(y_true=y, y_pred=logits)
loss_per_sample = loss / len(x)
results = {'loss': loss_per_sample}
return results
@tf.function
def eval_step(model, x, y, metrics):
logits = model(x, training=False)
loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
y, logits, from_logits=True))
for metric in metrics.values():
metric.update_state(y_true=y, y_pred=logits)
loss_per_sample = loss / len(x)
results = {'eval_loss': loss_per_sample}
return results
def pack_dtensor_inputs(images, labels, image_layout, label_layout):
num_local_devices = image_layout.mesh.num_local_devices()
images = tf.split(images, num_local_devices)
labels = tf.split(labels, num_local_devices)
images = dtensor.pack(images, image_layout)
labels = dtensor.pack(labels, label_layout)
return images, labels
指标和优化器
在将 DTensor API 与 Keras Metric
和 Optimizer
一起使用时,您需要提供额外的网格信息,以便任何内部状态变量和张量都可以与模型中的变量一起使用。
对于优化器,DTensor 引入了一个新的实验命名空间
keras.dtensor.experimental.optimizers
,其中许多现有的 Keras 优化器被扩展为接收一个额外的mesh
参数。在将来的版本中,它可能会与 Keras 核心优化器合并。对于指标,您可以直接将
mesh
指定为构造函数的参数,以使其成为与 DTensor 兼容的Metric
。
optimizer = tf.keras.dtensor.experimental.optimizers.Adam(0.01, mesh=mesh)
metrics = {'accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}
eval_metrics = {'eval_accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}
训练模型
以下示例演示了如何在批次维度上对来自输入管道的數據进行分片,并使用具有完全复制权重的模型进行训练。
经过 3 个 epoch 后,模型应该达到大约 97% 的准确率
num_epochs = 3
image_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=4)
label_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
for epoch in range(num_epochs):
print("============================")
print("Epoch: ", epoch)
for metric in metrics.values():
metric.reset_state()
step = 0
results = {}
pbar = tf.keras.utils.Progbar(target=None, stateful_metrics=[])
for input in ds_train:
images, labels = input[0], input[1]
images, labels = pack_dtensor_inputs(
images, labels, image_layout, label_layout)
results.update(train_step(model, images, labels, optimizer, metrics))
for metric_name, metric in metrics.items():
results[metric_name] = metric.result()
pbar.update(step, values=results.items(), finalize=False)
step += 1
pbar.update(step, values=results.items(), finalize=True)
for metric in eval_metrics.values():
metric.reset_state()
for input in ds_test:
images, labels = input[0], input[1]
images, labels = pack_dtensor_inputs(
images, labels, image_layout, label_layout)
results.update(eval_step(model, images, labels, eval_metrics))
for metric_name, metric in eval_metrics.items():
results[metric_name] = metric.result()
for metric_name, metric in results.items():
print(f"{metric_name}: {metric.numpy()}")
为现有模型代码指定布局
通常,您拥有适合您的用例的模型。为模型中每个单独的层指定 Layout
信息将是一项大量工作,需要大量编辑。
为了帮助您轻松地将现有的 Keras 模型转换为使用 DTensor API,您可以使用新的 tf.keras.dtensor.experimental.LayoutMap
API,它允许您从全局角度指定 Layout
。
首先,您需要创建一个 LayoutMap
实例,它是一个类似字典的对象,包含您要为模型权重指定的 Layout
。
LayoutMap
在初始化时需要一个 Mesh
实例,该实例可用于为任何未配置布局的权重提供默认复制的 Layout
。如果您希望所有模型权重都完全复制,则可以提供空的 LayoutMap
,并且将使用默认网格来创建复制的 Layout
。
LayoutMap
使用字符串作为键,使用 Layout
作为值。普通 Python 字典和此类之间存在行为差异。在检索值时,字符串键将被视为正则表达式。
子类模型
考虑以下使用 Keras 子类化模型语法定义的模型。
class SubclassedModel(tf.keras.Model):
def __init__(self, name=None):
super().__init__(name=name)
self.feature = tf.keras.layers.Dense(16)
self.feature_2 = tf.keras.layers.Dense(24)
self.dropout = tf.keras.layers.Dropout(0.1)
def call(self, inputs, training=None):
x = self.feature(inputs)
x = self.dropout(x, training=training)
return self.feature_2(x)
此模型中有 4 个权重,分别是两个 Dense
层的 kernel
和 bias
。它们中的每一个都根据对象路径进行映射
model.feature.kernel
model.feature.bias
model.feature_2.kernel
model.feature_2.bias
现在定义以下 LayoutMap
并将其应用于模型
layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)
layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
with layout_map.scope():
subclassed_model = SubclassedModel()
模型权重在第一次调用时创建,因此使用 DTensor 输入调用模型并确认权重具有预期的布局
dtensor_input = dtensor.copy_to_mesh(tf.zeros((16, 16)), layout=unsharded_layout_2d)
# Trigger the weights creation for subclass model
subclassed_model(dtensor_input)
print(subclassed_model.feature.kernel.layout)
这样,您就可以快速将 Layout
映射到您的模型,而无需更新任何现有代码。
顺序模型和函数式模型
对于 Keras 函数式模型和顺序模型,您也可以使用 tf.keras.dtensor.experimental.LayoutMap
。
layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)
layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
with layout_map.scope():
inputs = tf.keras.Input((16,), batch_size=16)
x = tf.keras.layers.Dense(16, name='feature')(inputs)
x = tf.keras.layers.Dropout(0.1)(x)
output = tf.keras.layers.Dense(32, name='feature_2')(x)
model = tf.keras.Model(inputs, output)
print(model.layers[1].kernel.layout)
with layout_map.scope():
model = tf.keras.Sequential([
tf.keras.layers.Dense(16, name='feature', input_shape=(16,)),
tf.keras.layers.Dropout(0.1),
tf.keras.layers.Dense(32, name='feature_2')
])
print(model.layers[2].kernel.layout)