Avro 数据集 API

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

概述

Avro 数据集 API 的目的是将 Avro 格式的数据本机加载到 TensorFlow 中,作为 TensorFlow 数据集。Avro 是一个数据序列化系统,类似于 Protocol Buffers。它广泛用于 Apache Hadoop 中,在 Hadoop 中,它既可以提供持久数据的序列化格式,又可以提供 Hadoop 节点之间通信的线路格式。Avro 数据是一种面向行的紧凑二进制数据格式。它依赖于存储为单独 JSON 文件的架构。有关 Avro 格式和架构声明的规范,请参阅 官方手册

设置程序包

安装所需的 tensorflow-io 程序包

pip install tensorflow-io

导入程序包

import tensorflow as tf
import tensorflow_io as tfio

验证 tf 和 tfio 导入

print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.18.0
tensorflow version: 2.5.0

用法

浏览数据集

为了本教程的目的,我们下载示例 Avro 数据集。

下载示例 Avro 文件

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avro
ls -l train.avro
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1268      0 --:--:-- --:--:-- --:--:--  1268
100   369  100   369    0     0   1255      0 --:--:-- --:--:-- --:--:--  1255
-rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro

下载示例 Avro 文件的相应架构文件

curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avsc
ls -l train.avsc
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   151  100   151    0     0   1247      0 --:--:-- --:--:-- --:--:--  1247
100   271  100   271    0     0    780      0 --:--:-- --:--:-- --:--:--   780
-rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc

在上述示例中,基于 mnist 数据集创建了一个测试 Avro 数据集。TFRecord 格式的原始 mnist 数据集是从 TF 命名数据集 中生成的。但是,mnist 数据集作为演示数据集来说太大。为了简单起见,对大部分数据进行了修剪,仅保留了前几个记录。此外,对原始 mnist 数据集中的 image 字段进行了额外的修剪,并将其映射到 Avro 中的 features 字段。因此,avro 文件 train.avro 有 4 条记录,每条记录有 3 个字段:features(一个 int 数组)、label(一个 int 或 null)和 dataType(一个枚举)。要查看解码后的 train.avro(注意 原始 avro 数据文件 对于人类来说不可读,因为 avro 是一个压缩格式)

安装读取 Avro 文件所需的软件包

pip install avro

以人类可读格式读取和打印 Avro 文件

from avro.io import DatumReader
from avro.datafile import DataFileReader

import json

def print_avro(avro_file, max_record_num=None):
    if max_record_num is not None and max_record_num <= 0:
        return

    with open(avro_file, 'rb') as avro_handler:
        reader = DataFileReader(avro_handler, DatumReader())
        record_count = 0
        for record in reader:
            record_count = record_count+1
            print(record)
            if max_record_num is not None and record_count == max_record_num:
               break

print_avro(avro_file='train.avro')
{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'}
{'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'}
{'features': [0], 'label': 3, 'dataType': 'VALIDATION'}
{'features': [1], 'label': 4, 'dataType': 'VALIDATION'}

train.avro 的模式由 train.avsc 表示,它是一个 JSON 格式的文件。要查看 train.avsc

def print_schema(avro_schema_file):
    with open(avro_schema_file, 'r') as handle:
        parsed = json.load(handle)
    print(json.dumps(parsed, indent=4, sort_keys=True))

print_schema('train.avsc')
{
    "fields": [
        {
            "name": "features",
            "type": {
                "items": "int",
                "type": "array"
            }
        },
        {
            "name": "label",
            "type": [
                "int",
                "null"
            ]
        },
        {
            "name": "dataType",
            "type": {
                "name": "dataTypes",
                "symbols": [
                    "TRAINING",
                    "VALIDATION"
                ],
                "type": "enum"
            }
        }
    ],
    "name": "ImageDataset",
    "type": "record"
}

准备数据集

使用 Avro 数据集 API 将 train.avro 加载为 TensorFlow 数据集

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
    'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
    'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              num_epochs=1)

for record in dataset:
    print(record['features[*]'])
    print(record['label'])
    print(record['dataType'])
    print("--------------------")
SparseTensor(indices=tf.Tensor(
[[0 0]
 [0 1]
 [0 2]
 [0 3]
 [0 4]
 [1 0]
 [1 1]
 [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64))
tf.Tensor([-100    2    3], shape=(3,), dtype=int32)
tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string)
--------------------
SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64))
tf.Tensor([4], shape=(1,), dtype=int32)
tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string)
--------------------

上述示例将 train.avro 转换为 tensorflow 数据集。数据集的每个元素都是一个字典,其键是特征名称,值是转换后的稀疏或稠密张量。例如,它将 featureslabeldataType 字段分别转换为 VarLenFeature(SparseTensor)、FixedLenFeature(DenseTensor) 和 FixedLenFeature(DenseTensor)。由于批次大小为 3,因此它将 train.avro 中的 3 条记录强制转换为结果数据集中的一个元素。对于 train.avro 中第一条记录(其标签为 null),avro 读取器会用指定的默认值 (-100) 替换它。在此示例中,train.avro 中共有 4 条记录。由于批次大小为 3,因此结果数据集包含 3 个元素,最后一个元素的批次大小为 1。但是,如果批次大小小于批次大小,用户也可以通过启用 drop_final_batch 来丢弃最后一个批次。例如

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

还可以通过增加 avro 解析/读取并行性来增加 num_parallel_reads,从而加快 Avro 数据处理。

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              num_parallel_reads=16,
                                                              batch_size=3,
                                                              drop_final_batch=True,
                                                              num_epochs=1)

for record in dataset:
    print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100,    2,    3], dtype=int32)>}

有关 make_avro_record_dataset 的详细用法,请参阅 API 文档

使用 Avro 数据集训练 tf.keras 模型

现在,让我们逐步了解一个基于 mnist 数据集使用 Avro 数据集训练 tf.keras 模型的端到端示例。

使用 Avro 数据集 API 将 train.avro 加载为 TensorFlow 数据集

features = {
    'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}

schema = tf.io.gfile.GFile('train.avsc').read()

dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
                                                              reader_schema=schema,
                                                              features=features,
                                                              shuffle=False,
                                                              batch_size=1,
                                                              num_epochs=1)

定义一个简单的 keras 模型

def build_and_compile_cnn_model():
    model = tf.keras.Sequential()
    model.compile(optimizer='sgd', loss='mse')
    return model

model = build_and_compile_cnn_model()

使用 Avro 数据集训练 keras 模型

model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f94b00645d0>}
Consider rewriting this model with the Functional API.
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f976476ca90>}
Consider rewriting this model with the Functional API.
1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>

avro 数据集可以将任何 avro 数据解析并强制转换为 TensorFlow 张量,包括记录中的记录、映射、数组、分支和枚举。解析信息作为映射传递到 avro 数据集实现中,其中键编码如何解析数据值,编码如何将数据强制转换为 TensorFlow 张量——决定原始类型(例如布尔值、整数、长整数、浮点数、双精度浮点数、字符串)以及张量类型(例如稀疏或密集)。提供了 TensorFlow 解析器类型(见表 1)和原始类型强制转换(表 2)的列表。

表 1 受支持的 TensorFlow 解析器类型

TensorFlow 解析器类型 TensorFlow 张量 说明
tf.FixedLenFeature([], tf.int32) 密集张量 解析固定长度特征;即所有行具有相同的不变元素数量,例如只有一个元素或一个数组,该数组始终对每一行具有相同数量的元素
tf.SparseFeature(index_key=['key_1st_index', 'key_2nd_index'], value_key='key_value', dtype=tf.int64, size=[20, 50]) 稀疏张量 解析稀疏特征,其中每一行具有可变长度的索引和值列表。“index_key”标识索引。“value_key”标识值。“dtype”是数据类型。“size”是每个索引项的预期最大索引值
tfio.experimental.columnar.VarLenFeatureWithRank([],tf.int64) 稀疏张量 解析可变长度特征;这意味着每一行数据可以具有可变数量的元素,例如第 1 行有 5 个元素,第 2 行有 7 个元素

表 2 从 Avro 类型到 TensorFlow 类型支持的转换

Avro 原始类型 TensorFlow 原始类型
布尔值:二进制值 tf.bool
字节:8 位无符号字节序列 tf.string
双精度:双精度 64 位 IEEE 浮点数 tf.float64
枚举:枚举类型 tf.string 使用符号名称
浮点数:单精度 32 位 IEEE 浮点数 tf.float32
整数:32 位有符号整数 tf.int32
长整数:64 位有符号整数 tf.int64
空值:无值 使用默认值
字符串:unicode 字符序列 tf.string

Avro 数据集 API 的一组综合示例在 测试中提供。