Написание кода скрытой марковской модели для дискретных наблюдений

Дискретная скрытая марковская модель в коде

Здравствуйте и вновь добро пожаловать на занятия по теме «Машинное обучение без учителя: скрытые марковские модели на языке Python».

В этой статье мы уже напишем код скрытой марковской модели для дискретных наблюдений.

Начинаем с импорта библиотек Numpy и Matplotlib.

import numpy as np

import matplotlib.pyplot as plt

Далее определим функцию random_normalized для создания корректной случайной марковской матрицы с размерностью d1xd2, чтобы быть уверенными, что сумма всех строк будет равна 1.

def random_normalized(d1, d2):

    x = np.random.random((d1, d2))

    return x / x.sum(axis=1, keepdims=True)

Теперь определим класс HMM для скрытой марковской модели. Здесь М – количество скрытых состояний.

class HMM:

    def __init__(self, M):

        self.M = M

Теперь функция fit. Прежде всего определим размер нашего словаря. Пронумеруем наши входные наблюдения от 0 до V-1, подобно тому как в обучении с учителем мы помечали классы от 0 до K-1.

    def fit(self, X, max_iter=30):

        np.random.seed(123)

        V = max(max(x) for x in X) + 1

        N = len(X)

Инициируем π, A и B. Логичной инициацией для π будет равномерное распределение. Для A и B же используем функцию, определённую выше.

        self.pi = np.ones(self.M) / self.M

        self.A = random_normalized(self.M, self.M)

        self.B = random_normalized(self.M, V)

Теперь мы готовы к написанию главного цикла. Значения будем выводить на экран через каждые 10 проходов. Все значения α и β будем сохранять в массиве, поскольку матрицу для этого использовать нельзя ввиду того, что они могут быть разной длины.

В циклах для α и β по моментам времени использовано поэлементное умножение, но вы можете попробовать вложенный цикл for, чтобы проверить правильность операции.

        costs = []

        for it in xrange(max_iter):

            if it % 10 == 0:

                print(“it:”, it)

            alphas = []

            betas = []

            P = np.zeros(N)

            for n in xrange(N):

                x = X[n]

                T = len(x)

                alpha = np.zeros((T, self.M))

                alpha[0] = self.pi*self.B[:,x[0]]

                for t in xrange(1, T):

                    alpha[t] = alpha[t-1].dot(self.A) * self.B[:, x[t]]

                P[n] = alpha[-1].sum()

                alphas.append(alpha)

                beta = np.zeros((T, self.M))

                beta[-1] = 1

                for t in xrange(T – 2, -1, -1):

                    beta[t] = self.A.dot(self.B[:, x[t+1]] * beta[t+1])

                betas.append(beta)

Справившись с этим, можно рассчитать значение ошибки.

            cost = np.sum(np.log(P))

            costs.append(cost)

Рассчитав α и β, мы можем сделать повторную оценку π, A и B.

  self.pi = np.sum((alphas[n][0] * betas[n][0])/P[n] for n in xrange(N)) / N

Надеюсь, вы справились с домашним заданием и убедились, что сумма всех π равна 1.

В качестве следующего шага будем отслеживать все знаменатели и числители для обновлений A и B. Знаменатель для A обозначим через den1, это матрица размерностью Mx1. den2 – тоже матрица с размерностью Mx1. a_num и b_num – числители для A и B соответственно.

            den1 = np.zeros((self.M, 1))

            den2 = np.zeros((self.M, 1))

            a_num = 0

            b_num = 0

            for n in xrange(N):

                x = X[n]

                T = len(x)

     den1 += (alphas[n][:-1] * betas[n][:-1]).sum(axis=0, keepdims=True).T / P[n]

    den2 += (alphas[n] * betas[n]).sum(axis=0, keepdims=True).T / P[n]

Вновь-таки, если вы не совсем понимаете, почему именно так, можете переписать этот фрагмент с помощью цикла for.

Теперь числитель для A.

                a_num_n = np.zeros((self.M, self.M))

                for i in xrange(self.M):

                    for j in xrange(self.M):

                        for t in xrange(T-1):

a_num_n[i,j] += alphas[n][t,i] * self.A[i,j] * self.B[j, x[t+1]] * betas[n][t+1,j]

                a_num += a_num_n / P[n]

Следующим идёт числитель для B.

                b_num_n = np.zeros((self.M, V))

                for i in xrange(self.M):

                    for j in xrange(V):

                        for t in xrange(T):

                            if x[t] == j:

                                b_num_n[i,j] += alphas[n][t][i] * betas[n][t][i]

                b_num += b_num_n2 / P[n]

После этого устанавливаем новые значения для A и B и выводим их значения на экран, чтобы посмотреть, чему они окажутся равны. Заодно и выведем значение ошибки.

            self.A = a_num / den1

            self.B = b_num / den2

        print(“A:”, self.A)

        print(“B:”, self.B)

        print(“pi:”, self.pi)

        plt.plot(costs)

        plt.show()

Это всё, что касается функции fit. Переходим теперь к функции likelihood. Это – для одного наблюдения.

    def likelihood(self, x):

        T = len(x)

        alpha = np.zeros((T, self.M))

        alpha[0] = self.pi*self.B[:,x[0]]

        for t in xrange(1, T):

            alpha[t] = alpha[t-1].dot(self.A) * self.B[:, x[t]]

        return alpha[-1].sum()

Мы можем определить и другую функцию likelihood_multi для вычисления правдоподобия для всех наблюдений.

    def likelihood_multi(self, X):

        return np.array([self.likelihood(x) for x in X])

Можем создать функцию и для логарифма правдоподобия множественных наблюдений, возвращающую логарифм написанного выше:

    def log_likelihood_multi(self, X):

        return np.log(self.likelihood_multi(X))

И последнее, что мы реализуем в классе HMM, – это алгоритм Витерби.

Для этого нам понадобятся две новые переменные delta и psi, обе матрицы размерностью TxM, а также два цикла для всех моментов времени и состояний.

    def get_state_sequence(self, x):

        T = len(x)

        delta = np.zeros((T, self.M))

        psi = np.zeros((T, self.M))

        delta[0] = self.pi*self.B[:,x[0]]

        for t in xrange(1, T):

            for j in xrange(self.M):

                delta[t,j] = np.max(delta[t-1]*self.A[:,j]) * self.B[j, x[t]]

                psi[t,j] = np.argmax(delta[t-1]*self.A[:,j])

        # backtrack

        states = np.zeros(T, dtype=np.int32)

        states[T-1] = np.argmax(delta[T-1])

        for t in xrange(T-2, -1, -1):

            states[t] = psi[t+1, states[t+1]]

        return states

Вот и всё, что касается класса HMM.

Последняя функция – fit_coin, которая загружает данные и обучает скрытую марковскую модель. Не забывайте, что наша модель ожидает в качестве данных символы от 0 до V-1.

def fit_coin():

    X = []

    for line in open(‘coin_data.txt’):

        x = [1 if e == ‘H’ else 0 for e in line.rstrip()]

        X.append(x)

Теперь создадим саму скрытую марковскую модель. Количество скрытых состояний установим равным 2 – мы это знаем, поскольку сами же и сгенерировали данные.

    hmm = HMM(2)

    hmm.fit(X)

    L = hmm.log_likelihood_multi(X).sum()

    print(“LL with fitted params:”, L)

Любопытно будет попробовать установить действительные значения для скрытой марковской модели. Так и сделаем. Интересно, в каком случае – при подобранных или при истинных значениях – логарифм правдоподобия будет выше.

    hmm.pi = np.array([0.5, 0.5])

    hmm.A = np.array([[0.1, 0.9], [0.8, 0.2]])

    hmm.B = np.array([[0.6, 0.4], [0.3, 0.7]])

    L = hmm.log_likelihood_multi(X).sum()

    print(“LL with true params:”, L)

И оценим результаты работы алгоритма Витерби.

    print(“Best state sequence for:”, X[0])

    print(hmm.get_state_sequence(X[0]))

if __name__ == ‘__main__’:

fit_coin()

Запустим наш файл.

Как можем видеть, в случае алгоритма максимизации ожидания, как и при методе k-средних или гауссовых смесях распределений, схождение происходит весьма быстро. Мы также видим, что при подобранных значениях логарифм правдоподобия выше, чем при истинных.


Всегда проверяйте наличие более поздней версии кода по адресу https://github.com/lazyprogrammer/machine_learning_examples. Идут обновления и улучшения! — Прим. LazyProgrammer

Проблема недостаточного заполнения и способ её решения

Одна из часто возникающих при работе со скрытыми марковскими моделями проблем – это проблема недостаточного заполнения. Представьте, что у вас есть очень длинная последовательность. Алгоритм прямого хода указывает, что мы должны умножать значения α до конца последовательности. Но что, если мы будем перемножать малые числа? Они будут становиться ещё меньше. Попробуйте перемножить в Python 0,1500. До какой степени вы сможете умножать, не дойдя до нуля? Не более нескольких сотен раз. А теперь представьте себе аудиосигнал. Он дискретизируется с частотой 44 100 выборок в секунду. Вряд ли ваша скрытая марковская модель сможет принести пользу, если она будет в состоянии обработать лишь несколько миллисекунд данных. Как же решить эту проблему?

Для начала рассмотрим алгоритм Витерби. Это простой случай, поскольку включает в себя только умножение. К тому же можем использовать логарифм вероятности вместо самой вероятности, поскольку он монотонно возрастает. Логарифм произведения равен сумме логарифмов:

log AB = log A + log B.

Поэтому с алгоритмом Витерби вопрос решён – мы просто берём логарифмы.

Но что делать с алгоритмом прямого-обратного хода и алгоритмом обучения, которые включают суммы? Тут всё намного сложнее, и нам придётся ввести понятие масштабирования. Чтобы добиться правильного масштабирования, введём коэффициент масштабирования, обозначив его через c(T). Как и в случае α и β, он будет иметь продолжительность T. С помощью коэффициента масштабирования мы можем преобразовать три этапа алгоритма прямого хода.

На этапе инициации α’ остаётся равной прежнему значению:

\alpha^' (1,i) = \pi_1 B(j,x(1)) = \alpha = (1,i).

Разница в том, что α’ – лишь временная переменная. Мы используем переменные со штрихом, чтобы определить масштаб, являющийся суммой значений переменной со штрихом по всем состояниям в данный момент времени:

Наконец, мы определяем  как α’, делённое на c(t):

Таким образом, начальное значение будет равно

Далее, мы можем определить этап индукции как следующую переменную со штрихом, выраженную через предыдущую переменную:

Это то же самое выражение, что и прежде, с тем разве что отличием, что появились α’ и . Масштабирование же остаётся прежним:

И, наконец, мы можем вычислить вероятность наблюдения следующим образом:

Таким образом, это всё лишь произведение всех c(t). В качестве упражнения попытайтесь самостоятельно доказать истинность этого утверждения.

Заметьте, что эта вероятность наблюдения будет крайне мала, о чём мы уже говорили в самом начале. Но обратите внимание и на то, что это произведение, что значит, что мы можем вычислить логарифм вероятности, как и в нашей функции затрат. Вы увидите, что даже в случае множественных наблюдений мы можем полностью исключить вероятность последовательности из уравнений обновлений.

Теперь поговорим об обновлении β. Нам не нужно вычислять новый коэффициент масштабирования – мы можем воспользоваться прежним.  инициируется как и прежде и равно единице:

\widehat {\beta} (T,i) = 1.

Этап индукции определяется следующим образом:

По сути, это старый этап индукции, но масштабированный по c(t+1).

А ещё вот вам несколько тождеств, которые я предлагаю вам доказать:

Меня часто спрашивают, как убедиться в правильности своих действий и как достичь успехов в этой теме. Вот именно так. У нас тут три тождества, которые могут быть доказаны с помощью уже показанных здесь уравнений. Эти тождества важны, поскольку мы будем пользоваться ими, когда будем переписывать уравнения обновлений, выражая их через  и . Поэтому, надеюсь, вы сможете доказать их.

С их же помощью мы можем определить новые обновления. Так, π остаётся прежним, но вероятность исчезает:

A(i,j) также похоже на предыдущее, и вероятность также исчезает:

По сути, вероятность оказалась заложена в новых величинах, а в числителе добавился коэффициент масштабирования.

Похожее выражение имеем и для переменной B:

Обратите внимание, как мы устранили фактическую вероятность наблюдения, поскольку она неявно включена в новые переменные. Осталось только реализовать эти изменения.

Обновление дискретной скрытой марковской модели с масштабированием

Сейчас мы напишем код для масштабированной версии дискретной скрытой марковской модели.

Тут всё очень похоже на немасштабированную скрытую марковскую модель, поэтому часть кода мы просто перенесём оттуда. Совершенно идентичными остаются функция random_normalized, импорты библиотек, определение класса. И конечно, мы выводим на экран значения π, A и B. Изменения будут лишь в основном цикле. Изменению также подвергнется функция likelihood, но остальные зависящие от неё функции остаются без изменений. Функция fit_coin также остаётся совершенно неизменной. То есть, почти весь код остаётся прежним.

Итак, приступим к циклу. Кроме списка значений α и β у нас появляется новый список – коэффициентов масштабирования scales. Кроме того, у нас будет логарифм вероятности, вместо самой вероятности.

        costs = []

        for it in xrange(max_iter):

            if it % 10 == 0:

                print(“it:”, it)

            alphas = []

            betas = []

            scales = []

            logP = np.zeros(N)

            for n in xrange(N):

                x = X[n]

                T = len(x)

                scale = np.zeros(T)

                alpha = np.zeros((T, self.M))

                alpha[0] = self.pi*self.B[:,x[0]]

                scale[0] = alpha[0].sum()

                alpha[0] /= scale[0]

                for t in xrange(1, T):

                    alpha_t_prime = alpha[t-1].dot(self.A) * self.B[:, x[t]]

                    scale[t] = alpha_t_prime.sum()

                    alpha[t] = alpha_t_prime / scale[t]

Теперь логарифм вероятности.

                logP[n] = np.log(scale).sum()

                alphas.append(alpha)

                scales.append(scale)

Теперь разберёмся с β.

                beta = np.zeros((T, self.M))

                beta[-1] = 1

                for t in xrange(T – 2, -1, -1):

                    beta[t] = self.A.dot(self.B[:, x[t+1]] * beta[t+1]) / scale[t+1]

                betas.append(beta)

И вычисляем ошибку.

            cost = np.sum(logP)

            costs.append(cost)

С α и β разобрались, теперь вычисление значений π, A и B.

    self.pi = np.sum((alphas[n][0] * betas[n][0]) for n in xrange(N)) / N

            den1 = np.zeros((self.M, 1))

            den2 = np.zeros((self.M, 1))

            a_num = np.zeros((self.M, self.M))

            b_num = np.zeros((self.M, V))

            for n in xrange(N):

                x = X[n]

                T = len(x)

                den1 += (alphas[n][:-1] * betas[n][:-1]).sum(axis=0, keepdims=True).T

                den2 += (alphas[n] * betas[n]).sum(axis=0, keepdims=True).T

Числители для A и B:

                for i in xrange(self.M):

                    for j in xrange(self.M):

                        for t in xrange(T-1):

      a_num[i,j] += alphas[n][t,i] * betas[n][t+1,j] * self.A[i,j] * self.B[j, x[t+1]] / scales[n][t+1]

                for i in xrange(self.M):

                    for j in xrange(V):

                        for t in xrange(T):

                            if x[t] == j:

                                b_num[i,j] += alphas[n][t][i] * betas[n][t][i]

            self.A = a_num / den1

            self.B = b_num / den2

        print(“A:”, self.A)

        print(“B:”, self.B)

        print(“pi:”, self.pi)

        plt.plot(costs)

        plt.show()

Далее – вычисление правдоподобия. Вернее, в действительности мы будем вычислять его логарифм, поэтому изменим название функции likelihood на log_likelihood. Соответствующим образом поменяем и зависящие от неё функции.

    def log_likelihood(self, x):

        T = len(x)

        scale = np.zeros(T)

        alpha = np.zeros((T, self.M))

        alpha[0] = self.pi*self.B[:,x[0]]

        scale[0] = alpha[0].sum()

        alpha[0] /= scale[0]

        for t in xrange(1, T):

            alpha_t_prime = alpha[t-1].dot(self.A) * self.B[:, x[t]]

            scale[t] = alpha_t_prime.sum()

            alpha[t] = alpha_t_prime / scale[t]

        return np.log(scale).sum()

    def log_likelihood_multi(self, X):

        return np.array([self.log_likelihood(x) for x in X])


Данный код был улучшен и обновлён! Всегда проверяйте последние обновления кода по адресу https://github.com/lazyprogrammer/machine_learning_examples. Код для этой статьи: папка hmm_class, файл hmmd_scaled.pyПрим. LazyProgrammer

Масштабированный алгоритм Витерби

Мы продолжаем писать код для масштабированной скрытой марковской модели.

Итак, последний этап – закончить алгоритм Витерби. Как уже указывалось, это очень просто, поскольку единственное, что требуется, – брать логарифмы всех произведений и преобразовывать их в суммы логарифмов. Этим и займёмся.

    def get_state_sequence(self, x):

        T = len(x)

        delta = np.zeros((T, self.M))

        psi = np.zeros((T, self.M))

        delta[0] = np.log(self.pi) + np.log(self.B[:,x[0]])

        for t in xrange(1, T):

            for j in xrange(self.M):

                delta[t,j] = np.max(delta[t-1] + np.log(self.A[:,j])) + np.log(self.B[j, x[t]])

                psi[t,j] = np.argmax(delta[t-1] + np.log(self.A[:,j]))

Дальше всё абсолютно то же самое

        # backtrack

        states = np.zeros(T, dtype=np.int32)

        states[T-1] = np.argmax(delta[T-1])

        for t in xrange(T-2, -1, -1):

            states[t] = psi[t+1, states[t+1]]

        return states

Изменения готовы. Посмотрим, как всё оно будет работать.

Сначала функция затрат, которую мы уже видели.

Итак, имеем ту же последовательность состояний, что и ранее.

Понравилась статья? Поделить с друзьями:
Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: