在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
本教程介绍了自动编码器,并提供了三个示例:基础知识、图像去噪和异常检测。
自动编码器是一种特殊的类型的神经网络,它经过训练可以将输入复制到输出。例如,给定一个手写数字的图像,自动编码器首先将图像编码为一个低维潜在表示,然后将潜在表示解码回图像。自动编码器学习压缩数据,同时最大限度地减少重建误差。
要了解更多关于自动编码器的知识,请阅读 Ian Goodfellow、Yoshua Bengio 和 Aaron Courville 合著的《深度学习》第 14 章。 深度学习
导入 TensorFlow 和其他库
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, losses
from tensorflow.keras.datasets import fashion_mnist
from tensorflow.keras.models import Model
加载数据集
首先,您将使用 Fashion MNIST 数据集训练基本自动编码器。此数据集中的每个图像都是 28x28 像素。
(x_train, _), (x_test, _) = fashion_mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
print (x_train.shape)
print (x_test.shape)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz 29515/29515 ━━━━━━━━━━━━━━━━━━━━ 0s 0us/step Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz 26421880/26421880 ━━━━━━━━━━━━━━━━━━━━ 0s 0us/step Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz 5148/5148 ━━━━━━━━━━━━━━━━━━━━ 0s 0us/step Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz 4422102/4422102 ━━━━━━━━━━━━━━━━━━━━ 0s 0us/step (60000, 28, 28) (10000, 28, 28)
第一个示例:基本自动编码器
定义一个具有两个密集层的自动编码器:一个 encoder
,它将图像压缩成 64 维的潜在向量,以及一个 decoder
,它从潜在空间重建原始图像。
要定义您的模型,请使用 Keras 模型子类化 API。
class Autoencoder(Model):
def __init__(self, latent_dim, shape):
super(Autoencoder, self).__init__()
self.latent_dim = latent_dim
self.shape = shape
self.encoder = tf.keras.Sequential([
layers.Flatten(),
layers.Dense(latent_dim, activation='relu'),
])
self.decoder = tf.keras.Sequential([
layers.Dense(tf.math.reduce_prod(shape).numpy(), activation='sigmoid'),
layers.Reshape(shape)
])
def call(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
shape = x_test.shape[1:]
latent_dim = 64
autoencoder = Autoencoder(latent_dim, shape)
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
使用 x_train
作为输入和目标来训练模型。 encoder
将学习将数据集从 784 维压缩到潜在空间,而 decoder
将学习重建原始图像。.
autoencoder.fit(x_train, x_train,
epochs=10,
shuffle=True,
validation_data=(x_test, x_test))
Epoch 1/10 WARNING: All log messages before absl::InitializeLog() is called are written to STDERR I0000 00:00:1713230807.172583 14227 service.cc:145] XLA service 0x7fe4cc007e20 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices: I0000 00:00:1713230807.172638 14227 service.cc:153] StreamExecutor device (0): Tesla T4, Compute Capability 7.5 I0000 00:00:1713230807.172643 14227 service.cc:153] StreamExecutor device (1): Tesla T4, Compute Capability 7.5 I0000 00:00:1713230807.172646 14227 service.cc:153] StreamExecutor device (2): Tesla T4, Compute Capability 7.5 I0000 00:00:1713230807.172648 14227 service.cc:153] StreamExecutor device (3): Tesla T4, Compute Capability 7.5 131/1875 ━━━━━━━━━━━━━━━━━━━━ 2s 1ms/step - loss: 0.1034 I0000 00:00:1713230807.858362 14227 device_compiler.h:188] Compiled cluster using XLA! This line is logged at most once for the lifetime of the process. 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 5s 2ms/step - loss: 0.0397 - val_loss: 0.0134 Epoch 2/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 2s 1ms/step - loss: 0.0124 - val_loss: 0.0111 Epoch 3/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 3s 1ms/step - loss: 0.0102 - val_loss: 0.0097 Epoch 4/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 2s 1ms/step - loss: 0.0095 - val_loss: 0.0094 Epoch 5/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 2s 1ms/step - loss: 0.0092 - val_loss: 0.0092 Epoch 6/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 3s 1ms/step - loss: 0.0090 - val_loss: 0.0090 Epoch 7/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 2s 1ms/step - loss: 0.0089 - val_loss: 0.0089 Epoch 8/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 2s 1ms/step - loss: 0.0088 - val_loss: 0.0090 Epoch 9/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 2s 1ms/step - loss: 0.0087 - val_loss: 0.0089 Epoch 10/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 2s 1ms/step - loss: 0.0087 - val_loss: 0.0088 <keras.src.callbacks.history.History at 0x7fe6854d9b50>
现在模型已经训练完成,让我们通过对测试集中的图像进行编码和解码来测试它。
encoded_imgs = autoencoder.encoder(x_test).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()
n = 10
plt.figure(figsize=(20, 4))
for i in range(n):
# display original
ax = plt.subplot(2, n, i + 1)
plt.imshow(x_test[i])
plt.title("original")
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
# display reconstruction
ax = plt.subplot(2, n, i + 1 + n)
plt.imshow(decoded_imgs[i])
plt.title("reconstructed")
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()
第二个示例:图像去噪
自动编码器还可以被训练用来从图像中去除噪声。在下一节中,您将通过对每个图像应用随机噪声来创建 Fashion MNIST 数据集的噪声版本。然后,您将使用噪声图像作为输入,使用原始图像作为目标来训练自动编码器。
让我们重新导入数据集以省略之前进行的修改。
(x_train, _), (x_test, _) = fashion_mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]
print(x_train.shape)
(60000, 28, 28, 1)
向图像添加随机噪声
noise_factor = 0.2
x_train_noisy = x_train + noise_factor * tf.random.normal(shape=x_train.shape)
x_test_noisy = x_test + noise_factor * tf.random.normal(shape=x_test.shape)
x_train_noisy = tf.clip_by_value(x_train_noisy, clip_value_min=0., clip_value_max=1.)
x_test_noisy = tf.clip_by_value(x_test_noisy, clip_value_min=0., clip_value_max=1.)
绘制噪声图像。
n = 10
plt.figure(figsize=(20, 2))
for i in range(n):
ax = plt.subplot(1, n, i + 1)
plt.title("original + noise")
plt.imshow(tf.squeeze(x_test_noisy[i]))
plt.gray()
plt.show()
定义卷积自动编码器
在本例中,您将使用 Conv2D 层在 encoder
中训练卷积自动编码器,并在 decoder
中使用 Conv2DTranspose 层。
class Denoise(Model):
def __init__(self):
super(Denoise, self).__init__()
self.encoder = tf.keras.Sequential([
layers.Input(shape=(28, 28, 1)),
layers.Conv2D(16, (3, 3), activation='relu', padding='same', strides=2),
layers.Conv2D(8, (3, 3), activation='relu', padding='same', strides=2)])
self.decoder = tf.keras.Sequential([
layers.Conv2DTranspose(8, kernel_size=3, strides=2, activation='relu', padding='same'),
layers.Conv2DTranspose(16, kernel_size=3, strides=2, activation='relu', padding='same'),
layers.Conv2D(1, kernel_size=(3, 3), activation='sigmoid', padding='same')])
def call(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
autoencoder = Denoise()
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
autoencoder.fit(x_train_noisy, x_train,
epochs=10,
shuffle=True,
validation_data=(x_test_noisy, x_test))
Epoch 1/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 7s 2ms/step - loss: 0.0332 - val_loss: 0.0094 Epoch 2/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 3s 2ms/step - loss: 0.0089 - val_loss: 0.0081 Epoch 3/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 4s 2ms/step - loss: 0.0079 - val_loss: 0.0076 Epoch 4/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 4s 2ms/step - loss: 0.0074 - val_loss: 0.0073 Epoch 5/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 4s 2ms/step - loss: 0.0072 - val_loss: 0.0071 Epoch 6/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 4s 2ms/step - loss: 0.0070 - val_loss: 0.0070 Epoch 7/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 4s 2ms/step - loss: 0.0069 - val_loss: 0.0069 Epoch 8/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 4s 2ms/step - loss: 0.0068 - val_loss: 0.0068 Epoch 9/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 4s 2ms/step - loss: 0.0068 - val_loss: 0.0067 Epoch 10/10 1875/1875 ━━━━━━━━━━━━━━━━━━━━ 3s 2ms/step - loss: 0.0067 - val_loss: 0.0067 <keras.src.callbacks.history.History at 0x7fe6883e5a30>
让我们看一下编码器的摘要。注意图像如何从 28x28 降采样到 7x7。
autoencoder.encoder.summary()
解码器将图像从 7x7 上采样回 28x28。
autoencoder.decoder.summary()
绘制噪声图像和自动编码器生成的去噪图像。
encoded_imgs = autoencoder.encoder(x_test_noisy).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()
n = 10
plt.figure(figsize=(20, 4))
for i in range(n):
# display original + noise
ax = plt.subplot(2, n, i + 1)
plt.title("original + noise")
plt.imshow(tf.squeeze(x_test_noisy[i]))
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
# display reconstruction
bx = plt.subplot(2, n, i + n + 1)
plt.title("reconstructed")
plt.imshow(tf.squeeze(decoded_imgs[i]))
plt.gray()
bx.get_xaxis().set_visible(False)
bx.get_yaxis().set_visible(False)
plt.show()
第三个示例:异常检测
概述
在本例中,您将训练一个自动编码器来检测 ECG5000 数据集 上的异常。此数据集包含 5,000 个 心电图,每个心电图都有 140 个数据点。您将使用数据集的简化版本,其中每个示例都被标记为 0
(对应于异常心律)或 1
(对应于正常心律)。您有兴趣识别异常心律。
您将如何使用自动编码器检测异常?回想一下,自动编码器经过训练可以最小化重建误差。您将仅对正常心律训练自动编码器,然后使用它来重建所有数据。我们的假设是异常心律将具有更高的重建误差。然后,如果重建误差超过固定阈值,您将把心律分类为异常。
加载心电图数据
您将使用的数据集基于来自 timeseriesclassification.com 的数据集。
# Download the dataset
dataframe = pd.read_csv('http://storage.googleapis.com/download.tensorflow.org/data/ecg.csv', header=None)
raw_data = dataframe.values
dataframe.head()
# The last element contains the labels
labels = raw_data[:, -1]
# The other data points are the electrocadriogram data
data = raw_data[:, 0:-1]
train_data, test_data, train_labels, test_labels = train_test_split(
data, labels, test_size=0.2, random_state=21
)
将数据归一化到 [0,1]
。
min_val = tf.reduce_min(train_data)
max_val = tf.reduce_max(train_data)
train_data = (train_data - min_val) / (max_val - min_val)
test_data = (test_data - min_val) / (max_val - min_val)
train_data = tf.cast(train_data, tf.float32)
test_data = tf.cast(test_data, tf.float32)
您将仅使用正常心律训练自动编码器,这些心律在本数据集中标记为 1
。将正常心律与异常心律分开。
train_labels = train_labels.astype(bool)
test_labels = test_labels.astype(bool)
normal_train_data = train_data[train_labels]
normal_test_data = test_data[test_labels]
anomalous_train_data = train_data[~train_labels]
anomalous_test_data = test_data[~test_labels]
绘制正常心电图。
plt.grid()
plt.plot(np.arange(140), normal_train_data[0])
plt.title("A Normal ECG")
plt.show()
绘制异常心电图。
plt.grid()
plt.plot(np.arange(140), anomalous_train_data[0])
plt.title("An Anomalous ECG")
plt.show()
构建模型
class AnomalyDetector(Model):
def __init__(self):
super(AnomalyDetector, self).__init__()
self.encoder = tf.keras.Sequential([
layers.Dense(32, activation="relu"),
layers.Dense(16, activation="relu"),
layers.Dense(8, activation="relu")])
self.decoder = tf.keras.Sequential([
layers.Dense(16, activation="relu"),
layers.Dense(32, activation="relu"),
layers.Dense(140, activation="sigmoid")])
def call(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded
autoencoder = AnomalyDetector()
autoencoder.compile(optimizer='adam', loss='mae')
注意,自动编码器仅使用正常心电图进行训练,但使用完整的测试集进行评估。
history = autoencoder.fit(normal_train_data, normal_train_data,
epochs=20,
batch_size=512,
validation_data=(test_data, test_data),
shuffle=True)
Epoch 1/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 5s 494ms/step - loss: 0.0575 - val_loss: 0.0526 Epoch 2/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 14ms/step - loss: 0.0549 - val_loss: 0.0506 Epoch 3/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 13ms/step - loss: 0.0514 - val_loss: 0.0488 Epoch 4/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 13ms/step - loss: 0.0475 - val_loss: 0.0466 Epoch 5/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 14ms/step - loss: 0.0432 - val_loss: 0.0439 Epoch 6/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 13ms/step - loss: 0.0389 - val_loss: 0.0421 Epoch 7/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 13ms/step - loss: 0.0353 - val_loss: 0.0404 Epoch 8/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 13ms/step - loss: 0.0319 - val_loss: 0.0388 Epoch 9/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 14ms/step - loss: 0.0290 - val_loss: 0.0374 Epoch 10/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 13ms/step - loss: 0.0276 - val_loss: 0.0368 Epoch 11/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 14ms/step - loss: 0.0264 - val_loss: 0.0363 Epoch 12/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 14ms/step - loss: 0.0254 - val_loss: 0.0356 Epoch 13/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 13ms/step - loss: 0.0242 - val_loss: 0.0352 Epoch 14/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 14ms/step - loss: 0.0234 - val_loss: 0.0350 Epoch 15/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 14ms/step - loss: 0.0229 - val_loss: 0.0345 Epoch 16/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 14ms/step - loss: 0.0222 - val_loss: 0.0343 Epoch 17/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 13ms/step - loss: 0.0216 - val_loss: 0.0341 Epoch 18/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 14ms/step - loss: 0.0212 - val_loss: 0.0338 Epoch 19/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 14ms/step - loss: 0.0210 - val_loss: 0.0338 Epoch 20/20 5/5 ━━━━━━━━━━━━━━━━━━━━ 0s 14ms/step - loss: 0.0208 - val_loss: 0.0339
plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")
plt.legend()
<matplotlib.legend.Legend at 0x7fe6704c3520>
您将很快将心电图分类为异常,如果重建误差大于训练集中正常示例的标准差。首先,让我们绘制训练集中正常心电图、经过自动编码器编码和解码后的重建以及重建误差。
encoded_data = autoencoder.encoder(normal_test_data).numpy()
decoded_data = autoencoder.decoder(encoded_data).numpy()
plt.plot(normal_test_data[0], 'b')
plt.plot(decoded_data[0], 'r')
plt.fill_between(np.arange(140), decoded_data[0], normal_test_data[0], color='lightcoral')
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.show()
创建一个类似的图,这次用于异常测试示例。
encoded_data = autoencoder.encoder(anomalous_test_data).numpy()
decoded_data = autoencoder.decoder(encoded_data).numpy()
plt.plot(anomalous_test_data[0], 'b')
plt.plot(decoded_data[0], 'r')
plt.fill_between(np.arange(140), decoded_data[0], anomalous_test_data[0], color='lightcoral')
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.show()
检测异常
通过计算重建损失是否大于固定阈值来检测异常。在本教程中,您将计算训练集中正常示例的平均绝对误差,然后如果重建误差高于训练集的标准差,则将未来示例分类为异常。
绘制训练集中正常心电图的重建误差
reconstructions = autoencoder.predict(normal_train_data)
train_loss = tf.keras.losses.mae(reconstructions, normal_train_data)
plt.hist(train_loss[None,:], bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")
plt.show()
74/74 ━━━━━━━━━━━━━━━━━━━━ 1s 6ms/step
选择一个高于平均值的标准差的阈值。
threshold = np.mean(train_loss) + np.std(train_loss)
print("Threshold: ", threshold)
Threshold: 0.033047937
如果您检查测试集中异常示例的重建误差,您会注意到大多数重建误差大于阈值。通过改变阈值,您可以调整分类器的 精度 和 召回率。
reconstructions = autoencoder.predict(anomalous_test_data)
test_loss = tf.keras.losses.mae(reconstructions, anomalous_test_data)
plt.hist(test_loss[None, :], bins=50)
plt.xlabel("Test loss")
plt.ylabel("No of examples")
plt.show()
14/14 ━━━━━━━━━━━━━━━━━━━━ 0s 22ms/step
如果重建误差大于阈值,则将心电图分类为异常。
def predict(model, data, threshold):
reconstructions = model(data)
loss = tf.keras.losses.mae(reconstructions, data)
return tf.math.less(loss, threshold)
def print_stats(predictions, labels):
print("Accuracy = {}".format(accuracy_score(labels, predictions)))
print("Precision = {}".format(precision_score(labels, predictions)))
print("Recall = {}".format(recall_score(labels, predictions)))
preds = predict(autoencoder, test_data, threshold)
print_stats(preds, test_labels)
Accuracy = 0.944 Precision = 0.9921875 Recall = 0.9071428571428571
下一步
要了解有关使用自动编码器进行异常检测的更多信息,请查看 Victor Dibia 使用 TensorFlow.js 构建的这个出色的 交互式示例。对于实际应用案例,您可以了解 空中客车如何使用 TensorFlow 检测国际空间站遥测数据中的异常。要了解有关基础知识的更多信息,请阅读 François Chollet 的这篇 博客文章。有关更多详细信息,请阅读 Ian Goodfellow、Yoshua Bengio 和 Aaron Courville 合著的《深度学习》第 14 章。 深度学习