在 TensorFlow.org 上查看
|
在 Google Colab 中运行
|
在 GitHub 上查看源代码
|
下载笔记本
|
概述
Avro 数据集 API 的目的是将 Avro 格式的数据本机加载到 TensorFlow 中,作为 TensorFlow 数据集。Avro 是一个数据序列化系统,类似于 Protocol Buffers。它广泛用于 Apache Hadoop 中,在 Hadoop 中,它既可以提供持久数据的序列化格式,又可以提供 Hadoop 节点之间通信的线路格式。Avro 数据是一种面向行的紧凑二进制数据格式。它依赖于存储为单独 JSON 文件的架构。有关 Avro 格式和架构声明的规范,请参阅 官方手册。
设置程序包
安装所需的 tensorflow-io 程序包
pip install tensorflow-io
导入程序包
import tensorflow as tf
import tensorflow_io as tfio
验证 tf 和 tfio 导入
print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
tensorflow-io version: 0.18.0 tensorflow version: 2.5.0
用法
浏览数据集
为了本教程的目的,我们下载示例 Avro 数据集。
下载示例 Avro 文件
curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avrols -l train.avro
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 151 100 151 0 0 1268 0 --:--:-- --:--:-- --:--:-- 1268
100 369 100 369 0 0 1255 0 --:--:-- --:--:-- --:--:-- 1255
-rw-rw-r-- 1 kbuilder kokoro 369 May 25 22:23 train.avro
下载示例 Avro 文件的相应架构文件
curl -OL https://github.com/tensorflow/io/raw/master/docs/tutorials/avro/train.avscls -l train.avsc
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 151 100 151 0 0 1247 0 --:--:-- --:--:-- --:--:-- 1247
100 271 100 271 0 0 780 0 --:--:-- --:--:-- --:--:-- 780
-rw-rw-r-- 1 kbuilder kokoro 271 May 25 22:23 train.avsc
在上述示例中,基于 mnist 数据集创建了一个测试 Avro 数据集。TFRecord 格式的原始 mnist 数据集是从 TF 命名数据集 中生成的。但是,mnist 数据集作为演示数据集来说太大。为了简单起见,对大部分数据进行了修剪,仅保留了前几个记录。此外,对原始 mnist 数据集中的 image 字段进行了额外的修剪,并将其映射到 Avro 中的 features 字段。因此,avro 文件 train.avro 有 4 条记录,每条记录有 3 个字段:features(一个 int 数组)、label(一个 int 或 null)和 dataType(一个枚举)。要查看解码后的 train.avro(注意 原始 avro 数据文件 对于人类来说不可读,因为 avro 是一个压缩格式)
安装读取 Avro 文件所需的软件包
pip install avro
以人类可读格式读取和打印 Avro 文件
from avro.io import DatumReader
from avro.datafile import DataFileReader
import json
def print_avro(avro_file, max_record_num=None):
if max_record_num is not None and max_record_num <= 0:
return
with open(avro_file, 'rb') as avro_handler:
reader = DataFileReader(avro_handler, DatumReader())
record_count = 0
for record in reader:
record_count = record_count+1
print(record)
if max_record_num is not None and record_count == max_record_num:
break
print_avro(avro_file='train.avro')
{'features': [0, 0, 0, 1, 4], 'label': None, 'dataType': 'TRAINING'}
{'features': [0, 0], 'label': 2, 'dataType': 'TRAINING'}
{'features': [0], 'label': 3, 'dataType': 'VALIDATION'}
{'features': [1], 'label': 4, 'dataType': 'VALIDATION'}
而 train.avro 的模式由 train.avsc 表示,它是一个 JSON 格式的文件。要查看 train.avsc
def print_schema(avro_schema_file):
with open(avro_schema_file, 'r') as handle:
parsed = json.load(handle)
print(json.dumps(parsed, indent=4, sort_keys=True))
print_schema('train.avsc')
{
"fields": [
{
"name": "features",
"type": {
"items": "int",
"type": "array"
}
},
{
"name": "label",
"type": [
"int",
"null"
]
},
{
"name": "dataType",
"type": {
"name": "dataTypes",
"symbols": [
"TRAINING",
"VALIDATION"
],
"type": "enum"
}
}
],
"name": "ImageDataset",
"type": "record"
}
准备数据集
使用 Avro 数据集 API 将 train.avro 加载为 TensorFlow 数据集
features = {
'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32),
'label': tf.io.FixedLenFeature(shape=[], dtype=tf.int32, default_value=-100),
'dataType': tf.io.FixedLenFeature(shape=[], dtype=tf.string)
}
schema = tf.io.gfile.GFile('train.avsc').read()
dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
reader_schema=schema,
features=features,
shuffle=False,
batch_size=3,
num_epochs=1)
for record in dataset:
print(record['features[*]'])
print(record['label'])
print(record['dataType'])
print("--------------------")
SparseTensor(indices=tf.Tensor( [[0 0] [0 1] [0 2] [0 3] [0 4] [1 0] [1 1] [2 0]], shape=(8, 2), dtype=int64), values=tf.Tensor([0 0 0 1 4 0 0 0], shape=(8,), dtype=int32), dense_shape=tf.Tensor([3 5], shape=(2,), dtype=int64)) tf.Tensor([-100 2 3], shape=(3,), dtype=int32) tf.Tensor([b'TRAINING' b'TRAINING' b'VALIDATION'], shape=(3,), dtype=string) -------------------- SparseTensor(indices=tf.Tensor([[0 0]], shape=(1, 2), dtype=int64), values=tf.Tensor([1], shape=(1,), dtype=int32), dense_shape=tf.Tensor([1 1], shape=(2,), dtype=int64)) tf.Tensor([4], shape=(1,), dtype=int32) tf.Tensor([b'VALIDATION'], shape=(1,), dtype=string) --------------------
上述示例将 train.avro 转换为 tensorflow 数据集。数据集的每个元素都是一个字典,其键是特征名称,值是转换后的稀疏或稠密张量。例如,它将 features、label、dataType 字段分别转换为 VarLenFeature(SparseTensor)、FixedLenFeature(DenseTensor) 和 FixedLenFeature(DenseTensor)。由于批次大小为 3,因此它将 train.avro 中的 3 条记录强制转换为结果数据集中的一个元素。对于 train.avro 中第一条记录(其标签为 null),avro 读取器会用指定的默认值 (-100) 替换它。在此示例中,train.avro 中共有 4 条记录。由于批次大小为 3,因此结果数据集包含 3 个元素,最后一个元素的批次大小为 1。但是,如果批次大小小于批次大小,用户也可以通过启用 drop_final_batch 来丢弃最后一个批次。例如
dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
reader_schema=schema,
features=features,
shuffle=False,
batch_size=3,
drop_final_batch=True,
num_epochs=1)
for record in dataset:
print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f97656423d0>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100, 2, 3], dtype=int32)>}
还可以通过增加 avro 解析/读取并行性来增加 num_parallel_reads,从而加快 Avro 数据处理。
dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
reader_schema=schema,
features=features,
shuffle=False,
num_parallel_reads=16,
batch_size=3,
drop_final_batch=True,
num_epochs=1)
for record in dataset:
print(record)
{'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f9765693990>, 'dataType': <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'TRAINING', b'TRAINING', b'VALIDATION'], dtype=object)>, 'label': <tf.Tensor: shape=(3,), dtype=int32, numpy=array([-100, 2, 3], dtype=int32)>}
有关 make_avro_record_dataset 的详细用法,请参阅 API 文档。
使用 Avro 数据集训练 tf.keras 模型
现在,让我们逐步了解一个基于 mnist 数据集使用 Avro 数据集训练 tf.keras 模型的端到端示例。
使用 Avro 数据集 API 将 train.avro 加载为 TensorFlow 数据集
features = {
'features[*]': tfio.experimental.columnar.VarLenFeatureWithRank(dtype=tf.int32)
}
schema = tf.io.gfile.GFile('train.avsc').read()
dataset = tfio.experimental.columnar.make_avro_record_dataset(file_pattern=['train.avro'],
reader_schema=schema,
features=features,
shuffle=False,
batch_size=1,
num_epochs=1)
定义一个简单的 keras 模型
def build_and_compile_cnn_model():
model = tf.keras.Sequential()
model.compile(optimizer='sgd', loss='mse')
return model
model = build_and_compile_cnn_model()
使用 Avro 数据集训练 keras 模型
model.fit(x=dataset, epochs=1, steps_per_epoch=1, verbose=1)
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f94b00645d0>}
Consider rewriting this model with the Functional API.
WARNING:tensorflow:Layers in a Sequential model should only have a single input tensor, but we receive a <class 'dict'> input: {'features[*]': <tensorflow.python.framework.sparse_tensor.SparseTensor object at 0x7f976476ca90>}
Consider rewriting this model with the Functional API.
1/1 [==============================] - 0s 60ms/step - loss: 0.0000e+00
<tensorflow.python.keras.callbacks.History at 0x7f94ec08c6d0>
avro 数据集可以将任何 avro 数据解析并强制转换为 TensorFlow 张量,包括记录中的记录、映射、数组、分支和枚举。解析信息作为映射传递到 avro 数据集实现中,其中键编码如何解析数据值,编码如何将数据强制转换为 TensorFlow 张量——决定原始类型(例如布尔值、整数、长整数、浮点数、双精度浮点数、字符串)以及张量类型(例如稀疏或密集)。提供了 TensorFlow 解析器类型(见表 1)和原始类型强制转换(表 2)的列表。
表 1 受支持的 TensorFlow 解析器类型
| TensorFlow 解析器类型 | TensorFlow 张量 | 说明 |
|---|---|---|
| tf.FixedLenFeature([], tf.int32) | 密集张量 | 解析固定长度特征;即所有行具有相同的不变元素数量,例如只有一个元素或一个数组,该数组始终对每一行具有相同数量的元素 |
| tf.SparseFeature(index_key=['key_1st_index', 'key_2nd_index'], value_key='key_value', dtype=tf.int64, size=[20, 50]) | 稀疏张量 | 解析稀疏特征,其中每一行具有可变长度的索引和值列表。“index_key”标识索引。“value_key”标识值。“dtype”是数据类型。“size”是每个索引项的预期最大索引值 |
| tfio.experimental.columnar.VarLenFeatureWithRank([],tf.int64) | 稀疏张量 | 解析可变长度特征;这意味着每一行数据可以具有可变数量的元素,例如第 1 行有 5 个元素,第 2 行有 7 个元素 |
表 2 从 Avro 类型到 TensorFlow 类型支持的转换
| Avro 原始类型 | TensorFlow 原始类型 |
|---|---|
| 布尔值:二进制值 | tf.bool |
| 字节:8 位无符号字节序列 | tf.string |
| 双精度:双精度 64 位 IEEE 浮点数 | tf.float64 |
| 枚举:枚举类型 | tf.string 使用符号名称 |
| 浮点数:单精度 32 位 IEEE 浮点数 | tf.float32 |
| 整数:32 位有符号整数 | tf.int32 |
| 长整数:64 位有符号整数 | tf.int64 |
| 空值:无值 | 使用默认值 |
| 字符串:unicode 字符序列 | tf.string |
Avro 数据集 API 的一组综合示例在 测试中提供。
在 TensorFlow.org 上查看
在 Google Colab 中运行
在 GitHub 上查看源代码
下载笔记本