Скрытая марковская модель для непрерывных наблюдений в коде

Часть 1.

Здравствуйте и вновь добро пожаловать на занятия по теме «Машинное обучение без учителя: скрытые марковские модели на языке Python».

В продолжении предыдущей части, здесь мы напишем код для скрытой марковской модели для непрерывных наблюдений.

Я уже написал часть кода с импортом необходимых библиотек и функций из файла generate_c, а также функцией random_normalized, использовавшейся в дискретной скрытой марковской модели.

import numpy as np

import matplotlib.pyplot as plt

from generate_c import get_signals, big_init, simple_init

from scipy.stats import multivariate_normal as mvn

def random_normalized(d1, d2):

    x = np.random.random((d1, d2))

    return x / x.sum(axis=1, keepdims=True)

Итак, определим новый класс HMM.

class HMM:

    def __init__(self, M, K):

        self.M = M

        self.K = K

Определим функцию fit.

    def fit(self, X, max_iter=30, eps=10e-1):

        N = len(X)

        D = X[0].shape[1]

        self.pi = np.ones(self.M) / self.M

        self.A = random_normalized(self.M, self.M)

        self.R = np.ones((self.M, self.K)) / self.K

        self.mu = np.zeros((self.M, self.K, D))

        for i in xrange(self.M):

            for k in xrange(self.K):

                random_idx = np.random.choice(N)

                x = X[random_idx]

                random_time_idx = np.random.choice(len(x))

                self.mu[i,k] = x[random_time_idx]

        self.sigma = np.zeros((self.M, self.K, D, D))

        for j in xrange(self.M):

            for k in xrange(self.K):

                self.sigma[j,k] = np.eye(D)

Переходим к основному циклу.

        costs = []

        for it in xrange(max_iter):

            if it % 1 == 0:

                print(“it:”, it)

            alphas = []

            betas = []

            gammas = []

            Bs = []

            P = np.zeros(N)

            for n in xrange(N):

                x = X[n]

                T = len(x)

                B = np.zeros((self.M, T))

                component = np.zeros((self.M, self.K, T))

                for j in xrange(self.M):

                    for t in xrange(T):

                        for k in xrange(self.K):

                            p = self.R[j,k] * mvn.pdf(x[t], self.mu[j,k], self.sigma[j,k])

                            component[j,k,t] = p

                            B[j,t] += p

                Bs.append(B)

Теперь можем перейти к вычислению α и β, используя только что вычисленное B. Сначала α. Тут всё очень похоже на случай с дискретными наблюдениями.

                alpha = np.zeros((T, self.M))

                alpha[0] = self.pi*B[:,0]

                for t in xrange(1, T):

                    alpha[t] = alpha[t-1].dot(self.A) * B[:,t]

                P[n] = alpha[-1].sum()

                assert(P[n] <= 1)

                alphas.append(alpha)

Теперь β.

                beta = np.zeros((T, self.M))

                beta[-1] = 1

                for t in xrange(T – 2, -1, -1):

                    beta[t] = self.A.dot(B[:,t+1] * beta[t+1])

                betas.append(beta)

Теперь нам нужна новая переменная γ.

                gamma = np.zeros((T, self.M, self.K))

                for t in xrange(T):

                    alphabeta = (alphas[n][t,:] * betas[n][t,:]).sum()

                    for j in xrange(self.M):

                        factor = alphas[n][t,j] * betas[n][t,j] / alphabeta

                        for k in xrange(self.K):

                            gamma[t,j,k] = factor * component[j,k,t] / B[j,t]

                gammas.append(gamma)

Справившись с этой частью, можем вычислить ошибку.

            cost = np.log(P).sum()

            costs.append(cost)

Теперь этап пересчёта всех переменных.

self.pi = np.sum((alphas[n][0] * betas[n][0])/P[n] for n in xrange(N)) / N

            a_den = np.zeros((self.M, 1))

            a_num = 0

            r_num = np.zeros((self.M, self.K))

            r_den = np.zeros(self.M)

            mu_num = np.zeros((self.M, self.K, D))

            sigma_num = np.zeros((self.M, self.K, D, D))

            for n in xrange(N):

                x = X[n]

                T = len(x)

                B = Bs[n]

Вычисляем знаменатель для A.

a_den += (alphas[n][:-1] * betas[n][:-1]).sum(axis=0, keepdims=True).T / P[n]

Теперь числитель для A.

                a_num_n = np.zeros((self.M, self.M))

                for i in xrange(self.M):

                    for j in xrange(self.M):

                        for t in xrange(T-1):

                            a_num_n[i,j] += alphas[n][t,i] * self.A[i,j] * B[j,t+1] * betas[n][t+1,j]

                a_num += a_num_n / P[n]

Теперь обновляем компоненты смеси распределений.

                r_num_n = np.zeros((self.M, self.K))

                r_den_n = np.zeros(self.M)

                for j in xrange(self.M):

                    for k in xrange(self.K):

                        for t in xrange(T):

                            r_num_n[j,k] += gamma[t,j,k]

                            r_den_n[j] += gamma[t,j,k]

                r_num += r_num_n / P[n]

                r_den += r_den_n / P[n]

Следующее – μ и Σ.

                mu_num_n = np.zeros((self.M, self.K, D))

                sigma_num_n = np.zeros((self.M, self.K, D, D))

                for j in xrange(self.M):

                    for k in xrange(self.K):

                        for t in xrange(T):

                            mu_num_n[j,k] += gamma[t,j,k] * x[t]

                            sigma_num_n[j,k] += gamma[t,j,k] * np.outer(x[t] – self.mu[j,k], x[t] – self.mu[j,k])

                mu_num += mu_num_n / P[n]

                sigma_num += sigma_num_n / P[n]

            self.A = a_num / a_den

Обновляем значения R, μ и Σ.

            for j in xrange(self.M):

                for k in xrange(self.K):

                    self.R[j,k] = r_num[j,k] / r_den[j]

                    self.mu[j,k] = mu_num[j,k] / r_num[j,k]

                    self.sigma[j,k] = sigma_num[j,k] / r_num[j,k]

        print(“A:”, self.A)

        print(“mu:”, self.mu)

        print(“sigma:”, self.sigma)

        print(“R:”, self.R)

        print(“pi:”, self.pi)

        plt.plot(costs)

        plt.show()

Часть 2.

Следующим рассчитываем правдоподобие. Это очень похоже на то, что мы делали ранее.

    def likelihood(self, x):

        T = len(x)

        alpha = np.zeros((T, self.M))

        B = np.zeros((self.M, T))

        for j in xrange(self.M):

            for t in xrange(T):

                for k in xrange(self.K):

                    p = self.R[j,k] * mvn.pdf(x[t], self.mu[j,k], self.sigma[j,k])

                    B[j,t] += p

        alpha[0] = self.pi*B[:,0]

        for t in xrange(1, T):

            alpha[t] = alpha[t-1].dot(self.A) * B[:,t]

        return alpha[-1].sum()

Функции likelihood_multi и log_likelihood_multi идентичны прежним, так что просто скопируем их:

    def likelihood_multi(self, X):

        return np.array([self.likelihood(x) for x in X])

    def log_likelihood_multi(self, X):

        return np.log(self.likelihood_multi(X))

Далее идёт функция set для проверки истинных параметров.

    def set(self, pi, A, R, mu, sigma):

        self.pi = pi

        self.A = A

        self.R = R

        self.mu = mu

        self.sigma = sigma

        M, K = R.shape

        self.M = M

        self.K = K

Теперь создадим функцию fake_signal с моделью простого сигнала.

def fake_signal(init=simple_init):

    signals = get_signals(N=10, T=10, init=init)

    hmm = HMM(2, 2)

    hmm.fit(signals)

    L = hmm.log_likelihood_multi(signals).sum()

    print(“LL for fitted params:”, L)

    _, _, _, pi, A, R, mu, sigma = init()

    hmm.set(pi, A, R, mu, sigma)

    L = hmm.log_likelihood_multi(signals).sum()

    print(“LL for actual params:”, L)

if __name__ == ‘__main__’:

    fake_signal(init=simple_init)

Запустим наш код. Всё выглядит неплохо.

Попробуем теперь с моделью большого сигнала.

if __name__ == ‘__main__’:

    fake_signal(init=big_init)

Не получается – у нас некорректное деление. Видимо, P[n] равно нулю. Да и вероятность A оказалась не меньшей или равной единице.

Скрытая марковская модель для непрерывных наблюдений в Theano

Я уже написал часть кода. Мы импортируем библиотеку wave для загрузки звука, а также, разумеется, Theano, Numpy и Matplotlib. Из файла generate_c импортированы функции get_signals и только big_init – функцией для простого сигнала мы пользоваться больше не будем.

import wave

import theano

import theano.tensor as T

import numpy as np

import matplotlib.pyplot as plt

# from theano.sandbox import solve # does not have gradient functionality

from generate_c import get_signals, big_init

def random_normalized(d1, d2):

    x = np.random.random((d1, d2))

    return x / x.sum(axis=1, keepdims=True)

Далее идёт функция для загрузки wav-файла. В нём просто кто-то сказал «Hello, world!». Сразу же проводится и нормализация путём вычитания среднего значения и деления на стандартное отклонение, поскольку, как обнаружилось, она не работает без нормализации.

def real_signal():

    spf = wave.open(‘helloworld.wav’, ‘r’)

    #Extract Raw Audio from Wav File

    # If you right-click on the file and go to “Get Info”, you can see:

    # sampling rate = 16000 Hz

    # bits per sample = 16

    # The first is quantization in time

    # The second is quantization in amplitude

    # We also do this for images!

    # 2^16 = 65536 is how many different sound levels we have

    signal = spf.readframes(-1)

    signal = np.fromstring(signal, ‘Int16’)

    T = len(signal)

    signal = (signal – signal.mean()) / signal.std()

У нас будет 5 состояний и 3 гауссианы. Кроме того, мы переформатируем сигнал, поскольку наш API ожидает множественные последовательности. Поэтому переведём его в формат 1xTx1. Коэффициент обучения равен 10-6, количество итераций равно 20.

    hmm = HMM(5, 3)

    # signal needs to be of shape N x T(n) x D

    hmm.fit(signal.reshape(1, T, 1), learning_rate=10e-6, max_iter=20)

Что касается функции fake_signal, то мы вызываем get_signals и пользуемся аналогичной моделью с 5 состояниями и 3 гауссианами, после чего, как и прежде, выводим значение правдоподобия на экран.

def fake_signal():

    signals = get_signals()

    hmm = HMM(5, 3)

    hmm.fit(signals)

    L = hmm.log_likelihood_multi(signals).sum()

    print(“LL for fitted params:”, L)

    # test in actual params

    _, _, _, pi, A, R, mu, sigma = big_init()

    hmm.set(pi, A, R, mu, sigma)

    L = hmm.log_likelihood_multi(signals).sum()

    print(“LL for actual params:”, L)

Итак, приступим к нашей скрытой марковской модели. Определяем класс HMM.

class HMM:

    def __init__(self, M, K):

        self.M = M

        self.K = K

    def fit(self, X, learning_rate=10e-3, max_iter=10):

        N = len(X)

        D = X[0].shape[1]

        pi0 = np.ones(self.M) / self.M

        A0 = random_normalized(self.M, self.M)

        R0 = np.ones((self.M, self.K)) / self.K

        mu0 = np.zeros((self.M, self.K, D))

        for i in xrange(self.M):

            for k in xrange(self.K):

                random_idx = np.random.choice(N)

                x = X[random_idx]

                random_time_idx = np.random.choice(len(x))

                mu0[i,k] = x[random_time_idx]

        sigma0 = np.zeros((self.M, self.K, D, D))

        for j in xrange(self.M):

            for k in xrange(self.K):

                sigma0[j,k] = np.eye(D)

        thx, cost = self.set(pi0, A0, R0, mu0, sigma0)

Теперь главная часть всего этого файла.

    def set(self, pi, A, R, mu, sigma):

        self.pi = theano.shared(pi)

        self.A = theano.shared(A)

        self.R = theano.shared(R)

        self.mu = theano.shared(mu)

        self.sigma = theano.shared(sigma)

        M, K = R.shape

        self.M = M

        self.K = K

        D = self.mu.shape[2]

        twopiD = (2*np.pi)**D

Определяем переменные и функции Theano.

        thx = T.matrix(‘X’)

        def mvn_pdf(x, mu, sigma):

            k = 1 / T.sqrt(twopiD * T.nlinalg.det(sigma))

            e = T.exp(-0.5*(x – mu).T.dot(T.nlinalg.matrix_inverse(sigma).dot(x – mu)))

            return k*e

Далее нам понадобится функция gmm_pdf, чтобы использовать написанный выше код. В ней будут определены ещё две функции state_pdfs и component_pdf (рассчитывает B(j, t)).

        def gmm_pdf(x):

            def state_pdfs(xt):

                def component_pdf(j, xt):

                    Bj_t = 0

                    for k in xrange(self.K):

                        Bj_t += self.R[j,k] * mvn_pdf(xt, self.mu[j,k], self.sigma[j,k])

                    return Bj_t

Используем функцию scan. У нас их тут будет несколько.

                Bt, _ = theano.scan(

                    fn=component_pdf,

                    sequences=T.arange(self.M),

                    n_steps=self.M,

                    outputs_info=None,

                    non_sequences=[xt],

                )

                return Bt

            B, _ = theano.scan(

                fn=state_pdfs,

                sequences=x,

                n_steps=x.shape[0],

                outputs_info=None,

            )

            return B.T

        B = gmm_pdf(thx)

После этого определяем функцию recurrence.

        def recurrence(t, old_a, B):

            a = old_a.dot(self.A) * B[:, t]

            s = a.sum()

            return (a / s), s

Ещё одна функция scan.

        [alpha, scale], _ = theano.scan(

            fn=recurrence,

            sequences=T.arange(1, thx.shape[0]),

            outputs_info=[self.pi*B[:,0], None],

            n_steps=thx.shape[0]-1,

            non_sequences=[B],

        )

Теперь вычисляем ошибку.

        cost = -T.log(scale).sum()

        self.cost_op = theano.function(

            inputs=[thx],

            outputs=cost,

        )

        return thx, cost

Теперь можно вернуться к функции fit и определить обновления.

        updates = [

            (self.pi, (self.pi – learning_rate*T.grad(cost, self.pi)) / self.pi.sum()),

            (self.A, (self.A – learning_rate*T.grad(cost, self.A)) / self.A.sum(axis=1).dimshuffle(0, ‘x’)),

            (self.R, (self.R – learning_rate*T.grad(cost, self.R)) / self.R.sum(axis=1).dimshuffle(0, ‘x’)),

            (self.mu, self.mu – learning_rate*T.grad(cost, self.mu)),

            (self.sigma, self.sigma – learning_rate*T.grad(cost, self.sigma)),

        ]

        train_op = theano.function(

            inputs=[thx],

            updates=updates,

        )

Переходим к основному циклу.

        costs = []

        for it in xrange(max_iter):

            print(“it:”, it)

            for n in xrange(N):

                c = self.log_likelihood_multi(X).sum()

                print(“c:”, c)

                costs.append(c)

                train_op(X[n])

И выводим на экран значения всех наших переменных.

        print(“A:”, self.A.get_value())

        print(“mu:”, self.mu.get_value())

        print(“sigma:”, self.sigma.get_value())

        print(“R:”, self.R.get_value())

        print(“pi:”, self.pi.get_value())

        plt.plot(costs)

        plt.show()

И последнее – функция логарифма правдоподобия.

    def log_likelihood_multi(self, X):

        return np.array([self.cost_op(x) for x in X])

Запустим программу и посмотрим, что получится. Сначала – с помощью функции fake_signal.

if __name__ == ‘__main__’:

    # real_signal()

fake_signal()

Теперь – с помощью real_signal.

if __name__ == ‘__main__’:

real_signal()

    # fake_signal()

Это наш файл «Hello, world».

Понравилась статья? Поделить с друзьями:
Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: