在 TensorFlow.org 上查看
|
在 Google Colab 中运行
|
在 GitHub 上查看源码
|
下载笔记本
|
概览
在本教程中,您将学习如何在 Keras 中使用 DTensor。
通过 DTensor 与 Keras 的集成,您可以重用现有的 Keras 层和模型来构建和训练分布式机器学习模型。
您将使用 MNIST 数据训练一个多层分类模型。本教程将演示如何为子类化模型、Sequential 模型和函数式模型设置布局 (Layout)。
本教程假设您已经阅读了 DTensor 编程指南,并熟悉 Mesh 和 Layout 等基本的 DTensor 概念。
本教程基于 使用 Keras 在 MNIST 上训练神经网络。
设置
自 2.9.0 版本起,DTensor (tf.experimental.dtensor) 已成为 TensorFlow 的一部分。
首先,安装或升级 TensorFlow Datasets
pip install --quiet --upgrade tensorflow-datasets接下来,导入 TensorFlow 和 dtensor,并将 TensorFlow 配置为使用 8 个虚拟 CPU。
尽管此示例使用虚拟 CPU,但 DTensor 在 CPU、GPU 或 TPU 设备上的工作方式相同。
import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow.experimental import dtensor
def configure_virtual_cpus(ncpu):
phy_devices = tf.config.list_physical_devices('CPU')
tf.config.set_logical_device_configuration(
phy_devices[0],
[tf.config.LogicalDeviceConfiguration()] * ncpu)
configure_virtual_cpus(8)
tf.config.list_logical_devices('CPU')
devices = [f'CPU:{i}' for i in range(8)]
确定性伪随机数生成器
您需要注意的一点是,DTensor API 要求每个正在运行的客户端具有相同的随机种子,以便在初始化权重时具有确定性的行为。您可以通过 tf.keras.utils.set_random_seed() 在 Keras 中设置全局种子来实现这一点。
tf.keras.backend.experimental.enable_tf_random_generator()
tf.keras.utils.set_random_seed(1337)
创建数据并行网格 (Data Parallel Mesh)
本教程演示数据并行训练。适应模型并行训练和空间并行训练只需切换到不同的 Layout 对象集即可。有关数据并行之外的分布式训练的更多信息,请参阅 使用 DTensor 进行分布式训练 教程。
数据并行训练是一种常用的并行训练方案,例如 tf.distribute.MirroredStrategy 也使用该方案。
使用 DTensor,数据并行训练循环使用由单个“batch”维度组成的 Mesh,其中每个设备运行模型的一个副本,该副本接收来自全局批次的一个分片 (shard)。
mesh = dtensor.create_mesh([("batch", 8)], devices=devices)
由于每个设备都运行模型的完整副本,模型变量应在网格中完全复制(未分片)。例如,此 Mesh 上秩为 2 的权重的完全复制布局如下所示
example_weight_layout = dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh) # or
example_weight_layout = dtensor.Layout.replicated(mesh, rank=2)
此 Mesh 上秩为 2 的数据张量的布局将在第一个维度上进行分片(有时称为 batch_sharded),
example_data_layout = dtensor.Layout(['batch', dtensor.UNSHARDED], mesh) # or
example_data_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
使用布局创建 Keras 层
在数据并行方案中,通常使用完全复制的布局来创建模型权重,这样模型的每个副本都可以使用分片后的输入数据进行计算。
为了配置层权重的布局信息,Keras 为大多数内置层的层构造函数添加了一个额外的参数。
以下示例构建了一个带有完全复制权重布局的小型图像分类模型。您可以通过 tf.keras.layers.Dense 中的 kernel_layout 和 bias_layout 参数指定 kernel 和 bias 的布局信息。大多数内置 Keras 层都已准备好显式指定层权重的 Layout。
unsharded_layout_2d = dtensor.Layout.replicated(mesh, 2)
unsharded_layout_1d = dtensor.Layout.replicated(mesh, 1)
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128,
activation='relu',
name='d1',
kernel_layout=unsharded_layout_2d,
bias_layout=unsharded_layout_1d),
tf.keras.layers.Dense(10,
name='d2',
kernel_layout=unsharded_layout_2d,
bias_layout=unsharded_layout_1d)
])
您可以通过检查权重上的 layout 属性来查看布局信息。
for weight in model.weights:
print(f'Weight name: {weight.name} with layout: {weight.layout}')
break
加载数据集并构建输入流水线
加载 MNIST 数据集并为其配置一些预处理输入流水线。数据集本身不与任何 DTensor 布局信息关联。
(ds_train, ds_test), ds_info = tfds.load(
'mnist',
split=['train', 'test'],
shuffle_files=True,
as_supervised=True,
with_info=True,
)
def normalize_img(image, label):
"""Normalizes images: `uint8` -> `float32`."""
return tf.cast(image, tf.float32) / 255., label
batch_size = 128
ds_train = ds_train.map(
normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(batch_size)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.map(
normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(batch_size)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)
定义模型的训练逻辑
接下来,定义模型的训练和评估逻辑。
截至 TensorFlow 2.9,您必须为支持 DTensor 的 Keras 模型编写自定义训练循环。这是为了使用适当的布局信息打包输入数据,这并未集成到 Keras 标准的 tf.keras.Model.fit() 或 tf.keras.Model.eval() 函数中。您将在即将发布的版本中获得更多对 tf.data 的支持。
@tf.function
def train_step(model, x, y, optimizer, metrics):
with tf.GradientTape() as tape:
logits = model(x, training=True)
# tf.reduce_sum sums the batch sharded per-example loss to a replicated
# global loss (scalar).
loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
y, logits, from_logits=True))
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
for metric in metrics.values():
metric.update_state(y_true=y, y_pred=logits)
loss_per_sample = loss / len(x)
results = {'loss': loss_per_sample}
return results
@tf.function
def eval_step(model, x, y, metrics):
logits = model(x, training=False)
loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
y, logits, from_logits=True))
for metric in metrics.values():
metric.update_state(y_true=y, y_pred=logits)
loss_per_sample = loss / len(x)
results = {'eval_loss': loss_per_sample}
return results
def pack_dtensor_inputs(images, labels, image_layout, label_layout):
num_local_devices = image_layout.mesh.num_local_devices()
images = tf.split(images, num_local_devices)
labels = tf.split(labels, num_local_devices)
images = dtensor.pack(images, image_layout)
labels = dtensor.pack(labels, label_layout)
return images, labels
指标和优化器
当将 DTensor API 与 Keras Metric 和 Optimizer 一起使用时,您需要提供额外的网格信息,以便任何内部状态变量和张量都能与模型中的变量协同工作。
对于优化器,DTensor 引入了一个新的实验性命名空间
keras.dtensor.experimental.optimizers,其中许多现有的 Keras 优化器已扩展为可以接收额外的mesh参数。在未来的版本中,它可能会与 Keras 核心优化器合并。对于指标 (metrics),您可以直接在构造函数中将
mesh指定为参数,使其成为兼容 DTensor 的Metric。
optimizer = tf.keras.dtensor.experimental.optimizers.Adam(0.01, mesh=mesh)
metrics = {'accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}
eval_metrics = {'eval_accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}
训练模型
以下示例演示了如何从输入流水线按批次维度对数据进行分片,并使用具有完全复制权重的模型进行训练。
经过 3 个 epoch 后,模型应达到约 97% 的准确率
num_epochs = 3
image_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=4)
label_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
for epoch in range(num_epochs):
print("============================")
print("Epoch: ", epoch)
for metric in metrics.values():
metric.reset_state()
step = 0
results = {}
pbar = tf.keras.utils.Progbar(target=None, stateful_metrics=[])
for input in ds_train:
images, labels = input[0], input[1]
images, labels = pack_dtensor_inputs(
images, labels, image_layout, label_layout)
results.update(train_step(model, images, labels, optimizer, metrics))
for metric_name, metric in metrics.items():
results[metric_name] = metric.result()
pbar.update(step, values=results.items(), finalize=False)
step += 1
pbar.update(step, values=results.items(), finalize=True)
for metric in eval_metrics.values():
metric.reset_state()
for input in ds_test:
images, labels = input[0], input[1]
images, labels = pack_dtensor_inputs(
images, labels, image_layout, label_layout)
results.update(eval_step(model, images, labels, eval_metrics))
for metric_name, metric in eval_metrics.items():
results[metric_name] = metric.result()
for metric_name, metric in results.items():
print(f"{metric_name}: {metric.numpy()}")
为现有模型代码指定布局
通常您拥有的模型已经非常适合您的用例。为模型内的每个单独层指定 Layout 信息将是一项巨大的工作,需要进行大量修改。
为了帮助您轻松地将现有 Keras 模型转换为使用 DTensor API,您可以使用新的 tf.keras.dtensor.experimental.LayoutMap API,该 API 允许您从全局角度指定 Layout。
首先,您需要创建一个 LayoutMap 实例,这是一个类似字典的对象,其中包含您想要为模型权重指定的所有 Layout。
LayoutMap 在初始化时需要一个 Mesh 实例,它可用于为任何未配置布局的权重提供默认的复制 Layout。如果您希望所有模型权重都完全复制,则可以提供一个空的 LayoutMap,系统将使用默认网格来创建复制的 Layout。
LayoutMap 使用字符串作为键,使用 Layout 作为值。普通 Python 字典与此类之间存在行为差异:检索值时,字符串键将被视为正则表达式。
子类化模型
考虑以下使用 Keras 子类化模型语法定义的模型。
class SubclassedModel(tf.keras.Model):
def __init__(self, name=None):
super().__init__(name=name)
self.feature = tf.keras.layers.Dense(16)
self.feature_2 = tf.keras.layers.Dense(24)
self.dropout = tf.keras.layers.Dropout(0.1)
def call(self, inputs, training=None):
x = self.feature(inputs)
x = self.dropout(x, training=training)
return self.feature_2(x)
该模型中有 4 个权重,分别是两个 Dense 层的 kernel 和 bias。它们中的每一个都根据对象路径进行映射
model.feature.kernelmodel.feature.biasmodel.feature_2.kernelmodel.feature_2.bias
现在定义以下 LayoutMap 并将其应用于模型
layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)
layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
with layout_map.scope():
subclassed_model = SubclassedModel()
模型权重在第一次调用时创建,因此请使用 DTensor 输入调用模型并确认权重具有预期的布局
dtensor_input = dtensor.copy_to_mesh(tf.zeros((16, 16)), layout=unsharded_layout_2d)
# Trigger the weights creation for subclass model
subclassed_model(dtensor_input)
print(subclassed_model.feature.kernel.layout)
有了这个,您可以快速将 Layout 映射到您的模型,而无需更新任何现有代码。
Sequential 和函数式模型
对于 Keras 函数式和 Sequential 模型,您也可以使用 tf.keras.dtensor.experimental.LayoutMap。
layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)
layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
with layout_map.scope():
inputs = tf.keras.Input((16,), batch_size=16)
x = tf.keras.layers.Dense(16, name='feature')(inputs)
x = tf.keras.layers.Dropout(0.1)(x)
output = tf.keras.layers.Dense(32, name='feature_2')(x)
model = tf.keras.Model(inputs, output)
print(model.layers[1].kernel.layout)
with layout_map.scope():
model = tf.keras.Sequential([
tf.keras.layers.Dense(16, name='feature', input_shape=(16,)),
tf.keras.layers.Dropout(0.1),
tf.keras.layers.Dense(32, name='feature_2')
])
print(model.layers[2].kernel.layout)
在 TensorFlow.org 上查看
在 Google Colab 中运行
在 GitHub 上查看源码
下载笔记本