在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
在本教程中,我们使用 EMNIST 数据集来演示如何使用 tff.learning
API 在联邦平均算法中启用有损压缩算法以降低通信成本。有关联邦平均算法的更多详细信息,请参阅论文 Communication-Efficient Learning of Deep Networks from Decentralized Data。
开始之前
开始之前,请运行以下代码以确保您的环境已正确设置。如果您没有看到问候语,请参阅 安装 指南以获取说明。
pip install --quiet --upgrade tensorflow-federated
pip install --quiet --upgrade tensorflow-model-optimization
%load_ext tensorboard
import functools
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
验证 TFF 是否正常工作。
@tff.federated_computation
def hello_world():
return 'Hello, World!'
hello_world()
b'Hello, World!'
准备输入数据
在本节中,我们加载并预处理 TFF 中包含的 EMNIST 数据集。有关 EMNIST 数据集的更多详细信息,请查看 用于图像分类的联邦学习 教程。
# This value only applies to EMNIST dataset, consider choosing appropriate
# values if switching to other datasets.
MAX_CLIENT_DATASET_SIZE = 418
CLIENT_EPOCHS_PER_ROUND = 1
CLIENT_BATCH_SIZE = 20
TEST_BATCH_SIZE = 500
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data(
only_digits=True)
def reshape_emnist_element(element):
return (tf.expand_dims(element['pixels'], axis=-1), element['label'])
def preprocess_train_dataset(dataset):
"""Preprocessing function for the EMNIST training dataset."""
return (dataset
# Shuffle according to the largest client dataset
.shuffle(buffer_size=MAX_CLIENT_DATASET_SIZE)
# Repeat to do multiple local epochs
.repeat(CLIENT_EPOCHS_PER_ROUND)
# Batch to a fixed client batch size
.batch(CLIENT_BATCH_SIZE, drop_remainder=False)
# Preprocessing step
.map(reshape_emnist_element))
emnist_train = emnist_train.preprocess(preprocess_train_dataset)
定义模型
这里,我们基于原始 FedAvg CNN 定义了一个 Keras 模型,然后将 Keras 模型包装在 tff.learning.models.VariableModel 的实例中,以便 TFF 可以使用它。
请注意,我们需要一个函数来生成模型,而不仅仅是直接生成模型。此外,该函数不能仅仅捕获预先构建的模型,它必须在调用它的上下文中创建模型。原因是 TFF 被设计为进入设备,并且需要控制何时构建资源,以便可以捕获和打包它们。
def create_original_fedavg_cnn_model(only_digits=True):
"""The CNN model used in https://arxiv.org/abs/1602.05629."""
data_format = 'channels_last'
max_pool = functools.partial(
tf.keras.layers.MaxPooling2D,
pool_size=(2, 2),
padding='same',
data_format=data_format)
conv2d = functools.partial(
tf.keras.layers.Conv2D,
kernel_size=5,
padding='same',
data_format=data_format,
activation=tf.nn.relu)
model = tf.keras.models.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
conv2d(filters=32),
max_pool(),
conv2d(filters=64),
max_pool(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(512, activation=tf.nn.relu),
tf.keras.layers.Dense(10 if only_digits else 62),
tf.keras.layers.Softmax(),
])
return model
# Gets the type information of the input data. TFF is a strongly typed
# functional programming framework, and needs type information about inputs to
# the model.
input_spec = emnist_train.create_tf_dataset_for_client(
emnist_train.client_ids[0]).element_spec
def tff_model_fn():
keras_model = create_original_fedavg_cnn_model()
return tff.learning.models.from_keras_model(
keras_model=keras_model,
input_spec=input_spec,
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
训练模型并输出训练指标
现在,我们准备构建一个联邦平均算法,并在 EMNIST 数据集上训练定义的模型。
首先,我们需要使用 tff.learning.algorithms.build_weighted_fed_avg API 构建一个联邦平均算法。
federated_averaging = tff.learning.algorithms.build_weighted_fed_avg(
model_fn=tff_model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0))
现在,让我们运行联邦平均算法。从 TFF 的角度来看,联邦学习算法的执行如下所示
- 初始化算法并获取初始服务器状态。服务器状态包含执行算法所需的必要信息。请记住,由于 TFF 是函数式的,因此该状态包括算法使用的任何优化器状态(例如,动量项)以及模型参数本身——这些将作为参数传递并作为 TFF 计算的结果返回。
- 逐轮执行算法。在每一轮中,都会返回一个新的服务器状态,作为每个客户端在其数据上训练模型的结果。通常,在一轮中
- 服务器将模型广播到所有参与的客户端。
- 每个客户端根据模型和自身数据执行工作。
- 服务器聚合所有模型以生成一个服务器状态,其中包含一个新模型。
有关更多详细信息,请参阅 自定义联邦算法,第 2 部分:实现联邦平均 教程。
训练指标被写入 TensorBoard 目录,以便在训练后显示。
def train(federated_averaging_process, num_rounds, num_clients_per_round, summary_writer):
"""Trains the federated averaging process and output metrics."""
# Initialize the Federated Averaging algorithm to get the initial server state.
state = federated_averaging_process.initialize()
with summary_writer.as_default():
for round_num in range(num_rounds):
# Sample the clients parcitipated in this round.
sampled_clients = np.random.choice(
emnist_train.client_ids,
size=num_clients_per_round,
replace=False)
# Create a list of `tf.Dataset` instances from the data of sampled clients.
sampled_train_data = [
emnist_train.create_tf_dataset_for_client(client)
for client in sampled_clients
]
# Round one round of the algorithm based on the server state and client data
# and output the new state and metrics.
result = federated_averaging_process.next(state, sampled_train_data)
state = result.state
train_metrics = result.metrics['client_work']['train']
# Add metrics to Tensorboard.
for name, value in train_metrics.items():
tf.summary.scalar(name, value, step=round_num)
summary_writer.flush()
# Clean the log directory to avoid conflicts.
try:
tf.io.gfile.rmtree('/tmp/logs/scalars')
except tf.errors.OpError as e:
pass # Path doesn't exist
# Set up the log directory and writer for Tensorboard.
logdir = "/tmp/logs/scalars/original/"
summary_writer = tf.summary.create_file_writer(logdir)
train(federated_averaging_process=federated_averaging, num_rounds=10,
num_clients_per_round=10, summary_writer=summary_writer)
round 0, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.092454836), ('loss', 2.310193), ('num_examples', 941), ('num_batches', 51)]), broadcasted_bits=507.62Mibit, aggregated_bits=507.62Mibit round 1, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.10029791), ('loss', 2.3102622), ('num_examples', 1007), ('num_batches', 55)]), broadcasted_bits=1015.24Mibit, aggregated_bits=1015.25Mibit round 2, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.10710711), ('loss', 2.3048222), ('num_examples', 999), ('num_batches', 54)]), broadcasted_bits=1.49Gibit, aggregated_bits=1.49Gibit round 3, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.1061061), ('loss', 2.3066027), ('num_examples', 999), ('num_batches', 55)]), broadcasted_bits=1.98Gibit, aggregated_bits=1.98Gibit round 4, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.1287594), ('loss', 2.2999024), ('num_examples', 1064), ('num_batches', 58)]), broadcasted_bits=2.48Gibit, aggregated_bits=2.48Gibit round 5, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.13529412), ('loss', 2.2994456), ('num_examples', 1020), ('num_batches', 55)]), broadcasted_bits=2.97Gibit, aggregated_bits=2.97Gibit round 6, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.124045804), ('loss', 2.2947247), ('num_examples', 1048), ('num_batches', 57)]), broadcasted_bits=3.47Gibit, aggregated_bits=3.47Gibit round 7, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.14217557), ('loss', 2.290349), ('num_examples', 1048), ('num_batches', 57)]), broadcasted_bits=3.97Gibit, aggregated_bits=3.97Gibit round 8, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.14641434), ('loss', 2.290953), ('num_examples', 1004), ('num_batches', 56)]), broadcasted_bits=4.46Gibit, aggregated_bits=4.46Gibit round 9, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.1695238), ('loss', 2.2859888), ('num_examples', 1050), ('num_batches', 57)]), broadcasted_bits=4.96Gibit, aggregated_bits=4.96Gibit
使用上面指定的根日志目录启动 TensorBoard 以显示训练指标。数据可能需要几秒钟才能加载。除了损失和准确率之外,我们还输出广播和聚合数据的数量。广播数据是指服务器推送到每个客户端的张量,而聚合数据是指每个客户端返回到服务器的张量。
%tensorboard --logdir /tmp/logs/scalars/ --port=0
构建自定义聚合函数
现在,让我们实现一个函数,以便对聚合数据使用有损压缩算法。我们将使用 TFF 的 API 为此创建一个 tff.aggregators.AggregationFactory
。虽然研究人员可能经常想要实现自己的聚合器(可以通过 tff.aggregators
API 完成),但我们将使用内置方法来完成此操作,具体来说是 tff.learning.compression_aggregator
。
重要的是要注意,此聚合器不会一次性对整个模型应用压缩。相反,它只对模型中足够大的那些变量应用压缩。通常,偏差等小变量对不准确性更敏感,而且由于它们相对较小,因此潜在的通信节省也相对较小。
compression_aggregator = tff.learning.compression_aggregator()
isinstance(compression_aggregator, tff.aggregators.WeightedAggregationFactory)
True
如上所示,压缩聚合器是一个加权聚合工厂,这意味着它涉及加权聚合(与用于差分隐私的聚合器不同,后者通常是非加权的)。
此聚合工厂可以通过其 model_aggregator
参数直接插入 FedAvg 中。
federated_averaging_with_compression = tff.learning.algorithms.build_weighted_fed_avg(
tff_model_fn,
client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0),
model_aggregator=compression_aggregator)
再次训练模型
现在让我们运行新的联邦平均算法。
logdir_for_compression = "/tmp/logs/scalars/compression/"
summary_writer_for_compression = tf.summary.create_file_writer(
logdir_for_compression)
train(federated_averaging_process=federated_averaging_with_compression,
num_rounds=10,
num_clients_per_round=10,
summary_writer=summary_writer_for_compression)
round 0, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.087804876), ('loss', 2.3126457), ('num_examples', 1025), ('num_batches', 55)]), broadcasted_bits=507.62Mibit, aggregated_bits=146.47Mibit round 1, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.073267326), ('loss', 2.3111901), ('num_examples', 1010), ('num_batches', 56)]), broadcasted_bits=1015.24Mibit, aggregated_bits=292.93Mibit round 2, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.08925144), ('loss', 2.3071017), ('num_examples', 1042), ('num_batches', 57)]), broadcasted_bits=1.49Gibit, aggregated_bits=439.40Mibit round 3, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.07985144), ('loss', 2.3061485), ('num_examples', 1077), ('num_batches', 59)]), broadcasted_bits=1.98Gibit, aggregated_bits=585.86Mibit round 4, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.11947791), ('loss', 2.302166), ('num_examples', 996), ('num_batches', 55)]), broadcasted_bits=2.48Gibit, aggregated_bits=732.33Mibit round 5, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.12195122), ('loss', 2.2997446), ('num_examples', 984), ('num_batches', 54)]), broadcasted_bits=2.97Gibit, aggregated_bits=878.79Mibit round 6, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.10429448), ('loss', 2.2997215), ('num_examples', 978), ('num_batches', 55)]), broadcasted_bits=3.47Gibit, aggregated_bits=1.00Gibit round 7, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.16857143), ('loss', 2.2961135), ('num_examples', 1050), ('num_batches', 56)]), broadcasted_bits=3.97Gibit, aggregated_bits=1.14Gibit round 8, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.1399177), ('loss', 2.2942808), ('num_examples', 972), ('num_batches', 54)]), broadcasted_bits=4.46Gibit, aggregated_bits=1.29Gibit round 9, train_metrics=OrderedDict([('sparse_categorical_accuracy', 0.14202899), ('loss', 2.2972558), ('num_examples', 1035), ('num_batches', 57)]), broadcasted_bits=4.96Gibit, aggregated_bits=1.43Gibit
再次启动 TensorBoard 以比较两次运行的训练指标。
正如您在 Tensorboard 中看到的,aggregated_bits
图中的 orginial
和 compression
曲线之间存在显着下降,而在 loss
和 sparse_categorical_accuracy
图中,两条曲线非常相似。
总之,我们实现了一种压缩算法,它可以实现与原始联邦平均算法相似的性能,同时通信成本显著降低。
%tensorboard --logdir /tmp/logs/scalars/ --port=0
练习
要实现自定义压缩算法并将其应用于训练循环,您可以
- 将新的压缩算法实现为 tff.aggregators.MeanFactory 的子类。
- 使用压缩算法进行训练,看看它是否比上面的算法更好。
潜在有价值的开放研究问题包括:非均匀量化、无损压缩(如霍夫曼编码)以及根据先前训练轮次的的信息调整压缩的机制。
推荐阅读材料
- 通过减少客户端资源需求来扩展联邦学习的范围
- 联邦学习:提高通信效率的策略
- 第 3.5 节 通信和压缩 在 联邦学习中的高级问题和开放问题 中