在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 |
本教程演示了如何使用 AVI 视频数据加载和预处理 UCF101 人体动作数据集。预处理数据后,可用于视频分类/识别、字幕或聚类等任务。原始数据集包含从 YouTube 收集的真实动作视频,包含 101 个类别,包括弹奏大提琴、刷牙和化妆。您将学习如何
从 zip 文件加载数据。
从视频文件中读取帧序列。
可视化视频数据。
将帧生成器包装在
tf.data.Dataset
中。
本视频加载和预处理教程是 TensorFlow 视频教程系列的第一部分。以下是另外三个教程
- 构建用于视频分类的 3D CNN 模型:请注意,本教程使用 (2+1)D CNN 将 3D 数据的空间和时间方面分解;如果您使用的是体积数据(例如 MRI 扫描),请考虑使用 3D CNN 而不是 (2+1)D CNN。
- 用于流式动作识别的 MoViNet:熟悉 TF Hub 上可用的 MoViNet 模型。
- 使用 MoViNet 进行视频分类的迁移学习:本教程说明了如何使用在不同数据集上训练的预训练视频分类模型,并使用 UCF-101 数据集。
设置
首先安装并导入一些必要的库,包括:remotezip 用于检查 ZIP 文件的内容,tqdm 用于使用进度条,OpenCV 用于处理视频文件,以及 tensorflow_docs
用于将数据嵌入 Jupyter 笔记本中。
# The way this tutorial uses the `TimeDistributed` layer requires TF>=2.10
pip install -U "tensorflow>=2.10.0"
pip install remotezip tqdm opencv-python
pip install -q git+https://github.com/tensorflow/docs
import tqdm
import random
import pathlib
import itertools
import collections
import os
import cv2
import numpy as np
import remotezip as rz
import tensorflow as tf
# Some modules to display an animation using imageio.
import imageio
from IPython import display
from urllib import request
from tensorflow_docs.vis import embed
下载 UCF101 数据集的子集
UCF101 数据集 包含 101 类不同的视频动作,主要用于动作识别。在本演示中,您将使用这些类别的一个子集。
URL = 'https://storage.googleapis.com/thumos14_files/UCF101_videos.zip'
上面的 URL 包含一个包含 UCF 101 数据集的 zip 文件。创建一个使用 remotezip
库检查该 URL 中 zip 文件内容的函数
def list_files_from_zip_url(zip_url):
""" List the files in each class of the dataset given a URL with the zip file.
Args:
zip_url: A URL from which the files can be extracted from.
Returns:
List of files in each of the classes.
"""
files = []
with rz.RemoteZip(zip_url) as zip:
for zip_info in zip.infolist():
files.append(zip_info.filename)
return files
files = list_files_from_zip_url(URL)
files = [f for f in files if f.endswith('.avi')]
files[:10]
从几个视频和有限数量的类别开始训练。在运行上面的代码块后,请注意类名包含在每个视频的文件名中。
定义 get_class
函数,该函数从文件名中检索类名。然后,创建一个名为 get_files_per_class
的函数,该函数将所有文件列表(上面的 files
)转换为一个字典,列出每个类的文件
def get_class(fname):
""" Retrieve the name of the class given a filename.
Args:
fname: Name of the file in the UCF101 dataset.
Returns:
Class that the file belongs to.
"""
return fname.split('_')[-3]
def get_files_per_class(files):
""" Retrieve the files that belong to each class.
Args:
files: List of files in the dataset.
Returns:
Dictionary of class names (key) and files (values).
"""
files_for_class = collections.defaultdict(list)
for fname in files:
class_name = get_class(fname)
files_for_class[class_name].append(fname)
return files_for_class
获得每个类的文件列表后,您可以选择要使用的类别数量以及每个类别要使用的视频数量,以创建数据集。
NUM_CLASSES = 10
FILES_PER_CLASS = 50
files_for_class = get_files_per_class(files)
classes = list(files_for_class.keys())
print('Num classes:', len(classes))
print('Num videos for class[0]:', len(files_for_class[classes[0]]))
创建一个名为 select_subset_of_classes
的新函数,该函数选择数据集中存在的类的一个子集,以及每个类特定的文件数量
def select_subset_of_classes(files_for_class, classes, files_per_class):
""" Create a dictionary with the class name and a subset of the files in that class.
Args:
files_for_class: Dictionary of class names (key) and files (values).
classes: List of classes.
files_per_class: Number of files per class of interest.
Returns:
Dictionary with class as key and list of specified number of video files in that class.
"""
files_subset = dict()
for class_name in classes:
class_files = files_for_class[class_name]
files_subset[class_name] = class_files[:files_per_class]
return files_subset
files_subset = select_subset_of_classes(files_for_class, classes[:NUM_CLASSES], FILES_PER_CLASS)
list(files_subset.keys())
定义辅助函数,将视频拆分为训练集、验证集和测试集。视频从包含 zip 文件的 URL 下载,并放置到各自的子目录中。
def download_from_zip(zip_url, to_dir, file_names):
""" Download the contents of the zip file from the zip URL.
Args:
zip_url: A URL with a zip file containing data.
to_dir: A directory to download data to.
file_names: Names of files to download.
"""
with rz.RemoteZip(zip_url) as zip:
for fn in tqdm.tqdm(file_names):
class_name = get_class(fn)
zip.extract(fn, str(to_dir / class_name))
unzipped_file = to_dir / class_name / fn
fn = pathlib.Path(fn).parts[-1]
output_file = to_dir / class_name / fn
unzipped_file.rename(output_file)
以下函数返回尚未放置到数据子集中的剩余数据。它允许您将剩余数据放置到下一个指定的数据子集中。
def split_class_lists(files_for_class, count):
""" Returns the list of files belonging to a subset of data as well as the remainder of
files that need to be downloaded.
Args:
files_for_class: Files belonging to a particular class of data.
count: Number of files to download.
Returns:
Files belonging to the subset of data and dictionary of the remainder of files that need to be downloaded.
"""
split_files = []
remainder = {}
for cls in files_for_class:
split_files.extend(files_for_class[cls][:count])
remainder[cls] = files_for_class[cls][count:]
return split_files, remainder
以下 download_ucf_101_subset
函数允许您下载 UCF101 数据集的子集,并将其拆分为训练集、验证集和测试集。您可以指定要使用的类别数量。 splits
参数允许您传入一个字典,其中键值是子集的名称(例如:“train”)以及您希望每个类别拥有的视频数量。
def download_ucf_101_subset(zip_url, num_classes, splits, download_dir):
""" Download a subset of the UCF101 dataset and split them into various parts, such as
training, validation, and test.
Args:
zip_url: A URL with a ZIP file with the data.
num_classes: Number of labels.
splits: Dictionary specifying the training, validation, test, etc. (key) division of data
(value is number of files per split).
download_dir: Directory to download data to.
Return:
Mapping of the directories containing the subsections of data.
"""
files = list_files_from_zip_url(zip_url)
for f in files:
path = os.path.normpath(f)
tokens = path.split(os.sep)
if len(tokens) <= 2:
files.remove(f) # Remove that item from the list if it does not have a filename
files_for_class = get_files_per_class(files)
classes = list(files_for_class.keys())[:num_classes]
for cls in classes:
random.shuffle(files_for_class[cls])
# Only use the number of classes you want in the dictionary
files_for_class = {x: files_for_class[x] for x in classes}
dirs = {}
for split_name, split_count in splits.items():
print(split_name, ":")
split_dir = download_dir / split_name
split_files, files_for_class = split_class_lists(files_for_class, split_count)
download_from_zip(zip_url, split_dir, split_files)
dirs[split_name] = split_dir
return dirs
download_dir = pathlib.Path('./UCF101_subset/')
subset_paths = download_ucf_101_subset(URL,
num_classes = NUM_CLASSES,
splits = {"train": 30, "val": 10, "test": 10},
download_dir = download_dir)
下载数据后,您现在应该拥有 UCF101 数据集子集的副本。运行以下代码以打印所有数据子集中的视频总数。
video_count_train = len(list(download_dir.glob('train/*/*.avi')))
video_count_val = len(list(download_dir.glob('val/*/*.avi')))
video_count_test = len(list(download_dir.glob('test/*/*.avi')))
video_total = video_count_train + video_count_val + video_count_test
print(f"Total videos: {video_total}")
您现在也可以预览数据文件目录。
find ./UCF101_subset
从每个视频文件中创建帧
frames_from_video_file
函数将视频拆分为帧,从视频文件中读取随机选择的 n_frames
跨度,并将其作为 NumPy array
返回。为了减少内存和计算开销,请选择一个 **较小** 的帧数。此外,从每个视频中选择 **相同** 的帧数,这使得更容易处理数据批次。
def format_frames(frame, output_size):
"""
Pad and resize an image from a video.
Args:
frame: Image that needs to resized and padded.
output_size: Pixel size of the output frame image.
Return:
Formatted frame with padding of specified output size.
"""
frame = tf.image.convert_image_dtype(frame, tf.float32)
frame = tf.image.resize_with_pad(frame, *output_size)
return frame
def frames_from_video_file(video_path, n_frames, output_size = (224,224), frame_step = 15):
"""
Creates frames from each video file present for each category.
Args:
video_path: File path to the video.
n_frames: Number of frames to be created per video file.
output_size: Pixel size of the output frame image.
Return:
An NumPy array of frames in the shape of (n_frames, height, width, channels).
"""
# Read each video frame by frame
result = []
src = cv2.VideoCapture(str(video_path))
video_length = src.get(cv2.CAP_PROP_FRAME_COUNT)
need_length = 1 + (n_frames - 1) * frame_step
if need_length > video_length:
start = 0
else:
max_start = video_length - need_length
start = random.randint(0, max_start + 1)
src.set(cv2.CAP_PROP_POS_FRAMES, start)
# ret is a boolean indicating whether read was successful, frame is the image itself
ret, frame = src.read()
result.append(format_frames(frame, output_size))
for _ in range(n_frames - 1):
for _ in range(frame_step):
ret, frame = src.read()
if ret:
frame = format_frames(frame, output_size)
result.append(frame)
else:
result.append(np.zeros_like(result[0]))
src.release()
result = np.array(result)[..., [2, 1, 0]]
return result
可视化视频数据
frames_from_video_file
函数返回一组帧作为 NumPy 数组。尝试在 Patrick Gillett 的 维基百科 上的新的视频上使用此函数
curl -O https://upload.wikimedia.org/wikipedia/commons/8/86/End_of_a_jam.ogv
video_path = "End_of_a_jam.ogv"
sample_video = frames_from_video_file(video_path, n_frames = 10)
sample_video.shape
def to_gif(images):
converted_images = np.clip(images * 255, 0, 255).astype(np.uint8)
imageio.mimsave('./animation.gif', converted_images, fps=10)
return embed.embed_file('./animation.gif')
to_gif(sample_video)
除了检查此视频之外,您还可以显示 UCF-101 数据。为此,请运行以下代码
# docs-infra: no-execute
ucf_sample_video = frames_from_video_file(next(subset_paths['train'].glob('*/*.avi')), 50)
to_gif(ucf_sample_video)
接下来,定义 FrameGenerator
类,以便创建一个可迭代对象,该对象可以将数据馈送到 TensorFlow 数据管道。生成器(__call__
)函数生成由 frames_from_video_file
生成的帧数组,以及与帧集关联的标签的独热编码向量。
class FrameGenerator:
def __init__(self, path, n_frames, training = False):
""" Returns a set of frames with their associated label.
Args:
path: Video file paths.
n_frames: Number of frames.
training: Boolean to determine if training dataset is being created.
"""
self.path = path
self.n_frames = n_frames
self.training = training
self.class_names = sorted(set(p.name for p in self.path.iterdir() if p.is_dir()))
self.class_ids_for_name = dict((name, idx) for idx, name in enumerate(self.class_names))
def get_files_and_class_names(self):
video_paths = list(self.path.glob('*/*.avi'))
classes = [p.parent.name for p in video_paths]
return video_paths, classes
def __call__(self):
video_paths, classes = self.get_files_and_class_names()
pairs = list(zip(video_paths, classes))
if self.training:
random.shuffle(pairs)
for path, name in pairs:
video_frames = frames_from_video_file(path, self.n_frames)
label = self.class_ids_for_name[name] # Encode labels
yield video_frames, label
在将 FrameGenerator
对象包装为 TensorFlow Dataset 对象之前,先测试一下它。此外,对于训练数据集,请确保启用训练模式,以便对数据进行洗牌。
fg = FrameGenerator(subset_paths['train'], 10, training=True)
frames, label = next(fg())
print(f"Shape: {frames.shape}")
print(f"Label: {label}")
最后,创建一个 TensorFlow 数据输入管道。您从生成器对象创建的此管道允许您将数据馈送到深度学习模型。在此视频管道中,每个元素都是一组帧及其关联的标签。
# Create the training set
output_signature = (tf.TensorSpec(shape = (None, None, None, 3), dtype = tf.float32),
tf.TensorSpec(shape = (), dtype = tf.int16))
train_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths['train'], 10, training=True),
output_signature = output_signature)
检查标签是否已洗牌。
for frames, labels in train_ds.take(10):
print(labels)
# Create the validation set
val_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths['val'], 10),
output_signature = output_signature)
# Print the shapes of the data
train_frames, train_labels = next(iter(train_ds))
print(f'Shape of training set of frames: {train_frames.shape}')
print(f'Shape of training labels: {train_labels.shape}')
val_frames, val_labels = next(iter(val_ds))
print(f'Shape of validation set of frames: {val_frames.shape}')
print(f'Shape of validation labels: {val_labels.shape}')
配置数据集以提高性能
使用缓冲预取,以便您可以从磁盘生成数据,而不会让 I/O 成为阻塞。在加载数据时要使用的两个重要函数是
Dataset.cache
:在第一个 epoch 中从磁盘加载帧集后,将它们保存在内存中。此函数确保数据集在训练模型时不会成为瓶颈。如果您的数据集太大而无法放入内存,您也可以使用此方法来创建高性能的磁盘缓存。Dataset.prefetch
:在训练期间重叠数据预处理和模型执行。有关详细信息,请参阅 使用tf.data
提高性能。
AUTOTUNE = tf.data.AUTOTUNE
train_ds = train_ds.cache().shuffle(1000).prefetch(buffer_size = AUTOTUNE)
val_ds = val_ds.cache().shuffle(1000).prefetch(buffer_size = AUTOTUNE)
要准备要馈送到模型中的数据,请使用如下所示的批处理。请注意,在处理视频数据(例如 AVI 文件)时,数据应以五维对象的形式进行整形。这些维度如下:[batch_size, number_of_frames, height, width, channels]
。相比之下,图像将具有四个维度:[batch_size, height, width, channels]
。下图说明了视频数据的形状是如何表示的。
train_ds = train_ds.batch(2)
val_ds = val_ds.batch(2)
train_frames, train_labels = next(iter(train_ds))
print(f'Shape of training set of frames: {train_frames.shape}')
print(f'Shape of training labels: {train_labels.shape}')
val_frames, val_labels = next(iter(val_ds))
print(f'Shape of validation set of frames: {val_frames.shape}')
print(f'Shape of validation labels: {val_labels.shape}')
后续步骤
现在您已经创建了一个包含视频帧及其标签的 TensorFlow Dataset
,您可以将其与深度学习模型一起使用。以下分类模型使用预训练的 EfficientNet 在几分钟内训练到高精度
net = tf.keras.applications.EfficientNetB0(include_top = False)
net.trainable = False
model = tf.keras.Sequential([
tf.keras.layers.Rescaling(scale=255),
tf.keras.layers.TimeDistributed(net),
tf.keras.layers.Dense(10),
tf.keras.layers.GlobalAveragePooling3D()
])
model.compile(optimizer = 'adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits = True),
metrics=['accuracy'])
model.fit(train_ds,
epochs = 10,
validation_data = val_ds,
callbacks = tf.keras.callbacks.EarlyStopping(patience = 2, monitor = 'val_loss'))
要了解有关在 TensorFlow 中使用视频数据的更多信息,请查看以下教程