用于联邦学习研究的 TFF:模型和更新压缩

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

在本教程中,我们使用 EMNIST 数据集来演示如何使用 tff.learning API 在联邦平均算法中启用有损压缩算法以降低通信成本。有关联邦平均算法的更多详细信息,请参阅论文 Communication-Efficient Learning of Deep Networks from Decentralized Data

开始之前

开始之前,请运行以下代码以确保您的环境已正确设置。如果您没有看到问候语,请参阅 安装 指南以获取说明。

pip install --quiet --upgrade tensorflow-federated
pip install --quiet --upgrade tensorflow-model-optimization
%load_ext tensorboard

import functools

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

验证 TFF 是否正常工作。

@tff.federated_computation
def hello_world():
  return 'Hello, World!'

hello_world()
b'Hello, World!'

准备输入数据

在本节中,我们加载并预处理 TFF 中包含的 EMNIST 数据集。有关 EMNIST 数据集的更多详细信息,请查看 用于图像分类的联邦学习 教程。

# This value only applies to EMNIST dataset, consider choosing appropriate
# values if switching to other datasets.
MAX_CLIENT_DATASET_SIZE = 418

CLIENT_EPOCHS_PER_ROUND = 1
CLIENT_BATCH_SIZE = 20
TEST_BATCH_SIZE = 500

emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(
    only_digits=True)

def reshape_emnist_element(element):
  return (tf.expand_dims(element['pixels'], axis=-1), element['label'])

def preprocess_train_dataset(dataset):
  """Preprocessing function for the EMNIST training dataset."""
  return (dataset
          # Shuffle according to the largest client dataset
          .shuffle(buffer_size=MAX_CLIENT_DATASET_SIZE)
          # Repeat to do multiple local epochs
          .repeat(CLIENT_EPOCHS_PER_ROUND)
          # Batch to a fixed client batch size
          .batch(CLIENT_BATCH_SIZE, drop_remainder=False)
          # Preprocessing step
          .map(reshape_emnist_element))

emnist_train = emnist_train.preprocess(preprocess_train_dataset)

定义模型

这里,我们基于原始 FedAvg CNN 定义了一个 Keras 模型,然后将 Keras 模型包装在 tff.learning.models.VariableModel 的实例中,以便 TFF 可以使用它。

请注意,我们需要一个函数来生成模型,而不仅仅是直接生成模型。此外,该函数不能仅仅捕获预先构建的模型,它必须在调用它的上下文中创建模型。原因是 TFF 被设计为进入设备,并且需要控制何时构建资源,以便可以捕获和打包它们。

def create_original_fedavg_cnn_model(only_digits=True):
  """The CNN model used in https://arxiv.org/abs/1602.05629."""
  data_format = 'channels_last'

  max_pool = functools.partial(
      tf.keras.layers.MaxPooling2D,
      pool_size=(2, 2),
      padding='same',
      data_format=data_format)
  conv2d = functools.partial(
      tf.keras.layers.Conv2D,
      kernel_size=5,
      padding='same',
      data_format=data_format,
      activation=tf.nn.relu)

  model = tf.keras.models.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
      conv2d(filters=32),
      max_pool(),
      conv2d(filters=64),
      max_pool(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(512, activation=tf.nn.relu),
      tf.keras.layers.Dense(10 if only_digits else 62),
      tf.keras.layers.Softmax(),
  ])

  return model

# Gets the type information of the input data. TFF is a strongly typed
# functional programming framework, and needs type information about inputs to 
# the model.
input_spec = emnist_train.create_tf_dataset_for_client(
    emnist_train.client_ids[0]).element_spec

def tff_model_fn():
  keras_model = create_original_fedavg_cnn_model()
  return tff.learning.models.from_keras_model(
      keras_model=keras_model,
      input_spec=input_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])

训练模型并输出训练指标

现在,我们准备构建一个联邦平均算法,并在 EMNIST 数据集上训练定义的模型。

首先,我们需要使用 tff.learning.algorithms.build_weighted_fed_avg API 构建一个联邦平均算法。

federated_averaging = tff.learning.algorithms.build_weighted_fed_avg(
    model_fn=tff_model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))

现在,让我们运行联邦平均算法。从 TFF 的角度来看,联邦学习算法的执行如下所示

  1. 初始化算法并获取初始服务器状态。服务器状态包含执行算法所需的必要信息。请记住,由于 TFF 是函数式的,因此该状态包括算法使用的任何优化器状态(例如,动量项)以及模型参数本身——这些将作为参数传递并作为 TFF 计算的结果返回。
  2. 逐轮执行算法。在每一轮中,都会返回一个新的服务器状态,作为每个客户端在其数据上训练模型的结果。通常,在一轮中
    1. 服务器将模型广播到所有参与的客户端。
    2. 每个客户端根据模型和自身数据执行工作。
    3. 服务器聚合所有模型以生成一个服务器状态,其中包含一个新模型。

有关更多详细信息,请参阅 自定义联邦算法,第 2 部分:实现联邦平均 教程。

训练指标被写入 TensorBoard 目录,以便在训练后显示。

def train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer):
  """Trains the federated averaging process and output metrics."""

  # Initialize the Federated Averaging algorithm to get the initial server state.
  state = federated_averaging_process.initialize()

  with summary_writer.as_default():
    for round_num in range(num_rounds):
      # Sample the clients parcitipated in this round.
      sampled_clients = np.random.choice(
          emnist_train.client_ids,
          size=num_clients_per_round,
          replace=False)
      # Create a list of `tf.Dataset` instances from the data of sampled clients.
      sampled_train_data = [
          emnist_train.create_tf_dataset_for_client(client)
          for client in sampled_clients
      ]
      # Round one round of the algorithm based on the server state and client data
      # and output the new state and metrics.
      result = federated_averaging_process.next(state, sampled_train_data)
      state = result.state
      train_metrics = result.metrics['client_work']['train']

      # Add metrics to Tensorboard.
      for name, value in train_metrics.items():
          tf.summary.scalar(name, value, step=round_num)
      summary_writer.flush()
# Clean the log directory to avoid conflicts.
try:
  tf.io.gfile.rmtree('/tmp/logs/scalars')
except tf.errors.OpError as e:
  pass  # Path doesn't exist

# Set up the log directory and writer for Tensorboard.
logdir = "/tmp/logs/scalars/original/"
summary_writer = tf.summary.create_file_writer(logdir)

train(federated_averaging_process=federated_averaging, num_rounds=10,
      num_clients_per_round=10, summary_writer=summary_writer)
round  0, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.092454836), ('loss', 2.310193), ('num_examples', 941), ('num_batches', 51)]), broadcasted_bits=507.62Mibit, aggregated_bits=507.62Mibit
round  1, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.10029791), ('loss', 2.3102622), ('num_examples', 1007), ('num_batches', 55)]), broadcasted_bits=1015.24Mibit, aggregated_bits=1015.25Mibit
round  2, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.10710711), ('loss', 2.3048222), ('num_examples', 999), ('num_batches', 54)]), broadcasted_bits=1.49Gibit, aggregated_bits=1.49Gibit
round  3, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.1061061), ('loss', 2.3066027), ('num_examples', 999), ('num_batches', 55)]), broadcasted_bits=1.98Gibit, aggregated_bits=1.98Gibit
round  4, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.1287594), ('loss', 2.2999024), ('num_examples', 1064), ('num_batches', 58)]), broadcasted_bits=2.48Gibit, aggregated_bits=2.48Gibit
round  5, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.13529412), ('loss', 2.2994456), ('num_examples', 1020), ('num_batches', 55)]), broadcasted_bits=2.97Gibit, aggregated_bits=2.97Gibit
round  6, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.124045804), ('loss', 2.2947247), ('num_examples', 1048), ('num_batches', 57)]), broadcasted_bits=3.47Gibit, aggregated_bits=3.47Gibit
round  7, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.14217557), ('loss', 2.290349), ('num_examples', 1048), ('num_batches', 57)]), broadcasted_bits=3.97Gibit, aggregated_bits=3.97Gibit
round  8, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.14641434), ('loss', 2.290953), ('num_examples', 1004), ('num_batches', 56)]), broadcasted_bits=4.46Gibit, aggregated_bits=4.46Gibit
round  9, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.1695238), ('loss', 2.2859888), ('num_examples', 1050), ('num_batches', 57)]), broadcasted_bits=4.96Gibit, aggregated_bits=4.96Gibit

使用上面指定的根日志目录启动 TensorBoard 以显示训练指标。数据可能需要几秒钟才能加载。除了损失和准确率之外,我们还输出广播和聚合数据的数量。广播数据是指服务器推送到每个客户端的张量,而聚合数据是指每个客户端返回到服务器的张量。

%tensorboard --logdir /tmp/logs/scalars/ --port=0

构建自定义聚合函数

现在,让我们实现一个函数,以便对聚合数据使用有损压缩算法。我们将使用 TFF 的 API 为此创建一个 tff.aggregators.AggregationFactory。虽然研究人员可能经常想要实现自己的聚合器(可以通过 tff.aggregators API 完成),但我们将使用内置方法来完成此操作,具体来说是 tff.learning.compression_aggregator

重要的是要注意,此聚合器不会一次性对整个模型应用压缩。相反,它只对模型中足够大的那些变量应用压缩。通常,偏差等小变量对不准确性更敏感,而且由于它们相对较小,因此潜在的通信节省也相对较小。

compression_aggregator = tff.learning.compression_aggregator()
isinstance(compression_aggregator, tff.aggregators.WeightedAggregationFactory)
True

如上所示,压缩聚合器是一个加权聚合工厂,这意味着它涉及加权聚合(与用于差分隐私的聚合器不同,后者通常是非加权的)。

此聚合工厂可以通过其 model_aggregator 参数直接插入 FedAvg 中。

federated_averaging_with_compression = tff.learning.algorithms.build_weighted_fed_avg(
    tff_model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0),
    model_aggregator=compression_aggregator)

再次训练模型

现在让我们运行新的联邦平均算法。

logdir_for_compression = "/tmp/logs/scalars/compression/"
summary_writer_for_compression = tf.summary.create_file_writer(
    logdir_for_compression)

train(federated_averaging_process=federated_averaging_with_compression, 
      num_rounds=10,
      num_clients_per_round=10,
      summary_writer=summary_writer_for_compression)
round  0, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.087804876), ('loss', 2.3126457), ('num_examples', 1025), ('num_batches', 55)]), broadcasted_bits=507.62Mibit, aggregated_bits=146.47Mibit
round  1, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.073267326), ('loss', 2.3111901), ('num_examples', 1010), ('num_batches', 56)]), broadcasted_bits=1015.24Mibit, aggregated_bits=292.93Mibit
round  2, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.08925144), ('loss', 2.3071017), ('num_examples', 1042), ('num_batches', 57)]), broadcasted_bits=1.49Gibit, aggregated_bits=439.40Mibit
round  3, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.07985144), ('loss', 2.3061485), ('num_examples', 1077), ('num_batches', 59)]), broadcasted_bits=1.98Gibit, aggregated_bits=585.86Mibit
round  4, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.11947791), ('loss', 2.302166), ('num_examples', 996), ('num_batches', 55)]), broadcasted_bits=2.48Gibit, aggregated_bits=732.33Mibit
round  5, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.12195122), ('loss', 2.2997446), ('num_examples', 984), ('num_batches', 54)]), broadcasted_bits=2.97Gibit, aggregated_bits=878.79Mibit
round  6, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.10429448), ('loss', 2.2997215), ('num_examples', 978), ('num_batches', 55)]), broadcasted_bits=3.47Gibit, aggregated_bits=1.00Gibit
round  7, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.16857143), ('loss', 2.2961135), ('num_examples', 1050), ('num_batches', 56)]), broadcasted_bits=3.97Gibit, aggregated_bits=1.14Gibit
round  8, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.1399177), ('loss', 2.2942808), ('num_examples', 972), ('num_batches', 54)]), broadcasted_bits=4.46Gibit, aggregated_bits=1.29Gibit
round  9, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.14202899), ('loss', 2.2972558), ('num_examples', 1035), ('num_batches', 57)]), broadcasted_bits=4.96Gibit, aggregated_bits=1.43Gibit

再次启动 TensorBoard 以比较两次运行的训练指标。

正如您在 Tensorboard 中看到的,aggregated_bits 图中的 orginialcompression 曲线之间存在显着下降,而在 losssparse_categorical_accuracy 图中,两条曲线非常相似。

总之,我们实现了一种压缩算法,它可以实现与原始联邦平均算法相似的性能,同时通信成本显著降低。

%tensorboard --logdir /tmp/logs/scalars/ --port=0

练习

要实现自定义压缩算法并将其应用于训练循环,您可以

  1. 将新的压缩算法实现为 tff.aggregators.MeanFactory 的子类。
  2. 使用压缩算法进行训练,看看它是否比上面的算法更好。

潜在有价值的开放研究问题包括:非均匀量化、无损压缩(如霍夫曼编码)以及根据先前训练轮次的的信息调整压缩的机制。

推荐阅读材料