Содержание страницы
Часть 1.
Здравствуйте и вновь добро пожаловать на занятия по теме «Машинное обучение без учителя: скрытые марковские модели на языке Python».
В продолжении предыдущей части, здесь мы напишем код для скрытой марковской модели для непрерывных наблюдений.
Я уже написал часть кода с импортом необходимых библиотек и функций из файла generate_c, а также функцией random_normalized, использовавшейся в дискретной скрытой марковской модели.
import numpy as np
import matplotlib.pyplot as plt
from generate_c import get_signals, big_init, simple_init
from scipy.stats import multivariate_normal as mvn
def random_normalized(d1, d2):
x = np.random.random((d1, d2))
return x / x.sum(axis=1, keepdims=True)
Итак, определим новый класс HMM.
class HMM:
def __init__(self, M, K):
self.M = M
self.K = K
Определим функцию fit.
def fit(self, X, max_iter=30, eps=10e-1):
N = len(X)
D = X[0].shape[1]
self.pi = np.ones(self.M) / self.M
self.A = random_normalized(self.M, self.M)
self.R = np.ones((self.M, self.K)) / self.K
self.mu = np.zeros((self.M, self.K, D))
for i in xrange(self.M):
for k in xrange(self.K):
random_idx = np.random.choice(N)
x = X[random_idx]
random_time_idx = np.random.choice(len(x))
self.mu[i,k] = x[random_time_idx]
self.sigma = np.zeros((self.M, self.K, D, D))
for j in xrange(self.M):
for k in xrange(self.K):
self.sigma[j,k] = np.eye(D)
Переходим к основному циклу.
costs = []
for it in xrange(max_iter):
if it % 1 == 0:
print(“it:”, it)
alphas = []
betas = []
gammas = []
Bs = []
P = np.zeros(N)
for n in xrange(N):
x = X[n]
T = len(x)
B = np.zeros((self.M, T))
component = np.zeros((self.M, self.K, T))
for j in xrange(self.M):
for t in xrange(T):
for k in xrange(self.K):
p = self.R[j,k] * mvn.pdf(x[t], self.mu[j,k], self.sigma[j,k])
component[j,k,t] = p
B[j,t] += p
Bs.append(B)
Теперь можем перейти к вычислению α и β, используя только что вычисленное B. Сначала α. Тут всё очень похоже на случай с дискретными наблюдениями.
alpha = np.zeros((T, self.M))
alpha[0] = self.pi*B[:,0]
for t in xrange(1, T):
alpha[t] = alpha[t-1].dot(self.A) * B[:,t]
P[n] = alpha[-1].sum()
assert(P[n] <= 1)
alphas.append(alpha)
Теперь β.
beta = np.zeros((T, self.M))
beta[-1] = 1
for t in xrange(T – 2, -1, -1):
beta[t] = self.A.dot(B[:,t+1] * beta[t+1])
betas.append(beta)
Теперь нам нужна новая переменная γ.
gamma = np.zeros((T, self.M, self.K))
for t in xrange(T):
alphabeta = (alphas[n][t,:] * betas[n][t,:]).sum()
for j in xrange(self.M):
factor = alphas[n][t,j] * betas[n][t,j] / alphabeta
for k in xrange(self.K):
gamma[t,j,k] = factor * component[j,k,t] / B[j,t]
gammas.append(gamma)
Справившись с этой частью, можем вычислить ошибку.
cost = np.log(P).sum()
costs.append(cost)
Теперь этап пересчёта всех переменных.
self.pi = np.sum((alphas[n][0] * betas[n][0])/P[n] for n in xrange(N)) / N
a_den = np.zeros((self.M, 1))
a_num = 0
r_num = np.zeros((self.M, self.K))
r_den = np.zeros(self.M)
mu_num = np.zeros((self.M, self.K, D))
sigma_num = np.zeros((self.M, self.K, D, D))
for n in xrange(N):
x = X[n]
T = len(x)
B = Bs[n]
Вычисляем знаменатель для A.
a_den += (alphas[n][:-1] * betas[n][:-1]).sum(axis=0, keepdims=True).T / P[n]
Теперь числитель для A.
a_num_n = np.zeros((self.M, self.M))
for i in xrange(self.M):
for j in xrange(self.M):
for t in xrange(T-1):
a_num_n[i,j] += alphas[n][t,i] * self.A[i,j] * B[j,t+1] * betas[n][t+1,j]
a_num += a_num_n / P[n]
Теперь обновляем компоненты смеси распределений.
r_num_n = np.zeros((self.M, self.K))
r_den_n = np.zeros(self.M)
for j in xrange(self.M):
for k in xrange(self.K):
for t in xrange(T):
r_num_n[j,k] += gamma[t,j,k]
r_den_n[j] += gamma[t,j,k]
r_num += r_num_n / P[n]
r_den += r_den_n / P[n]
Следующее – μ и Σ.
mu_num_n = np.zeros((self.M, self.K, D))
sigma_num_n = np.zeros((self.M, self.K, D, D))
for j in xrange(self.M):
for k in xrange(self.K):
for t in xrange(T):
mu_num_n[j,k] += gamma[t,j,k] * x[t]
sigma_num_n[j,k] += gamma[t,j,k] * np.outer(x[t] – self.mu[j,k], x[t] – self.mu[j,k])
mu_num += mu_num_n / P[n]
sigma_num += sigma_num_n / P[n]
self.A = a_num / a_den
Обновляем значения R, μ и Σ.
for j in xrange(self.M):
for k in xrange(self.K):
self.R[j,k] = r_num[j,k] / r_den[j]
self.mu[j,k] = mu_num[j,k] / r_num[j,k]
self.sigma[j,k] = sigma_num[j,k] / r_num[j,k]
print(“A:”, self.A)
print(“mu:”, self.mu)
print(“sigma:”, self.sigma)
print(“R:”, self.R)
print(“pi:”, self.pi)
plt.plot(costs)
plt.show()
Часть 2.
Следующим рассчитываем правдоподобие. Это очень похоже на то, что мы делали ранее.
def likelihood(self, x):
T = len(x)
alpha = np.zeros((T, self.M))
B = np.zeros((self.M, T))
for j in xrange(self.M):
for t in xrange(T):
for k in xrange(self.K):
p = self.R[j,k] * mvn.pdf(x[t], self.mu[j,k], self.sigma[j,k])
B[j,t] += p
alpha[0] = self.pi*B[:,0]
for t in xrange(1, T):
alpha[t] = alpha[t-1].dot(self.A) * B[:,t]
return alpha[-1].sum()
Функции likelihood_multi и log_likelihood_multi идентичны прежним, так что просто скопируем их:
def likelihood_multi(self, X):
return np.array([self.likelihood(x) for x in X])
def log_likelihood_multi(self, X):
return np.log(self.likelihood_multi(X))
Далее идёт функция set для проверки истинных параметров.
def set(self, pi, A, R, mu, sigma):
self.pi = pi
self.A = A
self.R = R
self.mu = mu
self.sigma = sigma
M, K = R.shape
self.M = M
self.K = K
Теперь создадим функцию fake_signal с моделью простого сигнала.
def fake_signal(init=simple_init):
signals = get_signals(N=10, T=10, init=init)
hmm = HMM(2, 2)
hmm.fit(signals)
L = hmm.log_likelihood_multi(signals).sum()
print(“LL for fitted params:”, L)
_, _, _, pi, A, R, mu, sigma = init()
hmm.set(pi, A, R, mu, sigma)
L = hmm.log_likelihood_multi(signals).sum()
print(“LL for actual params:”, L)
if __name__ == ‘__main__’:
fake_signal(init=simple_init)
Запустим наш код. Всё выглядит неплохо.
Попробуем теперь с моделью большого сигнала.
if __name__ == ‘__main__’:
fake_signal(init=big_init)
Не получается – у нас некорректное деление. Видимо, P[n] равно нулю. Да и вероятность A оказалась не меньшей или равной единице.
Скрытая марковская модель для непрерывных наблюдений в Theano
Я уже написал часть кода. Мы импортируем библиотеку wave для загрузки звука, а также, разумеется, Theano, Numpy и Matplotlib. Из файла generate_c импортированы функции get_signals и только big_init – функцией для простого сигнала мы пользоваться больше не будем.
import wave
import theano
import theano.tensor as T
import numpy as np
import matplotlib.pyplot as plt
# from theano.sandbox import solve # does not have gradient functionality
from generate_c import get_signals, big_init
def random_normalized(d1, d2):
x = np.random.random((d1, d2))
return x / x.sum(axis=1, keepdims=True)
Далее идёт функция для загрузки wav-файла. В нём просто кто-то сказал «Hello, world!». Сразу же проводится и нормализация путём вычитания среднего значения и деления на стандартное отклонение, поскольку, как обнаружилось, она не работает без нормализации.
def real_signal():
spf = wave.open(‘helloworld.wav’, ‘r’)
#Extract Raw Audio from Wav File
# If you right-click on the file and go to “Get Info”, you can see:
# sampling rate = 16000 Hz
# bits per sample = 16
# The first is quantization in time
# The second is quantization in amplitude
# We also do this for images!
# 2^16 = 65536 is how many different sound levels we have
signal = spf.readframes(-1)
signal = np.fromstring(signal, ‘Int16’)
T = len(signal)
signal = (signal – signal.mean()) / signal.std()
У нас будет 5 состояний и 3 гауссианы. Кроме того, мы переформатируем сигнал, поскольку наш API ожидает множественные последовательности. Поэтому переведём его в формат 1xTx1. Коэффициент обучения равен 10-6, количество итераций равно 20.
hmm = HMM(5, 3)
# signal needs to be of shape N x T(n) x D
hmm.fit(signal.reshape(1, T, 1), learning_rate=10e-6, max_iter=20)
Что касается функции fake_signal, то мы вызываем get_signals и пользуемся аналогичной моделью с 5 состояниями и 3 гауссианами, после чего, как и прежде, выводим значение правдоподобия на экран.
def fake_signal():
signals = get_signals()
hmm = HMM(5, 3)
hmm.fit(signals)
L = hmm.log_likelihood_multi(signals).sum()
print(“LL for fitted params:”, L)
# test in actual params
_, _, _, pi, A, R, mu, sigma = big_init()
hmm.set(pi, A, R, mu, sigma)
L = hmm.log_likelihood_multi(signals).sum()
print(“LL for actual params:”, L)
Итак, приступим к нашей скрытой марковской модели. Определяем класс HMM.
class HMM:
def __init__(self, M, K):
self.M = M
self.K = K
def fit(self, X, learning_rate=10e-3, max_iter=10):
N = len(X)
D = X[0].shape[1]
pi0 = np.ones(self.M) / self.M
A0 = random_normalized(self.M, self.M)
R0 = np.ones((self.M, self.K)) / self.K
mu0 = np.zeros((self.M, self.K, D))
for i in xrange(self.M):
for k in xrange(self.K):
random_idx = np.random.choice(N)
x = X[random_idx]
random_time_idx = np.random.choice(len(x))
mu0[i,k] = x[random_time_idx]
sigma0 = np.zeros((self.M, self.K, D, D))
for j in xrange(self.M):
for k in xrange(self.K):
sigma0[j,k] = np.eye(D)
thx, cost = self.set(pi0, A0, R0, mu0, sigma0)
Теперь главная часть всего этого файла.
def set(self, pi, A, R, mu, sigma):
self.pi = theano.shared(pi)
self.A = theano.shared(A)
self.R = theano.shared(R)
self.mu = theano.shared(mu)
self.sigma = theano.shared(sigma)
M, K = R.shape
self.M = M
self.K = K
D = self.mu.shape[2]
twopiD = (2*np.pi)**D
Определяем переменные и функции Theano.
thx = T.matrix(‘X’)
def mvn_pdf(x, mu, sigma):
k = 1 / T.sqrt(twopiD * T.nlinalg.det(sigma))
e = T.exp(-0.5*(x – mu).T.dot(T.nlinalg.matrix_inverse(sigma).dot(x – mu)))
return k*e
Далее нам понадобится функция gmm_pdf, чтобы использовать написанный выше код. В ней будут определены ещё две функции state_pdfs и component_pdf (рассчитывает B(j, t)).
def gmm_pdf(x):
def state_pdfs(xt):
def component_pdf(j, xt):
Bj_t = 0
for k in xrange(self.K):
Bj_t += self.R[j,k] * mvn_pdf(xt, self.mu[j,k], self.sigma[j,k])
return Bj_t
Используем функцию scan. У нас их тут будет несколько.
Bt, _ = theano.scan(
fn=component_pdf,
sequences=T.arange(self.M),
n_steps=self.M,
outputs_info=None,
non_sequences=[xt],
)
return Bt
B, _ = theano.scan(
fn=state_pdfs,
sequences=x,
n_steps=x.shape[0],
outputs_info=None,
)
return B.T
B = gmm_pdf(thx)
После этого определяем функцию recurrence.
def recurrence(t, old_a, B):
a = old_a.dot(self.A) * B[:, t]
s = a.sum()
return (a / s), s
Ещё одна функция scan.
[alpha, scale], _ = theano.scan(
fn=recurrence,
sequences=T.arange(1, thx.shape[0]),
outputs_info=[self.pi*B[:,0], None],
n_steps=thx.shape[0]-1,
non_sequences=[B],
)
Теперь вычисляем ошибку.
cost = -T.log(scale).sum()
self.cost_op = theano.function(
inputs=[thx],
outputs=cost,
)
return thx, cost
Теперь можно вернуться к функции fit и определить обновления.
updates = [
(self.pi, (self.pi – learning_rate*T.grad(cost, self.pi)) / self.pi.sum()),
(self.A, (self.A – learning_rate*T.grad(cost, self.A)) / self.A.sum(axis=1).dimshuffle(0, ‘x’)),
(self.R, (self.R – learning_rate*T.grad(cost, self.R)) / self.R.sum(axis=1).dimshuffle(0, ‘x’)),
(self.mu, self.mu – learning_rate*T.grad(cost, self.mu)),
(self.sigma, self.sigma – learning_rate*T.grad(cost, self.sigma)),
]
train_op = theano.function(
inputs=[thx],
updates=updates,
)
Переходим к основному циклу.
costs = []
for it in xrange(max_iter):
print(“it:”, it)
for n in xrange(N):
c = self.log_likelihood_multi(X).sum()
print(“c:”, c)
costs.append(c)
train_op(X[n])
И выводим на экран значения всех наших переменных.
print(“A:”, self.A.get_value())
print(“mu:”, self.mu.get_value())
print(“sigma:”, self.sigma.get_value())
print(“R:”, self.R.get_value())
print(“pi:”, self.pi.get_value())
plt.plot(costs)
plt.show()
И последнее – функция логарифма правдоподобия.
def log_likelihood_multi(self, X):
return np.array([self.cost_op(x) for x in X])
Запустим программу и посмотрим, что получится. Сначала – с помощью функции fake_signal.
if __name__ == ‘__main__’:
# real_signal()
fake_signal()
Теперь – с помощью real_signal.
if __name__ == ‘__main__’:
real_signal()
# fake_signal()
Это наш файл «Hello, world».