多重变点检测和贝叶斯模型选择

贝叶斯模型选择

在 TensorFlow.org 上查看 在 Google Colab 中运行 在 GitHub 上查看源代码 下载笔记本

导入

import numpy as np
import tensorflow as tf
import tf_keras
import tensorflow_probability as tfp
from tensorflow_probability import distributions as tfd

from matplotlib import pylab as plt
%matplotlib inline
import scipy.stats

任务:具有多个变点的变点检测

考虑一个变点检测任务:事件以随时间变化的速率发生,这受生成数据的某些系统或过程的(未观察到的)状态的突然变化驱动。

例如,我们可能会观察到以下一系列计数

true_rates = [40, 3, 20, 50]
true_durations = [10, 20, 5, 35]

observed_counts = tf.concat(
    [tfd.Poisson(rate).sample(num_steps)
     for (rate, num_steps) in zip(true_rates, true_durations)], axis=0)

plt.plot(observed_counts)
[<matplotlib.lines.Line2D at 0x7f7589bdae10>]

png

这些可能代表数据中心的故障次数、网页的访问者数量、网络链路上的数据包数量等。

请注意,仅从查看数据并不能完全清楚有多少个不同的系统状态。你能说出三个切换点中的每一个发生在哪里吗?

已知状态数

我们首先考虑(可能不切实际的)情况,即未观察到的状态数是先验已知的。在这里,我们假设我们知道有四个潜在状态。

我们将此问题建模为一个切换(非齐次)泊松过程:在每个时间点,发生的事件数量服从泊松分布,而事件的速率由未观察到的系统状态 \(z_t\) 决定

\[x_t \sim \text{Poisson}(\lambda_{z_t})\]

潜在状态是离散的:\(z_t \in \{1, 2, 3, 4\}\),所以 \(\lambda = [\lambda_1, \lambda_2, \lambda_3, \lambda_4]\) 是一个简单的向量,包含每个状态的泊松速率。为了对状态随时间的演变进行建模,我们将定义一个简单的转移模型 \(p(z_t | z_{t-1})\):假设在每个步骤中,我们以某个概率 \(p\) 保留在前一个状态,以概率 \(1-p\) 以均匀随机的方式转移到另一个状态。初始状态也是均匀随机选择的,因此我们有

\[ \begin{align*} z_1 &\sim \text{Categorical}\left(\left\{\frac{1}{4}, \frac{1}{4}, \frac{1}{4}, \frac{1}{4}\right\}\right)\\ z_t | z_{t-1} &\sim \text{Categorical}\left(\left\{\begin{array}{cc}p & \text{if } z_t = z_{t-1} \\ \frac{1-p}{4-1} & \text{otherwise}\end{array}\right\}\right) \end{align*}\]

这些假设对应于具有泊松发射的隐马尔可夫模型。我们可以使用 TFP 中的 tfd.HiddenMarkovModel 对其进行编码。首先,我们定义转移矩阵和初始状态上的均匀先验

num_states = 4
initial_state_logits = tf.zeros([num_states]) # uniform distribution

daily_change_prob = 0.05
transition_probs = tf.fill([num_states, num_states],
                           daily_change_prob / (num_states - 1))
transition_probs = tf.linalg.set_diag(transition_probs,
                                      tf.fill([num_states],
                                              1 - daily_change_prob))

print("Initial state logits:\n{}".format(initial_state_logits))
print("Transition matrix:\n{}".format(transition_probs))
Initial state logits:
[0. 0. 0. 0.]
Transition matrix:
[[0.95       0.01666667 0.01666667 0.01666667]
 [0.01666667 0.95       0.01666667 0.01666667]
 [0.01666667 0.01666667 0.95       0.01666667]
 [0.01666667 0.01666667 0.01666667 0.95      ]]

接下来,我们使用一个可训练的变量来构建 tfd.HiddenMarkovModel 分布,该变量表示与每个系统状态相关的速率。我们在对数空间中参数化速率,以确保它们是正值的。

# Define variable to represent the unknown log rates.
trainable_log_rates = tf.Variable(
  tf.math.log(tf.reduce_mean(observed_counts)) +
  tf.random.stateless_normal([num_states], seed=(42, 42)),
  name='log_rates')

hmm = tfd.HiddenMarkovModel(
  initial_distribution=tfd.Categorical(
      logits=initial_state_logits),
  transition_distribution=tfd.Categorical(probs=transition_probs),
  observation_distribution=tfd.Poisson(log_rate=trainable_log_rates),
  num_steps=len(observed_counts))

最后,我们定义模型的总对数密度,包括速率上的弱信息 LogNormal 先验,并运行优化器来计算观察到的计数数据的最大后验 (MAP) 拟合。

rate_prior = tfd.LogNormal(5, 5)

def log_prob():
 return (tf.reduce_sum(rate_prior.log_prob(tf.math.exp(trainable_log_rates))) +
         hmm.log_prob(observed_counts))

losses = tfp.math.minimize(
    lambda: -log_prob(),
    optimizer=tf_keras.optimizers.Adam(learning_rate=0.1),
    num_steps=100)
plt.plot(losses)
plt.ylabel('Negative log marginal likelihood')
Text(0, 0.5, 'Negative log marginal likelihood')

png

rates = tf.exp(trainable_log_rates)
print("Inferred rates: {}".format(rates))
print("True rates: {}".format(true_rates))
Inferred rates: [ 2.8302798 49.58499   41.928307  17.35112  ]
True rates: [40, 3, 20, 50]

它起作用了!请注意,此模型中的潜在状态仅可识别到排列,因此我们恢复的速率顺序不同,并且存在一些噪声,但总体上它们与实际情况非常吻合。

恢复状态轨迹

现在我们已经拟合了模型,我们可能想要重建模型认为系统在每个时间步处于哪个状态。

这是一个后验推理任务:给定观察到的计数 \(x_{1:T}\) 和模型参数(速率)\(\lambda\),我们想要推断离散潜在变量的序列,遵循后验分布 \(p(z_{1:T} | x_{1:T}, \lambda)\)。在隐马尔可夫模型中,我们可以使用标准的消息传递算法有效地计算此分布的边缘和其它属性。特别是,posterior_marginals 方法将有效地计算(使用前向-后向算法)在每个时间步 \(t\) 处离散潜在状态 \(Z_t\) 上的边缘概率分布 \(p(Z_t = z_t | x_{1:T})\)。

# Runs forward-backward algorithm to compute marginal posteriors.
posterior_dists = hmm.posterior_marginals(observed_counts)
posterior_probs = posterior_dists.probs_parameter().numpy()

绘制后验概率,我们恢复了模型对数据的“解释”:模型认为每个状态在哪些时间点处于活动状态?

def plot_state_posterior(ax, state_posterior_probs, title):
  ln1 = ax.plot(state_posterior_probs, c='blue', lw=3, label='p(state | counts)')
  ax.set_ylim(0., 1.1)
  ax.set_ylabel('posterior probability')
  ax2 = ax.twinx()
  ln2 = ax2.plot(observed_counts, c='black', alpha=0.3, label='observed counts')
  ax2.set_title(title)
  ax2.set_xlabel("time")
  lns = ln1+ln2
  labs = [l.get_label() for l in lns]
  ax.legend(lns, labs, loc=4)
  ax.grid(True, color='white')
  ax2.grid(False)

fig = plt.figure(figsize=(10, 10))
plot_state_posterior(fig.add_subplot(2, 2, 1),
                     posterior_probs[:, 0],
                     title="state 0 (rate {:.2f})".format(rates[0]))
plot_state_posterior(fig.add_subplot(2, 2, 2),
                     posterior_probs[:, 1],
                     title="state 1 (rate {:.2f})".format(rates[1]))
plot_state_posterior(fig.add_subplot(2, 2, 3),
                     posterior_probs[:, 2],
                     title="state 2 (rate {:.2f})".format(rates[2]))
plot_state_posterior(fig.add_subplot(2, 2, 4),
                     posterior_probs[:, 3],
                     title="state 3 (rate {:.2f})".format(rates[3]))
plt.tight_layout()

png

在这种(简单)情况下,我们看到模型通常非常自信:在大多数时间步中,它将几乎所有概率质量分配给四个状态中的一个。幸运的是,这些解释看起来很合理!

我们也可以从每个时间步最有可能的潜在状态相关的速率来可视化这个后验,将概率后验压缩成一个单一的解释。

most_probable_states = hmm.posterior_mode(observed_counts)
most_probable_rates = tf.gather(rates, most_probable_states)
fig = plt.figure(figsize=(10, 4))
ax = fig.add_subplot(1, 1, 1)
ax.plot(most_probable_rates, c='green', lw=3, label='inferred rate')
ax.plot(observed_counts, c='black', alpha=0.3, label='observed counts')
ax.set_ylabel("latent rate")
ax.set_xlabel("time")
ax.set_title("Inferred latent rate over time")
ax.legend(loc=4)
<matplotlib.legend.Legend at 0x7f75849e70f0>

png

未知状态数

在实际问题中,我们可能不知道我们正在建模的系统中“真实”的状态数。这可能并不总是需要关注:如果你不特别关心未知状态的标识,你可以运行一个比模型需要的状态更多的模型,并学习(类似于)一堆实际状态的重复副本。但让我们假设你确实关心推断“真实”的潜在状态数。

我们可以将其视为贝叶斯模型选择的一种情况:我们有一组候选模型,每个模型都有不同的潜在状态数,我们想要选择最有可能生成观测数据的模型。为此,我们计算每个模型下数据的边际似然(我们也可以在模型本身添加先验,但这在本次分析中不是必需的;贝叶斯奥卡姆剃刀被证明足以编码对更简单模型的偏好)。

不幸的是,真正的边际似然,它对离散状态 \(z_{1:T}\) 和(向量)速率参数 \(\lambda\) 进行积分,\(p(x_{1:T}) = \int p(x_{1:T}, z_{1:T}, \lambda) dz d\lambda,\) 对于此模型来说是不可处理的。为了方便起见,我们将使用所谓的“经验贝叶斯”或“II 型最大似然”估计来近似它:而不是完全积分出与每个系统状态相关的(未知)速率参数 \(\lambda\),我们将对其值进行优化

\[\tilde{p}(x_{1:T}) = \max_\lambda \int p(x_{1:T}, z_{1:T}, \lambda) dz\]

这种近似可能会过度拟合,即它会比真实的边际似然更偏好更复杂的模型。我们可以考虑更忠实的近似,例如,优化变分下界,或使用蒙特卡罗估计器,例如退火重要性采样;这些(遗憾的是)超出了本笔记本的范围。(有关贝叶斯模型选择和近似的更多信息,优秀书籍机器学习:概率视角的第 7 章是一个很好的参考。)

原则上,我们可以通过多次运行上面的优化,使用不同的 num_states 值来进行这种模型比较,但这将是一项繁重的工作。在这里,我们将展示如何使用 TFP 的 batch_shape 机制进行向量化,并行考虑多个模型。

转移矩阵和初始状态先验:我们不再构建单个模型描述,而是构建一个转移矩阵和先验 logits 的批次,每个候选模型对应一个,直到 max_num_states。为了方便批处理,我们需要确保所有计算具有相同的“形状”:这必须对应于我们将拟合的最大模型的维度。为了处理较小的模型,我们可以将其描述“嵌入”到状态空间的最顶层维度中,有效地将剩余的维度视为从未使用过的虚拟状态。

max_num_states = 10

def build_latent_state(num_states, max_num_states, daily_change_prob=0.05):

  # Give probability exp(-100) ~= 0 to states outside of the current model.
  active_states_mask = tf.concat([tf.ones([num_states]),
                                  tf.zeros([max_num_states - num_states])],
                                 axis=0)
  initial_state_logits = -100. * (1 - active_states_mask)

  # Build a transition matrix that transitions only within the current
  # `num_states` states.
  transition_probs = tf.fill([num_states, num_states],
                             0. if num_states == 1
                             else daily_change_prob / (num_states - 1))  
  padded_transition_probs = tf.eye(max_num_states) + tf.pad(
      tf.linalg.set_diag(transition_probs,
                         tf.fill([num_states], - daily_change_prob)),
      paddings=[(0, max_num_states - num_states),
                (0, max_num_states - num_states)])

  return initial_state_logits, padded_transition_probs

# For each candidate model, build the initial state prior and transition matrix.
batch_initial_state_logits = []
batch_transition_probs = []
for num_states in range(1, max_num_states+1):
  initial_state_logits, transition_probs = build_latent_state(
      num_states=num_states,
      max_num_states=max_num_states)
  batch_initial_state_logits.append(initial_state_logits)
  batch_transition_probs.append(transition_probs)

batch_initial_state_logits = tf.stack(batch_initial_state_logits)
batch_transition_probs = tf.stack(batch_transition_probs)
print("Shape of initial_state_logits: {}".format(batch_initial_state_logits.shape))
print("Shape of transition probs: {}".format(batch_transition_probs.shape))
print("Example initial state logits for num_states==3:\n{}".format(batch_initial_state_logits[2, :]))
print("Example transition_probs for num_states==3:\n{}".format(batch_transition_probs[2, :, :]))
Shape of initial_state_logits: (10, 10)
Shape of transition probs: (10, 10, 10)
Example initial state logits for num_states==3:
[  -0.   -0.   -0. -100. -100. -100. -100. -100. -100. -100.]
Example transition_probs for num_states==3:
[[0.95  0.025 0.025 0.    0.    0.    0.    0.    0.    0.   ]
 [0.025 0.95  0.025 0.    0.    0.    0.    0.    0.    0.   ]
 [0.025 0.025 0.95  0.    0.    0.    0.    0.    0.    0.   ]
 [0.    0.    0.    1.    0.    0.    0.    0.    0.    0.   ]
 [0.    0.    0.    0.    1.    0.    0.    0.    0.    0.   ]
 [0.    0.    0.    0.    0.    1.    0.    0.    0.    0.   ]
 [0.    0.    0.    0.    0.    0.    1.    0.    0.    0.   ]
 [0.    0.    0.    0.    0.    0.    0.    1.    0.    0.   ]
 [0.    0.    0.    0.    0.    0.    0.    0.    1.    0.   ]
 [0.    0.    0.    0.    0.    0.    0.    0.    0.    1.   ]]

现在我们继续进行与上面类似的操作。这次,我们将在 trainable_rates 中使用额外的批次维度,分别为每个正在考虑的模型拟合速率。

trainable_log_rates = tf.Variable(
    tf.fill([batch_initial_state_logits.shape[0], max_num_states],
            tf.math.log(tf.reduce_mean(observed_counts))) + 
     tf.random.stateless_normal([1, max_num_states], seed=(42, 42)),
     name='log_rates')

hmm = tfd.HiddenMarkovModel(
  initial_distribution=tfd.Categorical(
      logits=batch_initial_state_logits),
  transition_distribution=tfd.Categorical(probs=batch_transition_probs),
  observation_distribution=tfd.Poisson(log_rate=trainable_log_rates),
  num_steps=len(observed_counts))
print("Defined HMM with batch shape: {}".format(hmm.batch_shape))
Defined HMM with batch shape: (10,)

在计算总的对数概率时,我们要注意只对每个模型组件实际使用的速率先验进行求和。

rate_prior = tfd.LogNormal(5, 5)

def log_prob():
  prior_lps = rate_prior.log_prob(tf.math.exp(trainable_log_rates))
  prior_lp = tf.stack(
      [tf.reduce_sum(prior_lps[i, :i+1]) for i in range(max_num_states)])
  return prior_lp + hmm.log_prob(observed_counts)

现在我们优化我们构建的批次目标,同时拟合所有候选模型。

losses = tfp.math.minimize(
    lambda: -log_prob(),
    optimizer=tf_keras.optimizers.Adam(0.1),
    num_steps=100)
plt.plot(losses)
plt.ylabel('Negative log marginal likelihood')
Text(0, 0.5, 'Negative log marginal likelihood')

png

num_states = np.arange(1, max_num_states+1)
plt.plot(num_states, -losses[-1])
plt.ylim([-400, -200])
plt.ylabel("marginal likelihood $\\tilde{p}(x)$")
plt.xlabel("number of latent states")
plt.title("Model selection on latent states")
Text(0.5, 1.0, 'Model selection on latent states')

png

检查似然,我们看到(近似的)边际似然倾向于偏好三状态模型。这似乎是合理的——“真实”模型有四个状态,但仅从数据来看,很难排除三状态解释。

我们还可以提取每个候选模型的拟合速率。

rates = tf.exp(trainable_log_rates)
for i, learned_model_rates in enumerate(rates):
  print("rates for {}-state model: {}".format(i+1, learned_model_rates[:i+1]))
rates for 1-state model: [32.968506]
rates for 2-state model: [ 5.789209 47.948917]
rates for 3-state model: [ 2.841977 48.057507 17.958897]
rates for 4-state model: [ 2.8302798 49.585037  41.928406  17.351114 ]
rates for 5-state model: [17.399694  77.83679   41.975216  49.62771    2.8256145]
rates for 6-state model: [41.63677   77.20768   49.570934  49.557076  17.630419   2.8713436]
rates for 7-state model: [41.711704  76.405945  49.581184  49.561283  17.451889   2.8722699
 17.43608  ]
rates for 8-state model: [41.771793 75.41323  49.568714 49.591846 17.2523   17.247969 17.231388
  2.830598]
rates for 9-state model: [41.83378   74.50916   49.619488  49.622494   2.8369408 17.254414
 17.21532    2.5904858 17.252514 ]
rates for 10-state model: [4.1886074e+01 7.3912338e+01 4.1940136e+01 4.9652588e+01 2.8485537e+00
 1.7433832e+01 6.7564294e-02 1.9590002e+00 1.7430998e+01 7.8838937e-02]

并绘制每个模型对数据的解释。

most_probable_states = hmm.posterior_mode(observed_counts)
fig = plt.figure(figsize=(14, 12))
for i, learned_model_rates in enumerate(rates):
  ax = fig.add_subplot(4, 3, i+1)
  ax.plot(tf.gather(learned_model_rates, most_probable_states[i]), c='green', lw=3, label='inferred rate')
  ax.plot(observed_counts, c='black', alpha=0.3, label='observed counts')
  ax.set_ylabel("latent rate")
  ax.set_xlabel("time")
  ax.set_title("{}-state model".format(i+1))
  ax.legend(loc=4)
plt.tight_layout()

png

很容易看出,一、二和(更微妙地)三状态模型提供了不充分的解释!有趣的是,所有超过四个状态的模型都提供了本质上相同的解释!这可能是因为我们的“数据”相对干净,留下了很少的替代解释空间;在更混乱的现实世界数据上,我们预计更高容量的模型将对数据提供越来越好的拟合,在某个折衷点,改进的拟合将被模型复杂度所抵消。

扩展

本笔记本中的模型可以以多种方式直接扩展。例如

  • 允许潜在状态具有不同的概率(某些状态可能是常见的,而另一些状态可能是罕见的)。
  • 允许潜在状态之间存在非均匀转移(例如,学习机器崩溃通常会接着系统重启,通常会接着一段时间的良好性能等等)。
  • 其他发射模型,例如 NegativeBinomial 用于对计数数据中的不同离散度进行建模,或连续分布,例如 Normal 用于实值数据。