TensorFlow 2.0 于 2019 年发布,具有 与 Keras 的紧密集成、默认情况下使用 Eager Execution 以及 Pythonic 函数执行,以及其他 新功能和改进。
本指南全面介绍了 TFX 中的 TF 2.x 技术概述。
使用哪个版本?
TFX 与 TensorFlow 2.x 兼容,TensorFlow 1.x 中存在的高级 API(尤其是 Estimators)仍然可以正常工作。
在 TensorFlow 2.x 中启动新项目
由于 TensorFlow 2.x 保留了 TensorFlow 1.x 的高级功能,因此在新的项目中使用旧版本没有任何优势,即使您不打算使用新功能。
因此,如果您要启动一个新的 TFX 项目,建议您使用 TensorFlow 2.x。您可能希望稍后更新代码,因为对 Keras 和其他新功能的全面支持即将推出,如果您从 TensorFlow 2.x 开始,更改范围将小得多,而不是将来尝试从 TensorFlow 1.x 升级。
将现有项目转换为 TensorFlow 2.x
为 TensorFlow 1.x 编写的代码在很大程度上与 TensorFlow 2.x 兼容,并且将在 TFX 中继续正常工作。
但是,如果您想利用 TF 2.x 中的改进和新功能,可以按照 迁移到 TF 2.x 的说明 进行操作。
Estimator
Estimator API 已在 TensorFlow 2.x 中保留,但不是新功能和开发的重点。使用 Estimators 在 TensorFlow 1.x 或 2.x 中编写的代码将在 TFX 中按预期继续正常工作。
以下是一个使用纯 Estimator 的端到端 TFX 示例:出租车示例(Estimator)
使用 model_to_estimator
的 Keras
Keras 模型可以使用 tf.keras.estimator.model_to_estimator
函数进行包装,这使得它们可以像 Estimators 一样工作。要使用此功能
- 构建 Keras 模型。
- 将编译后的模型传递给
model_to_estimator
。 - 在 Trainer 中使用
model_to_estimator
的结果,就像您通常使用 Estimator 一样。
# Build a Keras model.
def _keras_model_builder():
"""Creates a Keras model."""
...
model = tf.keras.Model(inputs=inputs, outputs=output)
model.compile()
return model
# Write a typical trainer function
def trainer_fn(trainer_fn_args, schema):
"""Build the estimator, using model_to_estimator."""
...
# Model to estimator
estimator = tf.keras.estimator.model_to_estimator(
keras_model=_keras_model_builder(), config=run_config)
return {
'estimator': estimator,
...
}
除了 Trainer 的用户模块文件之外,管道中的其他部分保持不变。
原生 Keras(即没有 model_to_estimator
的 Keras)
示例和 Colab
以下是一些使用原生 Keras 的示例
我们还有一个针对每个组件的 Keras Colab。
TFX 组件
以下部分介绍了相关 TFX 组件如何支持原生 Keras。
Transform
Transform 目前对 Keras 模型有实验性支持。
Transform 组件本身可以用于原生 Keras,无需更改。 preprocessing_fn
定义保持不变,使用 TensorFlow 和 tf.Transform 操作。
服务函数和评估函数已针对原生 Keras 进行了更改。详细信息将在以下 Trainer 和 Evaluator 部分中讨论。
Trainer
要配置原生 Keras,需要将 GenericExecutor
设置为 Trainer 组件,以替换默认的基于 Estimator 的执行器。有关详细信息,请查看 此处。
带有 Transform 的 Keras 模块文件
训练模块文件必须包含一个 run_fn
,它将由 GenericExecutor
调用,一个典型的 Keras run_fn
看起来像这样
def run_fn(fn_args: TrainerFnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
# Train and eval files contains transformed examples.
# _input_fn read dataset based on transformed schema from tft.
train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor,
tf_transform_output.transformed_metadata.schema)
eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor,
tf_transform_output.transformed_metadata.schema)
model = _build_keras_model()
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
signatures = {
'serving_default':
_get_serve_tf_examples_fn(model,
tf_transform_output).get_concrete_function(
tf.TensorSpec(
shape=[None],
dtype=tf.string,
name='examples')),
}
model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
在上面的 run_fn
中,导出训练后的模型时需要一个服务签名,以便模型可以接收原始示例进行预测。一个典型的服务函数看起来像这样
def _get_serve_tf_examples_fn(model, tf_transform_output):
"""Returns a function that parses a serialized tf.Example."""
# the layer is added as an attribute to the model in order to make sure that
# the model assets are handled correctly when exporting.
model.tft_layer = tf_transform_output.transform_features_layer()
@tf.function
def serve_tf_examples_fn(serialized_tf_examples):
"""Returns the output to be used in the serving signature."""
feature_spec = tf_transform_output.raw_feature_spec()
feature_spec.pop(_LABEL_KEY)
parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
transformed_features = model.tft_layer(parsed_features)
return model(transformed_features)
return serve_tf_examples_fn
在上面的服务函数中,需要使用 tft.TransformFeaturesLayer
层将 tf.Transform 变换应用于原始数据以进行推断。之前用于 Estimator 的 _serving_input_receiver_fn
在 Keras 中将不再需要。
没有 Transform 的 Keras 模块文件
这与上面显示的模块文件类似,但没有变换
def _get_serve_tf_examples_fn(model, schema):
@tf.function
def serve_tf_examples_fn(serialized_tf_examples):
feature_spec = _get_raw_feature_spec(schema)
feature_spec.pop(_LABEL_KEY)
parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
return model(parsed_features)
return serve_tf_examples_fn
def run_fn(fn_args: TrainerFnArgs):
schema = io_utils.parse_pbtxt_file(fn_args.schema_file, schema_pb2.Schema())
# Train and eval files contains raw examples.
# _input_fn reads the dataset based on raw data schema.
train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor, schema)
eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor, schema)
model = _build_keras_model()
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
signatures = {
'serving_default':
_get_serve_tf_examples_fn(model, schema).get_concrete_function(
tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')),
}
model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
tf.distribute.Strategy
目前,TFX 仅支持单工作器策略(例如,MirroredStrategy,OneDeviceStrategy)。
要使用分布式策略,请创建一个合适的 tf.distribute.Strategy,并将 Keras 模型的创建和编译移到策略范围内。
例如,将上面的 model = _build_keras_model()
替换为
mirrored_strategy = tf.distribute.MirroredStrategy()
with mirrored_strategy.scope():
model = _build_keras_model()
# Rest of the code can be unchanged.
model.fit(...)
要验证 MirroredStrategy
使用的设备(CPU/GPU),请启用信息级别的 TensorFlow 日志记录
import logging
logging.getLogger("tensorflow").setLevel(logging.INFO)
您应该能够在日志中看到 Using MirroredStrategy with devices (...)
。
Evaluator
在 TFMA v0.2x 中,ModelValidator 和 Evaluator 已合并到一个 新的 Evaluator 组件 中。新的 Evaluator 组件可以执行单模型评估,还可以验证当前模型与之前模型的比较。通过此更改,Pusher 组件现在从 Evaluator 而不是 ModelValidator 消费祝福结果。
新的 Evaluator 支持 Keras 模型和 Estimator 模型。之前需要的 _eval_input_receiver_fn
和评估保存的模型在 Keras 中将不再需要,因为 Evaluator 现在基于用于服务的相同 SavedModel
。