实现自定义聚合

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

在本教程中,我们将解释 tff.aggregators 模块背后的设计原则以及实现从客户端到服务器的自定义值聚合的最佳实践。

先决条件。 本教程假设您已经熟悉 联合核心 的基本概念,例如位置 (tff.SERVERtff.CLIENTS),TFF 如何表示计算 (tff.tensorflow.computationtff.federated_computation) 以及它们的类型签名。

pip install --quiet --upgrade tensorflow-federated

设计摘要

在 TFF 中,“聚合”是指将一组值从 tff.CLIENTS 移动以生成 tff.SERVER 上相同类型的聚合值。也就是说,每个单独的客户端值都不需要可用。例如,在联合学习中,客户端模型更新会进行平均以获得聚合模型更新,以应用于服务器上的全局模型。

除了实现此目标的操作符,例如 tff.federated_sum,TFF 还提供 tff.templates.AggregationProcess(一个 有状态过程),它将聚合计算的类型签名形式化,以便它可以推广到比简单求和更复杂的形式。

的主要组件是用于创建 AggregationProcess工厂,这些工厂旨在成为 TFF 中普遍有用且可替换的构建块,具有两个方面

  1. 参数化计算。 聚合是一个独立的构建块,可以插入其他 TFF 模块中,这些模块旨在与 tff.aggregators 一起使用以参数化其必要的聚合。

示例

learning_process = tff.learning.algorithms.build_weighted_fed_avg(
    ...,
    model_aggregator=tff.aggregators.MeanFactory())
  1. 聚合组合。 聚合构建块可以与其他聚合构建块组合以创建更复杂的复合聚合。

示例

secure_mean = tff.aggregators.MeanFactory(
    value_sum_factory=tff.aggregators.SecureSumFactory(...))

本教程的其余部分将解释如何实现这两个目标。

聚合过程

我们首先总结 tff.templates.AggregationProcess,然后介绍其创建的工厂模式。

The tff.templates.AggregationProcess 是一个 tff.templates.MeasuredProcess,其类型签名指定用于聚合。特别是,initializenext 函数具有以下类型签名

  • ( -> state_type@SERVER)
  • (<state_type@SERVER, {value_type}@CLIENTS, *> -> <state_type@SERVER, value_type@SERVER, measurements_type@SERVER>)

该状态(类型为 state_type)必须放置在服务器上。The next 函数将状态和要聚合的值(类型为 value_type)作为输入参数,该值放置在客户端上。The * 表示可选的其他输入参数,例如加权平均值中的权重。它返回一个更新后的状态对象、放置在服务器上的相同类型的聚合值以及一些度量。

请注意,在 next 函数的执行之间传递的状态以及旨在报告任何依赖于 next 函数的特定执行的信息的报告度量都可能是空的。但是,它们必须被明确指定,以便 TFF 的其他部分有一个明确的协议来遵循。

其他 TFF 模块(例如 tff.learning 中的模型更新)预计将使用 tff.templates.AggregationProcess 来参数化如何聚合值。但是,究竟聚合哪些值以及它们的类型签名是什么,取决于正在训练的模型和其他细节以及用于训练它的学习算法。

为了使聚合独立于计算的其他方面,我们使用工厂模式——一旦可聚合对象的类型签名可用,我们就会创建适当的 tff.templates.AggregationProcess,方法是调用工厂的 create 方法。因此,只有库作者需要直接处理聚合过程,他们负责创建过程。

聚合过程工厂

有两个用于无权重和加权聚合的抽象基工厂类。它们的 create 方法接受要聚合的值的类型签名,并返回一个 tff.templates.AggregationProcess,用于对这些值进行聚合。

tff.aggregators.UnweightedAggregationFactory 创建的过程接受两个输入参数:(1) 服务器上的状态和 (2) 指定类型 value_type 的值。

一个示例实现是 tff.aggregators.SumFactory

tff.aggregators.WeightedAggregationFactory 创建的过程接受三个输入参数:(1) 服务器上的状态,(2) 指定类型 value_type 的值和 (3) 类型 weight_type 的权重,如工厂用户在调用其 create 方法时指定的那样。

一个示例实现是 tff.aggregators.MeanFactory,它计算加权平均值。

工厂模式是我们实现上面提到的第一个目标的方式;即聚合是一个独立的构建块。例如,当更改哪些模型变量可训练时,复杂的聚合不一定需要更改;表示它的工厂将在使用诸如 tff.learning.algorithms.build_weighted_fed_avg 之类的函数时,使用不同的类型签名进行调用。

组合

回想一下,一般的聚合过程可以封装 (a) 对客户端值进行一些预处理,(b) 将值从客户端移动到服务器,以及 (c) 对服务器上的聚合值进行一些后处理。上面提到的第二个目标,聚合组合,是在 tff.aggregators 模块中通过构建聚合工厂的实现来实现的,这样部分 (b) 就可以委托给另一个聚合工厂。

与其在单个工厂类中实现所有必要的逻辑,不如默认情况下将实现集中在与聚合相关的单个方面。在需要时,这种模式使我们能够一次替换一个构建块。

一个例子是加权 tff.aggregators.MeanFactory。它的实现将客户端提供的值和权重相乘,然后分别对加权值和权重求和,然后在服务器上将加权值的总和除以权重的总和。与其直接使用 tff.federated_sum 运算符来实现求和,不如将求和委托给两个 tff.aggregators.SumFactory 实例。

这种结构使得两个默认求和能够被不同的工厂替换,这些工厂以不同的方式实现求和。例如,tff.aggregators.SecureSumFactory,或者 tff.aggregators.UnweightedAggregationFactory 的自定义实现。反过来,tff.aggregators.MeanFactory 本身可以是另一个工厂(例如 tff.aggregators.clipping_factory)的内部聚合,如果要在平均之前对值进行裁剪。

有关使用 tff.aggregators 模块中现有工厂的组合机制的推荐用法,请参阅之前的 调整推荐的聚合以进行学习 教程。

最佳实践示例

我们将通过实现一个简单的示例任务来详细说明 tff.aggregators 概念,并使其逐渐变得更加通用。另一种学习方法是查看现有工厂的实现。

import collections

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

示例任务不是对 value 求和,而是对 value * 2.0 求和,然后将总和除以 2.0。因此,聚合结果在数学上等效于直接对 value 求和,可以认为它包含三个部分:(1) 客户端缩放 (2) 跨客户端求和 (3) 服务器反缩放。

按照上面解释的设计,逻辑将实现为 tff.aggregators.UnweightedAggregationFactory 的子类,它在给定要聚合的 value_type 时创建适当的 tff.templates.AggregationProcess

最小实现

对于示例任务,必要的计算始终相同,因此无需使用状态。因此它是空的,表示为 tff.federated_value((), tff.SERVER)。目前,测量也是如此。

因此,任务的最小实现如下所示

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value((), tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.FederatedType(value_type, tff.CLIENTS))
    def next_fn(state, value):
      scaled_value = tff.federated_map(
          tff.tensorflow.computation(lambda x: x * 2.0), value)
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tensorflow.computation(lambda x: x / 2.0), summed_value)
      measurements = tff.federated_value((), tff.SERVER)
      return tff.templates.MeasuredProcessOutput(
          state=state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

可以使用以下代码验证一切是否按预期工作

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(np.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: {output.result}  (expected 8.0)')
Type signatures of the created aggregation process:

  - initialize: ( -> <>@SERVER)
  - next: (<state=<>@SERVER,value={float32}@CLIENTS> -> <state=<>@SERVER,result=float32@SERVER,measurements=<>@SERVER>)

Aggregation result: 8.0  (expected 8.0)

有状态和测量

有状态在 TFF 中被广泛用于表示预计将迭代执行并在每次迭代中发生变化的计算。例如,学习计算的状态包含正在学习的模型的权重。

为了说明如何在聚合计算中使用状态,我们修改了示例任务。不是将 value 乘以 2.0,而是将其乘以迭代索引——聚合执行的次数。

为此,我们需要一种方法来跟踪迭代索引,这可以通过状态的概念来实现。在 initialize_fn 中,不是创建空状态,而是将状态初始化为标量零。然后,可以在 next_fn 中使用状态,分为三个步骤:(1) 增加 1.0,(2) 用于乘以 value,以及 (3) 作为新的更新状态返回。

完成此操作后,您可能会注意到:但是与上面的代码完全相同,可以用来验证一切是否按预期工作。我怎么知道有什么变化?

好问题!这就是测量概念变得有用的地方。通常,测量可以报告与 next 函数的单个执行相关的任何值,这些值可用于监控。在这种情况下,它可以是来自之前示例的 summed_value。也就是说,在“反缩放”步骤之前的那个值,它应该取决于迭代索引。同样,这在实践中不一定有用,但说明了相关机制。

因此,对任务的有状态答案如下所示

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.FederatedType(value_type, tff.CLIENTS))
    def next_fn(state, value):
      new_state = tff.federated_map(
          tff.tensorflow.computation(lambda x: x + 1.0), state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(
          tff.tensorflow.computation(lambda x, y: x * y), (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(
          tff.tensorflow.computation(lambda x, y: x / y), (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

请注意,作为输入进入 next_fnstate 位于服务器上。为了在客户端使用它,首先需要对其进行通信,这可以通过使用 tff.federated_broadcast 运算符来实现。

为了验证一切是否按预期工作,我们现在可以查看报告的 measurements,这些测量应该在每次执行回合中都不同,即使使用相同的 client_data 运行也是如此。

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(np.float32))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}   (expected 8.0 * 1)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 2)')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #3')
print(f'|       Aggregation result: {output.result}   (expected 8.0)')
print(f'| Aggregation measurements: {output.measurements}  (expected 8.0 * 3)')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={float32}@CLIENTS> -> <state=float32@SERVER,result=float32@SERVER,measurements=float32@SERVER>)

| Round #1
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 8.0   (expected 8.0 * 1)

| Round #2
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 16.0  (expected 8.0 * 2)

| Round #3
|       Aggregation result: 8.0   (expected 8.0)
| Aggregation measurements: 24.0  (expected 8.0 * 3)

结构化类型

在联邦学习中训练的模型的模型权重通常表示为张量集合,而不是单个张量。在 TFF 中,这表示为 tff.StructType,并且通常有用的聚合工厂需要能够接受结构化类型。

但是,在上面的示例中,我们只使用了一个 tff.TensorType 对象。如果我们尝试使用之前的工厂使用 tff.StructType([(np.float32, (2,)), (np.float32, (3,))]) 创建聚合过程,我们会得到一个奇怪的错误,因为 TensorFlow 会尝试将 tf.Tensorlist 相乘。

问题在于,我们不是将张量结构乘以常数,而是需要将结构中的每个张量乘以常数。解决此问题的常用方法是在创建的 tff.tensorflow.computation 中使用 tf.nest 模块。

因此,与结构化类型兼容的先前 ExampleTaskFactory 版本如下所示

@tff.tensorflow.computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tensorflow.computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tensorflow.computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def create(self, value_type):
    @tff.federated_computation()
    def initialize_fn():
      return tff.federated_value(0.0, tff.SERVER)

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.FederatedType(value_type, tff.CLIENTS))
    def next_fn(state, value):
      new_state = tff.federated_map(add_one, state)
      state_at_clients = tff.federated_broadcast(new_state)
      scaled_value = tff.federated_map(scale, (value, state_at_clients))
      summed_value = tff.federated_sum(scaled_value)
      unscaled_value = tff.federated_map(unscale, (summed_value, new_state))
      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=summed_value)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

此示例突出显示了一种在构建 TFF 代码时可能需要遵循的模式。在不处理非常简单的操作时,当 tff.tensorflow.computation(将用作 tff.federated_computation 中的构建块)在单独的位置创建时,代码的可读性会更高。在 tff.federated_computation 中,这些构建块仅使用内在运算符连接。

为了验证它是否按预期工作

client_data = [[[1.0, 2.0], [3.0, 4.0, 5.0]],
               [[1.0, 1.0], [3.0, 0.0, -5.0]]]
factory = ExampleTaskFactory()
aggregation_process = factory.create(
    tff.to_type([(np.float32, (2,)), (np.float32, (3,))]))
print(f'Type signatures of the created aggregation process:\n'
      f'  - initialize: {aggregation_process.initialize.type_signature}\n'
      f'  - next: {aggregation_process.next.type_signature}\n')

state = aggregation_process.initialize()
output = aggregation_process.next(state, client_data)
print(f'Aggregation result: [{output.result[0]}, {output.result[1]}]\n'
      f'          Expected: [[2. 3.], [6. 4. 0.]]')
Type signatures of the created aggregation process:

  - initialize: ( -> float32@SERVER)
  - next: (<state=float32@SERVER,value={<float32[2],float32[3]>}@CLIENTS> -> <state=float32@SERVER,result=<float32[2],float32[3]>@SERVER,measurements=<float32[2],float32[3]>@SERVER>)

Aggregation result: [[2. 3.], [6. 4. 0.]]
          Expected: [[2. 3.], [6. 4. 0.]]

内部聚合

最后一步是可选地启用将实际聚合委托给其他工厂,以便轻松组合不同的聚合技术。

这是通过在我们的 ExampleTaskFactory 构造函数中创建一个可选的 inner_factory 参数来实现的。如果未指定,则使用 tff.aggregators.SumFactory,它应用了上一节中直接使用的 tff.federated_sum 运算符。

当调用 create 时,我们可以首先调用 inner_factorycreate 来使用相同的 value_type 创建内部聚合过程。

initialize_fn 返回的我们进程的状态由两部分组成:由“此”进程创建的状态和新创建的内部进程的状态。

next_fn 的实现不同之处在于实际聚合被委托给内部进程的 next 函数,以及最终输出的组合方式。状态再次由“此”和“内部”状态组成,测量值以类似于 OrderedDict 的方式组成。

以下是这种模式的实现。

@tff.tensorflow.computation()
def scale(value, factor):
  return tf.nest.map_structure(lambda x: x * factor, value)

@tff.tensorflow.computation()
def unscale(value, factor):
  return tf.nest.map_structure(lambda x: x / factor, value)

@tff.tensorflow.computation()
def add_one(value):
  return value + 1.0

class ExampleTaskFactory(tff.aggregators.UnweightedAggregationFactory):

  def __init__(self, inner_factory=None):
    if inner_factory is None:
      inner_factory = tff.aggregators.SumFactory()
    self._inner_factory = inner_factory

  def create(self, value_type):
    inner_process = self._inner_factory.create(value_type)

    @tff.federated_computation()
    def initialize_fn():
      my_state = tff.federated_value(0.0, tff.SERVER)
      inner_state = inner_process.initialize()
      return tff.federated_zip((my_state, inner_state))

    @tff.federated_computation(initialize_fn.type_signature.result,
                               tff.FederatedType(value_type, tff.CLIENTS))
    def next_fn(state, value):
      my_state, inner_state = state
      my_new_state = tff.federated_map(add_one, my_state)
      my_state_at_clients = tff.federated_broadcast(my_new_state)
      scaled_value = tff.federated_map(scale, (value, my_state_at_clients))

      # Delegation to an inner factory, returning values placed at SERVER.
      inner_output = inner_process.next(inner_state, scaled_value)

      unscaled_value = tff.federated_map(unscale, (inner_output.result, my_new_state))

      new_state = tff.federated_zip((my_new_state, inner_output.state))
      measurements = tff.federated_zip(
          collections.OrderedDict(
              scaled_value=inner_output.result,
              example_task=inner_output.measurements))

      return tff.templates.MeasuredProcessOutput(
          state=new_state, result=unscaled_value, measurements=measurements)

    return tff.templates.AggregationProcess(initialize_fn, next_fn)

当委托给 inner_process.next 函数时,我们得到的返回结构是 tff.templates.MeasuredProcessOutput,它具有相同的三个字段 - stateresultmeasurements。在创建组合聚合过程的整体返回结构时,statemeasurements 字段应通常组合在一起并返回。相反,result 字段对应于要聚合的值,而是“流过”组合的聚合。

state 对象应被视为工厂的实现细节,因此组合可以是任何结构。但是,measurements 对应于将在某个时间点向用户报告的值。因此,我们建议使用 OrderedDict,并使用组合命名,以便清楚地了解报告的指标来自组合中的哪个位置。

还要注意 tff.federated_zip 运算符的使用。由创建的进程控制的 state 对象应为 tff.FederatedType。如果我们改为在 initialize_fn 中返回 (this_state, inner_state),则其返回类型签名将是包含两个 tff.FederatedType 元组的 tff.StructType。使用 tff.federated_ziptff.FederatedType “提升”到顶层。这在 next_fn 中准备要返回的状态和测量值时也类似地使用。

最后,我们可以看到如何在默认内部聚合中使用它

client_data = [1.0, 2.0, 5.0]
factory = ExampleTaskFactory()
aggregation_process = factory.create(tff.TensorType(np.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: ()

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: ()

... 以及使用不同的内部聚合。例如,一个 ExampleTaskFactory

client_data = [1.0, 2.0, 5.0]
# Note the inner delegation can be to any UnweightedAggregaionFactory.
# In this case, each factory creates process that multiplies by the iteration
# index (1, 2, 3, ...), thus their combination multiplies by (1, 4, 9, ...).
factory = ExampleTaskFactory(ExampleTaskFactory())
aggregation_process = factory.create(tff.TensorType(np.float32))
state = aggregation_process.initialize()

output = aggregation_process.next(state, client_data)
print('| Round #1')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')

output = aggregation_process.next(output.state, client_data)
print('\n| Round #2')
print(f'|           Aggregation result: {output.result}   (expected 8.0)')
print(f'| measurements[\'scaled_value\']: {output.measurements["scaled_value"]}')
print(f'| measurements[\'example_task\']: {output.measurements["example_task"]}')
| Round #1
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 8.0
| measurements['example_task']: OrderedDict([('scaled_value', 8.0), ('example_task', ())])

| Round #2
|           Aggregation result: 8.0   (expected 8.0)
| measurements['scaled_value']: 16.0
| measurements['example_task']: OrderedDict([('scaled_value', 32.0), ('example_task', ())])

总结

在本教程中,我们解释了创建通用聚合构建块(表示为聚合工厂)的最佳实践。通用性通过设计意图以两种方式实现

  1. 参数化计算。聚合是一个独立的构建块,可以插入其他 TFF 模块,这些模块旨在与 tff.aggregators 一起使用以参数化其必要的聚合,例如 tff.learning.algorithms.build_weighted_fed_avg
  2. 聚合组合。 聚合构建块可以与其他聚合构建块组合以创建更复杂的复合聚合。