通过子类化创建新层和模型

作者: fchollet

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 在 keras.io 上查看

设置

import tensorflow as tf
from tensorflow import keras

Layer 类:状态(权重)和一些计算的组合

Keras 中的核心抽象之一是 Layer 类。层封装了状态(层的“权重”)和从输入到输出的转换(“调用”,层的正向传递)。

这是一个密集连接层。它有一个状态:变量 w 和 b。

class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super().__init__()
        self.w = self.add_weight(
            shape=(input_dim, units), initializer="random_normal", trainable=True
        )
        self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

您可以通过对一些张量输入进行调用来使用层,就像 Python 函数一样。

x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)
tf.Tensor(
[[-0.02419483 -0.06813122  0.00395634 -0.03124779]
 [-0.02419483 -0.06813122  0.00395634 -0.03124779]], shape=(2, 4), dtype=float32)

请注意,权重 w 和 b 在被设置为层属性后会自动由层跟踪

assert linear_layer.weights == [linear_layer.w, linear_layer.b]

层可以具有不可训练的权重

除了可训练的权重之外,您还可以向层添加不可训练的权重。此类权重在反向传播期间不应被考虑在内,当您训练层时。

以下是添加和使用不可训练权重的方法

class ComputeSum(keras.layers.Layer):
    def __init__(self, input_dim):
        super().__init__()
        self.total = self.add_weight(
            initializer="zeros", shape=(input_dim,), trainable=False
        )

    def call(self, inputs):
        self.total.assign_add(tf.reduce_sum(inputs, axis=0))
        return self.total


x = tf.ones((2, 2))
my_sum = ComputeSum(2)
y = my_sum(x)
print(y.numpy())
y = my_sum(x)
print(y.numpy())
[2. 2.]
[4. 4.]

它是 layer.weights 的一部分,但它被归类为不可训练的权重

print("weights:", len(my_sum.weights))
print("non-trainable weights:", len(my_sum.non_trainable_weights))

# It's not included in the trainable weights:
print("trainable_weights:", my_sum.trainable_weights)
weights: 1
non-trainable weights: 1
trainable_weights: []

最佳实践:推迟权重创建,直到知道输入的形状

我们上面的 Linear 层接受了一个 input_dim 参数,该参数用于在 __init__() 中计算权重 w 和 b 的形状

class Linear(keras.layers.Layer):
    def __init__(self, units=32, input_dim=32):
        super().__init__()
        self.w = self.add_weight(
            shape=(input_dim, units), initializer="random_normal", trainable=True
        )
        self.b = self.add_weight(shape=(units,), initializer="zeros", trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

在许多情况下,您可能事先不知道输入的大小,并且希望在该值已知时延迟创建权重,即在实例化层后的某个时间点。

在 Keras API 中,我们建议在层的 build(self, inputs_shape) 方法中创建层权重。像这样

class Linear(keras.layers.Layer):
    def __init__(self, units=32):
        super().__init__()
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

您层的 __call__() 方法会在第一次调用时自动运行构建。现在您拥有一个延迟加载的层,因此更易于使用。

# At instantiation, we don't know on what inputs this is going to get called
linear_layer = Linear(32)

# The layer's weights are created dynamically the first time the layer is called
y = linear_layer(x)

如上所示,单独实现 build() 可以很好地将仅创建一次权重与每次调用中使用权重区分开来。但是,对于某些高级自定义层,将状态创建和计算分开可能不切实际。层实现者可以将权重创建延迟到第一次 __call__(),但需要确保后续调用使用相同的权重。此外,由于 __call__() 很可能在 tf.function 内部首次执行,因此在 __call__() 中进行的任何变量创建都应包装在 tf.init_scope 中。

层是递归可组合的。

如果您将层实例作为另一个层的属性分配,则外部层将开始跟踪内部层创建的权重。

我们建议在 __init__() 方法中创建此类子层,并将其留给第一次 __call__() 来触发构建其权重。

class MLPBlock(keras.layers.Layer):
    def __init__(self):
        super().__init__()
        self.linear_1 = Linear(32)
        self.linear_2 = Linear(32)
        self.linear_3 = Linear(1)

    def call(self, inputs):
        x = self.linear_1(inputs)
        x = tf.nn.relu(x)
        x = self.linear_2(x)
        x = tf.nn.relu(x)
        return self.linear_3(x)


mlp = MLPBlock()
y = mlp(tf.ones(shape=(3, 64)))  # The first call to the `mlp` will create the weights
print("weights:", len(mlp.weights))
print("trainable weights:", len(mlp.trainable_weights))
weights: 6
trainable weights: 6

The add_loss() 方法

在编写层的 call() 方法时,您可以创建损失张量,您将在编写训练循环时使用这些张量。这可以通过调用 self.add_loss(value) 来实现。

# A layer that creates an activity regularization loss
class ActivityRegularizationLayer(keras.layers.Layer):
    def __init__(self, rate=1e-2):
        super().__init__()
        self.rate = rate

    def call(self, inputs):
        self.add_loss(self.rate * tf.reduce_mean(inputs))
        return inputs

请注意,add_loss() 可以接受普通 TensorFlow 操作的结果。这里不需要调用 Loss 对象。

这些损失(包括任何内部层创建的损失)可以通过 layer.losses 获取。此属性在每次 __call__() 开始时都会重置为顶层层,因此 layer.losses 始终包含上次前向传递期间创建的损失值。

class OuterLayer(keras.layers.Layer):
    def __init__(self):
        super().__init__()
        self.activity_reg = ActivityRegularizationLayer(1e-2)

    def call(self, inputs):
        return self.activity_reg(inputs)


layer = OuterLayer()
assert len(layer.losses) == 0  # No losses yet since the layer has never been called

_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # We created one loss value

# `layer.losses` gets reset at the start of each __call__
_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1  # This is the loss created during the call above

此外,loss 属性还包含为任何内部层的权重创建的正则化损失。

class OuterLayerWithKernelRegularizer(keras.layers.Layer):
    def __init__(self):
        super().__init__()
        self.dense = keras.layers.Dense(
            32, kernel_regularizer=keras.regularizers.l2(1e-3)
        )

    def call(self, inputs):
        return self.dense(inputs)


layer = OuterLayerWithKernelRegularizer()
_ = layer(tf.zeros((1, 1)))

# This is `1e-3 * sum(layer.dense.kernel ** 2)`,
# created by the `kernel_regularizer` above.
print(layer.losses)
[<tf.Tensor: shape=(), dtype=float32, numpy=0.0017542194>]

这些损失旨在在编写训练循环时考虑在内,例如

# Instantiate an optimizer.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Iterate over the batches of a dataset.
for x_batch_train, y_batch_train in train_dataset:
    with tf.GradientTape() as tape:
        logits = layer(x_batch_train)  # Logits for this minibatch
        # Loss value for this minibatch
        loss_value = loss_fn(y_batch_train, logits)
        # Add extra losses created during this forward pass:
        loss_value += sum(model.losses)

    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))

有关编写训练循环的详细指南,请参阅 从头开始编写训练循环的指南

这些损失也与 fit() 无缝协作(如果存在,它们会自动求和并添加到主损失中)。

import numpy as np

inputs = keras.Input(shape=(3,))
outputs = ActivityRegularizationLayer()(inputs)
model = keras.Model(inputs, outputs)

# If there is a loss passed in `compile`, the regularization
# losses get added to it
model.compile(optimizer="adam", loss="mse")
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))

# It's also possible not to pass any loss in `compile`,
# since the model already has a loss to minimize, via the `add_loss`
# call during the forward pass!
model.compile(optimizer="adam")
model.fit(np.random.random((2, 3)), np.random.random((2, 3)))
1/1 [==============================] - 0s 75ms/step - loss: 0.1081
1/1 [==============================] - 0s 31ms/step - loss: 0.0044
<keras.src.callbacks.History at 0x7fb23c0e3f40>

您可以选择在您的层上启用序列化。

如果您需要将自定义层序列化为 函数式模型 的一部分,您可以选择实现 get_config() 方法。

class Linear(keras.layers.Layer):
    def __init__(self, units=32):
        super().__init__()
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

    def get_config(self):
        return {"units": self.units}


# Now you can recreate the layer from its config:
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
{'units': 64}

请注意,基类 Layer__init__() 方法接受一些关键字参数,特别是 namedtype。在 __init__() 中将这些参数传递给父类并将其包含在层配置中是一个好习惯。

class Linear(keras.layers.Layer):
    def __init__(self, units=32, **kwargs):
        super().__init__(**kwargs)
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        self.b = self.add_weight(
            shape=(self.units,), initializer="random_normal", trainable=True
        )

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

    def get_config(self):
        config = super().get_config()
        config.update({"units": self.units})
        return config


layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
{'name': 'linear_7', 'trainable': True, 'dtype': 'float32', 'units': 64}

如果您在从其配置反序列化层时需要更多灵活性,您还可以覆盖 from_config() 类方法。这是 from_config() 的基本实现。

def from_config(cls, config):
  return cls(**config)

要了解有关序列化和保存的更多信息,请参阅完整的 保存和序列化模型的指南

call() 方法中的特权 training 参数

某些层,特别是 BatchNormalization 层和 Dropout 层,在训练和推理期间的行为不同。对于此类层,通常的做法是在 call() 方法中公开一个 training(布尔值)参数。

通过在 call() 中公开此参数,您可以使内置的训练和评估循环(例如 fit())能够在训练和推理中正确使用该层。

class CustomDropout(keras.layers.Layer):
    def __init__(self, rate, **kwargs):
        super().__init__(**kwargs)
        self.rate = rate

    def call(self, inputs, training=False):
        if training:
            return tf.nn.dropout(inputs, rate=self.rate)
        return inputs

call() 方法中的特权 mask 参数

The call() 支持的另一个特权参数是 mask 参数。

您将在所有 Keras RNN 层中找到它。掩码是一个布尔张量(输入中每个时间步长一个布尔值),用于在处理时间序列数据时跳过某些输入时间步长。

当由先前层生成掩码时,Keras 会自动将正确的 mask 参数传递给支持它的层的 __call__()。生成掩码的层是配置为 mask_zero=TrueEmbedding 层,以及 Masking 层。

要了解有关掩码以及如何编写支持掩码的层的更多信息,请查看指南 "了解填充和掩码"

The Model

通常,您将使用 Layer 类来定义内部计算块,并将使用 Model 类来定义外部模型 - 您将要训练的对象。

例如,在 ResNet50 模型中,您将有几个子类化 Layer 的 ResNet 块,以及一个包含整个 ResNet50 网络的 Model

The Model 类具有与 Layer 相同的 API,但有以下区别

  • 它公开了内置的训练、评估和预测循环 (model.fit()model.evaluate()model.predict())。
  • 它公开了其内部层的列表,通过 model.layers 属性。
  • 它公开了保存和序列化 API (save()save_weights()...)

实际上,Layer 类对应于我们在文献中所指的“层”(如“卷积层”或“循环层”)或“块”(如“ResNet 块”或“Inception 块”)。

同时,Model 类对应于我们在文献中所指的“模型”(如“深度学习模型”)或“网络”(如“深度神经网络”)。

因此,如果您想知道,“我应该使用 Layer 类还是 Model 类?”,问问自己:我是否需要在上面调用 fit()?我是否需要在上面调用 save()?如果是,请选择 Model。如果不是(要么是因为您的类只是更大系统中的一个块,要么是因为您自己编写了训练和保存代码),请使用 Layer

例如,我们可以使用上面的迷你 ResNet 示例来构建一个 Model,我们可以使用 fit() 训练它,并使用 save_weights() 保存它。

class ResNet(keras.Model):

    def __init__(self, num_classes=1000):
        super().__init__()
        self.block_1 = ResNetBlock()
        self.block_2 = ResNetBlock()
        self.global_pool = layers.GlobalAveragePooling2D()
        self.classifier = Dense(num_classes)

    def call(self, inputs):
        x = self.block_1(inputs)
        x = self.block_2(x)
        x = self.global_pool(x)
        return self.classifier(x)


resnet = ResNet()
dataset = ...
resnet.fit(dataset, epochs=10)
resnet.save(filepath.keras)

将所有内容整合在一起:一个端到端示例

以下是您到目前为止学到的内容

  • 一个 Layer 封装了一个状态(在 __init__()build() 中创建)和一些计算(在 call() 中定义)。
  • 层可以递归嵌套以创建新的、更大的计算块。
  • 层可以通过 add_loss() 创建和跟踪损失(通常是正则化损失)。
  • 外部容器,您要训练的东西,是一个 Model。一个 Model 就像一个 Layer,但增加了训练和序列化实用程序。

让我们将所有这些内容整合到一个端到端示例中:我们将实现一个变分自动编码器 (VAE)。我们将使用 MNIST 数字对其进行训练。

我们的 VAE 将是 Model 的子类,构建为子类化 Layer 的层的嵌套组合。它将具有正则化损失(KL 散度)。

from keras import layers


@keras.saving.register_keras_serializable()
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon


@keras.saving.register_keras_serializable()
class Encoder(layers.Layer):
    """Maps MNIST digits to a triplet (z_mean, z_log_var, z)."""

    def __init__(self, latent_dim=32, intermediate_dim=64, name="encoder", **kwargs):
        super().__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
        self.dense_mean = layers.Dense(latent_dim)
        self.dense_log_var = layers.Dense(latent_dim)
        self.sampling = Sampling()

    def call(self, inputs):
        x = self.dense_proj(inputs)
        z_mean = self.dense_mean(x)
        z_log_var = self.dense_log_var(x)
        z = self.sampling((z_mean, z_log_var))
        return z_mean, z_log_var, z


@keras.saving.register_keras_serializable()
class Decoder(layers.Layer):
    """Converts z, the encoded digit vector, back into a readable digit."""

    def __init__(self, original_dim, intermediate_dim=64, name="decoder", **kwargs):
        super().__init__(name=name, **kwargs)
        self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
        self.dense_output = layers.Dense(original_dim, activation="sigmoid")

    def call(self, inputs):
        x = self.dense_proj(inputs)
        return self.dense_output(x)


@keras.saving.register_keras_serializable()
class VariationalAutoEncoder(keras.Model):
    """Combines the encoder and decoder into an end-to-end model for training."""

    def __init__(
        self,
        original_dim,
        intermediate_dim=64,
        latent_dim=32,
        name="autoencoder",
        **kwargs
    ):
        super().__init__(name=name, **kwargs)
        self.original_dim = original_dim
        self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim)
        self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)

    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstructed = self.decoder(z)
        # Add KL divergence regularization loss.
        kl_loss = -0.5 * tf.reduce_mean(
            z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1
        )
        self.add_loss(kl_loss)
        return reconstructed

让我们在 MNIST 上编写一个简单的训练循环

original_dim = 784
vae = VariationalAutoEncoder(original_dim, 64, 32)

optimizer = keras.optimizers.Adam(learning_rate=1e-3)
mse_loss_fn = keras.losses.MeanSquaredError()

loss_metric = keras.metrics.Mean()

(x_train, _), _ = keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype("float32") / 255

train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)

epochs = 2

# Iterate over epochs.
for epoch in range(epochs):
    print("Start of epoch %d" % (epoch,))

    # Iterate over the batches of the dataset.
    for step, x_batch_train in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            reconstructed = vae(x_batch_train)
            # Compute reconstruction loss
            loss = mse_loss_fn(x_batch_train, reconstructed)
            loss += sum(vae.losses)  # Add KLD regularization loss

        grads = tape.gradient(loss, vae.trainable_weights)
        optimizer.apply_gradients(zip(grads, vae.trainable_weights))

        loss_metric(loss)

        if step % 100 == 0:
            print("step %d: mean loss = %.4f" % (step, loss_metric.result()))
Start of epoch 0
WARNING:tensorflow:5 out of the last 5 calls to <function _BaseOptimizer._update_step_xla at 0x7fb220066af0> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has reduce_retracing=True option that can avoid unnecessary retracing. For (3), please refer to https://tensorflowcn.cn/guide/function#controlling_retracing and https://tensorflowcn.cn/api_docs/python/tf/function for  more details.
WARNING:tensorflow:6 out of the last 6 calls to <function _BaseOptimizer._update_step_xla at 0x7fb220066af0> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has reduce_retracing=True option that can avoid unnecessary retracing. For (3), please refer to https://tensorflowcn.cn/guide/function#controlling_retracing and https://tensorflowcn.cn/api_docs/python/tf/function for  more details.
step 0: mean loss = 0.3433
step 100: mean loss = 0.1257
step 200: mean loss = 0.0994
step 300: mean loss = 0.0893
step 400: mean loss = 0.0844
step 500: mean loss = 0.0810
step 600: mean loss = 0.0788
step 700: mean loss = 0.0772
step 800: mean loss = 0.0760
step 900: mean loss = 0.0750
Start of epoch 1
step 0: mean loss = 0.0747
step 100: mean loss = 0.0741
step 200: mean loss = 0.0736
step 300: mean loss = 0.0731
step 400: mean loss = 0.0727
step 500: mean loss = 0.0723
step 600: mean loss = 0.0720
step 700: mean loss = 0.0717
step 800: mean loss = 0.0715
step 900: mean loss = 0.0712

请注意,由于 VAE 是 Model 的子类,因此它具有内置的训练循环。因此,您也可以这样训练它

vae = VariationalAutoEncoder(784, 64, 32)

optimizer = keras.optimizers.Adam(learning_rate=1e-3)

vae.compile(optimizer, loss=keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=2, batch_size=64)
Epoch 1/2
938/938 [==============================] - 4s 3ms/step - loss: 0.0746
Epoch 2/2
938/938 [==============================] - 3s 3ms/step - loss: 0.0676
<keras.src.callbacks.History at 0x7fb1e0533580>