文本生成联合学习

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

本教程基于 图像分类联合学习 教程中的概念,并展示了联合学习的其他几种有用方法。

特别是,我们加载了一个先前训练的 Keras 模型,并使用联合训练在(模拟)分散数据集上对其进行微调。这在实践中非常重要,原因有几个。使用序列化模型的能力使联合学习与其他 ML 方法的混合变得容易。此外,这允许使用越来越多的预训练模型——例如,从头开始训练语言模型很少有必要,因为现在有许多预训练模型广泛可用(参见,例如,TF Hub)。相反,从预训练模型开始,并使用联合学习对其进行微调,以适应特定应用程序的分散数据的特定特征,更有意义。

在本教程中,我们从一个生成 ASCII 字符的 RNN 开始,并通过联合学习对其进行微调。我们还展示了如何将最终权重反馈到原始 Keras 模型,从而允许使用标准工具轻松评估和生成文本。

pip install --quiet --upgrade tensorflow-federated
import collections
import functools
import os
import time

import numpy as np
import tensorflow as tf
import tensorflow_federated as tff

np.random.seed(0)

# Test that TFF is working:
tff.federated_computation(lambda: 'Hello, World!')()
b'Hello, World!'

加载预训练模型

我们加载一个模型,该模型是在遵循 TensorFlow 教程 使用 RNN 和急切执行进行文本生成 后预训练的。但是,我们不是在 莎士比亚全集 上进行训练,而是在查尔斯·狄更斯的 双城记圣诞颂歌 的文本上预训练的模型。

除了扩展词汇量之外,我们没有修改原始教程,因此这个初始模型不是最先进的,但它会产生合理的预测,足以满足我们的教程目的。最终模型使用 tf.keras.models.save_model(include_optimizer=False) 保存。

在本教程中,我们将使用联合学习来微调这个模型以用于莎士比亚,使用 TFF 提供的联合版本的数据。

生成词汇查找表

# A fixed vocabularly of ASCII chars that occur in the works of Shakespeare and Dickens:
vocab = list('dhlptx@DHLPTX $(,048cgkoswCGKOSW[_#\'/37;?bfjnrvzBFJNRVZ"&*.26:\naeimquyAEIMQUY]!%)-159\r')

# Creating a mapping from unique characters to indices
char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)

加载预训练模型并生成一些文本

def load_model(batch_size):
  urls = {
      1: 'https://storage.googleapis.com/tff-models-public/dickens_rnn.batch1.kerasmodel',
      8: 'https://storage.googleapis.com/tff-models-public/dickens_rnn.batch8.kerasmodel'}
  assert batch_size in urls, 'batch_size must be in ' + str(urls.keys())
  url = urls[batch_size]
  local_file = tf.keras.utils.get_file(os.path.basename(url), origin=url)  
  return tf.keras.models.load_model(local_file, compile=False)
def generate_text(model, start_string):
  # From https://tensorflowcn.cn/tutorials/sequences/text_generation
  num_generate = 200
  input_eval = [char2idx[s] for s in start_string]
  input_eval = tf.expand_dims(input_eval, 0)
  text_generated = []
  temperature = 1.0

  model.reset_states()
  for i in range(num_generate):
    predictions = model(input_eval)
    predictions = tf.squeeze(predictions, 0)
    predictions = predictions / temperature
    predicted_id = tf.random.categorical(
        predictions, num_samples=1)[-1, 0].numpy()
    input_eval = tf.expand_dims([predicted_id], 0)
    text_generated.append(idx2char[predicted_id])

  return (start_string + ''.join(text_generated))
# Text generation requires a batch_size=1 model.
keras_model_batch1 = load_model(batch_size=1)
print(generate_text(keras_model_batch1, 'What of TensorFlow Federated, you ask? '))
Downloading data from https://storage.googleapis.com/tff-models-public/dickens_rnn.batch1.kerasmodel
16193984/16193984 [==============================] - 0s 0us/step
What of TensorFlow Federated, you ask? Same yee you? Have I so,
often games in a man who rode one knee over his friend, with the
stone faces of the dread prisoners, dud a tender mastery. They
are not alive is infirmed us--to ever resume

加载和预处理联合莎士比亚数据

tff.simulation.datasets 包提供了各种数据集,这些数据集被分成“客户端”,每个客户端对应于可能参与联合学习的特定设备上的数据集。

这些数据集提供了逼真的非 IID 数据分布,在模拟中复制了在真实分散数据上进行训练的挑战。此数据的某些预处理是使用 Leaf 项目 (github) 中的工具完成的。

train_data, test_data = tff.simulation.datasets.shakespeare.load_data()

shakespeare.load_data() 提供的数据集由一系列字符串 Tensors 组成,每个字符串对应于莎士比亚戏剧中特定角色说的一行。客户端键由戏剧名称和角色名称连接而成,例如 MUCH_ADO_ABOUT_NOTHING_OTHELLO 对应于戏剧《皆大欢喜》中奥赛罗的角色的台词。请注意,在真实的联合学习场景中,客户端永远不会通过 ID 识别或跟踪,但对于模拟来说,使用带键的数据集很有用。

例如,这里我们可以查看来自《李尔王》的一些数据

# Here the play is "The Tragedy of King Lear" and the character is "King".
raw_example_dataset = train_data.create_tf_dataset_for_client(
    'THE_TRAGEDY_OF_KING_LEAR_KING')
# To allow for future extensions, each entry x
# is an OrderedDict with a single key 'snippets' which contains the text.
for x in raw_example_dataset.take(2):
  print(x['snippets'])
tf.Tensor(b'', shape=(), dtype=string)
tf.Tensor(b'What?', shape=(), dtype=string)

现在,我们使用 tf.data.Dataset 变换来准备这些数据,以便训练上面加载的 char RNN。

# Input pre-processing parameters
SEQ_LENGTH = 100
BATCH_SIZE = 8
BUFFER_SIZE = 100  # For dataset shuffling
# Construct a lookup table to map string chars to indexes,
# using the vocab loaded above:
table = tf.lookup.StaticHashTable(
    tf.lookup.KeyValueTensorInitializer(
        keys=vocab, values=tf.constant(list(range(len(vocab))),
                                       dtype=tf.int64)),
    default_value=0)


def to_ids(x):
  s = tf.reshape(x['snippets'], shape=[1])
  chars = tf.strings.bytes_split(s).values
  ids = table.lookup(chars)
  return ids


def split_input_target(chunk):
  input_text = tf.map_fn(lambda x: x[:-1], chunk)
  target_text = tf.map_fn(lambda x: x[1:], chunk)
  return (input_text, target_text)


def preprocess(dataset):
  return (
      # Map ASCII chars to int64 indexes using the vocab
      dataset.map(to_ids)
      # Split into individual chars
      .unbatch()
      # Form example sequences of SEQ_LENGTH +1
      .batch(SEQ_LENGTH + 1, drop_remainder=True)
      # Shuffle and form minibatches
      .shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
      # And finally split into (input, target) tuples,
      # each of length SEQ_LENGTH.
      .map(split_input_target))

请注意,在生成原始序列和以上批次时,为了简便起见,我们使用 drop_remainder=True。这意味着任何文本字符数少于 (SEQ_LENGTH + 1) * BATCH_SIZE 的字符(客户端)将拥有空数据集。解决此问题的一种典型方法是用特殊标记填充批次,然后屏蔽损失以不考虑填充标记。

这会使示例变得稍微复杂,因此在本教程中,我们只使用完整批次,如 标准教程 中所述。但是,在联邦设置中,这个问题更为严重,因为许多用户可能拥有小型数据集。

现在,我们可以预处理我们的 raw_example_dataset 并检查类型。

example_dataset = preprocess(raw_example_dataset)
print(example_dataset.element_spec)
(TensorSpec(shape=(8, 100), dtype=tf.int64, name=None), TensorSpec(shape=(8, 100), dtype=tf.int64, name=None))

编译模型并在预处理数据上进行测试。

我们加载了一个未编译的 Keras 模型,但为了运行 keras_model.evaluate,我们需要用损失和指标对其进行编译。我们还将在其中编译一个优化器,它将用作联邦学习中的设备上优化器。

原始教程没有字符级准确率(预测中最高概率放在正确下一个字符上的比例)。这是一个有用的指标,因此我们添加了它。但是,我们需要为此定义一个新的指标类,因为我们的预测具有等级 3(每个 BATCH_SIZE * SEQ_LENGTH 预测的 logits 向量),而 SparseCategoricalAccuracy 仅期望等级 2 预测。

class FlattenedCategoricalAccuracy(tf.keras.metrics.SparseCategoricalAccuracy):

  def __init__(self, name='accuracy', dtype=tf.float32):
    super().__init__(name, dtype=dtype)

  def update_state(self, y_true, y_pred, sample_weight=None):
    y_true = tf.reshape(y_true, [-1, 1])
    y_pred = tf.reshape(y_pred, [-1, len(vocab), 1])
    return super().update_state(y_true, y_pred, sample_weight)

现在,我们可以编译一个模型,并在我们的 example_dataset 上对其进行评估。

BATCH_SIZE = 8  # The training and eval batch size for the rest of this tutorial.
keras_model = load_model(batch_size=BATCH_SIZE)
keras_model.compile(
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[FlattenedCategoricalAccuracy()])

# Confirm that loss is much lower on Shakespeare than on random data
loss, accuracy = keras_model.evaluate(example_dataset.take(5), verbose=0)
print(
    'Evaluating on an example Shakespeare character: {a:3f}'.format(a=accuracy))

# As a sanity check, we can construct some completely random data, where we expect
# the accuracy to be essentially random:
random_guessed_accuracy = 1.0 / len(vocab)
print('Expected accuracy for random guessing: {a:.3f}'.format(
    a=random_guessed_accuracy))
random_indexes = np.random.randint(
    low=0, high=len(vocab), size=1 * BATCH_SIZE * (SEQ_LENGTH + 1))
data = collections.OrderedDict(
    snippets=tf.constant(
        ''.join(np.array(vocab)[random_indexes]), shape=[1, 1]))
random_dataset = preprocess(tf.data.Dataset.from_tensor_slices(data))
loss, accuracy = keras_model.evaluate(random_dataset, steps=10, verbose=0)
print('Evaluating on completely random data: {a:.3f}'.format(a=accuracy))
Downloading data from https://storage.googleapis.com/tff-models-public/dickens_rnn.batch8.kerasmodel
16193984/16193984 [==============================] - 0s 0us/step
Evaluating on an example Shakespeare character: 0.45.000
Expected accuracy for random guessing: 0.012
Evaluating on completely random data: 0.011

使用联邦学习微调模型。

TFF 将所有 TensorFlow 计算序列化,以便它们可以在非 Python 环境中运行(即使目前只有在 Python 中实现的模拟运行时可用)。即使我们在急切模式下运行(TF 2.0),TFF 目前通过在 "with tf.Graph.as_default()" 语句的上下文中构建必要的操作来序列化 TensorFlow 计算。因此,我们需要提供一个函数,TFF 可以使用它将我们的模型引入它控制的图中。我们按如下方式执行此操作。

# Clone the keras_model inside `create_tff_model()`, which TFF will
# call to produce a new copy of the model inside the graph that it will 
# serialize. Note: we want to construct all the necessary objects we'll need 
# _inside_ this method.
def create_tff_model():
  # TFF uses an `input_spec` so it knows the types and shapes
  # that your model expects.
  input_spec = example_dataset.element_spec
  keras_model_clone = tf.keras.models.clone_model(keras_model)
  return tff.learning.models.from_keras_model(
      keras_model_clone,
      input_spec=input_spec,
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[FlattenedCategoricalAccuracy()])

现在,我们已准备好构建一个联邦平均迭代过程,我们将使用它来改进模型(有关联邦平均算法的详细信息,请参阅论文 Communication-Efficient Learning of Deep Networks from Decentralized Data)。

我们使用一个已编译的 Keras 模型在每轮联邦训练后执行标准(非联邦)评估。这在进行模拟联邦学习并存在标准测试数据集时,对于研究目的非常有用。

在现实的生产环境中,可以使用相同的技术将使用联邦学习训练的模型在集中式基准数据集上进行评估,以进行测试或质量保证目的。

# This command builds all the TensorFlow graphs and serializes them: 
fed_avg = tff.learning.algorithms.build_weighted_fed_avg(
    model_fn=create_tff_model,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.5))

这是最简单的循环,我们在单个客户端的单个批次上对联邦平均进行一轮运行。

state = fed_avg.initialize()
result = fed_avg.next(state, [example_dataset.take(5)])
state = result.state
train_metrics = result.metrics['client_work']['train']
print('loss={l:.3f}, accuracy={a:.3f}'.format(
    l=train_metrics['loss'], a=train_metrics['accuracy']))
loss=4.399, accuracy=0.139

现在,让我们编写一个稍微有趣一些的训练和评估循环。

为了使此模拟仍然运行得相对快,我们在每轮中对相同的三个客户端进行训练,只考虑每个客户端的两个小批量。

def data(client, source=train_data):
  return preprocess(source.create_tf_dataset_for_client(client)).take(5)


clients = [
    'ALL_S_WELL_THAT_ENDS_WELL_CELIA', 'MUCH_ADO_ABOUT_NOTHING_OTHELLO',
]

train_datasets = [data(client) for client in clients]

# We concatenate the test datasets for evaluation with Keras by creating a 
# Dataset of Datasets, and then identity flat mapping across all the examples.
test_dataset = tf.data.Dataset.from_tensor_slices(
    [data(client, test_data) for client in clients]).flat_map(lambda x: x)

fed_avg.initialize() 生成的模型的初始状态基于 Keras 模型的随机初始化器,而不是加载的权重,因为 clone_model() 不会克隆权重。要从预训练模型开始训练,我们直接从加载的模型中设置服务器状态中的模型权重。

NUM_ROUNDS = 5

# The state of the FL server, containing the model and optimization state.
state = fed_avg.initialize()

# Load our pre-trained Keras model weights into the global model state.
pre_trained_weights = tff.learning.models.ModelWeights(
    trainable=[v.numpy() for v in keras_model.trainable_weights],
    non_trainable=[v.numpy() for v in keras_model.non_trainable_weights]
)
state = fed_avg.set_model_weights(state, pre_trained_weights)


def keras_evaluate(state, round_num):
  # Take our global model weights and push them back into a Keras model to
  # use its standard `.evaluate()` method.
  keras_model = load_model(batch_size=BATCH_SIZE)
  keras_model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[FlattenedCategoricalAccuracy()])
  model_weights = fed_avg.get_model_weights(state)
  model_weights.assign_weights_to(keras_model)
  loss, accuracy = keras_model.evaluate(example_dataset, steps=2, verbose=0)
  print('\tEval: loss={l:.3f}, accuracy={a:.3f}'.format(l=loss, a=accuracy))


for round_num in range(NUM_ROUNDS):
  print('Round {r}'.format(r=round_num))
  keras_evaluate(state, round_num)
  result = fed_avg.next(state, train_datasets)
  state = result.state
  train_metrics = result.metrics['client_work']['train']
  print('\tTrain: loss={l:.3f}, accuracy={a:.3f}'.format(
      l=train_metrics['loss'], a=train_metrics['accuracy']))

print('Final evaluation')
keras_evaluate(state, NUM_ROUNDS + 1)
Round 0
    Eval: loss=3.171, accuracy=0.428
    Train: loss=4.309, accuracy=0.098
Round 1
    Eval: loss=4.188, accuracy=0.185
    Train: loss=4.037, accuracy=0.223
Round 2
    Eval: loss=3.948, accuracy=0.200
    Train: loss=3.797, accuracy=0.228
Round 3
    Eval: loss=3.826, accuracy=0.179
    Train: loss=3.662, accuracy=0.219
Round 4
    Eval: loss=3.723, accuracy=0.171
    Train: loss=3.440, accuracy=0.245
Final evaluation
    Eval: loss=3.599, accuracy=0.181

使用默认更改,我们没有进行足够的训练来产生很大差异,但如果你在更多莎士比亚数据上进行更长时间的训练,你应该会看到使用更新后的模型生成的文本风格的差异。

# Set our newly trained weights back in the originally created model.
keras_model_batch1.set_weights([v.numpy() for v in keras_model.weights])
# Text generation requires batch_size=1
print(generate_text(keras_model_batch1, 'What of TensorFlow Federated, you ask? '))
What of TensorFlow Federated, you ask? She will be
heard of; or whether they recovered her faltering place, that a great mark of
being so final dark and distrustner the dearer to the chin, all
staftly towards him, or trot's in foot thro

建议的扩展

本教程只是第一步!以下是一些关于如何尝试扩展此笔记本的思路。

  • 编写一个更现实的训练循环,在其中随机抽取要训练的客户端。
  • 在客户端数据集上使用 ".repeat(NUM_EPOCHS)" 来尝试多次本地训练周期(例如,如 McMahan 等人 所述)。另请参阅 用于图像分类的联邦学习,它执行此操作。
  • 更改 compile() 命令以尝试在客户端上使用不同的优化算法。
  • 尝试 server_optimizer 参数到 build_weighted_fed_avg 以尝试在服务器上应用模型更新的不同算法。