某些数据集太大,无法在一台机器上处理。 tfds
支持使用 Apache Beam 在多台机器上生成数据。
本文档分为两个部分
- 适用于想要生成现有 Beam 数据集的用户
- 适用于想要创建新的 Beam 数据集的开发者
生成 Beam 数据集
以下是生成 Beam 数据集的不同示例,包括在云端或本地生成。
在 Google Cloud Dataflow 上
要使用 Google Cloud Dataflow 运行管道并利用分布式计算,请先按照 快速入门说明 进行操作。
环境设置完成后,您可以使用 GCS 上的数据目录运行 tfds build
CLI,并为 --beam_pipeline_options
标志指定 所需选项。
为了更轻松地启动脚本,建议使用实际的 GCP/GCS 设置和要生成的数据集的值定义以下变量
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
GCP_PROJECT=my-project-id
GCS_BUCKET=gs://my-gcs-bucket
然后,您需要创建一个文件,告诉 Dataflow 在工作节点上安装 tfds
echo "tensorflow_datasets[$DATASET_NAME]" > /tmp/beam_requirements.txt
如果您使用的是 tfds-nightly
,请确保从 tfds-nightly
中回显,以防数据集自上次发布以来已更新。
echo "tfds-nightly[$DATASET_NAME]" > /tmp/beam_requirements.txt
如果您使用的是 TFDS 库中未包含的其他依赖项,请按照 管理 Python 管道依赖项的说明 进行操作。
最后,您可以使用以下命令启动作业
tfds build $DATASET_NAME/$DATASET_CONFIG \
--data_dir=$GCS_BUCKET/tensorflow_datasets \
--beam_pipeline_options=\
"runner=DataflowRunner,project=$GCP_PROJECT,job_name=$DATASET_NAME-gen,"\
"staging_location=$GCS_BUCKET/binaries,temp_location=$GCS_BUCKET/temp,"\
"requirements_file=/tmp/beam_requirements.txt"
本地
要使用 默认 Apache Beam 运行器 在本地运行脚本(必须将所有数据放入内存),命令与其他数据集相同
tfds build my_dataset
使用 Apache Flink
要使用 Apache Flink 运行管道,您可以阅读 官方文档。请确保您的 Beam 符合 Flink 版本兼容性
为了更轻松地启动脚本,建议使用实际的 Flink 设置和要生成的数据集的值定义以下变量
DATASET_NAME=<dataset-name>
DATASET_CONFIG=<dataset-config>
FLINK_CONFIG_DIR=<flink-config-directory>
FLINK_VERSION=<flink-version>
要在嵌入式 Flink 集群上运行,您可以使用以下命令启动作业
tfds build $DATASET_NAME/$DATASET_CONFIG \
--beam_pipeline_options=\
"runner=FlinkRunner,flink_version=$FLINK_VERSION,flink_conf_dir=$FLINK_CONFIG_DIR"
使用自定义脚本
要在 Beam 上生成数据集,API 与其他数据集相同。您可以使用 DownloadConfig
的 beam_options
(和 beam_runner
)参数自定义 beam.Pipeline
。
# If you are running on Dataflow, Spark,..., you may have to set-up runtime
# flags. Otherwise, you can leave flags empty [].
flags = ['--runner=DataflowRunner', '--project=<project-name>', ...]
# `beam_options` (and `beam_runner`) will be forwarded to `beam.Pipeline`
dl_config = tfds.download.DownloadConfig(
beam_options=beam.options.pipeline_options.PipelineOptions(flags=flags)
)
data_dir = 'gs://my-gcs-bucket/tensorflow_datasets'
builder = tfds.builder('wikipedia/20190301.en', data_dir=data_dir)
builder.download_and_prepare(download_config=dl_config)
实现 Beam 数据集
先决条件
要编写 Apache Beam 数据集,您应该熟悉以下概念
- 熟悉
tfds
数据集创建指南,因为大多数内容仍然适用于 Beam 数据集。 - 使用 Beam 编程指南 了解 Apache Beam 的入门知识。
- 如果您想使用 Cloud Dataflow 生成数据集,请阅读 Google Cloud 文档 和 Apache Beam 依赖项指南。
说明
如果您熟悉 数据集创建指南,添加 Beam 数据集只需要修改 _generate_examples
函数。该函数应返回一个 beam 对象,而不是一个生成器
非 Beam 数据集
def _generate_examples(self, path):
for f in path.iterdir():
yield _process_example(f)
Beam 数据集
def _generate_examples(self, path):
return (
beam.Create(path.iterdir())
| beam.Map(_process_example)
)
所有其他内容都可以完全相同,包括测试。
一些额外的注意事项
- 使用
tfds.core.lazy_imports
导入 Apache Beam。通过使用延迟依赖项,用户仍然可以在生成数据集后读取数据集,而无需安装 Beam。 - 注意 Python 闭包。运行管道时,
beam.Map
和beam.DoFn
函数使用pickle
序列化并发送到所有工作节点。如果状态需要在工作节点之间共享,请不要在beam.PTransform
中使用可变对象。 - 由于
tfds.core.DatasetBuilder
使用 pickle 进行序列化,因此在数据创建期间对tfds.core.DatasetBuilder
进行的修改将在工作节点上被忽略(例如,无法在_split_generators
中设置self.info.metadata['offset'] = 123
,并在工作节点中像beam.Map(lambda x: x + self.info.metadata['offset'])
那样访问它)。 - 如果您需要在拆分之间共享一些管道步骤,可以向
_split_generator
添加一个额外的pipeline: beam.Pipeline
关键字参数,并控制整个生成管道。请参阅tfds.core.GeneratorBasedBuilder
的_generate_examples
文档。
示例
以下是一个 Beam 数据集的示例。
class DummyBeamDataset(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version('1.0.0')
def _info(self):
return self.dataset_info_from_configs(
features=tfds.features.FeaturesDict({
'image': tfds.features.Image(shape=(16, 16, 1)),
'label': tfds.features.ClassLabel(names=['dog', 'cat']),
}),
)
def _split_generators(self, dl_manager):
...
return {
'train': self._generate_examples(file_dir='path/to/train_data/'),
'test': self._generate_examples(file_dir='path/to/test_data/'),
}
def _generate_examples(self, file_dir: str):
"""Generate examples as dicts."""
beam = tfds.core.lazy_imports.apache_beam
def _process_example(filename):
# Use filename as key
return filename, {
'image': os.path.join(file_dir, filename),
'label': filename.split('.')[1], # Extract label: "0010102.dog.jpeg"
}
return (
beam.Create(tf.io.gfile.listdir(file_dir))
| beam.Map(_process_example)
)
运行您的管道
要运行管道,请查看上面的部分。
tfds build my_dataset --register_checksums
使用 TFDS 作为输入的管道
如果您想创建一个以 TFDS 数据集为源的 Beam 管道,可以使用 tfds.beam.ReadFromTFDS
builder = tfds.builder('my_dataset')
_ = (
pipeline
| tfds.beam.ReadFromTFDS(builder, split='train')
| beam.Map(tfds.as_numpy)
| ...
)
它将并行处理数据集的每个分片。