本指南介绍如何使用 TFX API 构建完全自定义组件。通过定义组件规范、执行器和组件接口类,完全自定义组件让您可以构建组件。此方法让您可以根据需要复用和扩展标准组件。
如果您是 TFX 管道的新手,请详细了解 TFX 管道的核心概念。
自定义执行器或自定义组件
如果只需要自定义处理逻辑,而组件的输入、输出和执行属性与现有组件相同,那么自定义执行器就足够了。当任何输入、输出或执行属性与任何现有 TFX 组件不同时,则需要完全自定义组件。
如何创建自定义组件?
开发完全自定义组件需要
- 为新组件定义一组输入和输出工件规范。特别是,输入工件的类型应与生成工件的组件的输出工件类型一致,并且输出工件的类型应与消耗工件的组件的输入工件类型一致(如果存在)。
- 新组件所需的非工件执行参数。
ComponentSpec
ComponentSpec
类通过定义组件的输入和输出工件以及用于组件执行的参数来定义组件契约。它有三个部分
- INPUTS:传递到组件执行器的输入工件的类型化参数的字典。通常,输入工件是上游组件的输出,因此共享相同的类型。
- OUTPUTS:组件生成的输出工件的类型化参数的字典。
- PARAMETERS:将传递到组件执行器的其他 ExecutionParameter 项的字典。这些是非工件参数,我们希望在管道 DSL 中灵活地定义它们并传递到执行中。
以下是一个 ComponentSpec 示例
class HelloComponentSpec(types.ComponentSpec):
"""ComponentSpec for Custom TFX Hello World Component."""
PARAMETERS = {
# These are parameters that will be passed in the call to
# create an instance of this component.
'name': ExecutionParameter(type=Text),
}
INPUTS = {
# This will be a dictionary with input artifacts, including URIs
'input_data': ChannelParameter(type=standard_artifacts.Examples),
}
OUTPUTS = {
# This will be a dictionary which this component will populate
'output_data': ChannelParameter(type=standard_artifacts.Examples),
}
执行器
接下来,为新组件编写执行器代码。基本上,需要使用被覆盖的 Do
函数创建 base_executor.BaseExecutor
的新子类。在 Do
函数中,参数 input_dict
、output_dict
和 exec_properties
映射到分别在 ComponentSpec 中定义的 INPUTS
、OUTPUTS
和 PARAMETERS
。对于 exec_properties
,可以通过字典查找直接获取值。对于 input_dict
和 output_dict
中的工件,artifact_utils 类中提供了可用于获取工件实例或工件 URI 的便捷函数。
class Executor(base_executor.BaseExecutor):
"""Executor for HelloComponent."""
def Do(self, input_dict: Dict[Text, List[types.Artifact]],
output_dict: Dict[Text, List[types.Artifact]],
exec_properties: Dict[Text, Any]) -> None:
...
split_to_instance = {}
for artifact in input_dict['input_data']:
for split in json.loads(artifact.split_names):
uri = artifact_utils.get_split_uri([artifact], split)
split_to_instance[split] = uri
for split, instance in split_to_instance.items():
input_dir = instance
output_dir = artifact_utils.get_split_uri(
output_dict['output_data'], split)
for filename in tf.io.gfile.listdir(input_dir):
input_uri = os.path.join(input_dir, filename)
output_uri = os.path.join(output_dir, filename)
io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)
单元测试自定义执行器
可以类似于 此示例 为自定义执行器创建单元测试。
组件接口
现在最复杂的部分已经完成,下一步是将这些部分组装到组件接口中,以使该组件能够在管道中使用。有几个步骤
- 使组件接口成为
base_component.BaseComponent
的子类 - 使用之前定义的
ComponentSpec
类为类变量SPEC_CLASS
赋值 - 使用之前定义的 Executor 类为类变量
EXECUTOR_SPEC
赋值 - 通过使用函数的参数来构造 ComponentSpec 类的实例并使用该值调用具有可选名称的超级函数来定义
__init__()
构造函数
创建组件实例时,base_component.BaseComponent
类中的类型检查逻辑将被调用,以确保传入的参数与 ComponentSpec
类中定义的类型信息兼容。
from tfx.types import standard_artifacts
from hello_component import executor
class HelloComponent(base_component.BaseComponent):
"""Custom TFX Hello World Component."""
SPEC_CLASS = HelloComponentSpec
EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)
def __init__(self,
input_data: types.Channel = None,
output_data: types.Channel = None,
name: Optional[Text] = None):
if not output_data:
examples_artifact = standard_artifacts.Examples()
examples_artifact.split_names = input_data.get()[0].split_names
output_data = channel_utils.as_channel([examples_artifact])
spec = HelloComponentSpec(input_data=input_data,
output_data=output_data, name=name)
super(HelloComponent, self).__init__(spec=spec)
组装到 TFX 管道中
最后一步是将新的自定义组件插入 TFX 管道。除了添加新组件的实例,还需要执行以下操作
- 正确连接新组件的上游和下游组件。这可以通过在新组件中引用上游组件的输出,并在下游组件中引用新组件的输出来完成
- 在构建管道时将新组件实例添加到组件列表中。
以下示例重点介绍了上述更改。可以在 TFX GitHub 存储库 中找到完整示例。
def _create_pipeline():
...
example_gen = CsvExampleGen(input_base=examples)
hello = component.HelloComponent(
input_data=example_gen.outputs['examples'], name='HelloWorld')
statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
...
return pipeline.Pipeline(
...
components=[example_gen, hello, statistics_gen, ...],
...
)
部署完全自定义组件
除了代码更改之外,所有新添加的部分(ComponentSpec
、Executor
、组件接口)都必须在管道运行环境中可访问,才能正确运行管道。