Apache Beam 和 TFX

Apache Beam 提供了一个框架,用于运行在各种执行引擎上运行的批处理和流式数据处理作业。几个 TFX 库使用 Beam 来运行任务,这使得跨计算集群实现高度可扩展性成为可能。Beam 包含对各种执行引擎或“运行器”的支持,包括一个直接运行器,它在单个计算节点上运行,对于开发、测试或小型部署非常有用。Beam 提供了一个抽象层,使 TFX 能够在任何支持的运行器上运行,而无需修改代码。TFX 使用 Beam Python API,因此它仅限于 Python API 支持的运行器。

部署和可扩展性

随着工作负载需求的增加,Beam 可以扩展到跨大型计算集群的非常大的部署。这仅受底层运行器可扩展性的限制。大型部署中的运行器通常会部署到容器编排系统(如 Kubernetes 或 Apache Mesos)中,以自动执行应用程序部署、扩展和管理。

有关 Apache Beam 的更多信息,请参阅 Apache Beam 文档。

对于 Google Cloud 用户,Dataflow 是推荐的运行器,它通过自动扩展资源、动态工作重新平衡、与其他 Google Cloud 服务的深度集成、内置安全性和监控来提供无服务器且经济高效的平台。

自定义 Python 代码和依赖项

在 TFX 管道中使用 Beam 的一个显着复杂性是处理自定义代码和/或来自其他 Python 模块所需的依赖项。以下是一些可能出现此问题的情况示例

  • preprocessing_fn 需要引用用户的 Python 模块
  • Evaluator 组件的自定义提取器
  • 从 TFX 组件子类化的自定义模块

TFX 依赖于 Beam 对 管理 Python 管道依赖项 的支持来处理 Python 依赖项。目前有两种方法可以管理此问题

  1. 将 Python 代码和依赖项作为源包提供
  2. [仅限 Dataflow] 使用容器映像作为工作器

接下来将讨论这些内容。

将 Python 代码和依赖项作为源包提供

建议熟悉以下用户的用户

  1. 熟悉 Python 打包
  2. 仅使用 Python 源代码(即没有 C 模块或共享库)。

请按照 管理 Python 管道依赖项 中的路径之一进行操作,使用以下 beam_pipeline_args 之一提供此内容

  • --setup_file
  • --extra_package
  • --requirements_file

注意:在上述任何情况下,请确保将相同版本的 tfx 列为依赖项。

[仅限 Dataflow] 使用容器映像作为工作器

TFX 0.26.0 及更高版本实验性地支持使用 自定义容器映像 作为 Dataflow 工作器。

为了使用此功能,您必须

  • 构建一个 Docker 映像,其中预先安装了 tfx 以及用户的自定义代码和依赖项。
    • 对于 (1) 使用 tfx>=0.26 且 (2) 使用 python 3.7 开发其管道的用户,最简单的方法是扩展官方 tensorflow/tfx 映像的相应版本
# You can use a build-arg to dynamically pass in the
# version of TFX being used to your Dockerfile.

ARG TFX_VERSION
FROM tensorflow/tfx:${TFX_VERSION}
# COPY your code and dependencies in
  • 将构建的映像推送到 Dataflow 使用的项目可访问的容器映像注册表中。
    • Google Cloud 用户可以考虑使用 Cloud Build,它可以很好地自动执行上述步骤。
  • 提供以下 beam_pipeline_args
beam_pipeline_args.extend([
    '--runner=DataflowRunner',
    '--project={project-id}',
    '--worker_harness_container_image={image-ref}',
    '--experiments=use_runner_v2',
])

TODO(b/171733562): 在 Dataflow 成为默认值后,删除 use_runner_v2。

TODO(b/179738639): 在 https://issues.apache.org/jira/browse/BEAM-5440 之后,创建有关如何在本地测试自定义容器的文档

Beam 管道参数

几个 TFX 组件依赖于 Beam 进行分布式数据处理。它们使用 beam_pipeline_args 进行配置,该参数在管道创建期间指定

my_pipeline = Pipeline(
    ...,
    beam_pipeline_args=[...])

TFX 0.30 及更高版本添加了一个接口 with_beam_pipeline_args,用于扩展每个组件的管道级 beam 参数

example_gen = CsvExampleGen(input_base=data_root).with_beam_pipeline_args([...])