使用 TFX 和 Vertex Pipelines 进行 Vertex AI 训练和服务

此基于笔记本的教程将创建一个 TFX 管道并运行它,该管道使用 Vertex AI 训练服务训练 ML 模型,并将其发布到 Vertex AI 以供服务。

此笔记本基于我们在 Vertex Pipelines 简单 TFX 管道教程 中构建的 TFX 管道。如果您尚未阅读该教程,请在继续使用此笔记本之前阅读它。

您可以使用 AutoML 在 Vertex AI 上训练模型,也可以使用自定义训练。在自定义训练中,您可以选择许多不同的机器类型来为您的训练作业提供动力,启用分布式训练,使用超参数调整,并使用 GPU 加速。

您还可以通过将训练后的模型部署到 Vertex AI 模型并创建端点来提供预测请求。

在本教程中,我们将使用 Vertex AI 训练(使用自定义作业)在 TFX 管道中训练模型。我们还将使用 Vertex AI 部署模型以提供预测请求。

此笔记本旨在在 Google ColabAI Platform 笔记本 上运行。如果您没有使用其中之一,只需单击上面的“在 Google Colab 中运行”按钮即可。

设置

如果您已完成 Vertex Pipelines 简单 TFX 管道教程,您将拥有一个可用的 GCP 项目和一个 GCS 存储桶,这些对于本教程来说就足够了。如果您错过了该教程,请先阅读它。

安装 Python 包

我们将安装所需的 Python 包,包括 TFX 和 KFP,以编写 ML 管道并将作业提交到 Vertex Pipelines。

# Use the latest version of pip.
pip install --upgrade pip
pip install --upgrade "tfx[kfp]<2"

您是否重新启动了运行时?

如果您使用的是 Google Colab,第一次运行上面的单元格时,您必须通过单击上面的“重新启动运行时”按钮或使用“运行时 > 重新启动运行时...”菜单来重新启动运行时。这是因为 Colab 加载包的方式。

如果您不在 Colab 上,可以使用以下单元格重新启动运行时。

# docs_infra: no_execute
import sys
if not 'google.colab' in sys.modules:
  # Automatically restart kernel after installs
  import IPython
  app = IPython.Application.instance()
  app.kernel.do_shutdown(True)

登录 Google 以使用此笔记本

如果您在 Colab 上运行此笔记本,请使用您的用户帐户进行身份验证

import sys
if 'google.colab' in sys.modules:
  from google.colab import auth
  auth.authenticate_user()

如果您在 AI Platform 笔记本上,请在运行下一部分之前使用 Google Cloud 进行身份验证,方法是在终端窗口(您可以通过菜单中的文件 > 新建打开)中运行

gcloud auth login

在终端窗口中(您可以通过菜单中的文件 > 新建打开)。您只需要在每个笔记本实例上执行一次。

检查包版本。

import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))
2024-05-08 09:16:21.420852: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-05-08 09:16:21.420896: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-05-08 09:16:21.422493: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
TensorFlow version: 2.15.1
TFX version: 1.15.0
KFP version: 1.8.22

设置变量

我们将设置一些用于自定义以下管道的变量。需要以下信息

  • GCP 项目 ID。请参阅 确定您的项目 ID
  • 运行管道的 GCP 区域。有关 Vertex Pipelines 可用区域的更多信息,请参阅 Vertex AI 位置指南
  • 用于存储管道输出的 Google Cloud Storage 存储桶。

在运行以下单元格之前,在其中输入所需的值.

GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS
GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS
GCS_BUCKET_NAME = ''          # <--- ENTER THIS

if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):
    from absl import logging
    logging.error('Please set all required parameters.')
ERROR:absl:Please set all required parameters.

设置 gcloud 以使用您的项目。

gcloud config set project {GOOGLE_CLOUD_PROJECT}
ERROR: (gcloud.config.set) argument VALUE: Must be specified.
Usage: gcloud config set SECTION/PROPERTY VALUE [optional flags]
  optional flags may be  --help | --installation

For detailed information on this command and its flags, run:
  gcloud config set --help
PIPELINE_NAME = 'penguin-vertex-training'

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# Name of Vertex AI Endpoint.
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))
PIPELINE_ROOT: gs:///pipeline_root/penguin-vertex-training

准备示例数据

我们将使用与Palmer Penguins 数据集相同的简单 TFX 管道教程

此数据集中有四个数值特征,它们已被归一化为范围 [0,1]。我们将构建一个分类模型,用于预测企鹅的species

我们需要制作数据集的副本。由于 TFX ExampleGen 从目录读取输入,因此我们需要创建一个目录并将数据集复制到 GCS 上的该目录中。

gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/".

快速查看 CSV 文件。

gsutil cat {DATA_ROOT}/penguins_processed.csv | head
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///data/penguin-vertex-training/penguins_processed.csv".

创建管道

我们的管道将与我们在用于 Vertex Pipelines 的简单 TFX 管道教程中创建的管道非常相似。管道将包含三个组件:CsvExampleGen、Trainer 和 Pusher。但我们将使用特殊的 Trainer 和 Pusher 组件。Trainer 组件将把训练工作负载迁移到 Vertex AI,而 Pusher 组件将把训练后的 ML 模型发布到 Vertex AI,而不是文件系统。

TFX 提供了一个特殊的Trainer,用于将训练作业提交到 Vertex AI Training 服务。我们只需在扩展模块中使用Trainer,而不是标准的Trainer 组件,以及一些必需的 GCP 参数。

在本教程中,我们将首先仅使用 CPU 运行 Vertex AI Training 作业,然后使用 GPU。

TFX 还提供了一个特殊的Pusher,用于将模型上传到Vertex AI 模型Pusher 将创建Vertex AI 端点资源,以提供在线预测。请参阅Vertex AI 文档,详细了解 Vertex AI 提供的在线预测。

编写模型代码。

模型本身与简单 TFX 管道教程中的模型几乎相同。

我们将添加_get_distribution_strategy() 函数,该函数创建一个TensorFlow 分布式策略,并在run_fn 中使用它,以便在 GPU 可用时使用 MirroredStrategy。

_trainer_module_file = 'penguin_trainer.py'
%%writefile {_trainer_module_file}

# Copied from https://tensorflowcn.cn/tfx/tutorials/tfx/penguin_simple and
# slightly modified run_fn() to add distribution_strategy.

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
        for feature in _FEATURE_KEYS
    }, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://tensorflowcn.cn/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# NEW: Read `use_gpu` from the custom_config of the Trainer.
#      if it uses GPU, enable MirroredStrategy.
def _get_distribution_strategy(fn_args: tfx.components.FnArgs):
  if fn_args.custom_config.get('use_gpu', False):
    logging.info('Using MirroredStrategy with one GPU.')
    return tf.distribute.MirroredStrategy(devices=['device:GPU:0'])
  return None


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  # NEW: If we have a distribution strategy, build a model in a strategy scope.
  strategy = _get_distribution_strategy(fn_args)
  if strategy is None:
    model = _make_keras_model()
  else:
    with strategy.scope():
      model = _make_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
Writing penguin_trainer.py

将模块文件复制到 GCS,管道组件可以从该位置访问。

否则,您可能需要构建一个包含模块文件的容器映像,并使用该映像运行管道和 AI Platform Training 作业。

gsutil cp {_trainer_module_file} {MODULE_ROOT}/
InvalidUrlError: Cloud URL scheme should be followed by colon and two slashes: "://". Found: "gs:///pipeline_module/penguin-vertex-training/".

编写管道定义

我们将定义一个函数来创建 TFX 管道。它与简单 TFX 管道教程中的三个组件相同,但我们在 GCP 扩展模块中使用了TrainerPusher 组件。

tfx.extensions.google_cloud_ai_platform.Trainer 的行为类似于常规的Trainer,但它只是将模型训练的计算迁移到云端。它在 Vertex AI Training 服务中启动一个自定义作业,而编排系统中的训练器组件将等待 Vertex AI Training 作业完成。

tfx.extensions.google_cloud_ai_platform.Pusher 使用训练后的模型创建 Vertex AI 模型和 Vertex AI 端点。

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, use_gpu: bool) -> tfx.dsl.Pipeline:
  """Implements the penguin pipeline with TFX."""
  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)

  # NEW: Configuration for Vertex AI Training.
  # This dictionary will be passed as `CustomJobSpec`.
  vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-4',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
  }
  if use_gpu:
    # See https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype
    # for available machine types.
    vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })

  # Trains a model using Vertex AI Training.
  # NEW: We need to specify a Trainer for GCP with related configs.
  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5),
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
      })

  # NEW: Configuration for pusher.
  vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      # Remaining argument is passed to aiplatform.Model.deploy()
      # See https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api#deploy_the_model
      # for the detail.
      #
      # Machine type is the compute resource to serve prediction requests.
      # See https://cloud.google.com/vertex-ai/docs/predictions/configure-compute#machine-types
      # for available machine types and acccerators.
      'machine_type': 'n1-standard-4',
  }

  # Vertex AI provides pre-built containers with various configurations for
  # serving.
  # See https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers
  # for available container images.
  serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
  if use_gpu:
    vertex_serving_spec.update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'

  # NEW: Pushes the model to Vertex AI.
  pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components)

在 Vertex Pipelines 上运行管道。

我们将使用 Vertex Pipelines 运行管道,就像我们在用于 Vertex Pipelines 的简单 TFX 管道教程中所做的那样。

# docs_infra: no_execute
import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # We will use CPUs only for now.
        use_gpu=False))

可以使用 google-cloud-aiplatform 包中的 Google Cloud aiplatform 客户端提交生成的定义文件。

# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import logging
logging.getLogger().setLevel(logging.INFO)

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.submit()

现在,您可以访问上面输出中的链接,或访问 Google Cloud Console 中的“Vertex AI > Pipelines”,查看进度。

使用预测请求进行测试

管道完成后,您将在“Vertex AI > 端点”中的某个端点处找到一个已部署的模型。我们需要知道端点的 ID,才能向新端点发送预测请求。这与我们上面输入的端点名称不同。您可以在 Google Cloud Console 中的端点页面上找到该 ID,它看起来像一个很长的数字。

在运行它之前,请在下面设置 ENDPOINT_ID。

ENDPOINT_ID=''     # <--- ENTER THIS
if not ENDPOINT_ID:
    from absl import logging
    logging.error('Please set the endpoint id.')
ERROR:absl:Please set the endpoint id.

我们使用相同的 aiplatform 客户端向端点发送请求。我们将发送一个用于企鹅物种分类的预测请求。输入是我们使用的四个特征,模型将返回三个值,因为我们的模型为每个物种输出一个值。

例如,以下特定示例在索引“2”处具有最大值,并将打印“2”。

# docs_infra: no_execute
import numpy as np

# The AI Platform services require regional API endpoints.
client_options = {
    'api_endpoint': GOOGLE_CLOUD_REGION + '-aiplatform.googleapis.com'
    }
# Initialize client that will be used to create and send requests.
client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)

# Set data values for the prediction request.
# Our model expects 4 feature inputs and produces 3 output values for each
# species. Note that the output is logit value rather than probabilities.
# See the model code to understand input / output structure.
instances = [{
    'culmen_length_mm':[0.71],
    'culmen_depth_mm':[0.38],
    'flipper_length_mm':[0.98],
    'body_mass_g': [0.78],
}]

endpoint = client.endpoint_path(
    project=GOOGLE_CLOUD_PROJECT,
    location=GOOGLE_CLOUD_REGION,
    endpoint=ENDPOINT_ID,
)
# Send a prediction request and get response.
response = client.predict(endpoint=endpoint, instances=instances)

# Uses argmax to find the index of the maximum value.
print('species:', np.argmax(response.predictions[0]))

有关在线预测的详细信息,请访问 Google Cloud Console 中的端点页面。您可以在其中找到有关发送示例请求和更多资源链接的指南。

使用 GPU 运行管道

Vertex AI 支持使用各种机器类型进行训练,包括对 GPU 的支持。请参阅机器规格参考,了解可用的选项。

我们已经定义了管道以支持 GPU 训练。我们只需将use_gpu 标志设置为 True。然后,将创建一个包含一个 NVIDIA_TESLA_K80 的机器规格的管道,我们的模型训练代码将使用tf.distribute.MirroredStrategy

请注意,use_gpu 标志不是 Vertex 或 TFX API 的一部分。它仅用于在本教程中控制训练代码。

# docs_infra: no_execute
runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        # Updated: Use GPUs. We will use a NVIDIA_TESLA_K80 and 
        # the model code will use tf.distribute.MirroredStrategy.
        use_gpu=True))

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.submit()

现在,您可以访问上面输出中的链接,或访问 Google Cloud Console 中的“Vertex AI > Pipelines”,查看进度。

清理

在本教程中,您创建了 Vertex AI 模型和端点。请删除这些资源,以避免任何不必要的费用。方法是转到端点,首先从端点取消部署模型。然后,您可以分别删除端点和模型。