TensorFlow 排名管道简介

TL;DR:使用 TensorFlow 排名管道减少构建、训练和服务 TensorFlow 排名模型的样板代码;根据用例和资源,为大规模排名应用程序使用适当的分布式策略。

简介

TensorFlow 排名管道包含一系列数据处理、模型构建、训练和服务流程,让您可以使用数据日志构建、训练和服务可扩展的神经网络排名模型,且只需付出最小的努力。当系统扩展时,管道效率最高。一般而言,如果您的模型在单台机器上运行需要 10 分钟或更长时间,请考虑使用此管道框架来分配负载并加快处理速度。

TensorFlow 排名管道已在分布式系统(1K+ CPU 和 100+ GPU 和 TPU)上使用大数据(TB 级)和大模型(1 亿次浮点运算)进行大规模实验和生产,且运行稳定。一旦使用 model.fit 在小部分数据上证明了 TensorFlow 模型,建议使用管道进行超参数扫描、持续训练和其他大规模情况。

排名管道

在 TensorFlow 中,构建、训练和服务排名模型的典型管道包括以下典型步骤。

  • 定义模型结构
    • 创建输入;
    • 创建预处理层;
    • 创建神经网络架构;
  • 训练模型
    • 从数据日志中生成训练和验证数据集;
    • 使用适当的超参数准备模型
      • 优化器;
      • 排名损失;
      • 排名指标;
    • 配置分布式策略以在多个设备上进行训练。
    • 配置回调以进行各种簿记。
    • 导出模型以供服务;
  • 提供模型
    • 确定服务时的 data 格式;
    • 选择并加载训练好的模型;
    • 使用加载的模型进行处理。

TensorFlow 排名管道的主要目标之一是减少步骤中的样板代码,例如数据集加载和预处理、列表数据和逐点评分函数的兼容性以及模型导出。另一个重要目标是强制执行许多本质上相关的进程的一致设计,例如,模型输入必须与训练数据集和服务时的 data 格式兼容。

使用指南

通过上述所有设计,启动 TF 排名模型将分为以下步骤,如图 1 所示。

Diagram of TensorFlow Ranking Pipeline
图 1:TensorFlow 排名类的图表以及使用 TF 排名管道训练排名模型的步骤。绿色模块可以针对您的排名模型进行定制。

使用分布式神经网络的示例

在此示例中,您将利用内置的 tfr.keras.model.FeatureSpecInputCreatortfr.keras.pipeline.SimpleDatasetBuildertfr.keras.pipeline.SimplePipeline,这些模块采用 feature_spec 来一致地定义模型输入和数据集服务器中的输入特征。可以在分布式排名教程中找到逐步演练的笔记本版本。

首先为上下文和示例特征定义 feature_spec

context_feature_spec = {}
example_feature_spec = {
    'custom_features_{}'.format(i + 1):
    tf.io.FixedLenFeature(shape=(1,), dtype=tf.float32, default_value=0.0)
    for i in range(10)
}
label_spec = ('utility', tf.io.FixedLenFeature(
    shape=(1,), dtype=tf.float32, default_value=-1))

按照图 1 中说明的步骤进行操作

根据 feature_spec 定义 input_creator

input_creator = tfr.keras.model.FeatureSpecInputCreator(
    context_feature_spec, example_feature_spec)

然后为同一组输入特征定义预处理特征转换。

def log1p(tensor):
    return tf.math.log1p(tensor * tf.sign(tensor)) * tf.sign(tensor)
preprocessor = {
    'custom_features_{}'.format(i + 1): log1p
    for i in range(10)
}

使用内置前馈 DNN 模型定义评分器。

dnn_scorer = tfr.keras.model.DNNScorer(
    hidden_layer_dims=[1024, 512, 256],
    output_units=1,
    activation=tf.nn.relu,
    use_batch_norm=True,
    batch_norm_moment=0.99,
    dropout=0.4)

使用 input_creatorpreprocessorscorer 创建 model_builder

model_builder = tfr.keras.model.ModelBuilder(
    input_creator=input_creator,
    preprocessor=preprocessor,
    scorer=dnn_scorer,
    mask_feature_name='__list_mask__',
    name='web30k_dnn_model')

现在为 dataset_builder 设置超参数。

dataset_hparams = tfr.keras.pipeline.DatasetHparams(
    train_input_pattern='/path/to/MSLR-WEB30K-ELWC/train-*',
    valid_input_pattern='/path/to/MSLR-WEB30K-ELWC/vali-*',
    train_batch_size=128,
    valid_batch_size=128,
    list_size=200,
    dataset_reader=tf.data.RecordIODataset,
    convert_labels_to_binary=False)

创建 dataset_builder

tfr.keras.pipeline.SimpleDatasetBuilder(
    context_feature_spec=context_feature_spec,
    example_feature_spec=example_feature_spec,
    mask_feature_name='__list_mask__',
    label_spec=label_spec,
    hparams=dataset_hparams)

同时为管道设置超参数。

pipeline_hparams = tfr.keras.pipeline.PipelineHparams(
    model_dir='/tmp/web30k_dnn_model',
    num_epochs=100,
    num_train_steps=100000,
    num_valid_steps=100,
    loss='softmax_loss',
    loss_reduction=tf.losses.Reduction.AUTO,
    optimizer='adam',
    learning_rate=0.0001,
    steps_per_execution=100,
    export_best_model=True,
    strategy='MirroredStrategy',
    tpu=None)

创建 ranking_pipeline 并训练。

ranking_pipeline = tfr.keras.pipeline.SimplePipeline(
    model_builder=model_builder,
    dataset_builder=dataset_builder,
    hparams=pipeline_hparams,
)
ranking_pipeline.train_and_validate()

TensorFlow 排名管道设计

TensorFlow 排名管道通过样板代码节省工程时间,同时通过覆盖和子类化允许灵活定制。为了实现此目的,管道引入了可定制类 tfr.keras.model.AbstractModelBuildertfr.keras.pipeline.AbstractDatasetBuildertfr.keras.pipeline.AbstractPipeline 来设置 TensorFlow 排名管道。

Design of TensorFlow Ranking Pipeline classes
图 2:TensorFlow 排名管道类的整体设计。

ModelBuilder

与构建 Keras 模型相关的样板代码集成在 AbstractModelBuilder 中,该代码传递给 AbstractPipeline,并在管道内调用以在策略范围内构建模型。这在图 1 中有所展示。类方法在抽象基类中定义。

class AbstractModelBuilder:
  def __init__(self, mask_feature_name, name):

  @abstractmethod
  def create_inputs(self):
    // To create tf.keras.Input. Abstract method, to be overridden.
    ...
  @abstractmethod
  def preprocess(self, context_inputs, example_inputs, mask):
    // To preprocess input features. Abstract method, to be overridden.
    ...
  @abstractmethod
  def score(self, context_features, example_features, mask):
    // To score based on preprocessed features. Abstract method, to be overridden.
    ...
  def build(self):
    context_inputs, example_inputs, mask = self.create_inputs()
    context_features, example_features = self.preprocess(
        context_inputs, example_inputs, mask)
    logits = self.score(context_features, example_features, mask)
    return tf.keras.Model(inputs=..., outputs=logits, name=self._name)

您可以直接对 AbstractModelBuilder 进行子类化,并用具体方法覆盖以进行定制,例如

class MyModelBuilder(AbstractModelBuilder):
  def create_inputs(self, ...):
  ...

同时,您应该在类初始化中将 ModelBuilder 与输入特征、预处理转换和指定为函数输入 input_creatorpreprocessorscorer 的评分函数一起使用,而不是进行子类化。

class ModelBuilder(AbstractModelBuilder):
  def __init__(self, input_creator, preprocessor, scorer, mask_feature_name, name):
  ...

为了减少创建这些输入的样板代码,函数类 tfr.keras.model.InputCreator 适用于 input_creatortfr.keras.model.Preprocessor 适用于 preprocessortfr.keras.model.Scorer 适用于 scorer,以及具体子类 tfr.keras.model.FeatureSpecInputCreatortfr.keras.model.TypeSpecInputCreatortfr.keras.model.PreprocessorWithSpectfr.keras.model.UnivariateScorertfr.keras.model.DNNScorertfr.keras.model.GAMScorer。这些类应涵盖大多数常见用例。

请注意,这些函数类是 Keras 类,因此无需序列化。建议通过子类化来对其进行自定义。

DatasetBuilder

DatasetBuilder 类收集与数据集相关的样板代码。数据会传递给 Pipeline,并调用以提供训练和验证数据集,并为已保存模型定义服务签名。如图 1 所示,DatasetBuilder 方法在 tfr.keras.pipeline.AbstractDatasetBuilder 基类中定义,

class AbstractDatasetBuilder:

  @abstractmethod
  def build_train_dataset(self, *arg, **kwargs):
    // To return the training dataset.
    ...
  @abstractmethod
  def build_valid_dataset(self, *arg, **kwargs):
    // To return the validation dataset.
    ...
  @abstractmethod
  def build_signatures(self, *arg, **kwargs):
    // To build the signatures to export saved model.
    ...

在具体的 DatasetBuilder 类中,您必须实现 build_train_datasetsbuild_valid_datasetsbuild_signatures

还提供了一个从 feature_spec 创建数据集的具体类

class BaseDatasetBuilder(AbstractDatasetBuilder):

  def __init__(self, context_feature_spec, example_feature_spec,
               training_only_example_spec,
               mask_feature_name, hparams,
               training_only_context_spec=None):
    // Specify label and weight specs in training_only_example_spec.
    ...
  def _features_and_labels(self, features):
    // To split the labels and weights from input features.
    ...

  def _build_dataset(self, ...):
    return tfr.data.build_ranking_dataset(
        context_feature_spec+training_only_context_spec,
        example_feature_spec+training_only_example_spec, mask_feature_name, ...)

  def build_train_dataset(self):
    return self._build_dataset(...)

  def build_valid_dataset(self):
    return self._build_dataset(...)

  def build_signatures(self, model):
    return saved_model.Signatures(model, context_feature_spec,
                                  example_feature_spec, mask_feature_name)()

DatasetBuilder 中使用的 hparamstfr.keras.pipeline.DatasetHparams 数据类中指定。

Pipeline

排名管道基于 tfr.keras.pipeline.AbstractPipeline

class AbstractPipeline:

  @abstractmethod
  def build_loss(self):
    // Returns a tf.keras.losses.Loss or a dict of Loss. To be overridden.
    ...
  @abstractmethod
  def build_metrics(self):
    // Returns a list of evaluation metrics. To be overridden.
    ...
  @abstractmethod
  def build_weighted_metrics(self):
    // Returns a list of weighted metrics. To be overridden.
    ...
  @abstractmethod
  def train_and_validate(self, *arg, **kwargs):
    // Main function to run the training pipeline. To be overridden.
    ...

还提供了一个具体的管道类,该类使用与 model.fit 兼容的不同 tf.distribute.strategy 训练模型

class ModelFitPipeline(AbstractPipeline):

  def __init__(self, model_builder, dataset_builder, hparams):
    ...
  def build_callbacks(self):
    // Builds callbacks used in model.fit. Override for customized usage.
    ...
  def export_saved_model(self, model, export_to, checkpoint=None):
    if checkpoint:
      model.load_weights(checkpoint)
    model.save(export_to, signatures=dataset_builder.build_signatures(model))

  def train_and_validate(self, verbose=0):
    with self._strategy.scope():
      model = model_builder.build()
      model.compile(
          optimizer,
          loss=self.build_loss(),
          metrics=self.build_metrics(),
          loss_weights=self.hparams.loss_weights,
          weighted_metrics=self.build_weighted_metrics())
      train_dataset, valid_dataset = (
          dataset_builder.build_train_dataset(),
          dataset_builder.build_valid_dataset())
      model.fit(
          x=train_dataset,
          validation_data=valid_dataset,
          callbacks=self.build_callbacks(),
          verbose=verbose)
      self.export_saved_model(model, export_to=model_output_dir)

tfr.keras.pipeline.ModelFitPipeline 中使用的 hparamstfr.keras.pipeline.PipelineHparams 数据类中指定。此 ModelFitPipeline 类足以应付大多数 TF Ranking 用例。客户端可以轻松地对其进行子类化以满足特定目的。

分布式策略支持

请参阅 分布式训练 以详细了解 TensorFlow 支持的分布式策略。目前,TensorFlow Ranking 流水线支持 tf.distribute.MirroredStrategy(默认)、tf.distribute.TPUStrategytf.distribute.MultiWorkerMirroredStrategytf.distribute.ParameterServerStrategy。镜像策略与大多数单机系统兼容。请将 strategy 设置为 None 以表示没有分布式策略。

一般来说,MirroredStrategy 适用于大多数具有 CPU 和 GPU 选项的设备上的相对较小的模型。MultiWorkerMirroredStrategy 适用于不适合一个工作进程的大型模型。ParameterServerStrategy 执行异步训练,并要求有多个工作进程可用。TPUStrategy 在有 TPU 时非常适合大型模型和大数据,但是,它在可以处理的张量形状方面灵活性较低。

常见问题解答

  1. 使用 RankingPipeline 的最少组件集

    请参见上面的 示例代码

  2. 如果我有自己的 Keras model

    要使用 tf.distribute 策略进行训练,需要在 strategy.scope() 下定义所有可训练变量来构建 model。因此,请将模型包装在 ModelBuilder 中,如下所示:

class MyModelBuilder(AbstractModelBuilder):
  def __init__(self, model, context_feature_names, example_feature_names,
               mask_feature_name, name):
    super().__init__(mask_feature_name, name)
    self._model = model
    self._context_feature_names = context_feature_names
    self._example_feature_names = example_feature_names

  def create_inputs(self):
    inputs = self._model.input
    context_inputs = {inputs[name] for name in self._context_feature_names}
    example_inputs = {inputs[name] for name in self._example_feature_names}
    mask = inputs[self._mask_feature_name]
    return context_inputs, example_inputs, mask

  def preprocess(self, context_inputs, example_inputs, mask):
    return context_inputs, example_inputs, mask

  def score(self, context_features, example_features, mask):
    inputs = dict(
        list(context_features.items()) + list(example_features.items()) +
        [(self._mask_feature_name, mask)])
    return self._model(inputs)

model_builder = MyModelBuilder(model, context_feature_names, example_feature_names,
                               mask_feature_name, "my_model")

然后将此 model_builder 馈送到管道中以进行进一步的训练。