使用 ML 元数据改进 ML 工程

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

假设您设置了一个生产 ML 管道来对企鹅进行分类。该管道会摄取您的训练数据,训练和评估模型,并将模型推送到生产环境中。

但是,当您稍后尝试使用包含不同类型企鹅的更大数据集使用此模型时,您会发现模型的行为与预期不符,并且开始错误地对物种进行分类。

此时,您想知道

  • 当唯一可用的工件是生产环境中的模型时,调试模型的最有效方法是什么?
  • 使用哪个训练数据集来训练模型?
  • 哪个训练运行导致了这个错误的模型?
  • 模型评估结果在哪里?
  • 从哪里开始调试?

ML 元数据 (MLMD) 是一个库,它利用与 ML 模型关联的元数据来帮助您回答这些问题以及更多问题。一个有用的类比是将此元数据视为软件开发中的日志记录。MLMD 使您能够可靠地跟踪与 ML 管道各个组件关联的工件和血统。

在本教程中,您将设置一个 TFX 管道来创建模型,该模型根据企鹅的体重以及喙的长度和深度以及鳍的长度将企鹅分类为三种物种。然后,您将使用 MLMD 跟踪管道组件的血统。

Colab 中的 TFX 管道

Colab 是一种轻量级开发环境,它与生产环境有很大不同。在生产环境中,您可能在多个分布式系统中拥有各种管道组件,例如数据摄取、转换、模型训练、运行历史记录等。在本教程中,您应该知道,编排和元数据存储存在显著差异 - 它们都在 Colab 中本地处理。了解有关 Colab 中 TFX 的更多信息 此处

设置

首先,我们将安装并导入必要的软件包,设置路径并下载数据。

升级 Pip

为了避免在本地运行时升级系统中的 Pip,请检查我们是否在 Colab 中运行。本地系统当然可以单独升级。

try:
  import colab
  !pip install --upgrade pip
except:
  pass

安装并导入 TFX

pip install -q tfx

导入软件包

您是否重新启动了运行时?

如果您使用的是 Google Colab,则第一次运行上面的单元格时,必须通过单击上面的“重新启动运行时”按钮或使用“运行时 > 重新启动运行时...”菜单来重新启动运行时。这是因为 Colab 加载软件包的方式。

import os
import tempfile
import urllib
import pandas as pd

import tensorflow_model_analysis as tfma
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
2024-04-30 10:32:39.287985: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-04-30 10:32:39.288034: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-04-30 10:32:39.289482: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered

检查 TFX 和 MLMD 版本。

from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import ml_metadata as mlmd
print('MLMD version: {}'.format(mlmd.__version__))
TFX version: 1.15.0
MLMD version: 1.15.0

下载数据集

在本 Colab 中,我们使用 Palmer 企鹅数据集,该数据集可以在 Github 上找到。我们通过删除所有不完整的记录,并删除 islandsex 列,并将标签转换为 int32 来处理数据集。该数据集包含 334 条企鹅体重的记录,以及企鹅喙的长度和深度以及鳍的长度。您将使用这些数据将企鹅分类为三种物种之一。

DATA_PATH = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv'
_data_root = tempfile.mkdtemp(prefix='tfx-data')
_data_filepath = os.path.join(_data_root, "penguins_processed.csv")
urllib.request.urlretrieve(DATA_PATH, _data_filepath)
('/tmpfs/tmp/tfx-data4bx2jr3d/penguins_processed.csv',
 <http.client.HTTPMessage at 0x7f82e19047c0>)

创建 InteractiveContext

要在本笔记本中以交互方式运行 TFX 组件,请创建一个 InteractiveContextInteractiveContext 使用带有短暂 MLMD 数据库实例的临时目录。请注意,对 InteractiveContext 的调用在 Colab 环境之外是无操作的。

通常,将类似的管道运行分组到 Context 中是一个好习惯。

interactive_context = InteractiveContext()
WARNING:absl:InteractiveContext pipeline_root argument not provided: using temporary directory /tmpfs/tmp/tfx-interactive-2024-04-30T10_32_43.981209-5usg33le as root for pipeline outputs.
WARNING:absl:InteractiveContext metadata_connection_config not provided: using SQLite ML Metadata database at /tmpfs/tmp/tfx-interactive-2024-04-30T10_32_43.981209-5usg33le/metadata.sqlite.

构建 TFX 管道

TFX 管道由执行 ML 工作流程不同方面的多个组件组成。在本笔记本中,您将创建并运行 ExampleGenStatisticsGenSchemaGenTrainer 组件,并使用 EvaluatorPusher 组件来评估和推送训练后的模型。

有关 TFX 管道组件的更多信息,请参阅 组件教程

实例化并运行 ExampleGen 组件

example_gen = tfx.components.CsvExampleGen(input_base=_data_root)
interactive_context.run(example_gen)
WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features.
WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.

实例化并运行 StatisticsGen 组件

statistics_gen = tfx.components.StatisticsGen(
    examples=example_gen.outputs['examples'])
interactive_context.run(statistics_gen)

实例化并运行 SchemaGen 组件

infer_schema = tfx.components.SchemaGen(
    statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
interactive_context.run(infer_schema)

实例化并运行 Trainer 组件

# Define the module file for the Trainer component
trainer_module_file = 'penguin_trainer.py'
%%writefile {trainer_module_file}

# Define the training algorithm for the Trainer module file
import os
from typing import List, Text

import tensorflow as tf
from tensorflow import keras

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

from tensorflow_metadata.proto.v0 import schema_pb2

# Features used for classification - culmen length and depth, flipper length,
# body mass, and species.

_LABEL_KEY = 'species'

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]


def _input_fn(file_pattern: List[Text],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema, batch_size: int) -> tf.data.Dataset:
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY), schema).repeat()


def _build_keras_model():
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  d = keras.layers.Dense(8, activation='relu')(d)
  d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)
  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])
  return model


def run_fn(fn_args: tfx.components.FnArgs):
  schema = schema_pb2.Schema()
  tfx.utils.parse_pbtxt_file(fn_args.schema_path, schema)
  train_dataset = _input_fn(
      fn_args.train_files, fn_args.data_accessor, schema, batch_size=10)
  eval_dataset = _input_fn(
      fn_args.eval_files, fn_args.data_accessor, schema, batch_size=10)
  model = _build_keras_model()
  model.fit(
      train_dataset,
      epochs=int(fn_args.train_steps / 20),
      steps_per_epoch=20,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

运行 Trainer 组件。

trainer = tfx.components.Trainer(
    module_file=os.path.abspath(trainer_module_file),
    examples=example_gen.outputs['examples'],
    schema=infer_schema.outputs['schema'],
    train_args=tfx.proto.TrainArgs(num_steps=100),
    eval_args=tfx.proto.EvalArgs(num_steps=50))
interactive_context.run(trainer)
running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying penguin_trainer.py -> build/lib
installing to /tmpfs/tmp/tmp2bjhph4h
running install
running install_lib
copying build/lib/penguin_trainer.py -> /tmpfs/tmp/tmp2bjhph4h
running install_egg_info
running egg_info
creating tfx_user_code_Trainer.egg-info
writing tfx_user_code_Trainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
Copying tfx_user_code_Trainer.egg-info to /tmpfs/tmp/tmp2bjhph4h/tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4-py3.9.egg-info
running install_scripts
creating /tmpfs/tmp/tmp2bjhph4h/tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4.dist-info/WHEEL
creating '/tmpfs/tmp/tmp1r3ydm1_/tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4-py3-none-any.whl' and adding '/tmpfs/tmp/tmp2bjhph4h' to it
adding 'penguin_trainer.py'
adding 'tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4.dist-info/METADATA'
adding 'tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4.dist-info/WHEEL'
adding 'tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4.dist-info/top_level.txt'
adding 'tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4.dist-info/RECORD'
removing /tmpfs/tmp/tmp2bjhph4h
/tmpfs/src/tf_docs_env/lib/python3.9/site-packages/setuptools/_distutils/cmd.py:66: SetuptoolsDeprecationWarning: setup.py install is deprecated.
!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer or other
        standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()
Processing /tmpfs/tmp/tfx-interactive-2024-04-30T10_32_43.981209-5usg33le/_wheels/tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4-py3-none-any.whl
Installing collected packages: tfx-user-code-Trainer
Successfully installed tfx-user-code-Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tfx_bsl/tfxio/tf_example_record.py:343: parse_example_dataset (from tensorflow.python.data.experimental.ops.parsing_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Dataset.map(tf.io.parse_example(...))` instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tfx_bsl/tfxio/tf_example_record.py:343: parse_example_dataset (from tensorflow.python.data.experimental.ops.parsing_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Dataset.map(tf.io.parse_example(...))` instead.
Epoch 1/5
WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
I0000 00:00:1714473175.420733  172568 device_compiler.h:186] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.
20/20 [==============================] - 2s 17ms/step - loss: 0.9629 - sparse_categorical_accuracy: 0.7000 - val_loss: 0.8934 - val_sparse_categorical_accuracy: 0.7600
Epoch 2/5
20/20 [==============================] - 0s 9ms/step - loss: 0.7868 - sparse_categorical_accuracy: 0.7650 - val_loss: 0.7069 - val_sparse_categorical_accuracy: 0.7700
Epoch 3/5
20/20 [==============================] - 0s 9ms/step - loss: 0.5864 - sparse_categorical_accuracy: 0.8150 - val_loss: 0.5397 - val_sparse_categorical_accuracy: 0.7800
Epoch 4/5
20/20 [==============================] - 0s 10ms/step - loss: 0.4492 - sparse_categorical_accuracy: 0.8150 - val_loss: 0.4520 - val_sparse_categorical_accuracy: 0.7800
Epoch 5/5
20/20 [==============================] - 0s 9ms/step - loss: 0.4016 - sparse_categorical_accuracy: 0.7900 - val_loss: 0.3730 - val_sparse_categorical_accuracy: 0.8200
INFO:tensorflow:Assets written to: /tmpfs/tmp/tfx-interactive-2024-04-30T10_32_43.981209-5usg33le/Trainer/model/4/Format-Serving/assets
INFO:tensorflow:Assets written to: /tmpfs/tmp/tfx-interactive-2024-04-30T10_32_43.981209-5usg33le/Trainer/model/4/Format-Serving/assets

评估并推送模型

使用 Evaluator 组件评估并“批准”模型,然后使用 Pusher 组件将模型推送到服务目录。

_serving_model_dir = os.path.join(tempfile.mkdtemp(),
                                  'serving_model/penguins_classification')
eval_config = tfma.EvalConfig(
    model_specs=[
        tfma.ModelSpec(label_key='species', signature_name='serving_default')
    ],
    metrics_specs=[
        tfma.MetricsSpec(metrics=[
            tfma.MetricConfig(
                class_name='SparseCategoricalAccuracy',
                threshold=tfma.MetricThreshold(
                    value_threshold=tfma.GenericValueThreshold(
                        lower_bound={'value': 0.6})))
        ])
    ],
    slicing_specs=[tfma.SlicingSpec()])
evaluator = tfx.components.Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    schema=infer_schema.outputs['schema'],
    eval_config=eval_config)
interactive_context.run(evaluator)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow_model_analysis/writers/metrics_plots_and_validations_writer.py:112: tf_record_iterator (from tensorflow.python.lib.io.tf_record) is deprecated and will be removed in a future version.
Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow_model_analysis/writers/metrics_plots_and_validations_writer.py:112: tf_record_iterator (from tensorflow.python.lib.io.tf_record) is deprecated and will be removed in a future version.
Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`
pusher = tfx.components.Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=tfx.proto.PushDestination(
        filesystem=tfx.proto.PushDestination.Filesystem(
            base_directory=_serving_model_dir)))
interactive_context.run(pusher)

运行 TFX 管道会填充 MLMD 数据库。在下一节中,您将使用 MLMD API 查询此数据库以获取元数据信息。

查询 MLMD 数据库

MLMD 数据库存储三种类型的元数据

  • 有关管道和与管道组件关联的谱系信息的元数据
  • 有关在管道运行期间生成的工件的元数据
  • 有关管道执行的元数据

典型的生产环境管道会随着新数据的到来提供多个模型。当您在已提供模型中遇到错误结果时,您可以查询 MLMD 数据库以隔离错误模型。然后,您可以跟踪与这些模型相对应的管道组件的谱系以调试您的模型

使用之前定义的 InteractiveContext 设置元数据 (MD) 存储以查询 MLMD 数据库。

connection_config = interactive_context.metadata_connection_config
store = mlmd.MetadataStore(connection_config)

# All TFX artifacts are stored in the base directory
base_dir = connection_config.sqlite.filename_uri.split('metadata.sqlite')[0]

创建一些辅助函数以查看 MD 存储中的数据。

def display_types(types):
  # Helper function to render dataframes for the artifact and execution types
  table = {'id': [], 'name': []}
  for a_type in types:
    table['id'].append(a_type.id)
    table['name'].append(a_type.name)
  return pd.DataFrame(data=table)
def display_artifacts(store, artifacts):
  # Helper function to render dataframes for the input artifacts
  table = {'artifact id': [], 'type': [], 'uri': []}
  for a in artifacts:
    table['artifact id'].append(a.id)
    artifact_type = store.get_artifact_types_by_id([a.type_id])[0]
    table['type'].append(artifact_type.name)
    table['uri'].append(a.uri.replace(base_dir, './'))
  return pd.DataFrame(data=table)
def display_properties(store, node):
  # Helper function to render dataframes for artifact and execution properties
  table = {'property': [], 'value': []}
  for k, v in node.properties.items():
    table['property'].append(k)
    table['value'].append(
        v.string_value if v.HasField('string_value') else v.int_value)
  for k, v in node.custom_properties.items():
    table['property'].append(k)
    table['value'].append(
        v.string_value if v.HasField('string_value') else v.int_value)
  return pd.DataFrame(data=table)

首先,查询 MD 存储以获取其所有存储的 ArtifactTypes 列表。

display_types(store.get_artifact_types())

接下来,查询所有 PushedModel 工件。

pushed_models = store.get_artifacts_by_type("PushedModel")
display_artifacts(store, pushed_models)

查询 MD 存储以获取最新的推送模型。本教程只有一个推送模型。

pushed_model = pushed_models[-1]
display_properties(store, pushed_model)

调试推送模型的第一步是查看哪个训练模型被推送,以及查看哪个训练数据用于训练该模型。

MLMD 提供遍历 API 以遍历谱系图,您可以使用它来分析模型谱系。

def get_one_hop_parent_artifacts(store, artifacts):
  # Get a list of artifacts within a 1-hop of the artifacts of interest
  artifact_ids = [artifact.id for artifact in artifacts]
  executions_ids = set(
      event.execution_id
      for event in store.get_events_by_artifact_ids(artifact_ids)
      if event.type == mlmd.proto.Event.OUTPUT)
  artifacts_ids = set(
      event.artifact_id
      for event in store.get_events_by_execution_ids(executions_ids)
      if event.type == mlmd.proto.Event.INPUT)
  return [artifact for artifact in store.get_artifacts_by_id(artifacts_ids)]

查询推送模型的父级工件。

parent_artifacts = get_one_hop_parent_artifacts(store, [pushed_model])
display_artifacts(store, parent_artifacts)

查询模型的属性。

exported_model = parent_artifacts[0]
display_properties(store, exported_model)

查询模型的上游工件。

model_parents = get_one_hop_parent_artifacts(store, [exported_model])
display_artifacts(store, model_parents)

获取模型训练使用的训练数据。

used_data = model_parents[0]
display_properties(store, used_data)

现在您已经获得了模型训练使用的训练数据,请再次查询数据库以查找训练步骤(执行)。查询 MD 存储以获取已注册的执行类型列表。

display_types(store.get_execution_types())

训练步骤是名为 tfx.components.trainer.component.TrainerExecutionType。遍历 MD 存储以获取与推送模型相对应的训练器运行。

def find_producer_execution(store, artifact):
  executions_ids = set(
      event.execution_id
      for event in store.get_events_by_artifact_ids([artifact.id])
      if event.type == mlmd.proto.Event.OUTPUT)
  return store.get_executions_by_id(executions_ids)[0]

trainer = find_producer_execution(store, exported_model)
display_properties(store, trainer)

总结

在本教程中,您了解了如何利用 MLMD 来跟踪 TFX 管道组件的谱系并解决问题。

要了解有关如何使用 MLMD 的更多信息,请查看以下其他资源