在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
概述
DTensor 提供了一种方法,让您可以在多个设备上分布式训练模型,以提高效率、可靠性和可扩展性。有关更多详细信息,请查看 DTensor 概念 指南。
在本教程中,您将使用 DTensor 训练一个情感分析模型。该示例演示了三种分布式训练方案
- 数据并行训练,将训练样本分片(分区)到设备上。
- 模型并行训练,将模型变量分片到设备上。
- 空间并行训练,将输入数据的特征分片到设备上(也称为 空间分区)。
本教程的训练部分灵感来自 Kaggle 笔记本,名为 Kaggle 关于情感分析的指南。要了解完整的训练和评估工作流程(不使用 DTensor),请参考该笔记本。
本教程将逐步介绍以下步骤
- 一些数据清理,以获得一个
tf.data.Dataset
,其中包含标记化的句子及其极性。 - 然后,使用
tf.Module
构建一个具有自定义 Dense 和 BatchNorm 层的 MLP 模型,以跟踪推理变量。模型构造函数将接受额外的Layout
参数来控制变量的分片。 - 对于训练,您将首先使用数据并行训练以及
tf.experimental.dtensor
的检查点功能。然后,您将继续进行模型并行训练和空间并行训练。 - 最后一部分简要介绍了
tf.saved_model
和tf.experimental.dtensor
在 TensorFlow 2.9 中的交互。
设置
DTensor (tf.experimental.dtensor
) 已成为 TensorFlow 2.9.0 版本的一部分。
首先,安装或升级 TensorFlow Datasets
pip install --quiet --upgrade tensorflow-datasets
接下来,导入 tensorflow
和 dtensor
,并将 TensorFlow 配置为使用 8 个虚拟 CPU。
即使本示例使用虚拟 CPU,DTensor 在 CPU、GPU 或 TPU 设备上的工作方式相同。
import tempfile
import numpy as np
import tensorflow_datasets as tfds
import tensorflow as tf
from tensorflow.experimental import dtensor
print('TensorFlow version:', tf.__version__)
def configure_virtual_cpus(ncpu):
phy_devices = tf.config.list_physical_devices('CPU')
tf.config.set_logical_device_configuration(phy_devices[0], [
tf.config.LogicalDeviceConfiguration(),
] * ncpu)
configure_virtual_cpus(8)
DEVICES = [f'CPU:{i}' for i in range(8)]
tf.config.list_logical_devices('CPU')
下载数据集
下载 IMDB 评论数据集以训练情感分析模型
train_data = tfds.load('imdb_reviews', split='train', shuffle_files=True, batch_size=64)
train_data
准备数据
首先对文本进行标记。这里使用 one-hot 编码的扩展,即 tf.keras.layers.TextVectorization
的 'tf_idf'
模式。
- 为了提高速度,将标记数量限制为 1200。
- 为了使
tf.Module
更简单,在训练之前运行TextVectorization
作为预处理步骤。
数据清理部分的最终结果是一个 Dataset
,其中标记化的文本作为 x
,标签作为 y
。
text_vectorization = tf.keras.layers.TextVectorization(output_mode='tf_idf', max_tokens=1200, output_sequence_length=None)
text_vectorization.adapt(data=train_data.map(lambda x: x['text']))
def vectorize(features):
return text_vectorization(features['text']), features['label']
train_data_vec = train_data.map(vectorize)
train_data_vec
使用 DTensor 构建神经网络
现在使用 DTensor
构建一个多层感知器 (MLP) 网络。该网络将使用全连接的 Dense 和 BatchNorm 层。
DTensor
通过根据其输入 Tensor
和变量的 dtensor.Layout
属性,对常规 TensorFlow 操作进行单程序多数据 (SPMD) 扩展,从而扩展 TensorFlow。
DTensor
意识层的变量是 dtensor.DVariable
,并且 DTensor
意识层对象的构造函数除了通常的层参数外,还接受额外的 Layout
输入。
Dense 层
以下自定义 Dense 层定义了 2 个层变量:\(W_{ij}\) 是权重的变量,\(b_i\) 是偏差的变量。
\[ y_j = \sigma(\sum_i x_i W_{ij} + b_j) \]
布局推断
此结果来自以下观察结果
矩阵点积 \(t_j = \sum_i x_i W_{ij}\) 的操作数的首选 DTensor 分片是沿 \(i\) 轴以相同方式分片 \(\mathbf{W}\) 和 \(\mathbf{x}\)。
矩阵和 \(t_j + b_j\) 的操作数的首选 DTensor 分片是沿 \(j\) 轴以相同方式分片 \(\mathbf{t}\) 和 \(\mathbf{b}\)。
class Dense(tf.Module):
def __init__(self, input_size, output_size,
init_seed, weight_layout, activation=None):
super().__init__()
random_normal_initializer = tf.function(tf.random.stateless_normal)
self.weight = dtensor.DVariable(
dtensor.call_with_layout(
random_normal_initializer, weight_layout,
shape=[input_size, output_size],
seed=init_seed
))
if activation is None:
activation = lambda x:x
self.activation = activation
# bias is sharded the same way as the last axis of weight.
bias_layout = weight_layout.delete([0])
self.bias = dtensor.DVariable(
dtensor.call_with_layout(tf.zeros, bias_layout, [output_size]))
def __call__(self, x):
y = tf.matmul(x, self.weight) + self.bias
y = self.activation(y)
return y
BatchNorm
批归一化层有助于在训练期间避免模式崩溃。在这种情况下,添加批归一化层有助于模型训练避免生成仅产生零的模型。
下面自定义 BatchNorm
层的构造函数不接受 Layout
参数。这是因为 BatchNorm
没有层变量。这仍然适用于 DTensor,因为 'x'(层的唯一输入)已经是表示全局批次的 DTensor。
class BatchNorm(tf.Module):
def __init__(self):
super().__init__()
def __call__(self, x, training=True):
if not training:
# This branch is not used in the Tutorial.
pass
mean, variance = tf.nn.moments(x, axes=[0])
return tf.nn.batch_normalization(x, mean, variance, 0.0, 1.0, 1e-5)
一个功能齐全的批归一化层(例如 tf.keras.layers.BatchNormalization
)将需要其变量的 Layout 参数。
def make_keras_bn(bn_layout):
return tf.keras.layers.BatchNormalization(gamma_layout=bn_layout,
beta_layout=bn_layout,
moving_mean_layout=bn_layout,
moving_variance_layout=bn_layout,
fused=False)
将层组合在一起
接下来,使用上面的构建块构建一个多层感知器 (MLP) 网络。下图显示了输入 x
与两个 Dense
层的权重矩阵之间的轴关系,没有任何 DTensor 分片或复制应用。
第一个 Dense
层的输出传递到第二个 Dense
层的输入(在 BatchNorm
之后)。因此,第一个 Dense
层 (\(\mathbf{W_1}\)) 的输出和第二个 Dense
层 (\(\mathbf{W_2}\)) 的输入的首选 DTensor 分片是沿公共轴 \(\hat{j}\) 以相同方式分片 \(\mathbf{W_1}\) 和 \(\mathbf{W_2}\),
\[ \mathsf{Layout}[{W_{1,ij} }; i, j] = \left[\hat{i}, \hat{j}\right] \\ \mathsf{Layout}[{W_{2,jk} }; j, k] = \left[\hat{j}, \hat{k} \right] \]
即使布局推断表明 2 个布局不是独立的,为了简化模型接口,MLP
将接受 2 个 Layout
参数,每个 Dense 层一个。
from typing import Tuple
class MLP(tf.Module):
def __init__(self, dense_layouts: Tuple[dtensor.Layout, dtensor.Layout]):
super().__init__()
self.dense1 = Dense(
1200, 48, (1, 2), dense_layouts[0], activation=tf.nn.relu)
self.bn = BatchNorm()
self.dense2 = Dense(48, 2, (3, 4), dense_layouts[1])
def __call__(self, x):
y = x
y = self.dense1(y)
y = self.bn(y)
y = self.dense2(y)
return y
布局推断约束的正确性与 API 的简单性之间的权衡是使用 DTensor 的 API 的常见设计点。也可以使用不同的 API 来捕获 Layout
之间的依赖关系。例如,MLPStricter
类在构造函数中创建 Layout
对象。
class MLPStricter(tf.Module):
def __init__(self, mesh, input_mesh_dim, inner_mesh_dim1, output_mesh_dim):
super().__init__()
self.dense1 = Dense(
1200, 48, (1, 2), dtensor.Layout([input_mesh_dim, inner_mesh_dim1], mesh),
activation=tf.nn.relu)
self.bn = BatchNorm()
self.dense2 = Dense(48, 2, (3, 4), dtensor.Layout([inner_mesh_dim1, output_mesh_dim], mesh))
def __call__(self, x):
y = x
y = self.dense1(y)
y = self.bn(y)
y = self.dense2(y)
return y
为了确保模型运行,请使用完全复制的布局和完全复制的 'x'
输入批次来探测您的模型。
WORLD = dtensor.create_mesh([("world", 8)], devices=DEVICES)
model = MLP([dtensor.Layout.replicated(WORLD, rank=2),
dtensor.Layout.replicated(WORLD, rank=2)])
sample_x, sample_y = train_data_vec.take(1).get_single_element()
sample_x = dtensor.copy_to_mesh(sample_x, dtensor.Layout.replicated(WORLD, rank=2))
print(model(sample_x))
将数据移动到设备
通常,tf.data
迭代器(以及其他数据获取方法)会生成由本地主机设备内存支持的张量对象。此数据必须传输到支持 DTensor 组件张量的加速器设备内存。
dtensor.copy_to_mesh
不适合这种情况,因为它会由于 DTensor 的全局视角而将输入张量复制到所有设备。因此,在本教程中,您将使用一个辅助函数 repack_local_tensor
来促进数据的传输。此辅助函数使用 dtensor.pack
将打算发送给副本的全局批次的碎片发送(并且仅发送)到支持副本的设备。
此简化函数假设单客户端。在多客户端应用程序中,确定拆分本地张量的正确方法以及拆分部分与本地设备之间的映射可能很费力。
计划提供额外的 DTensor API 来简化 tf.data
集成,支持单客户端和多客户端应用程序。敬请关注。
def repack_local_tensor(x, layout):
"""Repacks a local Tensor-like to a DTensor with layout.
This function assumes a single-client application.
"""
x = tf.convert_to_tensor(x)
sharded_dims = []
# For every sharded dimension, use tf.split to split the along the dimension.
# The result is a nested list of split-tensors in queue[0].
queue = [x]
for axis, dim in enumerate(layout.sharding_specs):
if dim == dtensor.UNSHARDED:
continue
num_splits = layout.shape[axis]
queue = tf.nest.map_structure(lambda x: tf.split(x, num_splits, axis=axis), queue)
sharded_dims.append(dim)
# Now we can build the list of component tensors by looking up the location in
# the nested list of split-tensors created in queue[0].
components = []
for locations in layout.mesh.local_device_locations():
t = queue[0]
for dim in sharded_dims:
split_index = locations[dim] # Only valid on single-client mesh.
t = t[split_index]
components.append(t)
return dtensor.pack(components, layout)
数据并行训练
在本节中,您将使用数据并行训练来训练 MLP 模型。接下来的部分将演示模型并行训练和空间并行训练。
数据并行训练是分布式机器学习中常用的方案
- 模型变量在每个 N 个设备上复制。
- 全局批次被拆分为 N 个每个副本批次。
- 每个每个副本批次都在副本设备上进行训练。
- 在所有副本上集体执行权重更新数据之前,会减少梯度。
数据并行训练在设备数量方面提供了近乎线性的加速。
创建数据并行网格
典型的并行训练循环使用一个 DTensor Mesh
,该网格包含一个单一的 batch
维度,其中每个设备都成为一个副本,接收来自全局批次的碎片。
复制的模型在副本上运行,因此模型变量被完全复制(未分片)。
mesh = dtensor.create_mesh([("batch", 8)], devices=DEVICES)
model = MLP([dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh),
dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh),])
将训练数据打包到 DTensor 中
训练数据批次应该被打包成沿着 'batch'
(第一个)轴分片的 DTensors,这样 DTensor 就可以将训练数据均匀地分配到 'batch'
网格维度。
def repack_batch(x, y, mesh):
x = repack_local_tensor(x, layout=dtensor.Layout(['batch', dtensor.UNSHARDED], mesh))
y = repack_local_tensor(y, layout=dtensor.Layout(['batch'], mesh))
return x, y
sample_x, sample_y = train_data_vec.take(1).get_single_element()
sample_x, sample_y = repack_batch(sample_x, sample_y, mesh)
print('x', sample_x[:, 0])
print('y', sample_y)
训练步骤
此示例使用带有自定义训练循环 (CTL) 的随机梯度下降优化器。有关这些主题的更多信息,请参阅 自定义训练循环指南 和 演练。
train_step
被封装为一个 tf.function
,以指示此主体将被追踪为 TensorFlow 图。 train_step
的主体包含正向推理传递、反向梯度传递和变量更新。
请注意,train_step
的主体不包含任何特殊的 DTensor 注释。相反,train_step
只包含处理来自输入批次的全局视图的输入 x
和 y
的高级 TensorFlow 操作以及模型。所有 DTensor 注释(Mesh
、Layout
)都从训练步骤中分解出来。
# Refer to the CTL (custom training loop guide)
@tf.function
def train_step(model, x, y, learning_rate=tf.constant(1e-4)):
with tf.GradientTape() as tape:
logits = model(x)
# tf.reduce_sum sums the batch sharded per-example loss to a replicated
# global loss (scalar).
loss = tf.reduce_sum(
tf.nn.sparse_softmax_cross_entropy_with_logits(
logits=logits, labels=y))
parameters = model.trainable_variables
gradients = tape.gradient(loss, parameters)
for parameter, parameter_gradient in zip(parameters, gradients):
parameter.assign_sub(learning_rate * parameter_gradient)
# Define some metrics
accuracy = 1.0 - tf.reduce_sum(tf.cast(tf.argmax(logits, axis=-1, output_type=tf.int64) != y, tf.float32)) / x.shape[0]
loss_per_sample = loss / len(x)
return {'loss': loss_per_sample, 'accuracy': accuracy}
检查点
您可以使用开箱即用的 tf.train.Checkpoint
对 DTensor 模型进行检查点。保存和恢复分片 DVariables 将执行高效的分片保存和恢复。目前,当使用 tf.train.Checkpoint.save
和 tf.train.Checkpoint.restore
时,所有 DVariables 必须位于同一个主机网格上,并且 DVariables 和常规变量不能一起保存。您可以在 本指南 中了解更多关于检查点的知识。
当恢复 DTensor 检查点时,变量的 Layout
可能与保存检查点时不同。也就是说,保存 DTensor 模型与布局和网格无关,只影响分片保存的效率。您可以使用一个网格和布局保存 DTensor 模型,并在不同的网格和布局上恢复它。本教程利用此功能在模型并行训练和空间并行训练部分继续训练。
CHECKPOINT_DIR = tempfile.mkdtemp()
def start_checkpoint_manager(model):
ckpt = tf.train.Checkpoint(root=model)
manager = tf.train.CheckpointManager(ckpt, CHECKPOINT_DIR, max_to_keep=3)
if manager.latest_checkpoint:
print("Restoring a checkpoint")
ckpt.restore(manager.latest_checkpoint).assert_consumed()
else:
print("New training")
return manager
训练循环
对于数据并行训练方案,训练多个 epoch 并报告进度。3 个 epoch 不足以训练模型——50% 的准确率与随机猜测一样好。
启用检查点,以便您以后可以继续训练。在下一节中,您将加载检查点并使用不同的并行方案进行训练。
num_epochs = 2
manager = start_checkpoint_manager(model)
for epoch in range(num_epochs):
step = 0
pbar = tf.keras.utils.Progbar(target=int(train_data_vec.cardinality()), stateful_metrics=[])
metrics = {'epoch': epoch}
for x,y in train_data_vec:
x, y = repack_batch(x, y, mesh)
metrics.update(train_step(model, x, y, 1e-2))
pbar.update(step, values=metrics.items(), finalize=False)
step += 1
manager.save()
pbar.update(step, values=metrics.items(), finalize=True)
模型并行训练
如果您切换到二维 Mesh
,并将模型变量沿着第二个网格维度进行分片,那么训练就变成了模型并行。
在模型并行训练中,每个模型副本跨越多个设备(在本例中为 2 个)。
- 有 4 个模型副本,训练数据批次被分配到 4 个副本。
- 单个模型副本内的 2 个设备接收复制的训练数据。
mesh = dtensor.create_mesh([("batch", 4), ("model", 2)], devices=DEVICES)
model = MLP([dtensor.Layout([dtensor.UNSHARDED, "model"], mesh),
dtensor.Layout(["model", dtensor.UNSHARDED], mesh)])
由于训练数据仍然沿着批次维度进行分片,因此您可以重用与数据并行训练情况相同的 repack_batch
函数。DTensor 将自动将每个副本的批次沿着 "model"
网格维度复制到副本内的所有设备。
def repack_batch(x, y, mesh):
x = repack_local_tensor(x, layout=dtensor.Layout(['batch', dtensor.UNSHARDED], mesh))
y = repack_local_tensor(y, layout=dtensor.Layout(['batch'], mesh))
return x, y
接下来运行训练循环。训练循环重用与数据并行训练示例相同的检查点管理器,代码看起来相同。
您可以继续在模型并行训练下训练数据并行训练的模型。
num_epochs = 2
manager = start_checkpoint_manager(model)
for epoch in range(num_epochs):
step = 0
pbar = tf.keras.utils.Progbar(target=int(train_data_vec.cardinality()))
metrics = {'epoch': epoch}
for x,y in train_data_vec:
x, y = repack_batch(x, y, mesh)
metrics.update(train_step(model, x, y, 1e-2))
pbar.update(step, values=metrics.items(), finalize=False)
step += 1
manager.save()
pbar.update(step, values=metrics.items(), finalize=True)
空间并行训练
当训练非常高维度的训练数据(例如非常大的图像或视频)时,可能需要沿着特征维度进行分片。这被称为 空间分区,它最初是在 TensorFlow 中引入的,用于训练具有大型 3-d 输入样本的模型。
DTensor 也支持这种情况。您需要做的唯一更改是创建一个包含 feature
维度的 Mesh,并应用相应的 Layout
。
mesh = dtensor.create_mesh([("batch", 2), ("feature", 2), ("model", 2)], devices=DEVICES)
model = MLP([dtensor.Layout(["feature", "model"], mesh),
dtensor.Layout(["model", dtensor.UNSHARDED], mesh)])
在将输入张量打包到 DTensors 时,沿着 feature
维度对输入数据进行分片。您可以使用稍微不同的重新打包函数 repack_batch_for_spt
来执行此操作,其中 spt
代表空间并行训练。
def repack_batch_for_spt(x, y, mesh):
# Shard data on feature dimension, too
x = repack_local_tensor(x, layout=dtensor.Layout(["batch", 'feature'], mesh))
y = repack_local_tensor(y, layout=dtensor.Layout(["batch"], mesh))
return x, y
空间并行训练也可以从使用其他并行训练方案创建的检查点继续。
num_epochs = 2
manager = start_checkpoint_manager(model)
for epoch in range(num_epochs):
step = 0
metrics = {'epoch': epoch}
pbar = tf.keras.utils.Progbar(target=int(train_data_vec.cardinality()))
for x, y in train_data_vec:
x, y = repack_batch_for_spt(x, y, mesh)
metrics.update(train_step(model, x, y, 1e-2))
pbar.update(step, values=metrics.items(), finalize=False)
step += 1
manager.save()
pbar.update(step, values=metrics.items(), finalize=True)
SavedModel 和 DTensor
DTensor 和 SavedModel 的集成仍在开发中。
截至 TensorFlow 2.11
,tf.saved_model
可以保存分片和复制的 DTensor 模型,保存将在网格的不同设备上执行高效的分片保存。但是,在保存模型后,所有 DTensor 注释都会丢失,保存的签名只能与常规张量一起使用,不能与 DTensors 一起使用。
mesh = dtensor.create_mesh([("world", 1)], devices=DEVICES[:1])
mlp = MLP([dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh),
dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh)])
manager = start_checkpoint_manager(mlp)
model_for_saving = tf.keras.Sequential([
text_vectorization,
mlp
])
@tf.function(input_signature=[tf.TensorSpec([None], tf.string)])
def run(inputs):
return {'result': model_for_saving(inputs)}
tf.saved_model.save(
model_for_saving, "/tmp/saved_model",
signatures=run)
截至 TensorFlow 2.9.0,您只能使用常规张量或完全复制的 DTensor(将被转换为常规张量)来调用加载的签名。
sample_batch = train_data.take(1).get_single_element()
sample_batch
loaded = tf.saved_model.load("/tmp/saved_model")
run_sig = loaded.signatures["serving_default"]
result = run_sig(sample_batch['text'])['result']
np.mean(tf.argmax(result, axis=-1) == sample_batch['label'])
下一步是什么?
本教程演示了使用 DTensor 构建和训练 MLP 情感分析模型。
通过 Mesh
和 Layout
原语,DTensor 可以将 TensorFlow tf.function
转换为适合各种训练方案的分布式程序。
在实际的机器学习应用程序中,应应用评估和交叉验证,以避免产生过拟合模型。本教程中介绍的技术也可以应用于将并行性引入评估。
从头开始使用 tf.Module
组成模型需要大量工作,而重用现有的构建块(如层和辅助函数)可以极大地加快模型开发速度。截至 TensorFlow 2.9,tf.keras.layers
下的所有 Keras 层都接受 DTensor 布局作为其参数,并且可以用于构建 DTensor 模型。您甚至可以直接重用 Keras 模型和 DTensor,而无需修改模型实现。有关使用 DTensor Keras 的信息,请参阅 DTensor Keras 集成教程。