在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
假设您设置了一个生产 ML 管道来对企鹅进行分类。该管道会摄取您的训练数据,训练和评估模型,并将模型推送到生产环境中。
但是,当您稍后尝试使用包含不同类型企鹅的更大数据集使用此模型时,您会发现模型的行为与预期不符,并且开始错误地对物种进行分类。
此时,您想知道
- 当唯一可用的工件是生产环境中的模型时,调试模型的最有效方法是什么?
- 使用哪个训练数据集来训练模型?
- 哪个训练运行导致了这个错误的模型?
- 模型评估结果在哪里?
- 从哪里开始调试?
ML 元数据 (MLMD) 是一个库,它利用与 ML 模型关联的元数据来帮助您回答这些问题以及更多问题。一个有用的类比是将此元数据视为软件开发中的日志记录。MLMD 使您能够可靠地跟踪与 ML 管道各个组件关联的工件和血统。
在本教程中,您将设置一个 TFX 管道来创建模型,该模型根据企鹅的体重以及喙的长度和深度以及鳍的长度将企鹅分类为三种物种。然后,您将使用 MLMD 跟踪管道组件的血统。
Colab 中的 TFX 管道
Colab 是一种轻量级开发环境,它与生产环境有很大不同。在生产环境中,您可能在多个分布式系统中拥有各种管道组件,例如数据摄取、转换、模型训练、运行历史记录等。在本教程中,您应该知道,编排和元数据存储存在显著差异 - 它们都在 Colab 中本地处理。了解有关 Colab 中 TFX 的更多信息 此处。
设置
首先,我们将安装并导入必要的软件包,设置路径并下载数据。
升级 Pip
为了避免在本地运行时升级系统中的 Pip,请检查我们是否在 Colab 中运行。本地系统当然可以单独升级。
try:
import colab
!pip install --upgrade pip
except:
pass
安装并导入 TFX
pip install -q tfx
导入软件包
您是否重新启动了运行时?
如果您使用的是 Google Colab,则第一次运行上面的单元格时,必须通过单击上面的“重新启动运行时”按钮或使用“运行时 > 重新启动运行时...”菜单来重新启动运行时。这是因为 Colab 加载软件包的方式。
import os
import tempfile
import urllib
import pandas as pd
import tensorflow_model_analysis as tfma
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
2024-04-30 10:32:39.287985: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-04-30 10:32:39.288034: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-04-30 10:32:39.289482: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
检查 TFX 和 MLMD 版本。
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import ml_metadata as mlmd
print('MLMD version: {}'.format(mlmd.__version__))
TFX version: 1.15.0 MLMD version: 1.15.0
下载数据集
在本 Colab 中,我们使用 Palmer 企鹅数据集,该数据集可以在 Github 上找到。我们通过删除所有不完整的记录,并删除 island
和 sex
列,并将标签转换为 int32
来处理数据集。该数据集包含 334 条企鹅体重的记录,以及企鹅喙的长度和深度以及鳍的长度。您将使用这些数据将企鹅分类为三种物种之一。
DATA_PATH = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv'
_data_root = tempfile.mkdtemp(prefix='tfx-data')
_data_filepath = os.path.join(_data_root, "penguins_processed.csv")
urllib.request.urlretrieve(DATA_PATH, _data_filepath)
('/tmpfs/tmp/tfx-data4bx2jr3d/penguins_processed.csv', <http.client.HTTPMessage at 0x7f82e19047c0>)
创建 InteractiveContext
要在本笔记本中以交互方式运行 TFX 组件,请创建一个 InteractiveContext
。 InteractiveContext
使用带有短暂 MLMD 数据库实例的临时目录。请注意,对 InteractiveContext
的调用在 Colab 环境之外是无操作的。
通常,将类似的管道运行分组到 Context
中是一个好习惯。
interactive_context = InteractiveContext()
WARNING:absl:InteractiveContext pipeline_root argument not provided: using temporary directory /tmpfs/tmp/tfx-interactive-2024-04-30T10_32_43.981209-5usg33le as root for pipeline outputs. WARNING:absl:InteractiveContext metadata_connection_config not provided: using SQLite ML Metadata database at /tmpfs/tmp/tfx-interactive-2024-04-30T10_32_43.981209-5usg33le/metadata.sqlite.
构建 TFX 管道
TFX 管道由执行 ML 工作流程不同方面的多个组件组成。在本笔记本中,您将创建并运行 ExampleGen
、StatisticsGen
、SchemaGen
和 Trainer
组件,并使用 Evaluator
和 Pusher
组件来评估和推送训练后的模型。
有关 TFX 管道组件的更多信息,请参阅 组件教程。
实例化并运行 ExampleGen 组件
example_gen = tfx.components.CsvExampleGen(input_base=_data_root)
interactive_context.run(example_gen)
WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features. WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.
实例化并运行 StatisticsGen 组件
statistics_gen = tfx.components.StatisticsGen(
examples=example_gen.outputs['examples'])
interactive_context.run(statistics_gen)
实例化并运行 SchemaGen 组件
infer_schema = tfx.components.SchemaGen(
statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
interactive_context.run(infer_schema)
实例化并运行 Trainer 组件
# Define the module file for the Trainer component
trainer_module_file = 'penguin_trainer.py'
%%writefile {trainer_module_file}
# Define the training algorithm for the Trainer module file
import os
from typing import List, Text
import tensorflow as tf
from tensorflow import keras
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2
# Features used for classification - culmen length and depth, flipper length,
# body mass, and species.
_LABEL_KEY = 'species'
_FEATURE_KEYS = [
'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
def _input_fn(file_pattern: List[Text],
data_accessor: tfx.components.DataAccessor,
schema: schema_pb2.Schema, batch_size: int) -> tf.data.Dataset:
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=_LABEL_KEY), schema).repeat()
def _build_keras_model():
inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
d = keras.layers.concatenate(inputs)
d = keras.layers.Dense(8, activation='relu')(d)
d = keras.layers.Dense(8, activation='relu')(d)
outputs = keras.layers.Dense(3)(d)
model = keras.Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer=keras.optimizers.Adam(1e-2),
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[keras.metrics.SparseCategoricalAccuracy()])
return model
def run_fn(fn_args: tfx.components.FnArgs):
schema = schema_pb2.Schema()
tfx.utils.parse_pbtxt_file(fn_args.schema_path, schema)
train_dataset = _input_fn(
fn_args.train_files, fn_args.data_accessor, schema, batch_size=10)
eval_dataset = _input_fn(
fn_args.eval_files, fn_args.data_accessor, schema, batch_size=10)
model = _build_keras_model()
model.fit(
train_dataset,
epochs=int(fn_args.train_steps / 20),
steps_per_epoch=20,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py
运行 Trainer
组件。
trainer = tfx.components.Trainer(
module_file=os.path.abspath(trainer_module_file),
examples=example_gen.outputs['examples'],
schema=infer_schema.outputs['schema'],
train_args=tfx.proto.TrainArgs(num_steps=100),
eval_args=tfx.proto.EvalArgs(num_steps=50))
interactive_context.run(trainer)
running bdist_wheel running build running build_py creating build creating build/lib copying penguin_trainer.py -> build/lib installing to /tmpfs/tmp/tmp2bjhph4h running install running install_lib copying build/lib/penguin_trainer.py -> /tmpfs/tmp/tmp2bjhph4h running install_egg_info running egg_info creating tfx_user_code_Trainer.egg-info writing tfx_user_code_Trainer.egg-info/PKG-INFO writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt' reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt' writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt' Copying tfx_user_code_Trainer.egg-info to /tmpfs/tmp/tmp2bjhph4h/tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4-py3.9.egg-info running install_scripts creating /tmpfs/tmp/tmp2bjhph4h/tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4.dist-info/WHEEL creating '/tmpfs/tmp/tmp1r3ydm1_/tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4-py3-none-any.whl' and adding '/tmpfs/tmp/tmp2bjhph4h' to it adding 'penguin_trainer.py' adding 'tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4.dist-info/METADATA' adding 'tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4.dist-info/WHEEL' adding 'tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4.dist-info/top_level.txt' adding 'tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4.dist-info/RECORD' removing /tmpfs/tmp/tmp2bjhph4h /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/setuptools/_distutils/cmd.py:66: SetuptoolsDeprecationWarning: setup.py install is deprecated. !! ******************************************************************************** Please avoid running ``setup.py`` directly. Instead, use pypa/build, pypa/installer or other standards-based tools. See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details. ******************************************************************************** !! self.initialize_options() Processing /tmpfs/tmp/tfx-interactive-2024-04-30T10_32_43.981209-5usg33le/_wheels/tfx_user_code_Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4-py3-none-any.whl Installing collected packages: tfx-user-code-Trainer Successfully installed tfx-user-code-Trainer-0.0+fef7c4ed90dc336ca26daee59d65660cf8da5fa988b2ca0c89df2f558fda10f4 WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tfx_bsl/tfxio/tf_example_record.py:343: parse_example_dataset (from tensorflow.python.data.experimental.ops.parsing_ops) is deprecated and will be removed in a future version. Instructions for updating: Use `tf.data.Dataset.map(tf.io.parse_example(...))` instead. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tfx_bsl/tfxio/tf_example_record.py:343: parse_example_dataset (from tensorflow.python.data.experimental.ops.parsing_ops) is deprecated and will be removed in a future version. Instructions for updating: Use `tf.data.Dataset.map(tf.io.parse_example(...))` instead. Epoch 1/5 WARNING: All log messages before absl::InitializeLog() is called are written to STDERR I0000 00:00:1714473175.420733 172568 device_compiler.h:186] Compiled cluster using XLA! This line is logged at most once for the lifetime of the process. 20/20 [==============================] - 2s 17ms/step - loss: 0.9629 - sparse_categorical_accuracy: 0.7000 - val_loss: 0.8934 - val_sparse_categorical_accuracy: 0.7600 Epoch 2/5 20/20 [==============================] - 0s 9ms/step - loss: 0.7868 - sparse_categorical_accuracy: 0.7650 - val_loss: 0.7069 - val_sparse_categorical_accuracy: 0.7700 Epoch 3/5 20/20 [==============================] - 0s 9ms/step - loss: 0.5864 - sparse_categorical_accuracy: 0.8150 - val_loss: 0.5397 - val_sparse_categorical_accuracy: 0.7800 Epoch 4/5 20/20 [==============================] - 0s 10ms/step - loss: 0.4492 - sparse_categorical_accuracy: 0.8150 - val_loss: 0.4520 - val_sparse_categorical_accuracy: 0.7800 Epoch 5/5 20/20 [==============================] - 0s 9ms/step - loss: 0.4016 - sparse_categorical_accuracy: 0.7900 - val_loss: 0.3730 - val_sparse_categorical_accuracy: 0.8200 INFO:tensorflow:Assets written to: /tmpfs/tmp/tfx-interactive-2024-04-30T10_32_43.981209-5usg33le/Trainer/model/4/Format-Serving/assets INFO:tensorflow:Assets written to: /tmpfs/tmp/tfx-interactive-2024-04-30T10_32_43.981209-5usg33le/Trainer/model/4/Format-Serving/assets
评估并推送模型
使用 Evaluator
组件评估并“批准”模型,然后使用 Pusher
组件将模型推送到服务目录。
_serving_model_dir = os.path.join(tempfile.mkdtemp(),
'serving_model/penguins_classification')
eval_config = tfma.EvalConfig(
model_specs=[
tfma.ModelSpec(label_key='species', signature_name='serving_default')
],
metrics_specs=[
tfma.MetricsSpec(metrics=[
tfma.MetricConfig(
class_name='SparseCategoricalAccuracy',
threshold=tfma.MetricThreshold(
value_threshold=tfma.GenericValueThreshold(
lower_bound={'value': 0.6})))
])
],
slicing_specs=[tfma.SlicingSpec()])
evaluator = tfx.components.Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
schema=infer_schema.outputs['schema'],
eval_config=eval_config)
interactive_context.run(evaluator)
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow_model_analysis/writers/metrics_plots_and_validations_writer.py:112: tf_record_iterator (from tensorflow.python.lib.io.tf_record) is deprecated and will be removed in a future version. Instructions for updating: Use eager execution and: `tf.data.TFRecordDataset(path)` WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/tensorflow_model_analysis/writers/metrics_plots_and_validations_writer.py:112: tf_record_iterator (from tensorflow.python.lib.io.tf_record) is deprecated and will be removed in a future version. Instructions for updating: Use eager execution and: `tf.data.TFRecordDataset(path)`
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
model_blessing=evaluator.outputs['blessing'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=_serving_model_dir)))
interactive_context.run(pusher)
运行 TFX 管道会填充 MLMD 数据库。在下一节中,您将使用 MLMD API 查询此数据库以获取元数据信息。
查询 MLMD 数据库
MLMD 数据库存储三种类型的元数据
- 有关管道和与管道组件关联的谱系信息的元数据
- 有关在管道运行期间生成的工件的元数据
- 有关管道执行的元数据
典型的生产环境管道会随着新数据的到来提供多个模型。当您在已提供模型中遇到错误结果时,您可以查询 MLMD 数据库以隔离错误模型。然后,您可以跟踪与这些模型相对应的管道组件的谱系以调试您的模型
使用之前定义的 InteractiveContext
设置元数据 (MD) 存储以查询 MLMD 数据库。
connection_config = interactive_context.metadata_connection_config
store = mlmd.MetadataStore(connection_config)
# All TFX artifacts are stored in the base directory
base_dir = connection_config.sqlite.filename_uri.split('metadata.sqlite')[0]
创建一些辅助函数以查看 MD 存储中的数据。
def display_types(types):
# Helper function to render dataframes for the artifact and execution types
table = {'id': [], 'name': []}
for a_type in types:
table['id'].append(a_type.id)
table['name'].append(a_type.name)
return pd.DataFrame(data=table)
def display_artifacts(store, artifacts):
# Helper function to render dataframes for the input artifacts
table = {'artifact id': [], 'type': [], 'uri': []}
for a in artifacts:
table['artifact id'].append(a.id)
artifact_type = store.get_artifact_types_by_id([a.type_id])[0]
table['type'].append(artifact_type.name)
table['uri'].append(a.uri.replace(base_dir, './'))
return pd.DataFrame(data=table)
def display_properties(store, node):
# Helper function to render dataframes for artifact and execution properties
table = {'property': [], 'value': []}
for k, v in node.properties.items():
table['property'].append(k)
table['value'].append(
v.string_value if v.HasField('string_value') else v.int_value)
for k, v in node.custom_properties.items():
table['property'].append(k)
table['value'].append(
v.string_value if v.HasField('string_value') else v.int_value)
return pd.DataFrame(data=table)
首先,查询 MD 存储以获取其所有存储的 ArtifactTypes
列表。
display_types(store.get_artifact_types())
接下来,查询所有 PushedModel
工件。
pushed_models = store.get_artifacts_by_type("PushedModel")
display_artifacts(store, pushed_models)
查询 MD 存储以获取最新的推送模型。本教程只有一个推送模型。
pushed_model = pushed_models[-1]
display_properties(store, pushed_model)
调试推送模型的第一步是查看哪个训练模型被推送,以及查看哪个训练数据用于训练该模型。
MLMD 提供遍历 API 以遍历谱系图,您可以使用它来分析模型谱系。
def get_one_hop_parent_artifacts(store, artifacts):
# Get a list of artifacts within a 1-hop of the artifacts of interest
artifact_ids = [artifact.id for artifact in artifacts]
executions_ids = set(
event.execution_id
for event in store.get_events_by_artifact_ids(artifact_ids)
if event.type == mlmd.proto.Event.OUTPUT)
artifacts_ids = set(
event.artifact_id
for event in store.get_events_by_execution_ids(executions_ids)
if event.type == mlmd.proto.Event.INPUT)
return [artifact for artifact in store.get_artifacts_by_id(artifacts_ids)]
查询推送模型的父级工件。
parent_artifacts = get_one_hop_parent_artifacts(store, [pushed_model])
display_artifacts(store, parent_artifacts)
查询模型的属性。
exported_model = parent_artifacts[0]
display_properties(store, exported_model)
查询模型的上游工件。
model_parents = get_one_hop_parent_artifacts(store, [exported_model])
display_artifacts(store, model_parents)
获取模型训练使用的训练数据。
used_data = model_parents[0]
display_properties(store, used_data)
现在您已经获得了模型训练使用的训练数据,请再次查询数据库以查找训练步骤(执行)。查询 MD 存储以获取已注册的执行类型列表。
display_types(store.get_execution_types())
训练步骤是名为 tfx.components.trainer.component.Trainer
的 ExecutionType
。遍历 MD 存储以获取与推送模型相对应的训练器运行。
def find_producer_execution(store, artifact):
executions_ids = set(
event.execution_id
for event in store.get_events_by_artifact_ids([artifact.id])
if event.type == mlmd.proto.Event.OUTPUT)
return store.get_executions_by_id(executions_ids)[0]
trainer = find_producer_execution(store, exported_model)
display_properties(store, trainer)
总结
在本教程中,您了解了如何利用 MLMD 来跟踪 TFX 管道组件的谱系并解决问题。
要了解有关如何使用 MLMD 的更多信息,请查看以下其他资源