TFX 中的 TensorFlow 2.x

TensorFlow 2.0 于 2019 年发布,具有 与 Keras 的紧密集成默认情况下使用 Eager Execution 以及 Pythonic 函数执行,以及其他 新功能和改进

本指南全面介绍了 TFX 中的 TF 2.x 技术概述。

使用哪个版本?

TFX 与 TensorFlow 2.x 兼容,TensorFlow 1.x 中存在的高级 API(尤其是 Estimators)仍然可以正常工作。

在 TensorFlow 2.x 中启动新项目

由于 TensorFlow 2.x 保留了 TensorFlow 1.x 的高级功能,因此在新的项目中使用旧版本没有任何优势,即使您不打算使用新功能。

因此,如果您要启动一个新的 TFX 项目,建议您使用 TensorFlow 2.x。您可能希望稍后更新代码,因为对 Keras 和其他新功能的全面支持即将推出,如果您从 TensorFlow 2.x 开始,更改范围将小得多,而不是将来尝试从 TensorFlow 1.x 升级。

将现有项目转换为 TensorFlow 2.x

为 TensorFlow 1.x 编写的代码在很大程度上与 TensorFlow 2.x 兼容,并且将在 TFX 中继续正常工作。

但是,如果您想利用 TF 2.x 中的改进和新功能,可以按照 迁移到 TF 2.x 的说明 进行操作。

Estimator

Estimator API 已在 TensorFlow 2.x 中保留,但不是新功能和开发的重点。使用 Estimators 在 TensorFlow 1.x 或 2.x 中编写的代码将在 TFX 中按预期继续正常工作。

以下是一个使用纯 Estimator 的端到端 TFX 示例:出租车示例(Estimator)

使用 model_to_estimator 的 Keras

Keras 模型可以使用 tf.keras.estimator.model_to_estimator 函数进行包装,这使得它们可以像 Estimators 一样工作。要使用此功能

  1. 构建 Keras 模型。
  2. 将编译后的模型传递给 model_to_estimator
  3. 在 Trainer 中使用 model_to_estimator 的结果,就像您通常使用 Estimator 一样。
# Build a Keras model.
def _keras_model_builder():
  """Creates a Keras model."""
  ...

  model = tf.keras.Model(inputs=inputs, outputs=output)
  model.compile()

  return model


# Write a typical trainer function
def trainer_fn(trainer_fn_args, schema):
  """Build the estimator, using model_to_estimator."""
  ...

  # Model to estimator
  estimator = tf.keras.estimator.model_to_estimator(
      keras_model=_keras_model_builder(), config=run_config)

  return {
      'estimator': estimator,
      ...
  }

除了 Trainer 的用户模块文件之外,管道中的其他部分保持不变。

原生 Keras(即没有 model_to_estimator 的 Keras)

示例和 Colab

以下是一些使用原生 Keras 的示例

我们还有一个针对每个组件的 Keras Colab

TFX 组件

以下部分介绍了相关 TFX 组件如何支持原生 Keras。

Transform

Transform 目前对 Keras 模型有实验性支持。

Transform 组件本身可以用于原生 Keras,无需更改。 preprocessing_fn 定义保持不变,使用 TensorFlowtf.Transform 操作。

服务函数和评估函数已针对原生 Keras 进行了更改。详细信息将在以下 Trainer 和 Evaluator 部分中讨论。

Trainer

要配置原生 Keras,需要将 GenericExecutor 设置为 Trainer 组件,以替换默认的基于 Estimator 的执行器。有关详细信息,请查看 此处

带有 Transform 的 Keras 模块文件

训练模块文件必须包含一个 run_fn,它将由 GenericExecutor 调用,一个典型的 Keras run_fn 看起来像这样

def run_fn(fn_args: TrainerFnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """
  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  # Train and eval files contains transformed examples.
  # _input_fn read dataset based on transformed schema from tft.
  train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor,
                            tf_transform_output.transformed_metadata.schema)
  eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor,
                           tf_transform_output.transformed_metadata.schema)

  model = _build_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model,
                                    tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],
                                            dtype=tf.string,
                                            name='examples')),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

在上面的 run_fn 中,导出训练后的模型时需要一个服务签名,以便模型可以接收原始示例进行预测。一个典型的服务函数看起来像这样

def _get_serve_tf_examples_fn(model, tf_transform_output):
  """Returns a function that parses a serialized tf.Example."""

  # the layer is added as an attribute to the model in order to make sure that
  # the model assets are handled correctly when exporting.
  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function
  def serve_tf_examples_fn(serialized_tf_examples):
    """Returns the output to be used in the serving signature."""
    feature_spec = tf_transform_output.raw_feature_spec()
    feature_spec.pop(_LABEL_KEY)
    parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)

    transformed_features = model.tft_layer(parsed_features)

    return model(transformed_features)

  return serve_tf_examples_fn

在上面的服务函数中,需要使用 tft.TransformFeaturesLayer 层将 tf.Transform 变换应用于原始数据以进行推断。之前用于 Estimator 的 _serving_input_receiver_fn 在 Keras 中将不再需要。

没有 Transform 的 Keras 模块文件

这与上面显示的模块文件类似,但没有变换

def _get_serve_tf_examples_fn(model, schema):

  @tf.function
  def serve_tf_examples_fn(serialized_tf_examples):
    feature_spec = _get_raw_feature_spec(schema)
    feature_spec.pop(_LABEL_KEY)
    parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
    return model(parsed_features)

  return serve_tf_examples_fn


def run_fn(fn_args: TrainerFnArgs):
  schema = io_utils.parse_pbtxt_file(fn_args.schema_file, schema_pb2.Schema())

  # Train and eval files contains raw examples.
  # _input_fn reads the dataset based on raw data schema.
  train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor, schema)
  eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor, schema)

  model = _build_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model, schema).get_concrete_function(
              tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
tf.distribute.Strategy

目前,TFX 仅支持单工作器策略(例如,MirroredStrategyOneDeviceStrategy)。

要使用分布式策略,请创建一个合适的 tf.distribute.Strategy,并将 Keras 模型的创建和编译移到策略范围内。

例如,将上面的 model = _build_keras_model() 替换为

  mirrored_strategy = tf.distribute.MirroredStrategy()
  with mirrored_strategy.scope():
    model = _build_keras_model()

  # Rest of the code can be unchanged.
  model.fit(...)

要验证 MirroredStrategy 使用的设备(CPU/GPU),请启用信息级别的 TensorFlow 日志记录

import logging
logging.getLogger("tensorflow").setLevel(logging.INFO)

您应该能够在日志中看到 Using MirroredStrategy with devices (...)

Evaluator

在 TFMA v0.2x 中,ModelValidator 和 Evaluator 已合并到一个 新的 Evaluator 组件 中。新的 Evaluator 组件可以执行单模型评估,还可以验证当前模型与之前模型的比较。通过此更改,Pusher 组件现在从 Evaluator 而不是 ModelValidator 消费祝福结果。

新的 Evaluator 支持 Keras 模型和 Estimator 模型。之前需要的 _eval_input_receiver_fn 和评估保存的模型在 Keras 中将不再需要,因为 Evaluator 现在基于用于服务的相同 SavedModel

有关更多信息,请参阅 Evaluator.