TensorFlow Extended (TFX) 的特征工程组件
此示例 Colab 笔记本提供了一个非常简单的示例,说明如何使用 TensorFlow Transform (tf.Transform
) 使用完全相同的代码来预处理数据,用于训练模型和在生产中提供推断。
TensorFlow Transform 是一个用于预处理 TensorFlow 输入数据的库,包括创建需要对整个训练数据集进行完整遍历的特征。例如,使用 TensorFlow Transform,您可以
- 使用均值和标准差对输入值进行归一化
- 通过对所有输入值生成词汇表将字符串转换为整数
- 根据观察到的数据分布将浮点数分配到桶中,从而将浮点数转换为整数
TensorFlow 内置支持对单个示例或一批示例进行操作。 tf.Transform
扩展了这些功能以支持对整个训练数据集进行完整遍历。
tf.Transform
的输出以 TensorFlow 图的形式导出,您可以将其用于训练和服务。在训练和服务中使用相同的图可以防止偏差,因为两个阶段都应用了相同的转换。
升级 Pip
为了避免在本地运行时升级系统中的 Pip,请确保我们在 Colab 中运行。本地系统当然可以单独升级。
try:
import colab
!pip install --upgrade pip
except:
pass
安装 TensorFlow Transform
pip install -q -U tensorflow_transform
# This cell is only necessary because packages were installed while python was
# running. It avoids the need to restart the runtime when running in Colab.
import pkg_resources
import importlib
importlib.reload(pkg_resources)
/tmpfs/tmp/ipykernel_192169/639106435.py:3: DeprecationWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html import pkg_resources <module 'pkg_resources' from '/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/pkg_resources/__init__.py'>
导入
import pathlib
import pprint
import tempfile
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils
from tensorflow_transform.keras_lib import tf_keras
2024-04-30 10:54:48.029467: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:54:48.029516: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:54:48.030987: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
数据:创建一些虚拟数据
我们将为简单的示例创建一些简单的虚拟数据
raw_data
是我们将要预处理的初始原始数据raw_data_metadata
包含告诉我们raw_data
中每一列类型的模式。在本例中,它非常简单。
raw_data = [
{'x': 1, 'y': 1, 's': 'hello'},
{'x': 2, 'y': 2, 's': 'world'},
{'x': 3, 'y': 3, 's': 'hello'}
]
raw_data_metadata = dataset_metadata.DatasetMetadata(
schema_utils.schema_from_feature_spec({
'y': tf.io.FixedLenFeature([], tf.float32),
'x': tf.io.FixedLenFeature([], tf.float32),
's': tf.io.FixedLenFeature([], tf.string),
}))
转换:创建预处理函数
预处理函数 是 tf.Transform 中最重要的概念。预处理函数是数据集真正发生转换的地方。它接受并返回一个张量字典,其中张量表示 Tensor
或 SparseTensor
。通常构成预处理函数核心的 API 调用主要分为两组
- TensorFlow 操作:任何接受并返回张量的函数,通常表示 TensorFlow 操作。这些将 TensorFlow 操作添加到图中,该图将原始数据转换为一次一个特征向量,从而转换为转换后的数据。这些将在训练和服务期间针对每个示例运行。
- Tensorflow Transform 分析器/映射器:tf.Transform 提供的任何分析器/映射器。这些也接受并返回张量,并且通常包含 TensorFlow 操作和 Beam 计算的组合,但与 TensorFlow 操作不同,它们仅在需要对整个训练数据集进行完整遍历的分析期间的 Beam 管道中运行。Beam 计算仅运行一次(在训练之前,在分析期间),并且通常对整个训练数据集进行完整遍历。它们创建
tf.constant
张量,这些张量将添加到您的图中。例如,tft.min
计算训练数据集上张量的最小值。
def preprocessing_fn(inputs):
"""Preprocess input columns into transformed columns."""
x = inputs['x']
y = inputs['y']
s = inputs['s']
x_centered = x - tft.mean(x)
y_normalized = tft.scale_to_0_1(y)
s_integerized = tft.compute_and_apply_vocabulary(s)
x_centered_times_y_normalized = (x_centered * y_normalized)
return {
'x_centered': x_centered,
'y_normalized': y_normalized,
's_integerized': s_integerized,
'x_centered_times_y_normalized': x_centered_times_y_normalized,
}
语法
您几乎准备好将所有内容整合在一起并使用 Apache Beam 来运行它。
Apache Beam 使用 特殊的语法来定义和调用转换。例如,在这一行中
result = pass_this | 'name this step' >> to_this_call
方法 to_this_call
正在被调用并传递了名为 pass_this
的对象,并且 此操作将在堆栈跟踪中被称为 name this step
。对 to_this_call
的调用的结果将在 result
中返回。您经常会看到像这样链接在一起的管道阶段
result = apache_beam.Pipeline() | 'first step' >> do_this_first() | 'second step' >> do_this_last()
并且由于它从一个新的管道开始,您可以像这样继续
next_result = result | 'doing more stuff' >> another_function()
将所有内容整合在一起
现在我们准备转换我们的数据。我们将使用 Apache Beam 与直接运行器,并提供三个输入
raw_data
- 我们上面创建的原始输入数据raw_data_metadata
- 原始数据的模式preprocessing_fn
- 我们创建的用于进行转换的函数
def main(output_dir):
# Ignore the warnings
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
transformed_dataset, transform_fn = ( # pylint: disable=unused-variable
(raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset # pylint: disable=unused-variable
# Save the transform_fn to the output_dir
_ = (
transform_fn
| 'WriteTransformFn' >> tft_beam.WriteTransformFn(output_dir))
return transformed_data, transformed_metadata
output_dir = pathlib.Path(tempfile.mkdtemp())
transformed_data, transformed_metadata = main(str(output_dir))
print('\nRaw data:\n{}\n'.format(pprint.pformat(raw_data)))
print('Transformed data:\n{}'.format(pprint.pformat(transformed_data)))
WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features. WARNING:absl:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch). WARNING:absl:You are passing instance dicts and DatasetMetadata to TFT which will not provide optimal performance. Consider following the TFT guide to upgrade to the TFXIO format (Apache Arrow RecordBatch). WARNING:absl:You are outputting instance dicts from `TransformDataset` which will not provide optimal performance. Consider setting `output_record_batches=True` to upgrade to the TFXIO format (Apache Arrow RecordBatch). Encoding functionality in this module works with both formats. WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/ipykernel_launcher.py', '-f', '/tmpfs/tmp/tmpgsoge9im.json', '--HistoryManager.hist_file=:memory:'] INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8s0_zhbm/tftransform_tmp/c576d13575254973b6f7263cfcf3ffc3/assets INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8s0_zhbm/tftransform_tmp/c576d13575254973b6f7263cfcf3ffc3/assets INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_text is not available. INFO:tensorflow:tensorflow_text is not available. INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8s0_zhbm/tftransform_tmp/b9fda3835766458d8e33d05f6357bed2/assets INFO:tensorflow:Assets written to: /tmpfs/tmp/tmp8s0_zhbm/tftransform_tmp/b9fda3835766458d8e33d05f6357bed2/assets INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_text is not available. INFO:tensorflow:tensorflow_text is not available. WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/ipykernel_launcher.py', '-f', '/tmpfs/tmp/tmpgsoge9im.json', '--HistoryManager.hist_file=:memory:'] Raw data: [{'s': 'hello', 'x': 1, 'y': 1}, {'s': 'world', 'x': 2, 'y': 2}, {'s': 'hello', 'x': 3, 'y': 3}] Transformed data: [{'s_integerized': 0, 'x_centered': -1.0, 'x_centered_times_y_normalized': -0.0, 'y_normalized': 0.0}, {'s_integerized': 1, 'x_centered': 0.0, 'x_centered_times_y_normalized': 0.0, 'y_normalized': 0.5}, {'s_integerized': 0, 'x_centered': 1.0, 'x_centered_times_y_normalized': 1.0, 'y_normalized': 1.0}]
这是正确的答案吗?
之前,我们使用 tf.Transform
来执行此操作
x_centered = x - tft.mean(x)
y_normalized = tft.scale_to_0_1(y)
s_integerized = tft.compute_and_apply_vocabulary(s)
x_centered_times_y_normalized = (x_centered * y_normalized)
- x_centered - 对于输入
[1, 2, 3]
,x 的平均值为 2,我们将它从 x 中减去以将我们的 x 值中心化到 0。因此,我们的结果[-1.0, 0.0, 1.0]
是正确的。 - y_normalized - 我们希望将我们的 y 值缩放到 0 到 1 之间。我们的输入是
[1, 2, 3]
,因此我们的结果[0.0, 0.5, 1.0]
是正确的。 - s_integerized - 我们希望将我们的字符串映射到词汇表中的索引,并且我们的词汇表中只有 2 个词(“hello” 和 “world”)。因此,对于输入
["hello", "world", "hello"]
,我们的结果[0, 1, 0]
是正确的。由于 “hello” 在此数据中出现频率最高,因此它将是词汇表中的第一个条目。 - x_centered_times_y_normalized - 我们希望通过使用乘法来交叉
x_centered
和y_normalized
来创建一个新特征。请注意,这会将结果相乘,而不是将原始值相乘,并且我们的新结果[-0.0, 0.0, 1.0]
是正确的。
使用生成的结果 transform_fn
ls -l {output_dir}
total 8 drwxr-xr-x 4 kbuilder kbuilder 4096 Apr 30 10:54 transform_fn drwxr-xr-x 2 kbuilder kbuilder 4096 Apr 30 10:54 transformed_metadata
该 transform_fn/
目录包含一个 tf.saved_model
,它使用所有常量 tensorflow-transform 分析结果来实现图。
可以使用 tf.saved_model.load
直接加载它,但这并不容易使用
loaded = tf.saved_model.load(str(output_dir/'transform_fn'))
loaded.signatures['serving_default']
<ConcreteFunction (*, inputs: TensorSpec(shape=(None,), dtype=tf.string, name='inputs'), inputs_1: TensorSpec(shape=(None,), dtype=tf.float32, name='inputs_1'), inputs_2: TensorSpec(shape=(None,), dtype=tf.float32, name='inputs_2')) -> Dict[['x_centered', TensorSpec(shape=(None,), dtype=tf.float32, name='x_centered')], ['s_integerized', TensorSpec(shape=<unknown>, dtype=tf.int64, name='s_integerized')], ['x_centered_times_y_normalized', TensorSpec(shape=(None,), dtype=tf.float32, name='x_centered_times_y_normalized')], ['y_normalized', TensorSpec(shape=(None,), dtype=tf.float32, name='y_normalized')]] at 0x7F452C2C6400>
更好的方法是使用 tft.TFTransformOutput
加载它。该 TFTransformOutput.transform_features_layer
方法返回一个 tft.TransformFeaturesLayer
对象,该对象可用于应用转换
tf_transform_output = tft.TFTransformOutput(output_dir)
tft_layer = tf_transform_output.transform_features_layer()
tft_layer
INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:struct2tensor is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_decision_forests is not available. INFO:tensorflow:tensorflow_text is not available. INFO:tensorflow:tensorflow_text is not available. <tensorflow_transform.output_wrapper.TransformFeaturesLayer at 0x7f46bc272700>
此 tft.TransformFeaturesLayer
期望一个批处理特征的字典。因此,从 raw_data
中的 List[Dict[str, Any]]
创建一个 Dict[str, tf.Tensor]
raw_data_batch = {
's': tf.constant([ex['s'] for ex in raw_data]),
'x': tf.constant([ex['x'] for ex in raw_data], dtype=tf.float32),
'y': tf.constant([ex['y'] for ex in raw_data], dtype=tf.float32),
}
您可以单独使用 tft.TransformFeaturesLayer
transformed_batch = tft_layer(raw_data_batch)
{key: value.numpy() for key, value in transformed_batch.items()}
{'x_centered': array([-1., 0., 1.], dtype=float32), 'x_centered_times_y_normalized': array([-0., 0., 1.], dtype=float32), 'y_normalized': array([0. , 0.5, 1. ], dtype=float32), 's_integerized': array([0, 1, 0])}
导出
更典型的用例将使用 tf.Transform
将转换应用于训练和评估数据集(有关示例,请参阅 下一个教程)。然后,在训练后,在导出模型之前,将 tft.TransformFeaturesLayer
附加为第一层,以便您可以将其导出为您的 tf.saved_model
的一部分。有关具体示例,请继续阅读。
一个示例训练模型
下面是一个模型,它
- 获取转换后的批次,
- 将它们全部堆叠成一个简单的
(batch, features)
矩阵, - 将它们通过几个密集层运行,并且
- 产生 10 个线性输出。
在实际用例中,您将对 s_integerized
特征应用独热编码。
您可以使用 tf.Transform
转换的数据集来训练此模型
class StackDict(tf_keras.layers.Layer):
def call(self, inputs):
values = [
tf.cast(v, tf.float32)
for k,v in sorted(inputs.items(), key=lambda kv: kv[0])]
return tf.stack(values, axis=1)
class TrainedModel(tf_keras.Model):
def __init__(self):
super().__init__(self)
self.concat = StackDict()
self.body = tf_keras.Sequential([
tf_keras.layers.Dense(64, activation='relu'),
tf_keras.layers.Dense(64, activation='relu'),
tf_keras.layers.Dense(10),
])
def call(self, inputs, training=None):
x = self.concat(inputs)
return self.body(x, training)
trained_model = TrainedModel()
假设我们训练了模型。
trained_model.compile(loss=..., optimizer='adam')
trained_model.fit(...)
此模型在转换后的输入上运行
trained_model_output = trained_model(transformed_batch)
trained_model_output.shape
TensorShape([3, 10])
一个示例导出包装器
假设您已经训练了上面的模型并希望导出它。
您将希望在导出的模型中包含转换函数
class ExportModel(tf.Module):
def __init__(self, trained_model, input_transform):
self.trained_model = trained_model
self.input_transform = input_transform
@tf.function
def __call__(self, inputs, training=None):
x = self.input_transform(inputs)
return self.trained_model(x)
export_model = ExportModel(trained_model=trained_model,
input_transform=tft_layer)
此组合模型在原始数据上运行,并产生与直接调用训练模型完全相同的结果
export_model_output = export_model(raw_data_batch)
export_model_output.shape
TensorShape([3, 10])
tf.reduce_max(abs(export_model_output - trained_model_output)).numpy()
0.0
此 export_model
包含 tft.TransformFeaturesLayer
并且是完全自包含的。您可以保存它并在另一个环境中恢复它,并且仍然获得完全相同的结果
import tempfile
model_dir = tempfile.mkdtemp(suffix='tft')
tf.saved_model.save(export_model, model_dir)
INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpjz93eylstft/assets INFO:tensorflow:Assets written to: /tmpfs/tmp/tmpjz93eylstft/assets
reloaded = tf.saved_model.load(model_dir)
reloaded_model_output = reloaded(raw_data_batch)
reloaded_model_output.shape
TensorShape([3, 10])
tf.reduce_max(abs(export_model_output - reloaded_model_output)).numpy()
0.0