Transform TFX 管道组件

Transform TFX 管道组件使用由 SchemaGen 组件创建的数据模式,对从 ExampleGen 组件发出的 tf.Examples 进行特征工程,并发出 SavedModel 以及预转换和后转换数据的统计信息。执行时,SavedModel 将接受从 ExampleGen 组件发出的 tf.Examples,并发出转换后的特征数据。

  • 消耗:来自 ExampleGen 组件的 tf.Examples,以及来自 SchemaGen 组件的数据模式。
  • 发出:发送到 Trainer 组件的 SavedModel,预转换和后转换统计信息。

配置 Transform 组件

编写好 preprocessing_fn 后,需要将其定义在 Python 模块中,然后将其作为输入提供给 Transform 组件。该模块将由 transform 加载,并找到名为 preprocessing_fn 的函数,Transform 将使用该函数构建预处理管道。

transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=os.path.abspath(_taxi_transform_module_file))

此外,您可能希望为基于 TFDV 的预转换或后转换统计信息计算提供选项。为此,请在同一个模块中定义 stats_options_updater_fn

Transform 和 TensorFlow Transform

Transform 广泛使用 TensorFlow Transform 对您的数据集执行特征工程。TensorFlow Transform 是在将特征数据发送到模型之前以及作为训练过程的一部分转换特征数据的绝佳工具。常见的特征转换包括

  • 嵌入:通过找到从高维空间到低维空间的有意义映射,将稀疏特征(例如词汇表生成的整数 ID)转换为密集特征。有关嵌入的介绍,请参阅 机器学习速成课程中的嵌入单元
  • 词汇表生成:通过创建将每个唯一值映射到 ID 号的词汇表,将字符串或其他非数字特征转换为整数。
  • 规范化值:转换数字特征,使其全部落在类似范围内。
  • 分箱:通过将值分配到离散箱中,将连续值特征转换为分类特征。
  • 丰富文本特征:从原始数据(如标记、n 元组、实体、情感等)生成特征,以丰富特征集。

TensorFlow Transform 支持这些转换以及许多其他类型的转换

  • 从最新数据自动生成词汇表。

  • 在将数据发送到模型之前,对数据执行任意转换。TensorFlow Transform 将转换构建到模型的 TensorFlow 图中,因此在训练和推理时执行相同的转换。您可以定义引用数据全局属性的转换,例如所有训练实例中特征的最大值。

您可以在运行 TFX 之前根据需要转换数据。但是,如果您在 TensorFlow Transform 中执行此操作,则转换将成为 TensorFlow 图的一部分。这种方法有助于避免训练/服务偏差。

建模代码中的转换使用 FeatureColumns。使用 FeatureColumns,您可以定义分箱、使用预定义词汇表的整型化,或任何无需查看数据即可定义的其他转换。

相比之下,TensorFlow Transform 旨在用于需要对数据进行完整遍历以计算事先未知值的转换。例如,词汇表生成需要对数据进行完整遍历。

除了使用 Apache Beam 计算值之外,TensorFlow Transform 还允许用户将这些值嵌入到 TensorFlow 图中,然后可以将其加载到训练图中。例如,在规范化特征时,tft.scale_to_z_score 函数将计算特征的均值和标准差,以及在 TensorFlow 图中,减去均值并除以标准差的函数的表示形式。通过发出 TensorFlow 图,而不仅仅是统计信息,TensorFlow Transform 简化了编写预处理管道的过程。

由于预处理被表示为一个图,它可以在服务器上进行,并且保证在训练和服务之间保持一致。这种一致性消除了训练/服务偏差的一个来源。

TensorFlow Transform 允许用户使用 TensorFlow 代码指定他们的预处理管道。这意味着管道以与 TensorFlow 图相同的方式构建。如果在这个图中只使用了 TensorFlow 操作,那么管道将是一个纯映射,它接受输入批次并返回输出批次。这样的管道等同于在使用 tf.Estimator API 时将此图放在你的 input_fn 中。为了指定全通操作(例如计算分位数),TensorFlow Transform 提供了称为 analyzers 的特殊函数,这些函数看起来像 TensorFlow 操作,但实际上指定了将由 Apache Beam 完成的延迟计算,并将输出作为常量插入图中。虽然普通的 TensorFlow 操作将以单个批次作为输入,对该批次执行一些计算并发出一个批次,但 analyzer 将对所有批次执行全局归约(在 Apache Beam 中实现)并返回结果。

通过结合普通的 TensorFlow 操作和 TensorFlow Transform 分析器,用户可以创建复杂的管道来预处理他们的数据。例如,tft.scale_to_z_score 函数接受一个输入张量,并返回该张量,该张量被归一化为具有均值 0 和方差 1。它通过在幕后调用 meanvar 分析器来实现这一点,这将有效地在图中生成等于输入张量的均值和方差的常量。然后它将使用 TensorFlow 操作来减去均值并除以标准差。

TensorFlow Transform preprocessing_fn

TFX Transform 组件简化了 Transform 的使用,它处理与读取和写入数据相关的 API 调用,并将输出 SavedModel 写入磁盘。作为 TFX 用户,你只需要定义一个名为 preprocessing_fn 的函数。在 preprocessing_fn 中,你定义了一系列函数,这些函数操作输入张量字典以生成输出张量字典。你可以在 TensorFlow Transform API 中找到像 scale_to_0_1 和 compute_and_apply_vocabulary 这样的辅助函数,或者使用下面所示的常规 TensorFlow 函数。

def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.

  Args:
    inputs: map from feature keys to raw not-yet-transformed features.

  Returns:
    Map from string feature key to transformed feature operations.
  """
  outputs = {}
  for key in _DENSE_FLOAT_FEATURE_KEYS:
    # If sparse make it dense, setting nan's to 0 or '', and apply zscore.
    outputs[_transformed_name(key)] = transform.scale_to_z_score(
        _fill_in_missing(inputs[key]))

  for key in _VOCAB_FEATURE_KEYS:
    # Build a vocabulary for this feature.
    outputs[_transformed_name(
        key)] = transform.compute_and_apply_vocabulary(
            _fill_in_missing(inputs[key]),
            top_k=_VOCAB_SIZE,
            num_oov_buckets=_OOV_SIZE)

  for key in _BUCKET_FEATURE_KEYS:
    outputs[_transformed_name(key)] = transform.bucketize(
        _fill_in_missing(inputs[key]), _FEATURE_BUCKET_COUNT)

  for key in _CATEGORICAL_FEATURE_KEYS:
    outputs[_transformed_name(key)] = _fill_in_missing(inputs[key])

  # Was this passenger a big tipper?
  taxi_fare = _fill_in_missing(inputs[_FARE_KEY])
  tips = _fill_in_missing(inputs[_LABEL_KEY])
  outputs[_transformed_name(_LABEL_KEY)] = tf.where(
      tf.is_nan(taxi_fare),
      tf.cast(tf.zeros_like(taxi_fare), tf.int64),
      # Test if the tip was > 20% of the fare.
      tf.cast(
          tf.greater(tips, tf.multiply(taxi_fare, tf.constant(0.2))), tf.int64))

  return outputs

了解 preprocessing_fn 的输入

The preprocessing_fn 描述了一系列对张量(即 TensorSparseTensorRaggedTensor)的操作。为了正确定义 preprocessing_fn,有必要了解数据是如何表示为张量的。 preprocessing_fn 的输入由模式决定。一个 Schema proto 最终会被转换为一个“特征规范”(有时称为“解析规范”,用于数据解析,有关转换逻辑的更多详细信息,请参阅 这里

使用 TensorFlow Transform 处理字符串标签

通常,人们希望使用 TensorFlow Transform 来生成词汇表并将该词汇表应用于将字符串转换为整数。在遵循此工作流程时,在模型中构建的 input_fn 将输出整数化的字符串。但是,标签是一个例外,因为为了使模型能够将输出(整数)标签映射回字符串,模型需要 input_fn 输出一个字符串标签,以及标签的可能值的列表。例如,如果标签是 catdog,那么 input_fn 的输出应该是这些原始字符串,并且键 ["cat", "dog"] 需要作为参数传递给估计器(见下文详细信息)。

为了处理字符串标签到整数的映射,你应该使用 TensorFlow Transform 来生成词汇表。我们在下面的代码片段中演示了这一点

def _preprocessing_fn(inputs):
  """Preprocess input features into transformed features."""

  ...


  education = inputs[features.RAW_LABEL_KEY]
  _ = tft.vocabulary(education, vocab_filename=features.RAW_LABEL_KEY)

  ...

上面的预处理函数接受原始输入特征(它也将作为预处理函数输出的一部分返回),并对其调用 tft.vocabulary。这将导致为 education 生成一个词汇表,可以在模型中访问该词汇表。

该示例还展示了如何转换标签,然后为转换后的标签生成词汇表。特别是,它接受原始标签 education,并将除前 5 个标签(按频率)之外的所有标签转换为 UNKNOWN,而不会将标签转换为整数。

在模型代码中,分类器必须将由 tft.vocabulary 生成的词汇表作为 label_vocabulary 参数提供。这是通过首先使用辅助函数将此词汇表读入列表来完成的。这在下面的代码片段中显示。请注意,示例代码使用上面讨论的转换后的标签,但在这里我们展示了使用原始标签的代码。

def create_estimator(pipeline_inputs, hparams):

  ...

  tf_transform_output = trainer_util.TFTransformOutput(
      pipeline_inputs.transform_dir)

  # vocabulary_by_name() returns a Python list.
  label_vocabulary = tf_transform_output.vocabulary_by_name(
      features.RAW_LABEL_KEY)

  return tf.contrib.learn.DNNLinearCombinedClassifier(
      ...
      n_classes=len(label_vocab),
      label_vocabulary=label_vocab,
      ...)

配置预转换和后转换统计信息

如上所述,Transform 组件调用 TFDV 来计算预转换和后转换统计信息。TFDV 接受一个可选的 StatsOptions 对象作为输入。用户可能希望配置此对象以启用某些额外的统计信息(例如 NLP 统计信息)或设置要验证的阈值(例如最小/最大令牌频率)。为此,在模块文件中定义一个 stats_options_updater_fn

def stats_options_updater_fn(stats_type, stats_options):
  ...
  if stats_type == stats_options_util.StatsType.PRE_TRANSFORM:
    # Update stats_options to modify pre-transform statistics computation.
    # Most constraints are specified in the schema which can be accessed
    # via stats_options.schema.
  if stats_type == stats_options_util.StatsType.POST_TRANSFORM
    # Update stats_options to modify post-transform statistics computation.
    # Most constraints are specified in the schema which can be accessed
    # via stats_options.schema.
  return stats_options

后转换统计信息通常受益于对用于预处理特征的词汇表的了解。词汇表名称到路径映射将为每个 TFT 生成的词汇表提供给 StatsOptions(因此也提供给 TFDV)。此外,可以通过以下两种方式添加外部创建的词汇表的映射:(i)直接修改 StatsOptions 中的 vocab_paths 字典,或者(ii)使用 tft.annotate_asset