此笔记本包含有关如何在 TFX InteractiveContext 和本地编排的 TFX 管道中编写和运行 Python 函数组件的示例。
有关更多背景信息,请参阅 TFX 文档网站上的 自定义 Python 函数组件 页面。
设置
我们首先安装 TFX 并导入必要的模块。TFX 需要 Python 3。
检查系统 Python 版本
import sys
sys.version
'3.9.19 (main, Apr 6 2024, 17:57:55) \n[GCC 9.4.0]'
升级 Pip
为了避免在本地运行时升级系统中的 Pip,请确保我们正在 Colab 中运行。本地系统当然可以单独升级。
try:
import colab
!pip install --upgrade pip
except:
pass
安装 TFX
pip install tfx
您是否重新启动了运行时?
如果您使用的是 Google Colab,则第一次运行上面的单元格时,必须重新启动运行时(运行时 > 重新启动运行时...)。这是因为 Colab 加载软件包的方式。
导入软件包
我们导入 TFX 并检查其版本。
# Check version
from tfx import v1 as tfx
tfx.__version__
2024-05-08 09:54:22.424419: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-05-08 09:54:22.424465: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-05-08 09:54:22.426007: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered '1.15.0'
自定义 Python 函数组件
在本节中,我们将从 Python 函数创建组件。我们不会进行任何实际的 ML 问题——这些简单的函数仅用于说明 Python 函数组件开发过程。
有关更多文档,请参阅 基于 Python 函数的组件指南。
创建 Python 自定义组件
我们首先编写一个生成一些虚拟数据的函数。它被写入到自己的 Python 模块文件中。
%%writefile my_generator.py
import os
import tensorflow as tf # Used for writing files.
from tfx import v1 as tfx
# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset
@tfx.dsl.components.component
def MyGenerator(data: tfx.dsl.components.OutputArtifact[Dataset]):
"""Create a file with dummy data in the output artifact."""
with tf.io.gfile.GFile(os.path.join(data.uri, 'data_file.txt'), 'w') as f:
f.write('Dummy data')
# Set metadata and ensure that it gets passed to downstream components.
data.set_string_custom_property('my_custom_field', 'my_custom_value')
Writing my_generator.py
接下来,我们编写第二个组件,它使用生成的虚拟数据。我们只需计算数据的哈希值并返回它。
%%writefile my_consumer.py
import hashlib
import os
import tensorflow as tf
from tfx import v1 as tfx
# Non-public APIs, just for showcase.
from tfx.types.experimental.simple_artifacts import Dataset
from tfx.types.standard_artifacts import String
@tfx.dsl.components.component
def MyConsumer(data: tfx.dsl.components.InputArtifact[Dataset],
hash: tfx.dsl.components.OutputArtifact[String],
algorithm: tfx.dsl.components.Parameter[str] = 'sha256'):
"""Reads the contents of data and calculate."""
with tf.io.gfile.GFile(
os.path.join(data.uri, 'data_file.txt'), 'r') as f:
contents = f.read()
h = hashlib.new(algorithm)
h.update(tf.compat.as_bytes(contents))
hash.value = h.hexdigest()
# Read a custom property from the input artifact and set to the output.
custom_value = data.get_string_custom_property('my_custom_field')
hash.set_string_custom_property('input_custom_field', custom_value)
Writing my_consumer.py
在笔记本中使用 InteractiveContext 运行
现在,我们将演示在 TFX InteractiveContext 中使用新组件的方法。
有关使用 TFX 笔记本 InteractiveContext 可以执行的操作的更多信息,请参阅笔记本中的 TFX Keras 组件教程。
from my_generator import MyGenerator
from my_consumer import MyConsumer
构建 InteractiveContext
# Here, we create an InteractiveContext using default parameters. This will
# use a temporary directory with an ephemeral ML Metadata database instance.
# To use your own pipeline root or database, the optional properties
# `pipeline_root` and `metadata_connection_config` may be passed to
# InteractiveContext. Calls to InteractiveContext are no-ops outside of the
# notebook.
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
context = InteractiveContext()
WARNING:absl:InteractiveContext pipeline_root argument not provided: using temporary directory /tmpfs/tmp/tfx-interactive-2024-05-08T09_54_27.937649-5yvglrqg as root for pipeline outputs. WARNING:absl:InteractiveContext metadata_connection_config not provided: using SQLite ML Metadata database at /tmpfs/tmp/tfx-interactive-2024-05-08T09_54_27.937649-5yvglrqg/metadata.sqlite.
使用 context.run()
以交互方式运行您的组件
接下来,我们使用 context.run()
在笔记本中以交互方式运行我们的组件。我们的使用者组件使用生成器组件的输出。
generator = MyGenerator()
context.run(generator)
consumer = MyConsumer(
data=generator.outputs['data'],
algorithm='md5')
context.run(consumer)
执行后,我们可以检查磁盘上使用者组件的“hash”输出工件的内容。
tail -v {consumer.outputs['hash'].get()[0].uri}
==> /tmpfs/tmp/tfx-interactive-2024-05-08T09_54_27.937649-5yvglrqg/MyConsumer/hash/2/value <== 0015fe7975d1a2794b59aa12635703f1
就是这样,您现在已经编写并执行了自己的自定义组件!
编写管道定义
接下来,我们将使用这些相同的组件编写一个管道。虽然在笔记本中使用 InteractiveContext
非常适合实验,但定义管道可以让您在本地或远程运行器上部署管道以进行生产使用。
这里,我们将演示在本地机器上运行的 LocalDagRunner 的使用方法。对于生产执行,Airflow 或 Kubeflow 运行器可能更适合。
构建管道
import os
import tempfile
from tfx import v1 as tfx
# Select a persistent TFX root directory to store your output artifacts.
# For demonstration purposes only, we use a temporary directory.
PIPELINE_ROOT = tempfile.mkdtemp()
# Select a pipeline name so that multiple runs of the same logical pipeline
# can be grouped.
PIPELINE_NAME = "function-based-pipeline"
# We use a ML Metadata configuration that uses a local SQLite database in
# the pipeline root directory. Other backends for ML Metadata are available
# for production usage.
METADATA_CONNECTION_CONFIG = tfx.orchestration.metadata.sqlite_metadata_connection_config(
os.path.join(PIPELINE_ROOT, 'metadata.sqlite'))
def function_based_pipeline():
# Here, we construct our generator and consumer components in the same way.
generator = MyGenerator()
consumer = MyConsumer(
data=generator.outputs['data'],
algorithm='md5')
return tfx.dsl.Pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
components=[generator, consumer],
metadata_connection_config=METADATA_CONNECTION_CONFIG)
my_pipeline = function_based_pipeline()
使用 LocalDagRunner
运行您的管道
tfx.orchestration.LocalDagRunner().run(my_pipeline)
WARNING:absl:ArtifactQuery.property_predicate is not supported.
我们可以检查此管道执行生成的输出工件。
find {PIPELINE_ROOT}
/tmpfs/tmp/tmpcu4s98j0 /tmpfs/tmp/tmpcu4s98j0/MyGenerator /tmpfs/tmp/tmpcu4s98j0/MyGenerator/data /tmpfs/tmp/tmpcu4s98j0/MyGenerator/data/1 /tmpfs/tmp/tmpcu4s98j0/MyGenerator/data/1/data_file.txt /tmpfs/tmp/tmpcu4s98j0/MyGenerator/.system /tmpfs/tmp/tmpcu4s98j0/MyGenerator/.system/stateful_working_dir /tmpfs/tmp/tmpcu4s98j0/MyGenerator/.system/executor_execution /tmpfs/tmp/tmpcu4s98j0/MyGenerator/.system/executor_execution/1 /tmpfs/tmp/tmpcu4s98j0/metadata.sqlite /tmpfs/tmp/tmpcu4s98j0/MyConsumer /tmpfs/tmp/tmpcu4s98j0/MyConsumer/.system /tmpfs/tmp/tmpcu4s98j0/MyConsumer/.system/stateful_working_dir /tmpfs/tmp/tmpcu4s98j0/MyConsumer/.system/executor_execution /tmpfs/tmp/tmpcu4s98j0/MyConsumer/.system/executor_execution/2 /tmpfs/tmp/tmpcu4s98j0/MyConsumer/hash /tmpfs/tmp/tmpcu4s98j0/MyConsumer/hash/2 /tmpfs/tmp/tmpcu4s98j0/MyConsumer/hash/2/value
您现在已经编写了自己的自定义组件,并在 LocalDagRunner 上协调了它们的执行!有关下一步操作,请查看 TFX 网站 上的其他教程和指南。