Apache Beam 提供了一个框架,用于运行在各种执行引擎上运行的批处理和流式数据处理作业。几个 TFX 库使用 Beam 来运行任务,这使得跨计算集群实现高度可扩展性成为可能。Beam 包含对各种执行引擎或“运行器”的支持,包括一个直接运行器,它在单个计算节点上运行,对于开发、测试或小型部署非常有用。Beam 提供了一个抽象层,使 TFX 能够在任何支持的运行器上运行,而无需修改代码。TFX 使用 Beam Python API,因此它仅限于 Python API 支持的运行器。
部署和可扩展性
随着工作负载需求的增加,Beam 可以扩展到跨大型计算集群的非常大的部署。这仅受底层运行器可扩展性的限制。大型部署中的运行器通常会部署到容器编排系统(如 Kubernetes 或 Apache Mesos)中,以自动执行应用程序部署、扩展和管理。
有关 Apache Beam 的更多信息,请参阅 Apache Beam 文档。
对于 Google Cloud 用户,Dataflow 是推荐的运行器,它通过自动扩展资源、动态工作重新平衡、与其他 Google Cloud 服务的深度集成、内置安全性和监控来提供无服务器且经济高效的平台。
自定义 Python 代码和依赖项
在 TFX 管道中使用 Beam 的一个显着复杂性是处理自定义代码和/或来自其他 Python 模块所需的依赖项。以下是一些可能出现此问题的情况示例
- preprocessing_fn 需要引用用户的 Python 模块
- Evaluator 组件的自定义提取器
- 从 TFX 组件子类化的自定义模块
TFX 依赖于 Beam 对 管理 Python 管道依赖项 的支持来处理 Python 依赖项。目前有两种方法可以管理此问题
- 将 Python 代码和依赖项作为源包提供
- [仅限 Dataflow] 使用容器映像作为工作器
接下来将讨论这些内容。
将 Python 代码和依赖项作为源包提供
建议熟悉以下用户的用户
- 熟悉 Python 打包
- 仅使用 Python 源代码(即没有 C 模块或共享库)。
请按照 管理 Python 管道依赖项 中的路径之一进行操作,使用以下 beam_pipeline_args 之一提供此内容
- --setup_file
- --extra_package
- --requirements_file
注意:在上述任何情况下,请确保将相同版本的 tfx
列为依赖项。
[仅限 Dataflow] 使用容器映像作为工作器
TFX 0.26.0 及更高版本实验性地支持使用 自定义容器映像 作为 Dataflow 工作器。
为了使用此功能,您必须
- 构建一个 Docker 映像,其中预先安装了
tfx
以及用户的自定义代码和依赖项。- 对于 (1) 使用
tfx>=0.26
且 (2) 使用 python 3.7 开发其管道的用户,最简单的方法是扩展官方tensorflow/tfx
映像的相应版本
- 对于 (1) 使用
# You can use a build-arg to dynamically pass in the
# version of TFX being used to your Dockerfile.
ARG TFX_VERSION
FROM tensorflow/tfx:${TFX_VERSION}
# COPY your code and dependencies in
- 将构建的映像推送到 Dataflow 使用的项目可访问的容器映像注册表中。
- Google Cloud 用户可以考虑使用 Cloud Build,它可以很好地自动执行上述步骤。
- 提供以下
beam_pipeline_args
beam_pipeline_args.extend([
'--runner=DataflowRunner',
'--project={project-id}',
'--worker_harness_container_image={image-ref}',
'--experiments=use_runner_v2',
])
TODO(b/171733562): 在 Dataflow 成为默认值后,删除 use_runner_v2。
TODO(b/179738639): 在 https://issues.apache.org/jira/browse/BEAM-5440 之后,创建有关如何在本地测试自定义容器的文档
Beam 管道参数
几个 TFX 组件依赖于 Beam 进行分布式数据处理。它们使用 beam_pipeline_args
进行配置,该参数在管道创建期间指定
my_pipeline = Pipeline(
...,
beam_pipeline_args=[...])
TFX 0.30 及更高版本添加了一个接口 with_beam_pipeline_args
,用于扩展每个组件的管道级 beam 参数
example_gen = CsvExampleGen(input_base=data_root).with_beam_pipeline_args([...])