在本地构建 TFX 管道

TFX 使得将机器学习 (ML) 工作流程作为管道进行编排变得更加容易,以便

  • 自动化您的机器学习流程,使您能够定期重新训练、评估和部署模型。
  • 创建机器学习管道,其中包括对模型性能的深入分析和对新训练模型的验证,以确保性能和可靠性。
  • 监控训练数据以发现异常并消除训练服务偏差
  • 通过使用不同的超参数集运行管道来提高实验速度。

典型的管道开发流程从本地机器开始,进行数据分析和组件设置,然后部署到生产环境中。本指南介绍了两种在本地构建管道的 方法。

  • 自定义 TFX 管道模板以满足您的机器学习工作流程的需求。TFX 管道模板是预构建的工作流程,演示了使用 TFX 标准组件的最佳实践。
  • 使用 TFX 构建管道。在此用例中,您无需从模板开始即可定义管道。

在开发管道时,可以使用 LocalDagRunner 运行它。然后,一旦管道组件已定义完善并经过测试,您就可以使用生产级编排器(如 Kubeflow 或 Airflow)。

开始之前

TFX 是一个 Python 包,因此您需要设置 Python 开发环境,例如虚拟环境或 Docker 容器。然后

pip install tfx

如果您不熟悉 TFX 管道,请在继续之前 了解有关 TFX 管道核心概念的更多信息

使用模板构建管道

TFX 管道模板通过提供一组预构建的管道定义来简化管道开发的入门工作,您可以根据自己的用例对其进行自定义。

以下部分介绍如何创建模板副本并对其进行自定义以满足您的需求。

创建管道模板副本

  1. 查看可用的 TFX 管道模板列表

    tfx template list
    
  2. 从列表中选择一个模板

    tfx template copy --model=template --pipeline_name=pipeline-name \
    --destination_path=destination-path
    

    替换以下内容

    • template: 您要复制的模板的名称。
    • pipeline-name: 要创建的管道的名称。
    • destination-path: 复制模板到的路径。

    了解有关 tfx template copy 命令 的更多信息。

  3. 管道模板副本已创建在您指定的路径中。

探索管道模板

本部分概述了模板创建的脚手架。

  1. 探索复制到管道根目录的目录和文件

    • 一个 pipeline 目录,其中包含
      • pipeline.py - 定义管道,并列出正在使用的组件
      • configs.py - 保存配置详细信息,例如数据来源或正在使用的编排器
    • 一个 data 目录
      • 这通常包含一个 data.csv 文件,它是 ExampleGen 的默认来源。您可以在 configs.py 中更改数据源。
    • 一个 models 目录,其中包含预处理代码和模型实现

    • 模板复制了本地环境和 Kubeflow 的 DAG 运行器。

    • 一些模板还包含 Python 笔记本,以便您可以使用机器学习元数据探索数据和工件。

  2. 在管道目录中运行以下命令

    tfx pipeline create --pipeline_path local_runner.py
    
    tfx run create --pipeline_name pipeline_name
    

    该命令使用 LocalDagRunner 创建管道运行,这会将以下目录添加到您的管道中

    • 一个 tfx_metadata 目录,其中包含本地使用的机器学习元数据存储。
    • 一个 tfx_pipeline_output 目录,其中包含管道的文件输出。
  3. 打开您的管道的 pipeline/configs.py 文件并查看内容。此脚本定义了管道和组件函数使用的配置选项。您可以在此处指定数据源位置或运行中的训练步骤数量等内容。

  4. 打开您的管道的 pipeline/pipeline.py 文件并查看内容。此脚本创建 TFX 管道。最初,管道仅包含一个 ExampleGen 组件。

    • 按照 pipeline.pyTODO 注释中的说明,向管道添加更多步骤。
  5. 打开 local_runner.py 文件并查看内容。此脚本创建管道运行并指定运行的参数,例如 data_pathpreprocessing_fn

  6. 您已查看模板创建的脚手架并使用 LocalDagRunner 创建了管道运行。接下来,自定义模板以满足您的需求。

自定义您的管道

本节概述如何开始自定义您的模板。

  1. 设计您的管道。模板提供的脚手架可帮助您使用 TFX 标准组件实现用于表格数据的管道。如果您要将现有的 ML 工作流迁移到管道中,您可能需要修改代码以充分利用 TFX 标准组件。您可能还需要创建 自定义组件,这些组件实现您的工作流特有的功能或 TFX 标准组件尚未支持的功能。

  2. 设计好管道后,使用以下过程迭代地自定义管道。从将数据导入管道的组件开始,通常是 ExampleGen 组件。

    1. 自定义管道或组件以适应您的用例。这些自定义可能包括以下更改:

      • 更改管道参数。
      • 向管道添加组件或删除组件。
      • 替换数据输入源。此数据源可以是文件,也可以是查询到 BigQuery 等服务。
      • 更改管道中组件的配置。
      • 更改组件的自定义函数。
    2. 使用 local_runner.py 脚本或其他合适的 DAG 运行器(如果您使用的是其他编排器)在本地运行组件。如果脚本失败,请调试故障并重试运行脚本。

    3. 此自定义完成后,继续进行下一个自定义。

  3. 通过迭代工作,您可以自定义模板工作流中的每个步骤以满足您的需求。

构建自定义管道

使用以下说明了解有关在不使用模板的情况下构建自定义管道的更多信息。

  1. 设计您的管道。TFX 标准组件提供经过验证的功能,可帮助您实现完整的 ML 工作流。如果您要将现有的 ML 工作流迁移到管道中,您可能需要修改代码以充分利用 TFX 标准组件。您可能还需要创建 自定义组件,这些组件实现数据增强等功能。

  2. 创建一个脚本文件以使用以下示例定义您的管道。本指南将此文件称为 my_pipeline.py

    import os
    from typing import Optional, Text, List
    from absl import logging
    from ml_metadata.proto import metadata_store_pb2
    import tfx.v1 as tfx
    
    PIPELINE_NAME = 'my_pipeline'
    PIPELINE_ROOT = os.path.join('.', 'my_pipeline_output')
    METADATA_PATH = os.path.join('.', 'tfx_metadata', PIPELINE_NAME, 'metadata.db')
    ENABLE_CACHE = True
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
          pipeline_name=PIPELINE_NAME,
          pipeline_root=PIPELINE_ROOT,
          enable_cache=ENABLE_CACHE,
          metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
          )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    
    if __name__ == '__main__':
      logging.set_verbosity(logging.INFO)
      run_pipeline()
    

    在接下来的步骤中,您将在 create_pipeline 中定义您的管道,并使用本地运行器在本地运行您的管道。

    使用以下过程迭代地构建您的管道。

    1. 自定义管道或组件以适应您的用例。这些自定义可能包括以下更改:

      • 更改管道参数。
      • 向管道添加组件或删除组件。
      • 替换数据输入文件。
      • 更改管道中组件的配置。
      • 更改组件的自定义函数。
    2. 使用本地运行器或通过直接运行脚本在本地运行组件。如果脚本失败,请调试故障并重试运行脚本。

    3. 此自定义完成后,继续进行下一个自定义。

    从管道工作流中的第一个节点开始,通常是第一个节点将数据导入您的管道。

  3. 将工作流中的第一个节点添加到您的管道。在此示例中,管道使用 ExampleGen 标准组件从 ./data 目录中的 CSV 加载数据。

    from tfx.components import CsvExampleGen
    
    DATA_PATH = os.path.join('.', 'data')
    
    def create_pipeline(
      pipeline_name: Text,
      pipeline_root:Text,
      data_path: Text,
      enable_cache: bool,
      metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
      beam_pipeline_args: Optional[List[Text]] = None
    ):
      components = []
    
      example_gen = tfx.components.CsvExampleGen(input_base=data_path)
      components.append(example_gen)
    
      return tfx.dsl.Pipeline(
            pipeline_name=pipeline_name,
            pipeline_root=pipeline_root,
            components=components,
            enable_cache=enable_cache,
            metadata_connection_config=metadata_connection_config,
            beam_pipeline_args=beam_pipeline_args, 
        )
    
    def run_pipeline():
      my_pipeline = create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_path=DATA_PATH,
        enable_cache=ENABLE_CACHE,
        metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(METADATA_PATH)
        )
    
      tfx.orchestration.LocalDagRunner().run(my_pipeline)
    

    CsvExampleGen 使用指定数据路径处的 CSV 中的数据创建序列化示例记录。通过使用数据根设置 CsvExampleGen 组件的 input_base 参数。

  4. 在与 my_pipeline.py 相同的目录中创建一个 data 目录。将一个小 CSV 文件添加到 data 目录。

  5. 使用以下命令运行您的 my_pipeline.py 脚本。

    python my_pipeline.py
    

    结果应类似于以下内容

    INFO:absl:Component CsvExampleGen depends on [].
    INFO:absl:Component CsvExampleGen is scheduled.
    INFO:absl:Component CsvExampleGen is running.
    INFO:absl:Running driver for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Running executor for CsvExampleGen
    INFO:absl:Generating examples.
    INFO:absl:Using 1 process(es) for Local pipeline execution.
    INFO:absl:Processing input csv data ./data/* to TFExample.
    WARNING:root:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
    INFO:absl:Examples generated.
    INFO:absl:Running publisher for CsvExampleGen
    INFO:absl:MetadataStore with DB connection initialized
    INFO:absl:Component CsvExampleGen is finished.
    
  6. 继续迭代地将组件添加到您的管道。