检索模型 通常用于从数百万甚至数亿个候选中筛选出少数几个最佳候选。为了能够对用户的上下文和行为做出反应,它们需要能够在毫秒内动态地完成此操作。
近似最近邻搜索 (ANN) 是使这成为可能的技术。在本教程中,我们将展示如何使用 ScaNN(最先进的最近邻检索包)将 TFRS 检索无缝扩展到数百万个项目。
什么是 ScaNN?
ScaNN 是 Google Research 的一个库,它在大型规模上执行密集向量相似性搜索。给定一个候选嵌入数据库,ScaNN 以一种允许在推理时快速搜索的方式对这些嵌入进行索引。ScaNN 使用最先进的向量压缩技术和精心实施的算法来实现最佳的速度-精度权衡。它可以大大优于暴力搜索,同时在精度方面几乎没有损失。
构建一个基于 ScaNN 的模型
为了在 TFRS 中试用 ScaNN,我们将构建一个简单的 MovieLens 检索模型,就像我们在 基本检索 教程中所做的那样。如果您已经完成了该教程,那么本节内容将很熟悉,可以安全地跳过。
首先,安装 TFRS 和 TensorFlow 数据集
pip install -q tensorflow-recommenders
pip install -q --upgrade tensorflow-datasets
我们还需要安装 scann
:它是 TFRS 的可选依赖项,因此需要单独安装。
pip install -q scann
from typing import Dict, Text
import os
import pprint
import tempfile
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_recommenders as tfrs
# Load the MovieLens 100K data.
ratings = tfds.load(
# Get the ratings data.
ratings = (ratings
# Retain only the fields we need.
.map(lambda x: {"user_id": x["user_id"], "movie_title": x["movie_title"]})
# Cache for efficiency.
# Get the movies data.
movies = tfds.load("movielens/100k-movies", split="train")
movies = (movies
# Retain only the fields we need.
.map(lambda x: x["movie_title"])
# Cache for efficiency.
user_ids = ratings.map(lambda x: x["user_id"])
unique_movie_titles = np.unique(np.concatenate(list(movies.batch(1000))))
unique_user_ids = np.unique(np.concatenate(list(user_ids.batch(1000))))
shuffled = ratings.shuffle(100_000, seed=42, reshuffle_each_iteration=False)
train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)
就像在 基本检索 教程中一样,我们构建了一个简单的双塔模型。
class MovielensModel(tfrs.Model):
def __init__(self):
embedding_dimension = 32
# Set up a model for representing movies.
self.movie_model = tf.keras.Sequential([
vocabulary=unique_movie_titles, mask_token=None),
# We add an additional embedding to account for unknown tokens.
tf.keras.layers.Embedding(len(unique_movie_titles) + 1, embedding_dimension)
# Set up a model for representing users.
self.user_model = tf.keras.Sequential([
vocabulary=unique_user_ids, mask_token=None),
# We add an additional embedding to account for unknown tokens.
tf.keras.layers.Embedding(len(unique_user_ids) + 1, embedding_dimension)
# Set up a task to optimize the model and compute metrics.
self.task = tfrs.tasks.Retrieval(
.map(lambda title: (title, self.movie_model(title)))
def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
# We pick out the user features and pass them into the user model.
user_embeddings = self.user_model(features["user_id"])
# And pick out the movie features and pass them into the movie model,
# getting embeddings back.
positive_movie_embeddings = self.movie_model(features["movie_title"])
# The task computes the loss and the metrics.
return self.task(
compute_metrics=not training
TFRS 模型只是一个 Keras 模型。我们可以对其进行编译
model = MovielensModel()
model.fit(train.batch(8192), epochs=3)
Epoch 1/3 10/10 [==============================] - 2s 96ms/step - factorized_top_k/top_1_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_5_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_10_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_50_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_100_categorical_accuracy: 0.0000e+00 - loss: 69832.4673 - regularization_loss: 0.0000e+00 - total_loss: 69832.4673 Epoch 2/3 10/10 [==============================] - 1s 16ms/step - factorized_top_k/top_1_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_5_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_10_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_50_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_100_categorical_accuracy: 0.0000e+00 - loss: 67497.9411 - regularization_loss: 0.0000e+00 - total_loss: 67497.9411 Epoch 3/3 10/10 [==============================] - 1s 15ms/step - factorized_top_k/top_1_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_5_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_10_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_50_categorical_accuracy: 0.0000e+00 - factorized_top_k/top_100_categorical_accuracy: 0.0000e+00 - loss: 66323.0760 - regularization_loss: 0.0000e+00 - total_loss: 66323.0760 <keras.callbacks.History at 0x7f30f00e1280>
model.evaluate(test.batch(8192), return_dict=True)
3/3 [==============================] - 6s 1s/step - factorized_top_k/top_1_categorical_accuracy: 0.0013 - factorized_top_k/top_5_categorical_accuracy: 0.0099 - factorized_top_k/top_10_categorical_accuracy: 0.0219 - factorized_top_k/top_50_categorical_accuracy: 0.1248 - factorized_top_k/top_100_categorical_accuracy: 0.2322 - loss: 49472.8535 - regularization_loss: 0.0000e+00 - total_loss: 49472.8535 {'factorized_top_k/top_1_categorical_accuracy': 0.0013000000035390258, 'factorized_top_k/top_5_categorical_accuracy': 0.009949999861419201, 'factorized_top_k/top_10_categorical_accuracy': 0.021900000050663948, 'factorized_top_k/top_50_categorical_accuracy': 0.12484999746084213, 'factorized_top_k/top_100_categorical_accuracy': 0.23215000331401825, 'loss': 28276.328125, 'regularization_loss': 0, 'total_loss': 28276.328125}
在 TFRS 中,这是通过 BruteForce
brute_force = tfrs.layers.factorized_top_k.BruteForce(model.user_model)
movies.batch(128).map(lambda title: (title, model.movie_model(title)))
创建并填充候选(通过 index
# Get predictions for user 42.
_, titles = brute_force(np.array(["42"]), k=3)
print(f"Top recommendations: {titles[0]}")
Top recommendations: [b'Angels in the Outfield (1994)' b"Kid in King Arthur's Court, A (1995)" b'Bedknobs and Broomsticks (1971)']
在不到 1000 部电影的小型数据集上,这非常快
%timeit _, titles = brute_force(np.array(["42"]), k=3)
1.65 ms ± 6.42 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
但是,如果我们有更多候选 - 数百万而不是数千,会发生什么?
# Construct a dataset of movies that's 1,000 times larger. We
# do this by adding several million dummy movie titles to the dataset.
lots_of_movies = tf.data.Dataset.concatenate(
movies.batch(4096).repeat(1_000).map(lambda x: tf.zeros_like(x))
# We also add lots of dummy embeddings by randomly perturbing
# the estimated embeddings for real movies.
lots_of_movies_embeddings = tf.data.Dataset.concatenate(
.map(lambda x: model.movie_model(x))
.map(lambda x: x * tf.random.uniform(tf.shape(x)))
我们可以在这个更大的数据集上构建一个 BruteForce
brute_force_lots = tfrs.layers.factorized_top_k.BruteForce()
tf.data.Dataset.zip((lots_of_movies, lots_of_movies_embeddings))
_, titles = brute_force_lots(model.user_model(np.array(["42"])), k=3)
print(f"Top recommendations: {titles[0]}")
Top recommendations: [b'Angels in the Outfield (1994)' b"Kid in King Arthur's Court, A (1995)" b'Bedknobs and Broomsticks (1971)']
但它们需要更长的时间。对于包含 100 万部电影的候选集,暴力预测变得相当缓慢
%timeit _, titles = brute_force_lots(model.user_model(np.array(["42"])), k=3)
4.03 ms ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
随着候选数量的增长,所需的时间线性增长:对于 1000 万个候选,提供最佳候选将花费 250 毫秒。对于实时服务来说,这显然太慢了。
在 TFRS 中使用 ScaNN 是通过 tfrs.layers.factorized_top_k.ScaNN
层完成的。它遵循与其他 top k 层相同的接口
scann = tfrs.layers.factorized_top_k.ScaNN(
tf.data.Dataset.zip((lots_of_movies, lots_of_movies_embeddings))
<tensorflow_recommenders.layers.factorized_top_k.ScaNN at 0x7f30f00bebb0>
_, titles = scann(model.user_model(np.array(["42"])), k=3)
print(f"Top recommendations: {titles[0]}")
Top recommendations: [b'Angels in the Outfield (1994)' b"Kid in King Arthur's Court, A (1995)" b'Bedknobs and Broomsticks (1971)']
%timeit _, titles = scann(model.user_model(np.array(["42"])), k=3)
22.4 ms ± 44 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
在这种情况下,我们可以在大约 2 毫秒内从大约 100 万个电影集中检索出前 3 部电影:比通过暴力搜索计算最佳候选快 15 倍。对于更大的数据集,近似方法的优势会更大。
当使用近似 top K 检索机制(如 ScaNN)时,检索速度通常是以精度为代价的。为了了解这种权衡,重要的是在使用 ScaNN 时测量模型的评估指标,并将它们与基线进行比较。
幸运的是,TFRS 使这变得很容易。我们只需使用 ScaNN 覆盖检索任务上的指标,重新编译模型并运行评估即可。
# Override the existing streaming candidate source.
model.task.factorized_metrics = tfrs.metrics.FactorizedTopK(
candidates=tf.data.Dataset.zip((lots_of_movies, lots_of_movies_embeddings))
# Need to recompile the model for the changes to take effect.
%time baseline_result = model.evaluate(test.batch(8192), return_dict=True, verbose=False)
CPU times: user 24min 23s, sys: 2min, total: 26min 23s Wall time: 3min 35s
我们可以使用 ScaNN 做同样的事情
model.task.factorized_metrics = tfrs.metrics.FactorizedTopK(
# We can use a much bigger batch size here because ScaNN evaluation
# is more memory efficient.
%time scann_result = model.evaluate(test.batch(8192), return_dict=True, verbose=False)
CPU times: user 15.6 s, sys: 633 ms, total: 16.3 s Wall time: 1.95 s
基于 ScaNN 的评估快得多。对于更大的数据集,这种优势会更大,因此对于大型数据集,始终运行基于 ScaNN 的评估可能是明智之举,以提高模型开发速度。
print(f"Brute force top-100 accuracy: {baseline_result['factorized_top_k/top_100_categorical_accuracy']:.2f}")
print(f"ScaNN top-100 accuracy: {scann_result['factorized_top_k/top_100_categorical_accuracy']:.2f}")
Brute force top-100 accuracy: 0.15 ScaNN top-100 accuracy: 0.14
这表明在这个人工数据集上,近似值几乎没有损失。一般来说,所有近似方法都表现出速度-精度权衡。要更深入地了解这一点,您可以查看 Erik Bernhardsson 的 ANN 基准测试.
基于 ScaNN
的模型已完全集成到 TensorFlow 模型中,因此提供服务与提供任何其他 TensorFlow 模型一样容易。
我们可以将其保存为 SavedModel
# We re-index the ScaNN layer to include the user embeddings in the same model.
# This way we can give the saved model raw features and get valid predictions
# back.
scann = tfrs.layers.factorized_top_k.ScaNN(model.user_model, num_reordering_candidates=1000)
tf.data.Dataset.zip((lots_of_movies, lots_of_movies_embeddings))
# Need to call it to set the shapes.
_ = scann(np.array(["42"]))
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "model")
loaded = tf.saved_model.load(path)
_, titles = loaded(tf.constant(["42"]))
print(f"Top recommendations: {titles[0][:3]}")
Top recommendations: [b'Angels in the Outfield (1994)' b"Kid in King Arthur's Court, A (1995)" b'Rudy (1993)']
生成的模型可以在任何安装了 TensorFlow 和 ScaNN 的 Python 服务中使用。
它也可以使用 TensorFlow Serving 的自定义版本进行服务,该版本以 Docker 容器的形式提供,可在 Docker Hub 上获取。您也可以从 Dockerfile 中自行构建镜像。
调整 ScaNN
现在让我们深入了解如何调整 ScaNN 层以获得更好的性能/准确性权衡。为了有效地做到这一点,我们首先需要衡量基线性能和准确性。
从上面可以看出,我们已经测量了模型处理单个(非批处理)查询的延迟(尽管需要注意的是,相当一部分延迟来自模型的非 ScaNN 组件)。
现在我们需要调查 ScaNN 的准确性,我们通过召回率来衡量。召回率@k 为 x% 表示,如果我们使用暴力检索来检索真正的前 k 个邻居,并将这些结果与使用 ScaNN 检索前 k 个邻居的结果进行比较,则 ScaNN 结果的 x% 位于真正的暴力检索结果中。让我们计算当前 ScaNN 搜索器的召回率。
首先,我们需要生成暴力、地面真值前 k 个结果
# Process queries in groups of 1000; processing them all at once with brute force
# may lead to out-of-memory errors, because processing a batch of q queries against
# a size-n dataset takes O(nq) space with brute force.
titles_ground_truth = tf.concat([
brute_force_lots(queries, k=10)[1] for queries in
test.batch(1000).map(lambda x: model.user_model(x["user_id"]))
], axis=0)
我们的变量 titles_ground_truth
现在包含暴力检索返回的前 10 部电影推荐。现在我们可以计算使用 ScaNN 时相同的推荐结果
# Get all user_id's as a 1d tensor of strings
test_flat = np.concatenate(list(test.map(lambda x: x["user_id"]).batch(1000).as_numpy_iterator()), axis=0)
# ScaNN is much more memory efficient and has no problem processing the whole
# batch of 20000 queries at once.
_, titles = scann(test_flat, k=10)
接下来,我们定义计算召回率的函数。对于每个查询,它计算暴力结果和 ScaNN 结果的交集中有多少个结果,并将此数除以暴力结果的数量。所有查询的此数量的平均值就是我们的召回率。
def compute_recall(ground_truth, approx_results):
return np.mean([
len(np.intersect1d(truth, approx)) / len(truth)
for truth, approx in zip(ground_truth, approx_results)
这给了我们当前 ScaNN 配置的基线召回率@10
print(f"Recall: {compute_recall(titles_ground_truth, titles):.3f}")
Recall: 0.938
%timeit -n 1000 scann(np.array(["42"]), k=10)
21.9 ms ± 30.5 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
为此,我们需要一个模型来了解 ScaNN 的调整旋钮如何影响性能。我们当前的模型使用 ScaNN 的树-AH 算法。该算法对嵌入数据库(“树”)进行分区,然后使用 AH 对这些分区中最有希望的分区进行评分,AH 是一种高度优化的近似距离计算例程。
TensorFlow Recommenders 的 ScaNN Keras 层的默认参数设置 num_leaves=100
和 num_leaves_to_search=10
。这意味着我们的数据库被划分为 100 个不相交的子集,并且使用 AH 对这 100 个子集中最具希望的 10 个子集进行评分。这意味着 10/100=10% 的数据集正在使用 AH 进行搜索。
和 num_leaves_to_search=100
,我们也会使用 AH 搜索 10% 的数据集。但是,与之前的设置相比,我们将搜索的 10% 将包含更高质量的候选者,因为更高的 num_leaves
因此,毫不奇怪,使用 num_leaves=1000
和 num_leaves_to_search=100
scann2 = tfrs.layers.factorized_top_k.ScaNN(
tf.data.Dataset.zip((lots_of_movies, lots_of_movies_embeddings))
_, titles2 = scann2(test_flat, k=10)
print(f"Recall: {compute_recall(titles_ground_truth, titles2):.3f}")
Recall: 0.974
从 100 个分区中选择前 10 个,而 scann2
从 1000 个分区中选择前 100 个。后者可能更昂贵,因为它涉及查看 10 倍的分区数量。
%timeit -n 1000 scann2(np.array(["42"]), k=10)
22 ms ± 32.4 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
一般来说,调整 ScaNN 搜索就是选择合适的权衡。每个单独的参数更改通常不会使搜索既更快又更准确;我们的目标是调整参数以在这些相互冲突的目标之间进行最佳权衡。
在延迟方面付出了一些代价,但显著提高了 scann
让我们尝试使用 AH 搜索 70/1000=7% 的数据集,并且只对最终的 400 个候选者进行重新评分
scann3 = tfrs.layers.factorized_top_k.ScaNN(
tf.data.Dataset.zip((lots_of_movies, lots_of_movies_embeddings))
_, titles3 = scann3(test_flat, k=10)
print(f"Recall: {compute_recall(titles_ground_truth, titles3):.3f}")
Recall: 0.969
在 scann
的基础上提供了大约 3% 的绝对召回率增益,同时还提供了更低的延迟
%timeit -n 1000 scann3(np.array(["42"]), k=10)
21.9 ms ± 26.3 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
这些旋钮可以进一步调整,以优化准确性-性能帕累托前沿的不同点。ScaNN 的算法可以在各种召回率目标范围内实现最先进的性能。
ScaNN 使用先进的矢量量化技术和高度优化的实现来实现其结果。矢量量化领域有着悠久的历史,并采用了各种方法。ScaNN 当前的量化技术在 这篇论文 中有详细介绍,该论文发表在 ICML 2020 上。该论文还与 这篇博文 一起发布,该博文对我们的技术进行了高级概述。
我们 ICML 2020 论文的参考文献中提到了许多相关的量化技术,其他与 ScaNN 相关的研究列在 http://sanjivk.com/ 上