本教程将介绍如何将 TensorFlow Recommenders 排序模型训练为 TFX 管道。
在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
在本基于笔记本的教程中,我们将创建一个 TFX 管道 并运行它,以训练一个排序模型,使用 TensorFlow Recommenders (TFRS) 预测电影评分。该管道将包含三个基本的 TFX 组件:ExampleGen、Trainer 和 Pusher。该管道包含最小的机器学习工作流程,例如导入数据、训练模型和导出经过训练的 TFRS 排序模型。
设置
首先,我们需要安装 TFX Python 包并下载我们将用于模型的数据集。
升级 Pip
为了避免在本地运行时升级系统中的 Pip,请检查我们是否在 Colab 中运行。本地系统当然可以单独升级。
import sys
if 'google.colab' in sys.modules:
!pip install --upgrade pip
安装 TFX
pip install -U tfx
pip install -U tensorflow-recommenders
您是否重新启动了运行时?
如果您使用的是 Google Colab,则第一次运行上面的单元格时,必须通过单击上面的“重新启动运行时”按钮或使用“运行时 > 重新启动运行时...”菜单来重新启动运行时。这是因为 Colab 加载包的方式。
在定义管道之前,我们需要为 Trainer 组件编写模型代码并将其保存在文件中。
检查 TensorFlow 和 TFX 版本。
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
2022-12-14 13:03:42.352068: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered 2022-12-14 13:03:43.224089: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory 2022-12-14 13:03:43.224183: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory 2022-12-14 13:03:43.224193: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly. TensorFlow version: 2.10.1 TFX version: 1.11.0
设置变量
有一些变量用于定义管道。您可以根据需要自定义这些变量。默认情况下,管道的所有输出都将在当前目录下生成。在本教程中,我们将创建一个硬编码的模式,而不是使用 SchemaGen 组件生成模式。
import os
PIPELINE_NAME = 'TFRS-ranking'
# Directory where MovieLens 100K rating data lives
DATA_ROOT = os.path.join('data', PIPELINE_NAME)
# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)
from absl import logging
logging.set_verbosity(logging.INFO) # Set default logging level.
准备示例数据
由于 TFX 目前不支持 TensorFlow Datasets API,我们将手动下载 MovieLens 100K 数据集,以便在我们的 TFX 管道中使用。我们使用的数据集是 MovieLens 100K 数据集。
此数据集中有四个数值特征
- userId
- movieId
- rating
- timestamp
我们将构建一个排序模型,该模型预测电影的 rating
。我们不会使用 timestamp
特征。
由于 TFX ExampleGen 从目录中读取输入,因此我们需要创建一个目录并将数据集复制到其中。
wget https://files.grouplens.org/datasets/movielens/ml-100k.zip
mkdir -p {DATA_ROOT}
unzip ml-100k.zip
echo 'userId,movieId,rating,timestamp' > {DATA_ROOT}/ratings.csv
sed 's/\t/,/g' ml-100k/u.data >> {DATA_ROOT}/ratings.csv
--2022-12-14 13:03:46-- https://files.grouplens.org/datasets/movielens/ml-100k.zip Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152 Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 4924029 (4.7M) [application/zip] Saving to: ‘ml-100k.zip’ ml-100k.zip 100%[===================>] 4.70M 31.0MB/s in 0.2s 2022-12-14 13:03:47 (31.0 MB/s) - ‘ml-100k.zip’ saved [4924029/4924029] Archive: ml-100k.zip creating: ml-100k/ inflating: ml-100k/allbut.pl inflating: ml-100k/mku.sh inflating: ml-100k/README inflating: ml-100k/u.data inflating: ml-100k/u.genre inflating: ml-100k/u.info inflating: ml-100k/u.item inflating: ml-100k/u.occupation inflating: ml-100k/u.user inflating: ml-100k/u1.base inflating: ml-100k/u1.test inflating: ml-100k/u2.base inflating: ml-100k/u2.test inflating: ml-100k/u3.base inflating: ml-100k/u3.test inflating: ml-100k/u4.base inflating: ml-100k/u4.test inflating: ml-100k/u5.base inflating: ml-100k/u5.test inflating: ml-100k/ua.base inflating: ml-100k/ua.test inflating: ml-100k/ub.base inflating: ml-100k/ub.test
快速查看 CSV 文件。
head {DATA_ROOT}/ratings.csv
userId,movieId,rating,timestamp 196,242,3,881250949 186,302,3,891717742 22,377,1,878887116 244,51,2,880606923 166,346,1,886397596 298,474,4,884182806 115,265,2,881171488 253,465,5,891628467 305,451,3,886324817
您应该能够看到四个值。例如,第一个示例表示用户“196”对电影“242”的评分为 3。
创建管道
TFX 管道使用 Python API 定义。我们将定义一个包含以下三个组件的管道。
- CsvExampleGen:读取数据文件并将它们转换为 TFX 内部格式以供进一步处理。有多个 ExampleGen 用于各种格式。在本教程中,我们将使用 CsvExampleGen,它接受 CSV 文件输入。
- Trainer:训练机器学习模型。 Trainer 组件 需要用户提供的模型定义代码。您可以使用 TensorFlow API 指定如何训练模型并将其保存在 _savedmodel 格式中。
- Pusher:将训练后的模型复制到 TFX 管道之外。 Pusher 组件 可以被认为是训练后的机器学习模型的部署过程。
在实际定义管道之前,我们需要先为 Trainer 组件编写模型代码。
编写模型训练代码
我们将构建一个简单的排序模型来预测电影评分。此模型训练代码将保存在单独的文件中。
在本教程中,我们将使用 TFX 的 通用训练器,它支持基于 Keras 的模型。您需要编写一个包含 run_fn
函数的 Python 文件,该函数是 Trainer
组件的入口点。
_trainer_module_file = 'tfrs_ranking_trainer.py'
我们使用的排序模型与 基本排序 教程中的模型几乎完全相同。唯一的区别是我们在候选塔中使用电影 ID 而不是电影标题。
%%writefile {_trainer_module_file}
from typing import Dict, Text
from typing import List
import numpy as np
import tensorflow as tf
from tensorflow_metadata.proto.v0 import schema_pb2
import tensorflow_recommenders as tfrs
from tensorflow_transform.tf_metadata import schema_utils
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
_FEATURE_KEYS = ['userId', 'movieId']
_LABEL_KEY = 'rating'
_FEATURE_SPEC = {
**{
feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
for feature in _FEATURE_KEYS
}, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}
class RankingModel(tf.keras.Model):
def __init__(self):
super().__init__()
embedding_dimension = 32
unique_user_ids = np.array(range(943)).astype(str)
unique_movie_ids = np.array(range(1682)).astype(str)
# Compute embeddings for users.
self.user_embeddings = tf.keras.Sequential([
tf.keras.layers.Input(shape=(1,), name='userId', dtype=tf.int64),
tf.keras.layers.Lambda(lambda x: tf.as_string(x)),
tf.keras.layers.StringLookup(
vocabulary=unique_user_ids, mask_token=None),
tf.keras.layers.Embedding(
len(unique_user_ids) + 1, embedding_dimension)
])
# Compute embeddings for movies.
self.movie_embeddings = tf.keras.Sequential([
tf.keras.layers.Input(shape=(1,), name='movieId', dtype=tf.int64),
tf.keras.layers.Lambda(lambda x: tf.as_string(x)),
tf.keras.layers.StringLookup(
vocabulary=unique_movie_ids, mask_token=None),
tf.keras.layers.Embedding(
len(unique_movie_ids) + 1, embedding_dimension)
])
# Compute predictions.
self.ratings = tf.keras.Sequential([
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(1)
])
def call(self, inputs):
user_id, movie_id = inputs
user_embedding = self.user_embeddings(user_id)
movie_embedding = self.movie_embeddings(movie_id)
return self.ratings(tf.concat([user_embedding, movie_embedding], axis=2))
class MovielensModel(tfrs.models.Model):
def __init__(self):
super().__init__()
self.ranking_model: tf.keras.Model = RankingModel()
self.task: tf.keras.layers.Layer = tfrs.tasks.Ranking(
loss=tf.keras.losses.MeanSquaredError(),
metrics=[tf.keras.metrics.RootMeanSquaredError()])
def call(self, features: Dict[str, tf.Tensor]) -> tf.Tensor:
return self.ranking_model((features['userId'], features['movieId']))
def compute_loss(self,
features: Dict[Text, tf.Tensor],
training=False) -> tf.Tensor:
labels = features[1]
rating_predictions = self(features[0])
# The task computes the loss and the metrics.
return self.task(labels=labels, predictions=rating_predictions)
def _input_fn(file_pattern: List[str],
data_accessor: tfx.components.DataAccessor,
schema: schema_pb2.Schema,
batch_size: int = 256) -> tf.data.Dataset:
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size, label_key=_LABEL_KEY),
schema=schema).repeat()
def _build_keras_model() -> tf.keras.Model:
return MovielensModel()
# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)
train_dataset = _input_fn(
fn_args.train_files, fn_args.data_accessor, schema, batch_size=8192)
eval_dataset = _input_fn(
fn_args.eval_files, fn_args.data_accessor, schema, batch_size=4096)
model = _build_keras_model()
model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1))
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
epochs = 3,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
model.save(fn_args.serving_model_dir)
Writing tfrs_ranking_trainer.py
现在您已完成构建 TFX 管道的所有准备步骤。
编写管道定义
我们定义一个函数来创建一个 TFX 管道。 Pipeline
对象表示一个 TFX 管道,可以使用 TFX 支持的管道编排系统之一运行它。
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
module_file: str, serving_model_dir: str,
metadata_path: str) -> tfx.dsl.Pipeline:
"""Creates a three component pipeline with TFX."""
# Brings data into the pipeline.
example_gen = tfx.components.CsvExampleGen(input_base=data_root)
# Uses user-provided Python function that trains a model.
trainer = tfx.components.Trainer(
module_file=module_file,
examples=example_gen.outputs['examples'],
train_args=tfx.proto.TrainArgs(num_steps=12),
eval_args=tfx.proto.EvalArgs(num_steps=24))
# Pushes the model to a filesystem destination.
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=serving_model_dir)))
# Following three components will be included in the pipeline.
components = [
example_gen,
trainer,
pusher,
]
return tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
metadata_connection_config=tfx.orchestration.metadata
.sqlite_metadata_connection_config(metadata_path),
components=components)
运行管道
TFX 支持多种编排器来运行管道。在本教程中,我们将使用 LocalDagRunner
,它包含在 TFX Python 包中,并在本地环境中运行管道。
现在我们创建一个 LocalDagRunner
并传递一个从我们已经定义的函数创建的 Pipeline
对象。
管道直接运行,您可以看到管道进度的日志,包括 ML 模型训练。
tfx.orchestration.LocalDagRunner().run(
_create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
data_root=DATA_ROOT,
module_file=_trainer_module_file,
serving_model_dir=SERVING_MODEL_DIR,
metadata_path=METADATA_PATH))
INFO:absl:Generating ephemeral wheel package for '/tmpfs/src/temp/docs/examples/tfrs_ranking_trainer.py' (including modules: ['tfrs_ranking_trainer']). INFO:absl:User module package has hash fingerprint version bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c. INFO:absl:Executing: ['/tmpfs/src/tf_docs_env/bin/python', '/tmpfs/tmp/tmpbks2_kfl/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmpfs/tmp/tmpbvugqpwn', '--dist-dir', '/tmpfs/tmp/tmpfqmceeau'] /tmpfs/src/tf_docs_env/lib/python3.9/site-packages/setuptools/command/install.py:34: SetuptoolsDeprecationWarning: setup.py install is deprecated. Use build and pip and other standards-based tools. warnings.warn( INFO:absl:Successfully built user code wheel distribution at 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'; target user module is 'tfrs_ranking_trainer'. INFO:absl:Full user module path is 'tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl' INFO:absl:Using deployment config: executor_specs { key: "CsvExampleGen" value { beam_executable_spec { python_executor_spec { class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor" } } } } executor_specs { key: "Pusher" value { python_class_executable_spec { class_path: "tfx.components.pusher.executor.Executor" } } } executor_specs { key: "Trainer" value { python_class_executable_spec { class_path: "tfx.components.trainer.executor.GenericExecutor" } } } custom_driver_specs { key: "CsvExampleGen" value { python_class_executable_spec { class_path: "tfx.components.example_gen.driver.FileBasedDriver" } } } metadata_connection_config { database_connection_config { sqlite { filename_uri: "metadata/TFRS-ranking/metadata.db" connection_mode: READWRITE_OPENCREATE } } } INFO:absl:Using connection config: sqlite { filename_uri: "metadata/TFRS-ranking/metadata.db" connection_mode: READWRITE_OPENCREATE } INFO:absl:Component CsvExampleGen is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } contexts { type { name: "node" } name { field_value { string_value: "TFRS-ranking.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "data/TFRS-ranking" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "Trainer" execution_options { caching_options { } } running bdist_wheel running build running build_py creating build creating build/lib copying tfrs_ranking_trainer.py -> build/lib installing to /tmpfs/tmp/tmpbvugqpwn running install running install_lib copying build/lib/tfrs_ranking_trainer.py -> /tmpfs/tmp/tmpbvugqpwn running install_egg_info running egg_info creating tfx_user_code_Trainer.egg-info writing tfx_user_code_Trainer.egg-info/PKG-INFO writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt' reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt' writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt' Copying tfx_user_code_Trainer.egg-info to /tmpfs/tmp/tmpbvugqpwn/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3.9.egg-info running install_scripts creating /tmpfs/tmp/tmpbvugqpwn/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/WHEEL creating '/tmpfs/tmp/tmpfqmceeau/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl' and adding '/tmpfs/tmp/tmpbvugqpwn' to it adding 'tfrs_ranking_trainer.py' adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/METADATA' adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/WHEEL' adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/top_level.txt' adding 'tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c.dist-info/RECORD' removing /tmpfs/tmp/tmpbvugqpwn INFO:absl:MetadataStore with DB connection initialized INFO:absl:[CsvExampleGen] Resolved inputs: ({},) INFO:absl:select span and version = (0, None) INFO:absl:latest span and version = (0, None) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 1 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}), exec_properties={'output_file_format': 5, 'input_config': '{\n "splits": [\n {\n "name": "single_split",\n "pattern": "*"\n }\n ]\n}', 'output_data_format': 6, 'output_config': '{\n "split_config": {\n "splits": [\n {\n "hash_buckets": 2,\n "name": "train"\n },\n {\n "hash_buckets": 1,\n "name": "eval"\n }\n ]\n }\n}', 'input_base': 'data/TFRS-ranking', 'span': 0, 'version': None, 'input_fingerprint': 'split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027'}, execution_output_uri='pipelines/TFRS-ranking/CsvExampleGen/.system/executor_execution/1/executor_output.pb', stateful_working_dir='pipelines/TFRS-ranking/CsvExampleGen/.system/stateful_working_dir/2022-12-14T13:03:48.504573', tmp_dir='pipelines/TFRS-ranking/CsvExampleGen/.system/executor_execution/1/.temp/', pipeline_node=node_info { type { name: "tfx.components.example_gen.csv_example_gen.component.CsvExampleGen" } id: "CsvExampleGen" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } contexts { type { name: "node" } name { field_value { string_value: "TFRS-ranking.CsvExampleGen" } } } } outputs { outputs { key: "examples" value { artifact_spec { type { name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET } } } } } parameters { parameters { key: "input_base" value { field_value { string_value: "data/TFRS-ranking" } } } parameters { key: "input_config" value { field_value { string_value: "{\n \"splits\": [\n {\n \"name\": \"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}" } } } parameters { key: "output_config" value { field_value { string_value: "{\n \"split_config\": {\n \"splits\": [\n {\n \"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n \"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n }\n}" } } } parameters { key: "output_data_format" value { field_value { int_value: 6 } } } parameters { key: "output_file_format" value { field_value { int_value: 5 } } } } downstream_nodes: "Trainer" execution_options { caching_options { } } , pipeline_info=id: "TFRS-ranking" , pipeline_run_id='2022-12-14T13:03:48.504573') INFO:absl:Generating examples. WARNING:apache_beam.runners.interactive.interactive_environment:Dependencies required for Interactive Beam PCollection visualization are not available, please use: `pip install apache-beam[interactive]` to install necessary dependencies to enable all data visualization features. INFO:absl:Processing input csv data data/TFRS-ranking/* to TFExample. WARNING:apache_beam.io.tfrecordio:Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be. INFO:absl:Examples generated. INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 1 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1" custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027" } } custom_properties { key: "span" value { int_value: 0 } } , artifact_type: name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}) for execution 1 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component CsvExampleGen is finished. INFO:absl:Component Trainer is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.trainer.component.Trainer" base_type: TRAIN } id: "Trainer" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } contexts { type { name: "node" } name { field_value { string_value: "TFRS-ranking.Trainer" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } context_queries { type { name: "node" } name { field_value { string_value: "TFRS-ranking.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "model" value { artifact_spec { type { name: "Model" base_type: MODEL } } } } outputs { key: "model_run" value { artifact_spec { type { name: "ModelRun" } } } } } parameters { parameters { key: "custom_config" value { field_value { string_value: "null" } } } parameters { key: "eval_args" value { field_value { string_value: "{\n \"num_steps\": 24\n}" } } } parameters { key: "module_path" value { field_value { string_value: "tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl" } } } parameters { key: "train_args" value { field_value { string_value: "{\n \"num_steps\": 12\n}" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "Pusher" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized WARNING:absl:ArtifactQuery.property_predicate is not supported. INFO:absl:[Trainer] Resolved inputs: ({'examples': [Artifact(artifact: id: 1 type_id: 15 uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "state" value { string_value: "published" } } custom_properties { key: "tfx_version" value { string_value: "1.11.0" } } state: LIVE create_time_since_epoch: 1671023043350 last_update_time_since_epoch: 1671023043350 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]},) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 2 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=2, input_dict={'examples': [Artifact(artifact: id: 1 type_id: 15 uri: "pipelines/TFRS-ranking/CsvExampleGen/examples/1" properties { key: "split_names" value { string_value: "[\"train\", \"eval\"]" } } custom_properties { key: "file_format" value { string_value: "tfrecords_gzip" } } custom_properties { key: "input_fingerprint" value { string_value: "split:single_split,num_files:1,total_bytes:1979205,xor_checksum:1671023027,sum_checksum:1671023027" } } custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "payload_format" value { string_value: "FORMAT_TF_EXAMPLE" } } custom_properties { key: "span" value { int_value: 0 } } custom_properties { key: "state" value { string_value: "published" } } custom_properties { key: "tfx_version" value { string_value: "1.11.0" } } state: LIVE create_time_since_epoch: 1671023043350 last_update_time_since_epoch: 1671023043350 , artifact_type: id: 15 name: "Examples" properties { key: "span" value: INT } properties { key: "split_names" value: STRING } properties { key: "version" value: INT } base_type: DATASET )]}, output_dict=defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model_run/2" , artifact_type: name: "ModelRun" )], 'model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model/2" , artifact_type: name: "Model" base_type: MODEL )]}), exec_properties={'train_args': '{\n "num_steps": 12\n}', 'eval_args': '{\n "num_steps": 24\n}', 'custom_config': 'null', 'module_path': 'tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'}, execution_output_uri='pipelines/TFRS-ranking/Trainer/.system/executor_execution/2/executor_output.pb', stateful_working_dir='pipelines/TFRS-ranking/Trainer/.system/stateful_working_dir/2022-12-14T13:03:48.504573', tmp_dir='pipelines/TFRS-ranking/Trainer/.system/executor_execution/2/.temp/', pipeline_node=node_info { type { name: "tfx.components.trainer.component.Trainer" base_type: TRAIN } id: "Trainer" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } contexts { type { name: "node" } name { field_value { string_value: "TFRS-ranking.Trainer" } } } } inputs { inputs { key: "examples" value { channels { producer_node_query { id: "CsvExampleGen" } context_queries { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } context_queries { type { name: "node" } name { field_value { string_value: "TFRS-ranking.CsvExampleGen" } } } artifact_query { type { name: "Examples" base_type: DATASET } } output_key: "examples" } min_count: 1 } } } outputs { outputs { key: "model" value { artifact_spec { type { name: "Model" base_type: MODEL } } } } outputs { key: "model_run" value { artifact_spec { type { name: "ModelRun" } } } } } parameters { parameters { key: "custom_config" value { field_value { string_value: "null" } } } parameters { key: "eval_args" value { field_value { string_value: "{\n \"num_steps\": 24\n}" } } } parameters { key: "module_path" value { field_value { string_value: "tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl" } } } parameters { key: "train_args" value { field_value { string_value: "{\n \"num_steps\": 12\n}" } } } } upstream_nodes: "CsvExampleGen" downstream_nodes: "Pusher" execution_options { caching_options { } } , pipeline_info=id: "TFRS-ranking" , pipeline_run_id='2022-12-14T13:03:48.504573') INFO:absl:Train on the 'train' split when train_args.splits is not set. INFO:absl:Evaluate on the 'eval' split when eval_args.splits is not set. INFO:absl:udf_utils.get_fn {'train_args': '{\n "num_steps": 12\n}', 'eval_args': '{\n "num_steps": 24\n}', 'custom_config': 'null', 'module_path': 'tfrs_ranking_trainer@pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'} 'run_fn' INFO:absl:Installing 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl' to a temporary directory. INFO:absl:Executing: ['/tmpfs/src/tf_docs_env/bin/python', '-m', 'pip', 'install', '--target', '/tmpfs/tmp/tmpkwsj4cmv', 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'] Processing ./pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl INFO:absl:Successfully installed 'pipelines/TFRS-ranking/_wheels/tfx_user_code_Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c-py3-none-any.whl'. INFO:absl:Training model. INFO:absl:Feature movieId has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature rating has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature userId has a shape dim { size: 1 } . Setting to DenseTensor. Installing collected packages: tfx-user-code-Trainer Successfully installed tfx-user-code-Trainer-0.0+bc2b229701436196f12efdbb4e880e3fa05103599208d98a05325f064775f21c INFO:absl:Feature movieId has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature rating has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature userId has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature movieId has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature rating has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature userId has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature movieId has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature rating has a shape dim { size: 1 } . Setting to DenseTensor. INFO:absl:Feature userId has a shape dim { size: 1 } . Setting to DenseTensor. Epoch 1/3 12/12 [==============================] - 4s 247ms/step - root_mean_squared_error: 1.9485 - loss: 3.6361 - regularization_loss: 0.0000e+00 - total_loss: 3.6361 - val_root_mean_squared_error: 1.2136 - val_loss: 1.4467 - val_regularization_loss: 0.0000e+00 - val_total_loss: 1.4467 Epoch 2/3 12/12 [==============================] - 2s 198ms/step - root_mean_squared_error: 1.1657 - loss: 1.3525 - regularization_loss: 0.0000e+00 - total_loss: 1.3525 - val_root_mean_squared_error: 1.1173 - val_loss: 1.2615 - val_regularization_loss: 0.0000e+00 - val_total_loss: 1.2615 Epoch 3/3 12/12 [==============================] - 2s 212ms/step - root_mean_squared_error: 1.1121 - loss: 1.2330 - regularization_loss: 0.0000e+00 - total_loss: 1.2330 - val_root_mean_squared_error: 1.0983 - val_loss: 1.2303 - val_regularization_loss: 0.0000e+00 - val_total_loss: 1.2303 WARNING:absl:Function `_wrapped_model` contains input name(s) movieId, userId with unsupported characters which will be renamed to movieid, userid in the SavedModel. WARNING:absl:Found untraced functions such as ranking_layer_call_fn, ranking_layer_call_and_return_conditional_losses while saving (showing 2 of 2). These functions will not be directly callable after loading. INFO:tensorflow:Assets written to: pipelines/TFRS-ranking/Trainer/model/2/Format-Serving/assets INFO:tensorflow:Assets written to: pipelines/TFRS-ranking/Trainer/model/2/Format-Serving/assets INFO:absl:Training complete. Model written to pipelines/TFRS-ranking/Trainer/model/2/Format-Serving. ModelRun written to pipelines/TFRS-ranking/Trainer/model_run/2 INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 2 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model_run/2" , artifact_type: name: "ModelRun" )], 'model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Trainer/model/2" , artifact_type: name: "Model" base_type: MODEL )]}) for execution 2 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component Trainer is finished. INFO:absl:Component Pusher is running. INFO:absl:Running launcher for node_info { type { name: "tfx.components.pusher.component.Pusher" base_type: DEPLOY } id: "Pusher" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } contexts { type { name: "node" } name { field_value { string_value: "TFRS-ranking.Pusher" } } } } inputs { inputs { key: "model" value { channels { producer_node_query { id: "Trainer" } context_queries { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } context_queries { type { name: "node" } name { field_value { string_value: "TFRS-ranking.Trainer" } } } artifact_query { type { name: "Model" base_type: MODEL } } output_key: "model" } } } } outputs { outputs { key: "pushed_model" value { artifact_spec { type { name: "PushedModel" base_type: MODEL } } } } } parameters { parameters { key: "custom_config" value { field_value { string_value: "null" } } } parameters { key: "push_destination" value { field_value { string_value: "{\n \"filesystem\": {\n \"base_directory\": \"serving_model/TFRS-ranking\"\n }\n}" } } } } upstream_nodes: "Trainer" execution_options { caching_options { } } INFO:absl:MetadataStore with DB connection initialized WARNING:absl:ArtifactQuery.property_predicate is not supported. INFO:absl:[Pusher] Resolved inputs: ({'model': [Artifact(artifact: id: 3 type_id: 18 uri: "pipelines/TFRS-ranking/Trainer/model/2" custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "state" value { string_value: "published" } } custom_properties { key: "tfx_version" value { string_value: "1.11.0" } } state: LIVE create_time_since_epoch: 1671023059840 last_update_time_since_epoch: 1671023059840 , artifact_type: id: 18 name: "Model" base_type: MODEL )]},) INFO:absl:MetadataStore with DB connection initialized INFO:absl:Going to run a new execution 3 INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=3, input_dict={'model': [Artifact(artifact: id: 3 type_id: 18 uri: "pipelines/TFRS-ranking/Trainer/model/2" custom_properties { key: "is_external" value { int_value: 0 } } custom_properties { key: "state" value { string_value: "published" } } custom_properties { key: "tfx_version" value { string_value: "1.11.0" } } state: LIVE create_time_since_epoch: 1671023059840 last_update_time_since_epoch: 1671023059840 , artifact_type: id: 18 name: "Model" base_type: MODEL )]}, output_dict=defaultdict(<class 'list'>, {'pushed_model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Pusher/pushed_model/3" , artifact_type: name: "PushedModel" base_type: MODEL )]}), exec_properties={'push_destination': '{\n "filesystem": {\n "base_directory": "serving_model/TFRS-ranking"\n }\n}', 'custom_config': 'null'}, execution_output_uri='pipelines/TFRS-ranking/Pusher/.system/executor_execution/3/executor_output.pb', stateful_working_dir='pipelines/TFRS-ranking/Pusher/.system/stateful_working_dir/2022-12-14T13:03:48.504573', tmp_dir='pipelines/TFRS-ranking/Pusher/.system/executor_execution/3/.temp/', pipeline_node=node_info { type { name: "tfx.components.pusher.component.Pusher" base_type: DEPLOY } id: "Pusher" } contexts { contexts { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } contexts { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } contexts { type { name: "node" } name { field_value { string_value: "TFRS-ranking.Pusher" } } } } inputs { inputs { key: "model" value { channels { producer_node_query { id: "Trainer" } context_queries { type { name: "pipeline" } name { field_value { string_value: "TFRS-ranking" } } } context_queries { type { name: "pipeline_run" } name { field_value { string_value: "2022-12-14T13:03:48.504573" } } } context_queries { type { name: "node" } name { field_value { string_value: "TFRS-ranking.Trainer" } } } artifact_query { type { name: "Model" base_type: MODEL } } output_key: "model" } } } } outputs { outputs { key: "pushed_model" value { artifact_spec { type { name: "PushedModel" base_type: MODEL } } } } } parameters { parameters { key: "custom_config" value { field_value { string_value: "null" } } } parameters { key: "push_destination" value { field_value { string_value: "{\n \"filesystem\": {\n \"base_directory\": \"serving_model/TFRS-ranking\"\n }\n}" } } } } upstream_nodes: "Trainer" execution_options { caching_options { } } , pipeline_info=id: "TFRS-ranking" , pipeline_run_id='2022-12-14T13:03:48.504573') WARNING:absl:Pusher is going to push the model without validation. Consider using Evaluator or InfraValidator in your pipeline. INFO:absl:Model version: 1671023059 INFO:absl:Model written to serving path serving_model/TFRS-ranking/1671023059. INFO:absl:Model pushed to pipelines/TFRS-ranking/Pusher/pushed_model/3. INFO:absl:Cleaning up stateless execution info. INFO:absl:Execution 3 succeeded. INFO:absl:Cleaning up stateful execution info. INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'pushed_model': [Artifact(artifact: uri: "pipelines/TFRS-ranking/Pusher/pushed_model/3" , artifact_type: name: "PushedModel" base_type: MODEL )]}) for execution 3 INFO:absl:MetadataStore with DB connection initialized INFO:absl:Component Pusher is finished.
如果管道成功完成,您应该在日志末尾看到“INFO:absl:Component Pusher is finished.”。因为 Pusher
组件是管道的最后一个组件。
Pusher 组件将训练后的模型推送到 SERVING_MODEL_DIR
,如果您没有更改之前步骤中的变量,则为 serving_model/TFRS-ranking
目录。您可以从 Colab 左侧面板的文件浏览器中查看结果,或者使用以下命令
# List files in created model directory.
ls -R {SERVING_MODEL_DIR}
serving_model/TFRS-ranking: 1671023059 serving_model/TFRS-ranking/1671023059: assets keras_metadata.pb saved_model.pb variables serving_model/TFRS-ranking/1671023059/assets: serving_model/TFRS-ranking/1671023059/variables: variables.data-00000-of-00001 variables.index
现在我们可以通过计算用户和电影的预测来测试排名模型
import glob
# Load the latest model for testing
loaded = tf.saved_model.load(max(glob.glob(os.path.join(SERVING_MODEL_DIR, '*/')), key=os.path.getmtime))
print(loaded({'userId': [[42]], 'movieId': [[15]]}).numpy())
[[[3.6946948]]]
本 TensorFlow Recommenders + TFX 教程到此结束。