加载视频数据

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

本教程演示了如何使用 AVI 视频数据加载和预处理 UCF101 人体动作数据集。预处理数据后,可用于视频分类/识别、字幕或聚类等任务。原始数据集包含从 YouTube 收集的真实动作视频,包含 101 个类别,包括弹奏大提琴、刷牙和化妆。您将学习如何

  • 从 zip 文件加载数据。

  • 从视频文件中读取帧序列。

  • 可视化视频数据。

  • 将帧生成器包装在 tf.data.Dataset 中。

本视频加载和预处理教程是 TensorFlow 视频教程系列的第一部分。以下是另外三个教程

设置

首先安装并导入一些必要的库,包括:remotezip 用于检查 ZIP 文件的内容,tqdm 用于使用进度条,OpenCV 用于处理视频文件,以及 tensorflow_docs 用于将数据嵌入 Jupyter 笔记本中。

# The way this tutorial uses the `TimeDistributed` layer requires TF>=2.10
pip install -U "tensorflow>=2.10.0"
pip install remotezip tqdm opencv-python
pip install -q git+https://github.com/tensorflow/docs
import tqdm
import random
import pathlib
import itertools
import collections

import os
import cv2
import numpy as np
import remotezip as rz

import tensorflow as tf

# Some modules to display an animation using imageio.
import imageio
from IPython import display
from urllib import request
from tensorflow_docs.vis import embed

下载 UCF101 数据集的子集

UCF101 数据集 包含 101 类不同的视频动作,主要用于动作识别。在本演示中,您将使用这些类别的一个子集。

URL = 'https://storage.googleapis.com/thumos14_files/UCF101_videos.zip'

上面的 URL 包含一个包含 UCF 101 数据集的 zip 文件。创建一个使用 remotezip 库检查该 URL 中 zip 文件内容的函数

def list_files_from_zip_url(zip_url):
  """ List the files in each class of the dataset given a URL with the zip file.

    Args:
      zip_url: A URL from which the files can be extracted from.

    Returns:
      List of files in each of the classes.
  """
  files = []
  with rz.RemoteZip(zip_url) as zip:
    for zip_info in zip.infolist():
      files.append(zip_info.filename)
  return files
files = list_files_from_zip_url(URL)
files = [f for f in files if f.endswith('.avi')]
files[:10]

从几个视频和有限数量的类别开始训练。在运行上面的代码块后,请注意类名包含在每个视频的文件名中。

定义 get_class 函数,该函数从文件名中检索类名。然后,创建一个名为 get_files_per_class 的函数,该函数将所有文件列表(上面的 files)转换为一个字典,列出每个类的文件

def get_class(fname):
  """ Retrieve the name of the class given a filename.

    Args:
      fname: Name of the file in the UCF101 dataset.

    Returns:
      Class that the file belongs to.
  """
  return fname.split('_')[-3]
def get_files_per_class(files):
  """ Retrieve the files that belong to each class.

    Args:
      files: List of files in the dataset.

    Returns:
      Dictionary of class names (key) and files (values). 
  """
  files_for_class = collections.defaultdict(list)
  for fname in files:
    class_name = get_class(fname)
    files_for_class[class_name].append(fname)
  return files_for_class

获得每个类的文件列表后,您可以选择要使用的类别数量以及每个类别要使用的视频数量,以创建数据集。

NUM_CLASSES = 10
FILES_PER_CLASS = 50
files_for_class = get_files_per_class(files)
classes = list(files_for_class.keys())
print('Num classes:', len(classes))
print('Num videos for class[0]:', len(files_for_class[classes[0]]))

创建一个名为 select_subset_of_classes 的新函数,该函数选择数据集中存在的类的一个子集,以及每个类特定的文件数量

def select_subset_of_classes(files_for_class, classes, files_per_class):
  """ Create a dictionary with the class name and a subset of the files in that class.

    Args:
      files_for_class: Dictionary of class names (key) and files (values).
      classes: List of classes.
      files_per_class: Number of files per class of interest.

    Returns:
      Dictionary with class as key and list of specified number of video files in that class.
  """
  files_subset = dict()

  for class_name in classes:
    class_files = files_for_class[class_name]
    files_subset[class_name] = class_files[:files_per_class]

  return files_subset
files_subset = select_subset_of_classes(files_for_class, classes[:NUM_CLASSES], FILES_PER_CLASS)
list(files_subset.keys())

定义辅助函数,将视频拆分为训练集、验证集和测试集。视频从包含 zip 文件的 URL 下载,并放置到各自的子目录中。

def download_from_zip(zip_url, to_dir, file_names):
  """ Download the contents of the zip file from the zip URL.

    Args:
      zip_url: A URL with a zip file containing data.
      to_dir: A directory to download data to.
      file_names: Names of files to download.
  """
  with rz.RemoteZip(zip_url) as zip:
    for fn in tqdm.tqdm(file_names):
      class_name = get_class(fn)
      zip.extract(fn, str(to_dir / class_name))
      unzipped_file = to_dir / class_name / fn

      fn = pathlib.Path(fn).parts[-1]
      output_file = to_dir / class_name / fn
      unzipped_file.rename(output_file)

以下函数返回尚未放置到数据子集中的剩余数据。它允许您将剩余数据放置到下一个指定的数据子集中。

def split_class_lists(files_for_class, count):
  """ Returns the list of files belonging to a subset of data as well as the remainder of
    files that need to be downloaded.

    Args:
      files_for_class: Files belonging to a particular class of data.
      count: Number of files to download.

    Returns:
      Files belonging to the subset of data and dictionary of the remainder of files that need to be downloaded.
  """
  split_files = []
  remainder = {}
  for cls in files_for_class:
    split_files.extend(files_for_class[cls][:count])
    remainder[cls] = files_for_class[cls][count:]
  return split_files, remainder

以下 download_ucf_101_subset 函数允许您下载 UCF101 数据集的子集,并将其拆分为训练集、验证集和测试集。您可以指定要使用的类别数量。 splits 参数允许您传入一个字典,其中键值是子集的名称(例如:“train”)以及您希望每个类别拥有的视频数量。

def download_ucf_101_subset(zip_url, num_classes, splits, download_dir):
  """ Download a subset of the UCF101 dataset and split them into various parts, such as
    training, validation, and test.

    Args:
      zip_url: A URL with a ZIP file with the data.
      num_classes: Number of labels.
      splits: Dictionary specifying the training, validation, test, etc. (key) division of data 
              (value is number of files per split).
      download_dir: Directory to download data to.

    Return:
      Mapping of the directories containing the subsections of data.
  """
  files = list_files_from_zip_url(zip_url)
  for f in files:
    path = os.path.normpath(f)
    tokens = path.split(os.sep)
    if len(tokens) <= 2:
      files.remove(f) # Remove that item from the list if it does not have a filename

  files_for_class = get_files_per_class(files)

  classes = list(files_for_class.keys())[:num_classes]

  for cls in classes:
    random.shuffle(files_for_class[cls])

  # Only use the number of classes you want in the dictionary
  files_for_class = {x: files_for_class[x] for x in classes}

  dirs = {}
  for split_name, split_count in splits.items():
    print(split_name, ":")
    split_dir = download_dir / split_name
    split_files, files_for_class = split_class_lists(files_for_class, split_count)
    download_from_zip(zip_url, split_dir, split_files)
    dirs[split_name] = split_dir

  return dirs
download_dir = pathlib.Path('./UCF101_subset/')
subset_paths = download_ucf_101_subset(URL,
                                       num_classes = NUM_CLASSES,
                                       splits = {"train": 30, "val": 10, "test": 10},
                                       download_dir = download_dir)

下载数据后,您现在应该拥有 UCF101 数据集子集的副本。运行以下代码以打印所有数据子集中的视频总数。

video_count_train = len(list(download_dir.glob('train/*/*.avi')))
video_count_val = len(list(download_dir.glob('val/*/*.avi')))
video_count_test = len(list(download_dir.glob('test/*/*.avi')))
video_total = video_count_train + video_count_val + video_count_test
print(f"Total videos: {video_total}")

您现在也可以预览数据文件目录。

find ./UCF101_subset

从每个视频文件中创建帧

frames_from_video_file 函数将视频拆分为帧,从视频文件中读取随机选择的 n_frames 跨度,并将其作为 NumPy array 返回。为了减少内存和计算开销,请选择一个 **较小** 的帧数。此外,从每个视频中选择 **相同** 的帧数,这使得更容易处理数据批次。

def format_frames(frame, output_size):
  """
    Pad and resize an image from a video.

    Args:
      frame: Image that needs to resized and padded. 
      output_size: Pixel size of the output frame image.

    Return:
      Formatted frame with padding of specified output size.
  """
  frame = tf.image.convert_image_dtype(frame, tf.float32)
  frame = tf.image.resize_with_pad(frame, *output_size)
  return frame
def frames_from_video_file(video_path, n_frames, output_size = (224,224), frame_step = 15):
  """
    Creates frames from each video file present for each category.

    Args:
      video_path: File path to the video.
      n_frames: Number of frames to be created per video file.
      output_size: Pixel size of the output frame image.

    Return:
      An NumPy array of frames in the shape of (n_frames, height, width, channels).
  """
  # Read each video frame by frame
  result = []
  src = cv2.VideoCapture(str(video_path))  

  video_length = src.get(cv2.CAP_PROP_FRAME_COUNT)

  need_length = 1 + (n_frames - 1) * frame_step

  if need_length > video_length:
    start = 0
  else:
    max_start = video_length - need_length
    start = random.randint(0, max_start + 1)

  src.set(cv2.CAP_PROP_POS_FRAMES, start)
  # ret is a boolean indicating whether read was successful, frame is the image itself
  ret, frame = src.read()
  result.append(format_frames(frame, output_size))

  for _ in range(n_frames - 1):
    for _ in range(frame_step):
      ret, frame = src.read()
    if ret:
      frame = format_frames(frame, output_size)
      result.append(frame)
    else:
      result.append(np.zeros_like(result[0]))
  src.release()
  result = np.array(result)[..., [2, 1, 0]]

  return result

可视化视频数据

frames_from_video_file 函数返回一组帧作为 NumPy 数组。尝试在 Patrick Gillett 的 维基百科 上的新的视频上使用此函数

curl -O https://upload.wikimedia.org/wikipedia/commons/8/86/End_of_a_jam.ogv
video_path = "End_of_a_jam.ogv"
sample_video = frames_from_video_file(video_path, n_frames = 10)
sample_video.shape
def to_gif(images):
  converted_images = np.clip(images * 255, 0, 255).astype(np.uint8)
  imageio.mimsave('./animation.gif', converted_images, fps=10)
  return embed.embed_file('./animation.gif')
to_gif(sample_video)

除了检查此视频之外,您还可以显示 UCF-101 数据。为此,请运行以下代码

# docs-infra: no-execute
ucf_sample_video = frames_from_video_file(next(subset_paths['train'].glob('*/*.avi')), 50)
to_gif(ucf_sample_video)

接下来,定义 FrameGenerator 类,以便创建一个可迭代对象,该对象可以将数据馈送到 TensorFlow 数据管道。生成器(__call__)函数生成由 frames_from_video_file 生成的帧数组,以及与帧集关联的标签的独热编码向量。

class FrameGenerator:
  def __init__(self, path, n_frames, training = False):
    """ Returns a set of frames with their associated label. 

      Args:
        path: Video file paths.
        n_frames: Number of frames. 
        training: Boolean to determine if training dataset is being created.
    """
    self.path = path
    self.n_frames = n_frames
    self.training = training
    self.class_names = sorted(set(p.name for p in self.path.iterdir() if p.is_dir()))
    self.class_ids_for_name = dict((name, idx) for idx, name in enumerate(self.class_names))

  def get_files_and_class_names(self):
    video_paths = list(self.path.glob('*/*.avi'))
    classes = [p.parent.name for p in video_paths] 
    return video_paths, classes

  def __call__(self):
    video_paths, classes = self.get_files_and_class_names()

    pairs = list(zip(video_paths, classes))

    if self.training:
      random.shuffle(pairs)

    for path, name in pairs:
      video_frames = frames_from_video_file(path, self.n_frames) 
      label = self.class_ids_for_name[name] # Encode labels
      yield video_frames, label

在将 FrameGenerator 对象包装为 TensorFlow Dataset 对象之前,先测试一下它。此外,对于训练数据集,请确保启用训练模式,以便对数据进行洗牌。

fg = FrameGenerator(subset_paths['train'], 10, training=True)

frames, label = next(fg())

print(f"Shape: {frames.shape}")
print(f"Label: {label}")

最后,创建一个 TensorFlow 数据输入管道。您从生成器对象创建的此管道允许您将数据馈送到深度学习模型。在此视频管道中,每个元素都是一组帧及其关联的标签。

# Create the training set
output_signature = (tf.TensorSpec(shape = (None, None, None, 3), dtype = tf.float32),
                    tf.TensorSpec(shape = (), dtype = tf.int16))
train_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths['train'], 10, training=True),
                                          output_signature = output_signature)

检查标签是否已洗牌。

for frames, labels in train_ds.take(10):
  print(labels)
# Create the validation set
val_ds = tf.data.Dataset.from_generator(FrameGenerator(subset_paths['val'], 10),
                                        output_signature = output_signature)
# Print the shapes of the data
train_frames, train_labels = next(iter(train_ds))
print(f'Shape of training set of frames: {train_frames.shape}')
print(f'Shape of training labels: {train_labels.shape}')

val_frames, val_labels = next(iter(val_ds))
print(f'Shape of validation set of frames: {val_frames.shape}')
print(f'Shape of validation labels: {val_labels.shape}')

配置数据集以提高性能

使用缓冲预取,以便您可以从磁盘生成数据,而不会让 I/O 成为阻塞。在加载数据时要使用的两个重要函数是

  • Dataset.cache:在第一个 epoch 中从磁盘加载帧集后,将它们保存在内存中。此函数确保数据集在训练模型时不会成为瓶颈。如果您的数据集太大而无法放入内存,您也可以使用此方法来创建高性能的磁盘缓存。

  • Dataset.prefetch:在训练期间重叠数据预处理和模型执行。有关详细信息,请参阅 使用 tf.data 提高性能

AUTOTUNE = tf.data.AUTOTUNE

train_ds = train_ds.cache().shuffle(1000).prefetch(buffer_size = AUTOTUNE)
val_ds = val_ds.cache().shuffle(1000).prefetch(buffer_size = AUTOTUNE)

要准备要馈送到模型中的数据,请使用如下所示的批处理。请注意,在处理视频数据(例如 AVI 文件)时,数据应以五维对象的形式进行整形。这些维度如下:[batch_size, number_of_frames, height, width, channels]。相比之下,图像将具有四个维度:[batch_size, height, width, channels]。下图说明了视频数据的形状是如何表示的。

Video data shape

train_ds = train_ds.batch(2)
val_ds = val_ds.batch(2)

train_frames, train_labels = next(iter(train_ds))
print(f'Shape of training set of frames: {train_frames.shape}')
print(f'Shape of training labels: {train_labels.shape}')

val_frames, val_labels = next(iter(val_ds))
print(f'Shape of validation set of frames: {val_frames.shape}')
print(f'Shape of validation labels: {val_labels.shape}')

后续步骤

现在您已经创建了一个包含视频帧及其标签的 TensorFlow Dataset,您可以将其与深度学习模型一起使用。以下分类模型使用预训练的 EfficientNet 在几分钟内训练到高精度

net = tf.keras.applications.EfficientNetB0(include_top = False)
net.trainable = False

model = tf.keras.Sequential([
    tf.keras.layers.Rescaling(scale=255),
    tf.keras.layers.TimeDistributed(net),
    tf.keras.layers.Dense(10),
    tf.keras.layers.GlobalAveragePooling3D()
])

model.compile(optimizer = 'adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits = True),
              metrics=['accuracy'])

model.fit(train_ds, 
          epochs = 10,
          validation_data = val_ds,
          callbacks = tf.keras.callbacks.EarlyStopping(patience = 2, monitor = 'val_loss'))

要了解有关在 TensorFlow 中使用视频数据的更多信息,请查看以下教程