此笔记本将教您如何使用 MoveNet 和 TensorFlow Lite 训练姿态分类模型。结果是一个新的 TensorFlow Lite 模型,它接受 MoveNet 模型的输出作为输入,并输出姿态分类,例如瑜伽姿势的名称。
此笔记本中的过程包括 3 个部分
- 第 1 部分:将姿态分类训练数据预处理为 CSV 文件,该文件指定 MoveNet 模型检测到的地标(身体关键点),以及地面实况姿态标签。
- 第 2 部分:构建和训练一个姿态分类模型,该模型以 CSV 文件中的地标坐标作为输入,并输出预测的标签。
- 第 3 部分:将姿态分类模型转换为 TFLite。
默认情况下,此笔记本使用带有标记瑜伽姿势的图像数据集,但我们在第 1 部分中还包含了一个部分,您可以在其中上传自己的姿势图像数据集。
在 TensorFlow.org 上查看 | 在 Google Colab 中运行 | 在 GitHub 上查看源代码 | 下载笔记本 | 查看 TF Hub 模型 |
准备
在本节中,您将导入必要的库并定义几个函数,以将训练图像预处理为包含地标坐标和地面实况标签的 CSV 文件。
这里没有可观察到的变化,但您可以展开隐藏的代码单元格以查看我们将稍后调用的某些函数的实现。
如果您只想创建 CSV 文件而不了解所有细节,只需运行本节并继续进行第 1 部分。
pip install -q opencv-python
import csv
import cv2
import itertools
import numpy as np
import pandas as pd
import os
import sys
import tempfile
import tqdm
from matplotlib import pyplot as plt
from matplotlib.collections import LineCollection
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow import keras
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
使用 MoveNet 运行姿态估计的代码
使用 MoveNet 运行姿态估计的函数
用于可视化姿态估计结果的函数。
加载图像、检测姿态地标并将它们保存到 CSV 文件中的代码
(可选)尝试 Movenet 姿态估计逻辑的代码片段
第 1 部分:预处理输入图像
因为我们姿态分类器的输入是 MoveNet 模型的输出地标,所以我们需要通过运行标记图像通过 MoveNet 来生成训练数据集,然后将所有地标数据和地面实况标签捕获到 CSV 文件中。
我们为本教程提供的这个数据集是一个 CG 生成的瑜伽姿势数据集。它包含多个 CG 生成的模型做 5 种不同瑜伽姿势的图像。该目录已分为 train
数据集和 test
数据集。
因此,在本节中,我们将下载瑜伽数据集并通过 MoveNet 运行它,以便我们可以将所有地标捕获到 CSV 文件中... 但是,将我们的瑜伽数据集馈送到 MoveNet 并生成此 CSV 文件大约需要 15 分钟。因此,作为替代方案,您可以通过将下面的 is_skip_step_1
参数设置为 True 来下载瑜伽数据集的预先存在的 CSV 文件。这样,您将跳过此步骤,而是下载将在此预处理步骤中创建的相同 CSV 文件。
另一方面,如果您想使用自己的图像数据集训练姿态分类器,则需要上传您的图像并运行此预处理步骤(将 is_skip_step_1
保持为 False)——按照以下说明上传您自己的姿势数据集。
(可选)上传您自己的姿势数据集
如果您想使用您自己的标记姿势训练姿态分类器(它们可以是任何姿势,而不仅仅是瑜伽姿势),请按照以下步骤操作
将上面的
use_custom_dataset
选项设置为 True。准备一个包含您图像数据集的文件夹的归档文件(ZIP、TAR 或其他)。该文件夹必须包含按以下方式排序的姿势图像。
如果您已将数据集分为训练集和测试集,则将
dataset_is_split
设置为 True。也就是说,您的图像文件夹必须包含“train”和“test”目录,如下所示yoga_poses/ |__ train/ |__ downdog/ |______ 00000128.jpg |______ ... |__ test/ |__ downdog/ |______ 00000181.jpg |______ ...
或者,如果您的数据集尚未拆分,则将
dataset_is_split
设置为 False,我们将根据指定的拆分分数将其拆分。也就是说,您上传的图像文件夹应如下所示yoga_poses/ |__ downdog/ |______ 00000128.jpg |______ 00000181.jpg |______ ... |__ goddess/ |______ 00000243.jpg |______ 00000306.jpg |______ ...
单击左侧的文件选项卡(文件夹图标),然后单击上传到会话存储(文件图标)。
选择您的归档文件,然后等待它上传完成,然后再继续。
编辑以下代码块以指定您的归档文件和图像目录的名称。(默认情况下,我们期望一个 ZIP 文件,因此如果您使用的是其他格式的归档文件,则还需要修改该部分。)
现在运行笔记本的其余部分。
if use_custom_dataset:
# ATTENTION:
# You must edit these two lines to match your archive and images folder name:
# !tar -xf YOUR_DATASET_ARCHIVE_NAME.tar
!unzip -q YOUR_DATASET_ARCHIVE_NAME.zip
dataset_in = 'YOUR_DATASET_DIR_NAME'
# You can leave the rest alone:
if not os.path.isdir(dataset_in):
raise Exception("dataset_in is not a valid directory")
if dataset_is_split:
IMAGES_ROOT = dataset_in
else:
dataset_out = 'split_' + dataset_in
split_into_train_test(dataset_in, dataset_out, test_split=0.2)
IMAGES_ROOT = dataset_out
下载瑜伽数据集
if not is_skip_step_1 and not use_custom_dataset:
!wget -O yoga_poses.zip http://download.tensorflow.org/data/pose_classification/yoga_poses.zip
!unzip -q yoga_poses.zip -d yoga_cg
IMAGES_ROOT = "yoga_cg"
预处理 TRAIN
数据集
if not is_skip_step_1:
images_in_train_folder = os.path.join(IMAGES_ROOT, 'train')
images_out_train_folder = 'poses_images_out_train'
csvs_out_train_path = 'train_data.csv'
preprocessor = MoveNetPreprocessor(
images_in_folder=images_in_train_folder,
images_out_folder=images_out_train_folder,
csvs_out_path=csvs_out_train_path,
)
preprocessor.process(per_pose_class_limit=None)
预处理 TEST
数据集
if not is_skip_step_1:
images_in_test_folder = os.path.join(IMAGES_ROOT, 'test')
images_out_test_folder = 'poses_images_out_test'
csvs_out_test_path = 'test_data.csv'
preprocessor = MoveNetPreprocessor(
images_in_folder=images_in_test_folder,
images_out_folder=images_out_test_folder,
csvs_out_path=csvs_out_test_path,
)
preprocessor.process(per_pose_class_limit=None)
第二部分:训练一个姿态分类模型,该模型以地标坐标作为输入,并输出预测的标签。
您将构建一个 TensorFlow 模型,该模型接收地标坐标并预测输入图像中人物执行的姿态类别。该模型包含两个子模型
- 子模型 1 从检测到的地标坐标计算姿态嵌入(又称特征向量)。
- 子模型 2 将姿态嵌入通过几个
Dense
层以预测姿态类别。
然后,您将根据在第一部分中预处理的数据集训练模型。
(可选)如果您没有运行第一部分,请下载预处理的数据集
# Download the preprocessed CSV files which are the same as the output of step 1
if is_skip_step_1:
!wget -O train_data.csv http://download.tensorflow.org/data/pose_classification/yoga_train_data.csv
!wget -O test_data.csv http://download.tensorflow.org/data/pose_classification/yoga_test_data.csv
csvs_out_train_path = 'train_data.csv'
csvs_out_test_path = 'test_data.csv'
is_skipped_step_1 = True
将预处理的 CSV 加载到 TRAIN
和 TEST
数据集中。
def load_pose_landmarks(csv_path):
"""Loads a CSV created by MoveNetPreprocessor.
Returns:
X: Detected landmark coordinates and scores of shape (N, 17 * 3)
y: Ground truth labels of shape (N, label_count)
classes: The list of all class names found in the dataset
dataframe: The CSV loaded as a Pandas dataframe features (X) and ground
truth labels (y) to use later to train a pose classification model.
"""
# Load the CSV file
dataframe = pd.read_csv(csv_path)
df_to_process = dataframe.copy()
# Drop the file_name columns as you don't need it during training.
df_to_process.drop(columns=['file_name'], inplace=True)
# Extract the list of class names
classes = df_to_process.pop('class_name').unique()
# Extract the labels
y = df_to_process.pop('class_no')
# Convert the input features and labels into the correct format for training.
X = df_to_process.astype('float64')
y = keras.utils.to_categorical(y)
return X, y, classes, dataframe
加载并拆分原始 TRAIN
数据集,将其拆分为 TRAIN
(85% 的数据)和 VALIDATE
(剩余的 15%)。
# Load the train data
X, y, class_names, _ = load_pose_landmarks(csvs_out_train_path)
# Split training data (X, y) into (X_train, y_train) and (X_val, y_val)
X_train, X_val, y_train, y_val = train_test_split(X, y,
test_size=0.15)
# Load the test data
X_test, y_test, _, df_test = load_pose_landmarks(csvs_out_test_path)
定义函数,将姿态地标转换为姿态嵌入(又称特征向量),用于姿态分类
接下来,通过以下步骤将地标坐标转换为特征向量:
- 将姿态中心移动到原点。
- 缩放姿态,使姿态大小变为 1
- 将这些坐标展平成特征向量
然后使用此特征向量训练基于神经网络的姿态分类器。
def get_center_point(landmarks, left_bodypart, right_bodypart):
"""Calculates the center point of the two given landmarks."""
left = tf.gather(landmarks, left_bodypart.value, axis=1)
right = tf.gather(landmarks, right_bodypart.value, axis=1)
center = left * 0.5 + right * 0.5
return center
def get_pose_size(landmarks, torso_size_multiplier=2.5):
"""Calculates pose size.
It is the maximum of two values:
* Torso size multiplied by `torso_size_multiplier`
* Maximum distance from pose center to any pose landmark
"""
# Hips center
hips_center = get_center_point(landmarks, BodyPart.LEFT_HIP,
BodyPart.RIGHT_HIP)
# Shoulders center
shoulders_center = get_center_point(landmarks, BodyPart.LEFT_SHOULDER,
BodyPart.RIGHT_SHOULDER)
# Torso size as the minimum body size
torso_size = tf.linalg.norm(shoulders_center - hips_center)
# Pose center
pose_center_new = get_center_point(landmarks, BodyPart.LEFT_HIP,
BodyPart.RIGHT_HIP)
pose_center_new = tf.expand_dims(pose_center_new, axis=1)
# Broadcast the pose center to the same size as the landmark vector to
# perform substraction
pose_center_new = tf.broadcast_to(pose_center_new,
[tf.size(landmarks) // (17*2), 17, 2])
# Dist to pose center
d = tf.gather(landmarks - pose_center_new, 0, axis=0,
name="dist_to_pose_center")
# Max dist to pose center
max_dist = tf.reduce_max(tf.linalg.norm(d, axis=0))
# Normalize scale
pose_size = tf.maximum(torso_size * torso_size_multiplier, max_dist)
return pose_size
def normalize_pose_landmarks(landmarks):
"""Normalizes the landmarks translation by moving the pose center to (0,0) and
scaling it to a constant pose size.
"""
# Move landmarks so that the pose center becomes (0,0)
pose_center = get_center_point(landmarks, BodyPart.LEFT_HIP,
BodyPart.RIGHT_HIP)
pose_center = tf.expand_dims(pose_center, axis=1)
# Broadcast the pose center to the same size as the landmark vector to perform
# substraction
pose_center = tf.broadcast_to(pose_center,
[tf.size(landmarks) // (17*2), 17, 2])
landmarks = landmarks - pose_center
# Scale the landmarks to a constant pose size
pose_size = get_pose_size(landmarks)
landmarks /= pose_size
return landmarks
def landmarks_to_embedding(landmarks_and_scores):
"""Converts the input landmarks into a pose embedding."""
# Reshape the flat input into a matrix with shape=(17, 3)
reshaped_inputs = keras.layers.Reshape((17, 3))(landmarks_and_scores)
# Normalize landmarks 2D
landmarks = normalize_pose_landmarks(reshaped_inputs[:, :, :2])
# Flatten the normalized landmark coordinates into a vector
embedding = keras.layers.Flatten()(landmarks)
return embedding
定义用于姿态分类的 Keras 模型
我们的 Keras 模型接收检测到的姿态地标,然后计算姿态嵌入并预测姿态类别。
# Define the model
inputs = tf.keras.Input(shape=(51))
embedding = landmarks_to_embedding(inputs)
layer = keras.layers.Dense(128, activation=tf.nn.relu6)(embedding)
layer = keras.layers.Dropout(0.5)(layer)
layer = keras.layers.Dense(64, activation=tf.nn.relu6)(layer)
layer = keras.layers.Dropout(0.5)(layer)
outputs = keras.layers.Dense(len(class_names), activation="softmax")(layer)
model = keras.Model(inputs, outputs)
model.summary()
model.compile(
optimizer='adam',
loss='categorical_crossentropy',
metrics=['accuracy']
)
# Add a checkpoint callback to store the checkpoint that has the highest
# validation accuracy.
checkpoint_path = "weights.best.hdf5"
checkpoint = keras.callbacks.ModelCheckpoint(checkpoint_path,
monitor='val_accuracy',
verbose=1,
save_best_only=True,
mode='max')
earlystopping = keras.callbacks.EarlyStopping(monitor='val_accuracy',
patience=20)
# Start training
history = model.fit(X_train, y_train,
epochs=200,
batch_size=16,
validation_data=(X_val, y_val),
callbacks=[checkpoint, earlystopping])
# Visualize the training history to see whether you're overfitting.
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['TRAIN', 'VAL'], loc='lower right')
plt.show()
# Evaluate the model using the TEST dataset
loss, accuracy = model.evaluate(X_test, y_test)
绘制混淆矩阵,以更好地了解模型性能
def plot_confusion_matrix(cm, classes,
normalize=False,
title='Confusion matrix',
cmap=plt.cm.Blues):
"""Plots the confusion matrix."""
if normalize:
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
print("Normalized confusion matrix")
else:
print('Confusion matrix, without normalization')
plt.imshow(cm, interpolation='nearest', cmap=cmap)
plt.title(title)
plt.colorbar()
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=55)
plt.yticks(tick_marks, classes)
fmt = '.2f' if normalize else 'd'
thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j, i, format(cm[i, j], fmt),
horizontalalignment="center",
color="white" if cm[i, j] > thresh else "black")
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.tight_layout()
# Classify pose in the TEST dataset using the trained model
y_pred = model.predict(X_test)
# Convert the prediction result to class name
y_pred_label = [class_names[i] for i in np.argmax(y_pred, axis=1)]
y_true_label = [class_names[i] for i in np.argmax(y_test, axis=1)]
# Plot the confusion matrix
cm = confusion_matrix(np.argmax(y_test, axis=1), np.argmax(y_pred, axis=1))
plot_confusion_matrix(cm,
class_names,
title ='Confusion Matrix of Pose Classification Model')
# Print the classification report
print('\nClassification Report:\n', classification_report(y_true_label,
y_pred_label))
(可选)调查错误预测
您可以查看来自 TEST
数据集的错误预测的姿态,以查看是否可以提高模型精度。
if is_skip_step_1:
raise RuntimeError('You must have run step 1 to run this cell.')
# If step 1 was skipped, skip this step.
IMAGE_PER_ROW = 3
MAX_NO_OF_IMAGE_TO_PLOT = 30
# Extract the list of incorrectly predicted poses
false_predict = [id_in_df for id_in_df in range(len(y_test)) \
if y_pred_label[id_in_df] != y_true_label[id_in_df]]
if len(false_predict) > MAX_NO_OF_IMAGE_TO_PLOT:
false_predict = false_predict[:MAX_NO_OF_IMAGE_TO_PLOT]
# Plot the incorrectly predicted images
row_count = len(false_predict) // IMAGE_PER_ROW + 1
fig = plt.figure(figsize=(10 * IMAGE_PER_ROW, 10 * row_count))
for i, id_in_df in enumerate(false_predict):
ax = fig.add_subplot(row_count, IMAGE_PER_ROW, i + 1)
image_path = os.path.join(images_out_test_folder,
df_test.iloc[id_in_df]['file_name'])
image = cv2.imread(image_path)
plt.title("Predict: %s; Actual: %s"
% (y_pred_label[id_in_df], y_true_label[id_in_df]))
plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
plt.show()
第三部分:将姿态分类模型转换为 TensorFlow Lite
您将把 Keras 姿态分类模型转换为 TensorFlow Lite 格式,以便您可以将其部署到移动应用程序、Web 浏览器和边缘设备。在转换模型时,您将应用 动态范围量化,将姿态分类 TensorFlow Lite 模型大小减少约 4 倍,而精度损失微不足道。
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()
print('Model size: %dKB' % (len(tflite_model) / 1024))
with open('pose_classifier.tflite', 'wb') as f:
f.write(tflite_model)
然后,您将编写标签文件,其中包含从类别索引到人类可读类别名称的映射。
with open('pose_labels.txt', 'w') as f:
f.write('\n'.join(class_names))
由于您已应用量化来减小模型大小,因此让我们评估量化的 TFLite 模型,以检查精度下降是否可以接受。
def evaluate_model(interpreter, X, y_true):
"""Evaluates the given TFLite model and return its accuracy."""
input_index = interpreter.get_input_details()[0]["index"]
output_index = interpreter.get_output_details()[0]["index"]
# Run predictions on all given poses.
y_pred = []
for i in range(len(y_true)):
# Pre-processing: add batch dimension and convert to float32 to match with
# the model's input data format.
test_image = X[i: i + 1].astype('float32')
interpreter.set_tensor(input_index, test_image)
# Run inference.
interpreter.invoke()
# Post-processing: remove batch dimension and find the class with highest
# probability.
output = interpreter.tensor(output_index)
predicted_label = np.argmax(output()[0])
y_pred.append(predicted_label)
# Compare prediction results with ground truth labels to calculate accuracy.
y_pred = keras.utils.to_categorical(y_pred)
return accuracy_score(y_true, y_pred)
# Evaluate the accuracy of the converted TFLite model
classifier_interpreter = tf.lite.Interpreter(model_content=tflite_model)
classifier_interpreter.allocate_tensors()
print('Accuracy of TFLite model: %s' %
evaluate_model(classifier_interpreter, X_test, y_test))
现在,您可以下载 TFLite 模型(pose_classifier.tflite
)和标签文件(pose_labels.txt
)来对自定义姿态进行分类。有关如何使用 TFLite 姿态分类模型的端到端示例,请参阅 Android 和 Python/Raspberry Pi 示例应用程序。
zip pose_classifier.zip pose_labels.txt pose_classifier.tflite
# Download the zip archive if running on Colab.
try:
from google.colab import files
files.download('pose_classifier.zip')
except:
pass