在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
本教程是两部分系列的第二部分,演示了如何使用 联邦核心 (FC) 在 TFF 中实现自定义类型的联邦算法,联邦核心 (FC) 是 联邦学习 (FL) 层 (tff.learning
) 的基础。
我们建议您先阅读本系列的 第一部分,该部分介绍了此处使用的一些关键概念和编程抽象。
本系列的第二部分使用第一部分中介绍的机制来实现联邦训练和评估算法的简单版本。
我们建议您查看 图像分类 和 文本生成 教程,以了解 TFF 联邦学习 API 的更高级和更温和的介绍,因为它们将帮助您将我们在此处描述的概念置于上下文中。
开始之前
在开始之前,请尝试运行以下“Hello World”示例,以确保您的环境已正确设置。如果它不起作用,请参阅 安装 指南以获取说明。
pip install --quiet --upgrade tensorflow-federated
import collections
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
@tff.federated_computation
def hello_world():
return 'Hello, World!'
hello_world()
b'Hello, World!'
实现联邦平均
与 用于图像分类的联邦学习 一样,我们将使用 MNIST 示例,但由于这是作为低级教程而设计的,我们将绕过 Keras API 和 tff.simulation
,编写原始模型代码,并从头开始构建联邦数据集。
准备联邦数据集
为了演示,我们将模拟一个场景,其中我们拥有来自 10 个用户的數據,每个用户都贡献了识别不同数字的知识。这与 i.i.d. 尽可能地不同。
首先,让我们加载标准 MNIST 数据
mnist_train, mnist_test = tf.keras.datasets.mnist.load_data()
[(x.dtype, x.shape) for x in mnist_train]
[(dtype('uint8'), (60000, 28, 28)), (dtype('uint8'), (60000,))]
数据以 Numpy 数组的形式提供,一个包含图像,另一个包含数字标签,两者都以第一个维度遍历单个示例。让我们编写一个辅助函数,以与我们将联邦序列馈送到 TFF 计算的方式兼容的方式对其进行格式化,即作为列表的列表 - 外部列表遍历用户(数字),内部列表遍历每个客户端序列中的数据批次。按照惯例,我们将每个批次结构化为名为 x
和 y
的一对张量,每个张量都具有领先的批次维度。同时,我们还将每个图像展平为一个 784 个元素的向量,并将其中的像素重新缩放到 0..1
范围内,这样我们就不必在模型逻辑中使用数据转换来使代码混乱。
NUM_EXAMPLES_PER_USER = 1000
BATCH_SIZE = 100
def get_data_for_digit(source, digit):
output_sequence = []
all_samples = [i for i, d in enumerate(source[1]) if d == digit]
for i in range(0, min(len(all_samples), NUM_EXAMPLES_PER_USER), BATCH_SIZE):
batch_samples = all_samples[i:i + BATCH_SIZE]
output_sequence.append({
'x':
np.array([source[0][i].flatten() / 255.0 for i in batch_samples],
dtype=np.float32),
'y':
np.array([source[1][i] for i in batch_samples], dtype=np.int32)
})
return output_sequence
federated_train_data = [get_data_for_digit(mnist_train, d) for d in range(10)]
federated_test_data = [get_data_for_digit(mnist_test, d) for d in range(10)]
作为快速健全性检查,让我们查看第五个客户端(对应于数字 5
)贡献的最后一批数据的 Y
张量。
federated_train_data[5][-1]['y']
array([5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5], dtype=int32)
为了确保,让我们也查看该批次的最后一个元素对应的图像。
from matplotlib import pyplot as plt
plt.imshow(federated_train_data[5][-1]['x'][-1].reshape(28, 28), cmap='gray')
plt.grid(False)
plt.show()
关于 TensorFlow 和 TFF 的结合
在本教程中,为了简洁起见,我们立即使用 tff.tensorflow.computation
装饰引入 TensorFlow 逻辑的函数。但是,对于更复杂的逻辑,这不是我们推荐的模式。调试 TensorFlow 本身可能已经是一个挑战,而调试 TensorFlow 在完全序列化后重新导入必然会丢失一些元数据并限制交互性,从而使调试更加困难。
因此,我们强烈建议将复杂的 TF 逻辑编写为独立的 Python 函数(即,不使用 tff.tensorflow.computation
装饰)。这样,可以使用 TF 最佳实践和工具(如急切模式)开发和测试 TensorFlow 逻辑,然后在将计算序列化到 TFF 之前(例如,通过使用 Python 函数作为参数调用 tff.tensorflow.computation
)。
定义损失函数
现在我们有了数据,让我们定义一个损失函数,可以用于训练。首先,让我们将输入类型定义为一个 TFF 具名元组。由于数据批次的尺寸可能会有所不同,我们将批次维度设置为 None
来表示该维度的尺寸未知。
BATCH_SPEC = collections.OrderedDict(
x=tf.TensorSpec(shape=[None, 784], dtype=tf.float32),
y=tf.TensorSpec(shape=[None], dtype=tf.int32))
BATCH_TYPE = tff.types.tensorflow_to_type(BATCH_SPEC)
str(BATCH_TYPE)
'<x=float32[?,784],y=int32[?]>'
你可能想知道为什么我们不能只定义一个普通的 Python 类型。回想一下我们在 第一部分 中的讨论,我们解释了虽然我们可以使用 Python 表达 TFF 计算的逻辑,但在幕后,TFF 计算 *不是* Python。上面定义的符号 BATCH_TYPE
代表一个抽象的 TFF 类型规范。重要的是要区分这种 *抽象* 的 TFF 类型和具体的 Python *表示* 类型,例如,容器,如 dict
或 collections.namedtuple
,它们可用于在 Python 函数体中表示 TFF 类型。与 Python 不同,TFF 只有一个抽象类型构造函数 tff.StructType
用于元组状容器,其元素可以单独命名或不命名。此类型也用于对计算的形式参数进行建模,因为 TFF 计算在形式上只能声明一个参数和一个结果 - 你很快就会看到这方面的例子。
现在让我们定义模型参数的 TFF 类型,同样作为 *权重* 和 *偏差* 的 TFF 具名元组。
MODEL_SPEC = collections.OrderedDict(
weights=tf.TensorSpec(shape=[784, 10], dtype=tf.float32),
bias=tf.TensorSpec(shape=[10], dtype=tf.float32))
MODEL_TYPE = tff.types.tensorflow_to_type(MODEL_SPEC)
print(MODEL_TYPE)
<weights=float32[784,10],bias=float32[10]>
有了这些定义,现在我们可以为给定模型定义单个批次的损失。请注意在 @tf.function
装饰器内部使用 @tff.tensorflow.computation
装饰器。这使我们能够使用类似 Python 语义的 TF 进行编写,即使我们在 tf.Graph
上下文中,该上下文由 tff.tensorflow.computation
装饰器创建。
# NOTE: `forward_pass` is defined separately from `batch_loss` so that it can
# be later called from within another tf.function. Necessary because a
# @tf.function decorated method cannot invoke a @tff.tensorflow.computation.
@tf.function
def forward_pass(model, batch):
predicted_y = tf.nn.softmax(
tf.matmul(batch['x'], model['weights']) + model['bias'])
return -tf.reduce_mean(
tf.reduce_sum(
tf.one_hot(batch['y'], 10) * tf.math.log(predicted_y), axis=[1]))
@tff.tensorflow.computation(MODEL_TYPE, BATCH_TYPE)
def batch_loss(model, batch):
return forward_pass(model, batch)
正如预期的那样,计算 batch_loss
返回给定模型和单个数据批次的 float32
损失。请注意,MODEL_TYPE
和 BATCH_TYPE
如何被合并到形式参数的 2 元组中;你可以将 batch_loss
的类型识别为 (<MODEL_TYPE,BATCH_TYPE> -> float32)
。
str(batch_loss.type_signature)
'(<model=<weights=float32[784,10],bias=float32[10]>,batch=<x=float32[?,784],y=int32[?]>> -> float32)'
作为健全性检查,让我们构建一个初始模型,该模型填充了零,并计算上面可视化的数据批次的损失。
initial_model = collections.OrderedDict(
weights=np.zeros([784, 10], dtype=np.float32),
bias=np.zeros([10], dtype=np.float32))
sample_batch = federated_train_data[5][-1]
batch_loss(initial_model, sample_batch)
2.3025851
请注意,我们使用定义为 dict
的初始模型来馈送 TFF 计算,即使定义它的 Python 函数体将模型参数作为 model['weight']
和 model['bias']
来使用。对 batch_loss
的调用的参数不会简单地传递给该函数体。
当我们调用 batch_loss
时会发生什么?batch_loss
的 Python 函数体已在上面定义它的单元格中被跟踪和序列化。TFF 在计算定义时充当 batch_loss
的调用者,并在调用 batch_loss
时充当目标。在这两种角色中,TFF 都充当 TFF 的抽象类型系统和 Python 表示类型之间的桥梁。在调用时,TFF 将接受大多数标准 Python 容器类型(dict
、list
、tuple
、collections.namedtuple
等)作为抽象 TFF 元组的具体表示。此外,虽然如上所述,TFF 计算在形式上只接受一个参数,但你可以在参数类型为元组的情况下使用熟悉的 Python 调用语法,使用位置参数和/或关键字参数 - 它按预期工作。
单个批次的梯度下降
现在,让我们定义一个计算,该计算使用此损失函数执行梯度下降的单个步骤。请注意,在定义此函数时,我们使用 batch_loss
作为子组件。你可以在另一个计算的函数体内部调用使用 tff.tensorflow.computation
构造的计算,尽管通常这不是必需的 - 如上所述,由于序列化会丢失一些调试信息,因此对于更复杂的计算,最好在没有 tff.tensorflow.computation
装饰器的情况下编写和测试所有 TensorFlow。
@tff.tensorflow.computation(MODEL_TYPE, BATCH_TYPE, np.float32)
def batch_train(initial_model, batch, learning_rate):
# Define a group of model variables and set them to `initial_model`. Must
# be defined outside the @tf.function.
model_vars = collections.OrderedDict([
(name, tf.Variable(name=name, initial_value=value))
for name, value in initial_model.items()
])
optimizer = tf.keras.optimizers.SGD(learning_rate)
@tf.function
def _train_on_batch(model_vars, batch):
# Perform one step of gradient descent using loss from `batch_loss`.
with tf.GradientTape() as tape:
loss = forward_pass(model_vars, batch)
grads = tape.gradient(loss, model_vars)
optimizer.apply_gradients(
zip(tf.nest.flatten(grads), tf.nest.flatten(model_vars)))
return model_vars
return _train_on_batch(model_vars, batch)
str(batch_train.type_signature)
'(<initial_model=<weights=float32[784,10],bias=float32[10]>,batch=<x=float32[?,784],y=int32[?]>,learning_rate=float32> -> <weights=float32[784,10],bias=float32[10]>)'
当你调用用 tff.tensorflow.computation
装饰的 Python 函数在另一个此类函数的函数体内部时,内部 TFF 计算的逻辑将被嵌入(本质上是内联)到外部计算的逻辑中。如上所述,如果你正在编写这两个计算,最好将内部函数(在本例中为 batch_loss
)设为常规 Python 或 tf.function
,而不是 tff.tensorflow.computation
。但是,这里我们说明了在另一个内部调用一个 tff.tensorflow.computation
基本上按预期工作。这可能是必要的,例如,如果你没有定义 batch_loss
的 Python 代码,而只有它的序列化 TFF 表示。
现在,让我们将此函数应用几次到初始模型,看看损失是否会降低。
model = initial_model
losses = []
for _ in range(5):
model = batch_train(model, sample_batch, 0.1)
losses.append(batch_loss(model, sample_batch))
losses
[0.19690025, 0.13176318, 0.101132266, 0.08273812, 0.0703014]
一系列本地数据的梯度下降
现在,由于 batch_train
似乎可以工作,让我们编写一个类似的训练函数 local_train
,它使用来自一个用户的全部批次序列,而不是单个批次。新的计算现在需要使用 tff.SequenceType(BATCH_TYPE)
,而不是 BATCH_TYPE
。
LOCAL_DATA_TYPE = tff.SequenceType(BATCH_TYPE)
@tff.federated_computation(MODEL_TYPE, np.float32, LOCAL_DATA_TYPE)
def local_train(initial_model, learning_rate, all_batches):
# Reduction function to apply to each batch.
@tff.federated_computation((MODEL_TYPE, np.float32), BATCH_TYPE)
def batch_fn(model_with_lr, batch):
model, lr = model_with_lr
return batch_train(model, batch, lr), lr
trained_model, _ = tff.sequence_reduce(
all_batches, (initial_model, learning_rate), batch_fn
)
return trained_model
str(local_train.type_signature)
'(<initial_model=<weights=float32[784,10],bias=float32[10]>,learning_rate=float32,all_batches=<x=float32[?,784],y=int32[?]>*> -> <weights=float32[784,10],bias=float32[10]>)'
这段简短的代码中包含许多细节,让我们逐一分析。
首先,虽然我们本可以在 TensorFlow 中完全实现此逻辑,并依赖于 tf.data.Dataset.reduce
来处理序列,就像我们之前做的那样,但这次我们选择在粘合语言中表达逻辑,作为 tff.federated_computation
。我们使用了联邦运算符 tff.sequence_reduce
来执行约简。
运算符 tff.sequence_reduce
的使用方式与 tf.data.Dataset.reduce
类似。你可以将其视为与 tf.data.Dataset.reduce
本质上相同,但用于联邦计算内部,正如你可能记得的那样,联邦计算不能包含 TensorFlow 代码。它是一个模板运算符,具有一个形式参数 3 元组,该元组由 T
类型元素的 *序列*、某种类型 U
的约简的初始状态(我们将抽象地将其称为 *零*)以及类型 (<U,T> -> U)
的 *约简运算符* 组成,该运算符通过处理单个元素来改变约简的状态。结果是约简的最终状态,在按顺序处理所有元素后获得。在我们的示例中,约简的状态是在数据的前缀上训练的模型,元素是数据批次。
其次,请注意,我们再次使用了一个计算(batch_train
)作为另一个计算(local_train
)的组件,但不是直接使用。我们不能将其用作约简运算符,因为它需要一个额外的参数 - 学习率。为了解决这个问题,我们定义了一个嵌入式联邦计算 batch_fn
,它将其函数体绑定到 local_train
的参数 learning_rate
。允许以这种方式定义的子计算捕获其父级的形式参数,只要子计算没有在其父级的函数体之外被调用。你可以将这种模式视为 Python 中 functools.partial
的等效项。
以这种方式捕获 learning_rate
的实际意义是,当然,在所有批次中使用相同的学习率值。
现在,让我们尝试在来自同一个用户的全部数据序列上使用新定义的本地训练函数,该用户贡献了样本批次(数字 5
)。
locally_trained_model = local_train(initial_model, 0.1, federated_train_data[5])
它工作了吗?为了回答这个问题,我们需要实现评估。
本地评估
以下是一种通过将所有数据批次的损失加起来来实现本地评估的方法(我们也可以计算平均值;我们将将其留作读者的练习)。
@tff.federated_computation(MODEL_TYPE, LOCAL_DATA_TYPE)
def local_eval(model, all_batches):
@tff.tensorflow.computation((MODEL_TYPE, np.float32), BATCH_TYPE)
def accumulate_evaluation(model_and_accumulator, batch):
model, accumulator = model_and_accumulator
return model, accumulator + batch_loss(model, batch)
_, total_loss = tff.sequence_reduce(
all_batches, (model, 0.0), accumulate_evaluation
)
return total_loss
str(local_eval.type_signature)
'(<model=<weights=float32[784,10],bias=float32[10]>,all_batches=<x=float32[?,784],y=int32[?]>*> -> float32)'
同样,这段代码中还有一些新元素,让我们逐一分析。
首先,我们使用了两个新的联邦运算符来处理序列:tff.sequence_map
,它接受一个 *映射函数* T->U
和一个 T
的 *序列*,并发出一个 U
的序列,该序列通过逐点应用映射函数获得,以及 tff.sequence_sum
,它只将所有元素加起来。在这里,我们将每个数据批次映射到一个损失值,然后将得到的损失值加起来以计算总损失。
请注意,我们本可以使用 tff.sequence_reduce
,但这并不是最佳选择 - 约简过程本质上是顺序的,而映射和求和可以并行计算。在有选择的情况下,最好坚持使用不限制实现选择的运算符,这样当我们的 TFF 计算在将来被编译以部署到特定环境时,就可以充分利用所有可能的机会,以实现更快、更可扩展、更节省资源的执行。
其次,请注意,就像在 local_train
中一样,我们需要的组件函数(batch_loss
)接受的参数比联邦运算符(tff.sequence_map
)预期的参数更多,因此我们再次定义了一个部分,这次通过直接将 lambda
包装为 tff.federated_computation
来进行内联。使用包装器与函数内联作为参数是使用 tff.tensorflow.computation
将 TensorFlow 逻辑嵌入 TFF 的推荐方法。
现在,让我们看看我们的训练是否成功。
print('initial_model loss =', local_eval(initial_model,
federated_train_data[5]))
print('locally_trained_model loss =',
local_eval(locally_trained_model, federated_train_data[5]))
initial_model loss = 23.025854 locally_trained_model loss = 0.43484688
确实,损失降低了。但如果我们在另一个用户的數據上进行评估会发生什么?
print('initial_model loss =', local_eval(initial_model,
federated_train_data[0]))
print('locally_trained_model loss =',
local_eval(locally_trained_model, federated_train_data[0]))
initial_model loss = 23.025854 locally_trained_model loss = 74.50075
正如预期,情况变得更糟。模型被训练来识别5
,并且从未见过0
。这就引出了一个问题——本地训练如何影响模型从全局角度来看的质量?
联邦评估
这是我们旅程中我们最终回到联邦类型和联邦计算的地方——我们一开始就讨论的主题。这里有一对 TFF 类型定义,用于表示源自服务器的模型和保留在客户端上的数据。
SERVER_MODEL_TYPE = tff.FederatedType(MODEL_TYPE, tff.SERVER)
CLIENT_DATA_TYPE = tff.FederatedType(LOCAL_DATA_TYPE, tff.CLIENTS)
有了迄今为止介绍的所有定义,用 TFF 表达联邦评估只是一行代码——我们将模型分发到客户端,让每个客户端在其本地数据部分上调用本地评估,然后平均损失。以下是一种编写此代码的方法。
@tff.federated_computation(SERVER_MODEL_TYPE, CLIENT_DATA_TYPE)
def federated_eval(model, data):
return tff.federated_mean(
tff.federated_map(local_eval, [tff.federated_broadcast(model), data]))
我们已经看到了tff.federated_mean
和 tff.federated_map
在更简单场景中的示例,从直观层面上看,它们按预期工作,但这段代码中还有更多内容,所以让我们仔细看一下。
首先,让我们分解一下“让每个客户端在其本地数据部分上调用本地评估”部分。您可能还记得前面的部分,local_eval
的类型签名形式为 (<MODEL_TYPE, LOCAL_DATA_TYPE> -> float32)
。
联邦运算符 tff.federated_map
是一个模板,它接受一个参数,该参数是一个 2 元组,包含类型为 T->U
的映射函数和类型为 {T}@CLIENTS
的联邦值(即,其成员组成与映射函数的参数类型相同),并返回类型为 {U}@CLIENTS
的结果。
由于我们将 local_eval
作为映射函数提供,以便在每个客户端的基础上应用,因此第二个参数应该是联邦类型 {<MODEL_TYPE, LOCAL_DATA_TYPE>}@CLIENTS
,即,在前面部分的命名法中,它应该是一个联邦元组。每个客户端应该将 local_eval
的完整参数集作为成员组成。相反,我们提供了一个 2 元素的 Python list
。这里发生了什么?
实际上,这是 TFF 中隐式类型转换的一个示例,类似于您可能在其他地方遇到的隐式类型转换,例如,当您将 int
提供给接受 float
的函数时。隐式转换目前很少使用,但我们计划在 TFF 中使其更加普遍,作为减少样板代码的一种方式。
在这种情况下应用的隐式转换是形式为 {<X,Y>}@Z
的联邦元组与联邦值的元组 <{X}@Z,{Y}@Z>
之间的等效性。虽然从形式上讲,这两个是不同的类型签名,但从程序员的角度来看,Z
中的每个设备都包含两个数据单元 X
和 Y
。这里发生的事情与 Python 中的 zip
类似,实际上,我们提供了一个运算符 tff.federated_zip
,它允许您显式地执行此类转换。当 tff.federated_map
遇到元组作为第二个参数时,它会简单地为您调用 tff.federated_zip
。
鉴于上述内容,您现在应该能够识别表达式 tff.federated_broadcast(model)
表示 TFF 类型为 {MODEL_TYPE}@CLIENTS
的值,以及 data
表示 TFF 类型为 {LOCAL_DATA_TYPE}@CLIENTS
(或简称为 CLIENT_DATA_TYPE
)的值,这两个值通过隐式 tff.federated_zip
过滤在一起,形成 tff.federated_map
的第二个参数。
运算符 tff.federated_broadcast
,正如您所期望的那样,只是将数据从服务器传输到客户端。
现在,让我们看看我们的本地训练如何影响系统中的平均损失。
print('initial_model loss =', federated_eval(initial_model,
federated_train_data))
print('locally_trained_model loss =',
federated_eval(locally_trained_model, federated_train_data))
initial_model loss = 23.025852 locally_trained_model loss = 54.43263
实际上,正如预期的那样,损失增加了。为了改进所有用户的模型,我们需要在所有人的数据上进行训练。
联邦训练
实现联邦训练最简单的方法是本地训练,然后平均模型。这使用我们已经讨论过的相同构建块和模式,如下所示。
SERVER_FLOAT_TYPE = tff.FederatedType(np.float32, tff.SERVER)
@tff.federated_computation(SERVER_MODEL_TYPE, SERVER_FLOAT_TYPE,
CLIENT_DATA_TYPE)
def federated_train(model, learning_rate, data):
return tff.federated_mean(
tff.federated_map(local_train, [
tff.federated_broadcast(model),
tff.federated_broadcast(learning_rate), data
]))
请注意,在 tff.learning
提供的联邦平均的完整功能实现中,我们更喜欢平均模型增量,而不是平均模型,原因有很多,例如,能够裁剪更新范数,用于压缩等。
让我们看看训练是否有效,方法是运行几轮训练,并比较训练前后平均损失。
model = initial_model
learning_rate = 0.1
for round_num in range(5):
model = federated_train(model, learning_rate, federated_train_data)
learning_rate = learning_rate * 0.9
loss = federated_eval(model, federated_train_data)
print('round {}, loss={}'.format(round_num, loss))
round 0, loss=21.60552406311035 round 1, loss=20.365678787231445 round 2, loss=19.27480125427246 round 3, loss=18.31110954284668 round 4, loss=17.457256317138672
为了完整起见,现在让我们也运行测试数据,以确认我们的模型泛化良好。
print('initial_model test loss =',
federated_eval(initial_model, federated_test_data))
print('trained_model test loss =', federated_eval(model, federated_test_data))
initial_model test loss = 22.795593 trained_model test loss = 17.278767
本教程到此结束。
当然,我们简化的示例没有反映在更现实的场景中需要做的一些事情——例如,我们没有计算除损失之外的指标。我们鼓励您研究 联邦平均的实现,它位于 tff.learning
中,作为一个更完整的示例,以及作为展示我们想要鼓励的一些编码实践的方式。