在本基于笔记本的教程中,我们将创建一个 TFX 管道,该管道将创建一个简单的分类模型,并分析其在多个运行中的性能。本笔记本基于我们在 简单 TFX Pipeline 教程 中构建的 TFX 管道。如果您尚未阅读该教程,请在继续本笔记本之前阅读它。
我们将向之前教程中创建的管道添加一个 Evaluator
组件。Evaluator 组件对您的模型执行深度分析,并将新模型与基线模型进行比较,以确定它们是否“足够好”。它使用 TensorFlow Model Analysis 库实现。
首先,我们需要安装 TFX Python 包并下载我们将用于模型的数据集。
升级 Pip
为了避免在本地运行时升级系统中的 Pip,请检查我们是否在 Colab 中运行。本地系统当然可以单独升级。
import colab
!pip install --upgrade pip
安装 TFX
pip install -U tfx
如果您使用的是 Google Colab,则第一次运行上面的单元格时,必须通过点击上面的“重新启动运行时”按钮或使用“运行时 > 重新启动运行时...”菜单来重新启动运行时。这是因为 Colab 加载包的方式。
检查 TensorFlow 和 TFX 版本。
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
TensorFlow version: 2.15.1 TFX version: 1.15.0
import os
PIPELINE_NAME = "penguin-tfma"
# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)
from absl import logging
logging.set_verbosity(logging.INFO) # Set default logging level.
我们将使用相同的数据集 Palmer Penguins 数据集。
此数据集中有四个数值特征,这些特征已标准化为范围 [0,1]。我们将构建一个分类模型,该模型预测企鹅的 species
由于 TFX ExampleGen 从目录中读取输入,因此我们需要创建一个目录并将数据集复制到其中。
import urllib.request
import tempfile
DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data') # Create a temporary directory.
_data_url = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv'
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_url, _data_filepath)
('/tmpfs/tmp/tfx-datakcma5ryu/data.csv', <http.client.HTTPMessage at 0x7fc5d80acb80>)
我们将向我们在 简单 TFX Pipeline 教程 中创建的管道添加一个 Evaluator
Evaluator 组件需要来自 ExampleGen
组件的输入数据,以及来自 Trainer
组件的模型,以及一个 tfma.EvalConfig
和 ModelBlessing
。ModelEvaluation 包含详细的评估结果,可以使用 TFMA 库对其进行进一步调查和可视化。ModelBlessing 包含一个布尔结果,指示模型是否通过了给定的标准,可以在后面的组件(如 Pusher)中用作信号。
我们将使用与 简单 TFX Pipeline 教程 中相同的模型代码。
_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}
# Copied from https://tensorflowcn.cn/tfx/tutorials/tfx/penguin_simple
from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils
from tfx.components.trainer.executor import TrainerFnArgs
from tfx.components.trainer.fn_args_utils import DataAccessor
from tfx_bsl.tfxio import dataset_options
from tensorflow_metadata.proto.v0 import schema_pb2
'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
_LABEL_KEY = 'species'
# Since we're not generating or creating a schema, we will instead create
# a feature spec. Since there are a fairly small number of features this is
# manageable for this dataset.
feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
for feature in _FEATURE_KEYS
_LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
def _input_fn(file_pattern: List[str],
data_accessor: DataAccessor,
schema: schema_pb2.Schema,
batch_size: int = 200) -> tf.data.Dataset:
"""Generates features and label for training.
file_pattern: List of paths or patterns of input tfrecord files.
data_accessor: DataAccessor for converting input to RecordBatch.
schema: schema of the input data.
batch_size: representing the number of consecutive elements of returned
dataset to combine in a single batch
A dataset that contains (features, indices) tuple where features is a
dictionary of Tensors, and indices is a single Tensor of label indices.
return data_accessor.tf_dataset_factory(
batch_size=batch_size, label_key=_LABEL_KEY),
def _build_keras_model() -> tf.keras.Model:
"""Creates a DNN Keras model for classifying penguin data.
A Keras Model.
# The model below is built with Functional API, please refer to
# https://tensorflowcn.cn/guide/keras/overview for all API options.
inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
d = keras.layers.concatenate(inputs)
for _ in range(2):
d = keras.layers.Dense(8, activation='relu')(d)
outputs = keras.layers.Dense(3)(d)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
# TFX Trainer will call this function.
def run_fn(fn_args: TrainerFnArgs):
"""Train the model based on given args.
fn_args: Holds args used to train the model as name/value pairs.
# This schema is usually either an output of SchemaGen or a manually-curated
# version provided by pipeline author. A schema can also derived from TFT
# graph if a Transform component is used. In the case when either is missing,
# `schema_from_feature_spec` could be used to generate schema from very simple
# feature_spec, but the schema returned would be very primitive.
schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
train_dataset = _input_fn(
eval_dataset = _input_fn(
model = _build_keras_model()
# The result of the training should be saved in `fn_args.serving_model_dir`
# directory.
model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py
我们将定义一个函数来创建一个 TFX 管道。除了上面提到的 Evaluator 组件之外,我们还将添加一个名为 Resolver
的节点。为了检查新模型是否比之前的模型更好,我们需要将其与之前发布的模型(称为基线模型)进行比较。 ML Metadata(MLMD) 会跟踪管道的所有先前工件,Resolver
可以使用名为 LatestBlessedModelStrategy
的策略类从 MLMD 中找到最新的已批准模型(通过评估器成功通过的模型)。
import tensorflow_model_analysis as tfma
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
module_file: str, serving_model_dir: str,
metadata_path: str) -> tfx.dsl.Pipeline:
"""Creates a three component penguin pipeline with TFX."""
# Brings data into the pipeline.
example_gen = tfx.components.CsvExampleGen(input_base=data_root)
# Uses user-provided Python function that trains a model.
trainer = tfx.components.Trainer(
# NEW: Get the latest blessed model for Evaluator.
model_resolver = tfx.dsl.Resolver(
# NEW: Uses TFMA to compute evaluation statistics over features of a model and
# perform quality validation of a candidate model (compared to a baseline).
eval_config = tfma.EvalConfig(
# An empty slice spec means the overall slice, i.e. the whole dataset.
# Calculate metrics for each penguin species.
lower_bound={'value': 0.6}),
# Change threshold will be ignored if there is no
# baseline model resolved from MLMD (first run).
absolute={'value': -1e-10}))
evaluator = tfx.components.Evaluator(
# Checks whether the model passed the validation steps and pushes the model
# to a file destination if check passed.
pusher = tfx.components.Pusher(
model_blessing=evaluator.outputs['blessing'], # Pass an evaluation result.
components = [
# Following two components were added to the pipeline.
return tfx.dsl.Pipeline(
我们需要通过 eval_config
- 要配置的其他指标(如果需要比模型中定义的指标更多指标)。
- 要配置的切片
- 模型验证阈值,用于验证是否包含验证
因为 SparseCategoricalAccuracy
已经包含在 model.compile()
调用中,它将自动包含在分析中。因此,我们在这里不添加任何额外的指标。 SparseCategoricalAccuracy
我们计算整个数据集和每个企鹅物种的指标。 SlicingSpec
新模型需要通过两个阈值,一个是 0.6 的绝对阈值,另一个是相对于基线模型更高的相对阈值。当您第一次运行管道时, change_threshold
将被忽略,只检查 value_threshold。如果您多次运行管道, Resolver
有关更多信息,请参阅 评估器组件指南。
我们将使用 LocalDagRunner
INFO:absl:Full user module path is 'penguin_trainer@pipelines/penguin-tfma/_wheels/tfx_user_code_Trainer-0.0+1e19049dced0ccb21e0af60dae1c6e0ef09b63d1ff0e370d7f699920c2735703-py3-none-any.whl' INFO:absl:Component CsvExampleGen is running. INFO:absl:Generating examples. INFO:absl:Processing input csv data /tmpfs/tmp/tfx-datakcma5ryu/* to TFExample. INFO:absl:Examples generated. INFO:absl:Execution 1 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/penguin-tfma/CsvExampleGen/examples/1" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1715159548,sum_checksum:1715159548" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 1 INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component latest_blessed_model_resolver is running. INFO:absl:[latest_blessed_model_resolver] Resolved inputs: ({'model_blessing': [], 'model': []},) INFO:absl:Component latest_blessed_model_resolver is finished. INFO:absl:Component Trainer is running. INFO:absl:Train on the 'train' split when train_args.splits is not set. INFO:absl:Evaluate on the 'eval' split when eval_args.splits is not set. INFO:absl:Installing 'pipelines/penguin-tfma/_wheels/tfx_user_code_Trainer-0.0+1e19049dced0ccb21e0af60dae1c6e0ef09b63d1ff0e370d7f699920c2735703-py3-none-any.whl' to a temporary directory. INFO:absl:Successfully installed 'pipelines/penguin-tfma/_wheels/tfx_user_code_Trainer-0.0+1e19049dced0ccb21e0af60dae1c6e0ef09b63d1ff0e370d7f699920c2735703-py3-none-any.whl'. INFO:absl:Training model. INFO:absl:Model: "model" INFO:absl:__________________________________________________________________________________________________ INFO:absl: Layer (type) Output Shape Param # Connected to INFO:absl:================================================================================================== INFO:absl:Total params: 139 (556.00 Byte) INFO:absl:Trainable params: 139 (556.00 Byte) INFO:absl:Non-trainable params: 0 (0.00 Byte) INFO:absl:__________________________________________________________________________________________________ 100/100 [==============================] - 2s 5ms/step - loss: 0.4620 - sparse_categorical_accuracy: 0.8455 - val_loss: 0.1694 - val_sparse_categorical_accuracy: 0.9400 INFO:absl:Training complete. Model written to pipelines/penguin-tfma/Trainer/model/3/Format-Serving. ModelRun written to pipelines/penguin-tfma/Trainer/model_run/3 INFO:absl:Execution 3 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model': [Artifact(artifact: uri: "pipelines/penguin-tfma/Trainer/model/3" , artifact_type: name: "Model" base_type: MODEL )], 'model_run': [Artifact(artifact: uri: "pipelines/penguin-tfma/Trainer/model_run/3" , artifact_type: name: "ModelRun" )]}) for execution 3 INFO:absl:Component Trainer is finished. INFO:absl:Component Evaluator is running. INFO:absl:Request was made to ignore the baseline ModelSpec and any change thresholds. This is likely because a baseline model was not provided INFO:absl:Using pipelines/penguin-tfma/Trainer/model/3/Format-Serving as model. INFO:absl:The 'example_splits' parameter is not set, using 'eval' split. INFO:absl:Evaluating model. INFO:absl:Evaluation complete. Results written to pipelines/penguin-tfma/Evaluator/evaluation/4. INFO:absl:Checking validation results. INFO:absl:Blessing result True written to pipelines/penguin-tfma/Evaluator/blessing/4. INFO:absl:Execution 4 succeeded. INFO:absl:Cleaning up stateless execution info. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'evaluation': [Artifact(artifact: uri: "pipelines/penguin-tfma/Evaluator/evaluation/4" , artifact_type: name: "ModelEvaluation" )], 'blessing': [Artifact(artifact: uri: "pipelines/penguin-tfma/Evaluator/blessing/4" , artifact_type: name: "ModelBlessing" )]}) for execution 4 INFO:absl:Component Evaluator is finished. INFO:absl:Component Pusher is running. INFO:absl:Model version: 1715159563 INFO:absl:Model written to serving path serving_model/penguin-tfma/1715159563. INFO:absl:Blessing result True written to pipelines/penguin-tfma/Evaluator/blessing/4.
或者您也可以手动检查生成的工件存储的输出目录。如果您使用文件浏览器访问 pipelines/penguin-tfma/Evaluator/blessing/
,您可以看到一个名为 BLESSED
如果祝福结果为 False
,Pusher 将拒绝将模型推送到 serving_model_dir
您可以再次运行管道,可能使用不同的评估配置。即使您使用完全相同的配置和数据集运行管道,训练后的模型也可能略有不同,因为模型训练固有的随机性会导致 NOT_BLESSED
您可以使用 TFMA 来调查和可视化模型评估工件中的评估结果。
您可以使用 MLMD API 以编程方式定位这些输出。首先,我们将定义一些实用程序函数来搜索刚刚生成的输出工件。
from ml_metadata.proto import metadata_store_pb2
# Non-public APIs, just for showcase.
from tfx.orchestration.portable.mlmd import execution_lib
# TODO(b/171447278): Move these functions into the TFX library.
def get_latest_artifacts(metadata, pipeline_name, component_id):
"""Output artifacts of the latest run of the component."""
context = metadata.store.get_context_by_type_and_name(
'node', f'{pipeline_name}.{component_id}')
executions = metadata.store.get_executions_by_context(context.id)
latest_execution = max(executions,
key=lambda e:e.last_update_time_since_epoch)
return execution_lib.get_output_artifacts(metadata, latest_execution.id)
我们可以找到 Evaluator
# Non-public APIs, just for showcase.
from tfx.orchestration.metadata import Metadata
from tfx.types import standard_component_specs
metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(
with Metadata(metadata_connection_config) as metadata_handler:
# Find output artifacts from MLMD.
evaluator_output = get_latest_artifacts(metadata_handler, PIPELINE_NAME,
eval_artifact = evaluator_output[standard_component_specs.EVALUATION_KEY][0]
INFO:absl:MetadataStore with DB connection initialized
始终返回一个评估工件,我们可以使用 TensorFlow 模型分析库对其进行可视化。例如,以下代码将呈现每个企鹅物种的准确性指标。
import tensorflow_model_analysis as tfma
eval_result = tfma.load_eval_result(eval_artifact.uri)
tfma.view.render_slicing_metrics(eval_result, slicing_column='species')
SlicingMetricsViewer(config={'weightedExamplesColumn': 'example_count'}, data=[{'slice': 'species:0', 'metrics…
如果您在 Show
下拉列表中选择 'sparse_categorical_accuracy',您可以看到每个物种的准确性值。您可能希望添加更多切片并检查您的模型是否适合所有分布,以及是否存在任何可能的偏差。
在 TensorFlow 模型分析库教程 中了解更多关于模型分析的信息。
您可以在 https://tensorflowcn.cn/tfx/tutorials 上找到更多资源
