TensorFlow 数据验证入门

TensorFlow 数据验证 (TFDV) 可以分析训练和服务数据以

核心 API 支持每个功能部分,并提供在其之上构建的便捷方法,这些方法可以在笔记本的上下文中调用。

计算描述性数据统计信息

TFDV 可以计算描述性 统计信息,这些统计信息提供数据的快速概述,包括存在的特征及其值分布的形状。诸如 Facets Overview 之类的工具可以提供这些统计信息的简洁可视化,以便于浏览。

例如,假设 path 指向 TFRecord 格式(包含 tensorflow.Example 类型的记录)中的文件。以下代码段说明了使用 TFDV 计算统计信息的示例

    stats = tfdv.generate_statistics_from_tfrecord(data_location=path)

返回值是 DatasetFeatureStatisticsList 协议缓冲区。 示例笔记本 包含使用 Facets Overview 对统计信息进行可视化的示例

    tfdv.visualize_statistics(stats)

Screenshot of statistics visualization

前面的示例假设数据存储在 TFRecord 文件中。TFDV 还支持 CSV 输入格式,并可扩展以支持其他常见格式。您可以找到可用的数据解码器 此处。此外,TFDV 提供了 tfdv.generate_statistics_from_dataframe 实用程序函数,供用户使用以 pandas DataFrame 表示的内存中数据。

除了计算默认的数据统计信息集之外,TFDV 还可以计算语义域(例如,图像、文本)的统计信息。要启用语义域统计信息的计算,请将 tfdv.StatsOptions 对象与 enable_semantic_domain_stats 设置为 True 传递给 tfdv.generate_statistics_from_tfrecord

在 Google Cloud 上运行

在内部,TFDV 使用 Apache Beam 的数据并行处理框架来扩展对大型数据集的统计信息计算。对于希望与 TFDV 深度集成(例如,在数据生成管道末尾附加统计信息生成、为自定义格式的数据生成统计信息)的应用程序,API 还公开了一个用于统计信息生成的 Beam PTransform。

要在 Google Cloud 上运行 TFDV,必须下载 TFDV 轮子文件并将其提供给 Dataflow 工作器。将轮子文件下载到当前目录,如下所示

pip download tensorflow_data_validation \
  --no-deps \
  --platform manylinux2010_x86_64 \
  --only-binary=:all:

以下代码段显示了在 Google Cloud 上使用 TFDV 的示例


import tensorflow_data_validation as tfdv
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions

PROJECT_ID = ''
JOB_NAME = ''
GCS_STAGING_LOCATION = ''
GCS_TMP_LOCATION = ''
GCS_DATA_LOCATION = ''
# GCS_STATS_OUTPUT_PATH is the file path to which to output the data statistics
# result.
GCS_STATS_OUTPUT_PATH = ''

PATH_TO_WHL_FILE = ''


# Create and set your PipelineOptions.
options = PipelineOptions()

# For Cloud execution, set the Cloud Platform project, job_name,
# staging location, temp_location and specify DataflowRunner.
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.job_name = JOB_NAME
google_cloud_options.staging_location = GCS_STAGING_LOCATION
google_cloud_options.temp_location = GCS_TMP_LOCATION
options.view_as(StandardOptions).runner = 'DataflowRunner'

setup_options = options.view_as(SetupOptions)
# PATH_TO_WHL_FILE should point to the downloaded tfdv wheel file.
setup_options.extra_packages = [PATH_TO_WHL_FILE]

tfdv.generate_statistics_from_tfrecord(GCS_DATA_LOCATION,
                                       output_path=GCS_STATS_OUTPUT_PATH,
                                       pipeline_options=options)

在这种情况下,生成的统计信息协议缓冲区存储在写入 GCS_STATS_OUTPUT_PATH 的 TFRecord 文件中。

注意 在 Google Cloud 上调用任何 tfdv.generate_statistics_... 函数(例如,tfdv.generate_statistics_from_tfrecord)时,必须提供一个 output_path。指定 None 可能会导致错误。

推断数据上的模式

模式 描述了数据的预期属性。其中一些属性是

  • 预期存在的特征
  • 它们的类型
  • 每个示例中特征的值数量
  • 所有示例中每个特征的存在
  • 特征的预期域。

简而言之,模式描述了对“正确”数据的期望,因此可用于检测数据中的错误(如下所述)。此外,相同的模式可用于设置 TensorFlow Transform 以进行数据转换。请注意,模式应相当静态,例如,多个数据集可以符合相同的模式,而统计信息(如上所述)可能因数据集而异。

由于编写模式可能是一项繁琐的任务,特别是对于具有大量特征的数据集而言,TFDV 提供了一种方法,可以根据描述性统计信息生成模式的初始版本。

    schema = tfdv.infer_schema(stats)

通常,TFDV 使用保守的启发式方法从统计信息中推断出稳定的数据属性,以避免将模式过度拟合到特定数据集。强烈建议您 **查看推断出的模式并根据需要对其进行细化**,以捕获 TFDV 启发式方法可能遗漏的有关数据的任何领域知识。

默认情况下,tfdv.infer_schema 推断每个必需特征的形状,如果特征的 value_count.min 等于 value_count.max。将 infer_feature_shape 参数设置为 False 以禁用形状推断。

模式本身存储为 模式协议缓冲区,因此可以使用标准协议缓冲区 API 进行更新/编辑。TFDV 还提供了一些 实用程序方法 以简化这些更新。例如,假设模式包含以下节以描述一个必需的字符串特征 payment_type,该特征接受单个值

feature {
  name: "payment_type"
  value_count {
    min: 1
    max: 1
  }
  type: BYTES
  domain: "payment_type"
  presence {
    min_fraction: 1.0
    min_count: 1
  }
}

要标记该特征应在至少 50% 的示例中填充

    tfdv.get_feature(schema, 'payment_type').presence.min_fraction = 0.5

示例笔记本 包含模式的简单可视化,以表格形式列出每个特征及其在模式中编码的主要特征。

Screenshot of schema visualization

检查数据是否存在错误

给定一个模式,可以检查数据集是否符合模式中设置的期望,或者是否存在任何 数据异常。您可以检查数据是否存在错误 (a) 在整个数据集的聚合中,通过将数据集的统计信息与模式进行匹配,或者 (b) 通过检查每个示例是否存在错误。

将数据集的统计信息与模式进行匹配

要检查聚合中是否存在错误,TFDV 将数据集的统计信息与模式进行匹配,并标记任何差异。例如

    # Assume that other_path points to another TFRecord file
    other_stats = tfdv.generate_statistics_from_tfrecord(data_location=other_path)
    anomalies = tfdv.validate_statistics(statistics=other_stats, schema=schema)

结果是 异常 协议缓冲区的实例,并描述统计信息与模式不一致的任何错误。例如,假设 other_path 处的数据包含具有特征 payment_type 值的示例,这些值超出了模式中指定的域。

这会产生一个异常

   payment_type  Unexpected string values  Examples contain values missing from the schema: Prcard (<1%).

表明在特征值的 < 1% 中发现了域外值。

如果这是预期的,则可以按如下方式更新模式

   tfdv.get_domain(schema, 'payment_type').value.append('Prcard')

如果异常确实表明数据错误,则应在使用数据进行训练之前修复基础数据。

此模块可以检测到的各种异常类型列在 此处

示例笔记本 包含异常的简单可视化,以表格形式列出检测到错误的特征以及每个错误的简短描述。

Screenshot of anomalies

检查每个示例是否存在错误

TFDV 还提供了一个选项,可以按每个示例验证数据,而不是将数据集范围内的统计信息与模式进行比较。TFDV 提供了用于按每个示例验证数据,然后生成所发现的异常示例的汇总统计信息的函数。例如

   options = tfdv.StatsOptions(schema=schema)
   anomalous_example_stats = tfdv.validate_examples_in_tfrecord(
       data_location=input, stats_options=options)

validate_examples_in_tfrecord 返回的 anomalous_example_stats 是一个 DatasetFeatureStatisticsList 协议缓冲区,其中每个数据集包含一组表现出特定异常的示例。您可以使用它来确定数据集中表现出给定异常的示例数量以及这些示例的特征。

模式环境

默认情况下,验证假设管道中的所有数据集都遵循单个模式。在某些情况下,需要引入轻微的模式变化,例如,用作标签的特征在训练期间是必需的(并且应进行验证),但在服务期间缺失。

**环境** 可用于表达这些要求。特别是,模式中的特征可以使用 default_environment、in_environment 和 not_in_environment 与一组环境相关联。

例如,如果 **tips** 特征用作训练中的标签,但在服务数据中缺失。如果没有指定环境,它将显示为异常。

    serving_stats = tfdv.generate_statistics_from_tfrecord(data_location=serving_data_path)
    serving_anomalies = tfdv.validate_statistics(serving_stats, schema)

Screenshot of serving anomalies

要解决此问题,我们需要将所有特征的默认环境设置为 'TRAINING' 和 'SERVING',并将 'tips' 特征从 SERVING 环境中排除。

    # All features are by default in both TRAINING and SERVING environments.
    schema.default_environment.append('TRAINING')
    schema.default_environment.append('SERVING')

    # Specify that 'tips' feature is not in SERVING environment.
    tfdv.get_feature(schema, 'tips').not_in_environment.append('SERVING')

    serving_anomalies_with_env = tfdv.validate_statistics(
        serving_stats, schema, environment='SERVING')

检查数据偏差和漂移

除了检查数据集是否符合模式中设置的期望外,TFDV 还提供功能来检测

  • 训练数据和服务数据之间的偏差
  • 不同训练数据日期之间的漂移

TFDV 通过根据模式中指定的漂移/偏差比较器比较不同数据集的统计信息来执行此检查。例如,要检查 'payment_type' 特征在训练数据集和服务数据集之间是否存在任何偏差

    # Assume we have already generated the statistics of training dataset, and
    # inferred a schema from it.
    serving_stats = tfdv.generate_statistics_from_tfrecord(data_location=serving_data_path)
    # Add a skew comparator to schema for 'payment_type' and set the threshold
    # of L-infinity norm for triggering skew anomaly to be 0.01.
    tfdv.get_feature(schema, 'payment_type').skew_comparator.infinity_norm.threshold = 0.01
    skew_anomalies = tfdv.validate_statistics(
        statistics=train_stats, schema=schema, serving_statistics=serving_stats)

注意 L-infinity 范数只会检测分类特征的偏差。而不是指定 infinity_norm 阈值,在 skew_comparator 中指定 jensen_shannon_divergence 阈值将检测数值特征和分类特征的偏差。

与检查数据集是否符合模式中设置的期望相同,结果也是 异常 协议缓冲区的实例,并描述训练数据集和服务数据集之间的任何偏差。例如,假设服务数据包含更多具有特征 payement_type 值为 Cash 的示例,这会产生一个偏差异常

   payment_type  High L-infinity distance between serving and training  The L-infinity distance between serving and training is 0.0435984 (up to six significant digits), above the threshold 0.01. The feature value with maximum difference is: Cash

如果异常确实表明训练数据和服务数据之间存在偏差,则需要进一步调查,因为这可能会对模型性能产生直接影响。

示例笔记本 包含检查基于偏差的异常的简单示例。

检测不同训练数据日期之间的漂移可以通过类似的方式完成

    # Assume we have already generated the statistics of training dataset for
    # day 2, and inferred a schema from it.
    train_day1_stats = tfdv.generate_statistics_from_tfrecord(data_location=train_day1_data_path)
    # Add a drift comparator to schema for 'payment_type' and set the threshold
    # of L-infinity norm for triggering drift anomaly to be 0.01.
    tfdv.get_feature(schema, 'payment_type').drift_comparator.infinity_norm.threshold = 0.01
    drift_anomalies = tfdv.validate_statistics(
        statistics=train_day2_stats, schema=schema, previous_statistics=train_day1_stats)

注意 L-infinity 范数只会检测分类特征的偏差。而不是指定 infinity_norm 阈值,在 skew_comparator 中指定 jensen_shannon_divergence 阈值将检测数值特征和分类特征的偏差。

编写自定义数据连接器

为了计算数据统计信息,TFDV 提供了几个 便捷方法 用于处理各种格式的输入数据(例如 TFRecordtf.train.Example、CSV 等)。如果您的数据格式不在此列表中,则需要编写一个自定义数据连接器来读取输入数据,并将其与 TFDV 核心 API 连接以计算数据统计信息。

TFDV 计算数据统计信息的核心 API 是一个 Beam PTransform,它接受输入示例批次的 PCollection(输入示例批次表示为 Arrow RecordBatch),并输出包含单个 DatasetFeatureStatisticsList 协议缓冲区的 PCollection。

一旦您实现了将输入示例批处理为 Arrow RecordBatch 的自定义数据连接器,您需要将其与 tfdv.GenerateStatistics API 连接以计算数据统计信息。以 TFRecordtf.train.Example 为例。 tfx_bsl 提供了 TFExampleRecord 数据连接器,以下是如何将其与 tfdv.GenerateStatistics API 连接的示例。

import tensorflow_data_validation as tfdv
from tfx_bsl.public import tfxio
import apache_beam as beam
from tensorflow_metadata.proto.v0 import statistics_pb2

DATA_LOCATION = ''
OUTPUT_LOCATION = ''

with beam.Pipeline() as p:
    _ = (
    p
    # 1. Read and decode the data with tfx_bsl.
    | 'TFXIORead' >> (
          tfxio.TFExampleRecord(
              file_pattern=[DATA_LOCATION],
              telemetry_descriptors=['my', 'tfdv']).BeamSource())
    # 2. Invoke TFDV `GenerateStatistics` API to compute the data statistics.
    | 'GenerateStatistics' >> tfdv.GenerateStatistics()
    # 3. Materialize the generated data statistics.
    | 'WriteStatsOutput' >> WriteStatisticsToTFRecord(OUTPUT_LOCATION))

计算数据切片的统计信息

TFDV 可以配置为计算数据切片的统计信息。可以通过提供切片函数来启用切片,这些函数接受 Arrow RecordBatch 并输出形式为 (slice key, record batch) 的元组序列。TFDV 提供了一种简单的方法来 生成基于特征值的切片函数,这些函数可以在计算统计信息时作为 tfdv.StatsOptions 的一部分提供。

启用切片后,输出 DatasetFeatureStatisticsList proto 包含多个 DatasetFeatureStatistics protos,每个切片一个。每个切片都由一个唯一的名称标识,该名称设置为 DatasetFeatureStatistics proto 中的数据集名称。默认情况下,TFDV 除了配置的切片之外,还会计算整个数据集的统计信息。

import tensorflow_data_validation as tfdv
from tensorflow_data_validation.utils import slicing_util

# Slice on country feature (i.e., every unique value of the feature).
slice_fn1 = slicing_util.get_feature_value_slicer(features={'country': None})

# Slice on the cross of country and state feature (i.e., every unique pair of
# values of the cross).
slice_fn2 = slicing_util.get_feature_value_slicer(
    features={'country': None, 'state': None})

# Slice on specific values of a feature.
slice_fn3 = slicing_util.get_feature_value_slicer(
    features={'age': [10, 50, 70]})

stats_options = tfdv.StatsOptions(
    slice_functions=[slice_fn1, slice_fn2, slice_fn3])