在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看 | 下载笔记本 |
此笔记本演示了一种使用 TFCO 库创建和优化约束问题的方法。当我们发现模型在数据的不同切片中表现不佳时,这种方法可以帮助改进模型,我们可以使用 公平性指标 来识别这些切片。Google 的第二项 AI 原则指出,我们的技术应避免创建或强化不公平的偏见,我们相信这种技术可以帮助在某些情况下提高模型的公平性。具体来说,此笔记本将
- 使用
tf.keras
和大规模 CelebFaces 属性 (CelebA) 数据集,训练一个简单的无约束神经网络模型来检测图像中的人是否在微笑。 - 使用公平性指标评估模型在不同年龄组中的性能,使用常用的公平性指标。
- 设置一个简单的约束优化问题,以实现跨年龄组的更公平的性能。
- 重新训练现在受约束的模型并再次评估性能,确保我们选择的公平性指标有所改善。
上次更新时间:2020 年 2 月 11 日
安装
此笔记本是在 Colaboratory 中创建的,连接到 Python 3 Google Compute Engine 后端。如果您希望在其他环境中托管此笔记本,那么只要您在下面的单元格中包含所有必需的包,您就不应该遇到任何重大问题。
请注意,您第一次运行 pip 安装时,可能会要求您重新启动运行时,因为预安装的包已过时。完成此操作后,将使用正确的包。
Pip 安装
请注意,根据您运行下方单元格的时间,您可能会收到有关 Colab 中 TensorFlow 的默认版本即将切换到 TensorFlow 2.X 的警告。您可以安全地忽略该警告,因为此笔记本旨在与 TensorFlow 1.X 和 2.X 兼容。
导入模块
此外,我们添加了一些特定于公平性指标的导入,我们将使用这些指标来评估和可视化模型的性能。
与公平性指标相关的导入
虽然 TFCO 与急切执行和图形执行兼容,但此笔记本假设急切执行默认情况下已启用,就像在 TensorFlow 2.x 中一样。为了确保不会出现任何问题,将在下面的单元格中启用急切执行。
启用急切执行并打印版本
CelebA 数据集
CelebA 是一个大规模人脸属性数据集,包含超过 200,000 张名人图像,每张图像都带有 40 个属性注释(例如发型、时尚配饰、面部特征等)和 5 个地标位置(眼睛、嘴巴和鼻子位置)。有关更多详细信息,请查看 论文。在所有者的许可下,我们将此数据集存储在 Google Cloud Storage 上,并且主要通过 TensorFlow Datasets(tfds
) 访问它。
在此笔记本中
- 我们的模型将尝试对图像中的主体是否在微笑进行分类,如“微笑”属性* 所示。
- 图像将从 218x178 调整大小到 28x28,以减少训练时的执行时间和内存。
- 我们的模型性能将在不同年龄组中进行评估,使用二进制“年轻”属性。在本笔记本中,我们将称之为“年龄组”。
* 虽然关于此数据集的标签方法的信息很少,但我们将假设“微笑”属性是通过主体脸上愉悦、友善或开心的表情来确定的。在本案例研究中,我们将这些标签视为真实情况。
gcs_base_dir = "gs://celeb_a_dataset/"
celeb_a_builder = tfds.builder("celeb_a", data_dir=gcs_base_dir, version='2.0.0')
celeb_a_builder.download_and_prepare()
num_test_shards_dict = {'0.3.0': 4, '2.0.0': 2} # Used because we download the test dataset separately
version = str(celeb_a_builder.info.version)
print('Celeb_A dataset version: %s' % version)
测试数据集辅助函数
注意事项
在继续之前,在使用 CelebA 时需要考虑以下几点
- 虽然原则上此笔记本可以使用任何面部图像数据集,但选择 CelebA 是因为它包含公众人物的公共领域图像。
- CelebA 中的所有属性注释都作为二进制类别进行操作。例如,“年轻”属性(由数据集标签器确定)在图像中表示为存在或不存在。
- CelebA 的分类不反映人类属性的真实多样性。
- 在本笔记本中,包含“年轻”属性的特征被称为“年龄组”,其中图像中存在“年轻”属性被标记为“年轻”年龄组的成员,而不存在“年轻”属性被标记为“非年轻”年龄组的成员。这些是根据假设做出的,因为这些信息在 原始论文 中没有提到。
- 因此,在本笔记本中训练的模型的性能与 CelebA 作者对属性的操作和注释方式有关。
- 此模型不应用于商业目的,因为这将违反CelebA 的非商业研究协议。
设置输入函数
后续单元格将有助于简化输入管道并可视化性能。
首先,我们定义一些与数据相关的变量并定义必要的预处理函数。
定义变量
定义预处理函数
然后,我们构建在 colab 中其余部分需要的數據函数。
# Train data returning either 2 or 3 elements (the third element being the group)
def celeb_a_train_data_wo_group(batch_size):
celeb_a_train_data = celeb_a_builder.as_dataset(split='train').shuffle(1024).repeat().batch(batch_size).map(preprocess_input_dict)
return celeb_a_train_data.map(get_image_and_label)
def celeb_a_train_data_w_group(batch_size):
celeb_a_train_data = celeb_a_builder.as_dataset(split='train').shuffle(1024).repeat().batch(batch_size).map(preprocess_input_dict)
return celeb_a_train_data.map(get_image_label_and_group)
# Test data for the overall evaluation
celeb_a_test_data = celeb_a_builder.as_dataset(split='test').batch(1).map(preprocess_input_dict).map(get_image_label_and_group)
# Copy test data locally to be able to read it into tfma
copy_test_files_to_local()
构建一个简单的 DNN 模型
由于此笔记本重点关注 TFCO,我们将组装一个简单的、无约束的 tf.keras.Sequential
模型。
我们可以通过增加一些复杂性(例如,更多密集连接层、探索不同的激活函数、增加图像大小)来大大提高模型性能,但这可能会分散对演示使用 Keras 时 TFCO 库易用性的目标。出于这个原因,模型将保持简单 - 但鼓励您探索这个空间。
def create_model():
# For this notebook, accuracy will be used to evaluate performance.
METRICS = [
tf.keras.metrics.BinaryAccuracy(name='accuracy')
]
# The model consists of:
# 1. An input layer that represents the 28x28x3 image flatten.
# 2. A fully connected layer with 64 units activated by a ReLU function.
# 3. A single-unit readout layer to output real-scores instead of probabilities.
model = keras.Sequential([
keras.layers.Flatten(input_shape=(IMAGE_SIZE, IMAGE_SIZE, 3), name='image'),
keras.layers.Dense(64, activation='relu'),
keras.layers.Dense(1, activation=None)
])
# TFCO by default uses hinge loss — and that will also be used in the model.
model.compile(
optimizer=tf.keras.optimizers.Adam(0.001),
loss='hinge',
metrics=METRICS)
return model
我们还定义了一个函数来设置种子以确保可重复的结果。请注意,此 colab 旨在作为教育工具,不具备经过精心调整的生产管道的稳定性。在不设置种子的情况下运行可能会导致结果不同。
def set_seeds():
np.random.seed(121212)
tf.compat.v1.set_random_seed(212121)
公平指标辅助函数
在训练我们的模型之前,我们定义了一些辅助函数,这些函数将使我们能够通过公平指标评估模型的性能。
首先,我们创建一个辅助函数来保存训练后的模型。
def save_model(model, subdir):
base_dir = tempfile.mkdtemp(prefix='saved_models')
model_location = os.path.join(base_dir, subdir)
model.save(model_location, save_format='tf')
return model_location
接下来,我们定义用于预处理数据的函数,以便将其正确传递给 TFMA。
用于
最后,我们定义一个函数来评估 TFMA 中的结果。
def get_eval_results(model_location, eval_subdir):
base_dir = tempfile.mkdtemp(prefix='saved_eval_results')
tfma_eval_result_path = os.path.join(base_dir, eval_subdir)
eval_config_pbtxt = """
model_specs {
label_key: "%s"
}
metrics_specs {
metrics {
class_name: "FairnessIndicators"
config: '{ "thresholds": [0.22, 0.5, 0.75] }'
}
metrics {
class_name: "ExampleCount"
}
}
slicing_specs {}
slicing_specs { feature_keys: "%s" }
options {
compute_confidence_intervals { value: False }
disabled_outputs{values: "analysis"}
}
""" % (LABEL_KEY, GROUP_KEY)
eval_config = text_format.Parse(eval_config_pbtxt, tfma.EvalConfig())
eval_shared_model = tfma.default_eval_shared_model(
eval_saved_model_path=model_location, tags=[tf.saved_model.SERVING])
schema_pbtxt = """
tensor_representation_group {
key: ""
value {
tensor_representation {
key: "%s"
value {
dense_tensor {
column_name: "%s"
shape {
dim { size: 28 }
dim { size: 28 }
dim { size: 3 }
}
}
}
}
}
}
feature {
name: "%s"
type: FLOAT
}
feature {
name: "%s"
type: FLOAT
}
feature {
name: "%s"
type: BYTES
}
""" % (IMAGE_KEY, IMAGE_KEY, IMAGE_KEY, LABEL_KEY, GROUP_KEY)
schema = text_format.Parse(schema_pbtxt, schema_pb2.Schema())
coder = tf_example_record.TFExampleBeamRecord(
physical_format='inmem', schema=schema,
raw_record_column_name=tfma.ARROW_INPUT_COLUMN)
tensor_adapter_config = tensor_adapter.TensorAdapterConfig(
arrow_schema=coder.ArrowSchema(),
tensor_representations=coder.TensorRepresentations())
# Run the fairness evaluation.
with beam.Pipeline() as pipeline:
_ = (
tfds_as_pcollection(pipeline, 'celeb_a', 'test')
| 'ExamplesToRecordBatch' >> coder.BeamSource()
| 'ExtractEvaluateAndWriteResults' >>
tfma.ExtractEvaluateAndWriteResults(
eval_config=eval_config,
eval_shared_model=eval_shared_model,
output_path=tfma_eval_result_path,
tensor_adapter_config=tensor_adapter_config)
)
return tfma.load_eval_result(output_path=tfma_eval_result_path)
训练和评估无约束模型
现在模型已定义,输入管道已到位,我们现在可以训练我们的模型了。为了减少执行时间和内存量,我们将通过将数据切片成小批次并仅进行几次重复迭代来训练模型。
请注意,在 TensorFlow < 2.0.0 中运行此笔记本可能会导致 np.where
的弃用警告。安全地忽略此警告,因为 TensorFlow 在 2.X 中通过使用 tf.where
代替 np.where
来解决此问题。
BATCH_SIZE = 32
# Set seeds to get reproducible results
set_seeds()
model_unconstrained = create_model()
model_unconstrained.fit(celeb_a_train_data_wo_group(BATCH_SIZE), epochs=5, steps_per_epoch=1000)
在测试数据上评估模型应该会得到一个略高于 85% 的最终准确率得分。对于一个没有微调的简单模型来说还不错。
print('Overall Results, Unconstrained')
celeb_a_test_data = celeb_a_builder.as_dataset(split='test').batch(1).map(preprocess_input_dict).map(get_image_label_and_group)
results = model_unconstrained.evaluate(celeb_a_test_data)
但是,跨年龄组评估的性能可能会暴露出一些缺陷。
为了进一步探索这一点,我们使用公平指标(通过 TFMA)评估模型。特别是,我们有兴趣了解在评估假阳性率时,“年轻”和“非年轻”类别之间是否存在显著的性能差距。
假阳性错误是指模型错误地预测了正类的情况。在这种情况下,假阳性结果是指真实情况是名人“不微笑”的图像,而模型预测为“微笑”。由此,假阳性率(在上面的可视化中使用)是测试准确性的衡量标准。虽然这在当前情况下是一个相对平凡的错误,但假阳性错误有时会导致更严重的行为。例如,垃圾邮件分类器中的假阳性错误会导致用户错过重要的电子邮件。
model_location = save_model(model_unconstrained, 'model_export_unconstrained')
eval_results_unconstrained = get_eval_results(model_location, 'eval_results_unconstrained')
如上所述,我们专注于假阳性率。当前版本的公平指标 (0.1.2) 默认选择假阴性率。在运行下面的行之后,取消选中 false_negative_rate 并选中 false_positive_rate 以查看我们感兴趣的指标。
tfma.addons.fairness.view.widget_view.render_fairness_indicator(eval_results_unconstrained)
如上结果所示,我们确实看到了“年轻”和“非年轻”类别之间的**不成比例的差距**。
这就是 TFCO 可以通过将假阳性率限制在更可接受的标准内来提供帮助的地方。
约束模型设置
如 TFCO 库 中所述,有一些辅助程序可以使约束问题更容易。
tfco.rate_context()
- 这将用于构建每个年龄组类别的约束。tfco.RateMinimizationProblem()
- 此处要最小化的速率表达式将是受年龄组影响的假阳性率。换句话说,现在性能将根据年龄组的假阳性率与整个数据集的假阳性率之间的差异进行评估。为了演示,将假阳性率小于或等于 5% 设置为约束。tfco.ProxyLagrangianOptimizerV2()
- 这是将实际解决速率约束问题的辅助程序。
下面的单元格将调用这些辅助程序来设置具有公平约束的模型训练。
# The batch size is needed to create the input, labels and group tensors.
# These tensors are initialized with all 0's. They will eventually be assigned
# the batch content to them. A large batch size is chosen so that there are
# enough number of "Young" and "Not Young" examples in each batch.
set_seeds()
model_constrained = create_model()
BATCH_SIZE = 32
# Create input tensor.
input_tensor = tf.Variable(
np.zeros((BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, 3), dtype="float32"),
name="input")
# Create labels and group tensors (assuming both labels and groups are binary).
labels_tensor = tf.Variable(
np.zeros(BATCH_SIZE, dtype="float32"), name="labels")
groups_tensor = tf.Variable(
np.zeros(BATCH_SIZE, dtype="float32"), name="groups")
# Create a function that returns the applied 'model' to the input tensor
# and generates constrained predictions.
def predictions():
return model_constrained(input_tensor)
# Create overall context and subsetted context.
# The subsetted context contains subset of examples where group attribute < 1
# (i.e. the subset of "Not Young" celebrity images).
# "groups_tensor < 1" is used instead of "groups_tensor == 0" as the former
# would be a comparison on the tensor value, while the latter would be a
# comparison on the Tensor object.
context = tfco.rate_context(predictions, labels=lambda:labels_tensor)
context_subset = context.subset(lambda:groups_tensor < 1)
# Setup list of constraints.
# In this notebook, the constraint will just be: FPR to less or equal to 5%.
constraints = [tfco.false_positive_rate(context_subset) <= 0.05]
# Setup rate minimization problem: minimize overall error rate s.t. constraints.
problem = tfco.RateMinimizationProblem(tfco.error_rate(context), constraints)
# Create constrained optimizer and obtain train_op.
# Separate optimizers are specified for the objective and constraints
optimizer = tfco.ProxyLagrangianOptimizerV2(
optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=0.001),
constraint_optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=0.001),
num_constraints=problem.num_constraints)
# A list of all trainable variables is also needed to use TFCO.
var_list = (model_constrained.trainable_weights + list(problem.trainable_variables) +
optimizer.trainable_variables())
模型现在已设置好,可以接受跨年龄组的假阳性率约束进行训练。
现在,由于约束模型的最后一次迭代不一定是在定义的约束方面表现最好的模型,因此 TFCO 库配备了 tfco.find_best_candidate_index()
,它可以帮助从每次迭代后找到的迭代中选择最佳迭代。将 tfco.find_best_candidate_index()
视为一个额外的启发式方法,它根据准确性和公平约束(在本例中,跨年龄组的假阳性率)分别对每个结果进行排名,相对于训练数据。这样,它可以搜索总体准确率和公平约束之间的更好权衡。
以下单元格将开始带有约束的训练,同时还会找到每次迭代中表现最好的模型。
# Obtain train set batches.
NUM_ITERATIONS = 100 # Number of training iterations.
SKIP_ITERATIONS = 10 # Print training stats once in this many iterations.
# Create temp directory for saving snapshots of models.
temp_directory = tempfile.mktemp()
os.mkdir(temp_directory)
# List of objective and constraints across iterations.
objective_list = []
violations_list = []
# Training iterations.
iteration_count = 0
for (image, label, group) in celeb_a_train_data_w_group(BATCH_SIZE):
# Assign current batch to input, labels and groups tensors.
input_tensor.assign(image)
labels_tensor.assign(label)
groups_tensor.assign(group)
# Run gradient update.
optimizer.minimize(problem, var_list=var_list)
# Record objective and violations.
objective = problem.objective()
violations = problem.constraints()
sys.stdout.write(
"\r Iteration %d: Hinge Loss = %.3f, Max. Constraint Violation = %.3f"
% (iteration_count + 1, objective, max(violations)))
# Snapshot model once in SKIP_ITERATIONS iterations.
if iteration_count % SKIP_ITERATIONS == 0:
objective_list.append(objective)
violations_list.append(violations)
# Save snapshot of model weights.
model_constrained.save_weights(
temp_directory + "/celeb_a_constrained_" +
str(iteration_count / SKIP_ITERATIONS) + ".h5")
iteration_count += 1
if iteration_count >= NUM_ITERATIONS:
break
# Choose best model from recorded iterates and load that model.
best_index = tfco.find_best_candidate_index(
np.array(objective_list), np.array(violations_list))
model_constrained.load_weights(
temp_directory + "/celeb_a_constrained_" + str(best_index) + ".0.h5")
# Remove temp directory.
os.system("rm -r " + temp_directory)
在应用约束后,我们再次使用公平指标评估结果。
model_location = save_model(model_constrained, 'model_export_constrained')
eval_result_constrained = get_eval_results(model_location, 'eval_results_constrained')
与我们之前使用公平指标时一样,取消选中 false_negative_rate 并选中 false_positive_rate 以查看我们感兴趣的指标。
请注意,为了公平地比较我们模型的两个版本,使用设置总体假阳性率大致相等的阈值非常重要。这确保我们正在查看实际的变化,而不是仅仅模型的转变,这相当于简单地移动阈值边界。在我们的例子中,将无约束模型在 0.5 处与约束模型在 0.22 处进行比较,可以对模型进行公平的比较。
eval_results_dict = {
'constrained': eval_result_constrained,
'unconstrained': eval_results_unconstrained,
}
tfma.addons.fairness.view.widget_view.render_fairness_indicator(multi_eval_results=eval_results_dict)
借助 TFCO 表达更复杂要求作为速率约束的能力,我们帮助此模型以对整体性能影响很小的方式实现了更理想的结果。当然,还有改进的空间,但至少 TFCO 能够找到一个尽可能接近满足约束并最大程度地减少组之间差异的模型。