Source code for hmmkay.hmm

from numba import njit
import numpy as np

from .utils import (
    _choice,
    _logsumexp,
    _check_array_sums_to_1,
    _check_random_state,
    _argmax,
    check_sequences,
)


__all__ = ["HMM"]


[docs]class HMM: """Discrete Hidden Markov Model. The number of hidden and observable states are determined by the shapes of the probability matrices passed as parameters. Parameters ---------- init_probas : array-like of shape (n_hidden_states,) The initial probabilities. transitions : array-like of shape (n_hidden_states, n_hidden_states) The transition probabilities. ``transitions[i, j] = P(st+1 = j / st = i)``. emissions : array-like of shape (n_hidden_states, n_observable_states) The probabilities of symbol emission. ``emissions[i, o] = P(Ot = o / st = i)``. n_iter : int, default=10 Number of iterations to run for the EM algorithm (in ``fit()``). """ def __init__(self, init_probas, transitions, emissions, n_iter=10): self.init_probas = np.array(init_probas, dtype=np.float64) self.transitions = np.array(transitions, dtype=np.float64) self.emissions = np.array(emissions, dtype=np.float64) self.n_iter = n_iter self.n_hidden_states = self.A.shape[0] self.n_observable_states = self.B.shape[1] if not ( self.A.shape[0] == self.A.shape[1] == self.pi.shape[0] == self.B.shape[0] ): raise ValueError("inconsistent number of hidden states.") self._check_matrices_conditioning()
[docs] def log_likelihood(self, sequences): """Compute log-likelihood of sequences. Parameters ---------- sequences : array-like of shape (n_seq, n_obs) or list (or numba typed list) \ of iterables of variable length The sequences of observable states Returns ------- log_likelihood : array of shape (n_seq,) """ total_log_likelihood = 0 sequences, n_obs_max = check_sequences(sequences, return_longest_length=True) log_alpha = np.empty(shape=(self.n_hidden_states, n_obs_max), dtype=np.float32) for seq in sequences: total_log_likelihood += self._forward(seq, log_alpha) return total_log_likelihood
[docs] def decode(self, sequences, return_log_probas=False): """Decode sequences with Viterbi algorithm. Given a sequence of observable states, return the sequence of hidden states that most-likely generated the input. Parameters ---------- sequences : array-like of shape (n_seq, n_obs) or list (or numba typed list) \ of iterables of variable length The sequences of observable states return_log_probas : bool, default=False If True, log-probabilities of the joint sequences of observable and hidden states are returned Returns ------- best_paths : ndarray of shape (n_seq, n_obs) or list of ndarray of \ variable length The most likely sequences of hidden states. log_probabilities : ndarray of shape (n_seq,) log-probabilities of the joint sequences of observable and hidden states. Only present if ``return_log_probas`` is True. """ sequences, n_obs_max = check_sequences(sequences, return_longest_length=True) hidden_states_sequences = [] log_probas = [] log_V = np.empty(shape=(self.n_hidden_states, n_obs_max), dtype=np.float32) back_path = np.empty(shape=(self.n_hidden_states, n_obs_max), dtype=np.int32) for seq in sequences: n_obs = seq.shape[0] self._viterbi(seq, log_V, back_path) best_path = np.empty(n_obs, dtype=np.int32) log_proba = _get_best_path(log_V, back_path, best_path) hidden_states_sequences.append(best_path) if return_log_probas: log_probas.append(log_proba) if isinstance(sequences, np.ndarray): # All sequences have the same length hidden_states_sequences = np.array(hidden_states_sequences) if return_log_probas: return hidden_states_sequences, np.array(log_probas) else: return hidden_states_sequences
[docs] def sample(self, n_seq=10, n_obs=10, random_state=None): """Sample sequences of hidden and observable states. Parameters ---------- n_seq : int, default=10 Number of sequences to sample n_obs : int, default=10 Number of observations per sequence random_state: int or np.random.RandomState instance, default=None Controls the RNG, see `scikt-learn glossary <https://scikit-learn.org/stable/glossary.html#term-random-state>`_ for details. Returns ------- hidden_states_sequences : ndarray of shape (n_seq, n_obs) observable_states_sequences : ndarray of shape (n_seq, n_obs) """ # TODO: allow n_obs_max rng = _check_random_state(random_state) sequences = np.array( [ _sample_one(n_obs, self.pi, self.A, self.B, seed=rng.tomaxint()) for _ in range(n_seq) ] ) # Unzip array of (hidden_states, observation) into tuple of arrays sequences = sequences.swapaxes(0, 1) return sequences[0], sequences[1]
[docs] def fit(self, sequences): """Fit model to sequences. The probabilities matrices ``init_probas``, ``transitions`` and ``emissions`` are estimated with the EM algorithm. Parameters ---------- sequences : array-like of shape (n_seq, n_obs) or list (or numba typed list) \ of iterables of variable length The sequences of observable states Returns ------- self : HMM instance """ sequences, n_obs_max = check_sequences(sequences, return_longest_length=True) log_alpha = np.empty(shape=(self.n_hidden_states, n_obs_max)) log_beta = np.empty(shape=(self.n_hidden_states, n_obs_max)) # E[i, j, t] = P(st = i, st+1 = j / O, lambda) log_E = np.empty( shape=(self.n_hidden_states, self.n_hidden_states, n_obs_max - 1) ) # g[i, t] = P(st = i / O, lambda) log_gamma = np.empty(shape=(self.n_hidden_states, n_obs_max)) for _ in range(self.n_iter): self.pi, self.A, self.B = _do_EM_step( sequences, self._log_pi, self._log_A, self._log_B, log_alpha, log_beta, log_E, log_gamma, ) self._check_matrices_conditioning() return self
def _viterbi(self, seq, log_V, back_path): # dummy wrapper for conveniency _viterbi(seq, self._log_pi, self._log_A, self._log_B, log_V, back_path) def _forward(self, seq, log_alpha): # dummy wrapper for conveniency return _forward(seq, self._log_pi, self._log_A, self._log_B, log_alpha) def _backward(self, seq, log_beta): # dummy wrapper for conveniency return _backward(seq, self._log_pi, self._log_A, self._log_B, log_beta) def _check_matrices_conditioning(self): _check_array_sums_to_1(self.pi, "init_probas") for s in range(self.n_hidden_states): _check_array_sums_to_1(self.A[s], f"Row {s} of A") _check_array_sums_to_1(self.B[s], f"Row {s} of B") # pi, A and B are respectively init_probas, transitions and emissions # matrices. _log_pi, _log_A and _log_B are updated each time pi, A, or B # are updated, respectively. Consider these private (and bug-prone :)), # Updating transitions would not update _log_A. @property def pi(self): return self.init_probas @pi.setter def pi(self, value): self.init_probas = value self._recompute_log_pi = True @property def _log_pi(self): if getattr(self, "_recompute_log_pi", True): self.__log_pi = np.log(self.pi) self._recompute_log_pi = False return self.__log_pi @property def A(self): return self.transitions @A.setter def A(self, value): self.transitions = value self._recompute_log_A = True @property def _log_A(self): if getattr(self, "_recompute_log_A", True): self.__log_A = np.log(self.A) self._recompute_log_A = False return self.__log_A @property def B(self): return self.emissions @B.setter def B(self, value): self.emissions = value self._recompute_log_B = True @property def _log_B(self): if getattr(self, "_recompute_log_B", True): self.__log_B = np.log(self.B) self._recompute_log_B = False return self.__log_B
@njit(cache=True) def _sample_one(n_obs, pi, A, B, seed): """Return (observations, hidden_states) sample""" np.random.seed(seed) # local to this numba function, not global numpy observations = [] hidden_states = [] s = _choice(pi) for _ in range(n_obs): hidden_states.append(s) obs = _choice(B[s]) observations.append(obs) s = _choice(A[s]) return hidden_states, observations @njit(cache=True) def _forward(seq, log_pi, log_A, log_B, log_alpha): """Fill log_alpha array with log probabilities, return log-likelihood""" # alpha[i, t] = P(O1, ... Ot, st = i / lambda) # reccursion is alpha[i, t] = B[i, Ot] * sumj(alpha[j, t - 1] * A[i, j]) # which becomes (when applying log) # log_alpha[i, t] = log(B[i, Ot]) + # logsum_jexp(log_alpha[j, t - 1] + _log_A[i, j]) # since log(sum(ai . bj)) = # log(sum(exp(log_ai + log_bi))) n_obs = len(seq) n_hidden_states = log_pi.shape[0] log_alpha[:, 0] = log_pi + log_B[:, seq[0]] buffer = np.empty(shape=n_hidden_states) for t in range(1, n_obs): for s in range(n_hidden_states): for ss in range(n_hidden_states): buffer[ss] = log_alpha[ss, t - 1] + log_A[ss, s] log_alpha[s, t] = _logsumexp(buffer) + log_B[s, seq[t]] return _logsumexp(log_alpha[:, n_obs - 1]) @njit(cache=True) def _backward(seq, log_pi, log_A, log_B, log_beta): """Fills beta array with log probabilities""" # beta[i, t] = P(Ot+1, ... OT, / st = i, lambda) n_obs = seq.shape[0] n_hidden_states = log_pi.shape[0] log_beta[:, n_obs - 1] = np.log(1) buffer = np.empty(shape=n_hidden_states) for t in range(n_obs - 2, -1, -1): for s in range(n_hidden_states): for ss in range(n_hidden_states): buffer[ss] = log_A[s, ss] + log_B[ss, seq[t + 1]] + log_beta[ss, t + 1] log_beta[s, t] = _logsumexp(buffer) @njit(cache=True) def _viterbi(seq, log_pi, log_A, log_B, log_V, back_path): """Fill V array with log probabilities and back_path with back links""" # V[i, t] = max_{s1...st-1} P(O1, ... Ot, s1, ... st-1, st=i / lambda) n_obs = seq.shape[0] n_hidden_states = log_pi.shape[0] log_V[:, 0] = log_pi + log_B[:, seq[0]] buffer = np.empty(shape=n_hidden_states) for t in range(1, n_obs): for s in range(n_hidden_states): for ss in range(n_hidden_states): buffer[ss] = log_V[ss, t - 1] + log_A[ss, s] best_prev = _argmax(buffer) back_path[s, t] = best_prev log_V[s, t] = buffer[best_prev] + log_B[s, seq[t]] @njit(cache=True) def _get_best_path(log_V, back_path, best_path): """Fill out best_path array""" n_obs = best_path.shape[0] s = _argmax(log_V[:, n_obs - 1]) out = log_V[s, n_obs - 1] for t in range(n_obs - 1, -1, -1): best_path[t] = s s = back_path[s, t] return out @njit(cache=True) def _do_EM_step(sequences, log_pi, log_A, log_B, log_alpha, log_beta, log_E, log_gamma): """Return A, B and C after EM step.""" # E STEP (over all sequences) # Accumulators for parameters of the hmm. They are summed over for # each sequence, then normalized in the M-step. # These are homogeneous to probabilities, not log-probabilities. pi_acc = np.zeros_like(log_pi) A_acc = np.zeros_like(log_A) B_acc = np.zeros_like(log_B) n_hidden_states = log_pi.shape[0] for seq_idx in range(len(sequences)): # numba can't iterate over 2D arrays seq = sequences[seq_idx] n_obs = seq.shape[0] log_likelihood = _forward(seq, log_pi, log_A, log_B, log_alpha) _backward(seq, log_pi, log_A, log_B, log_beta) # Compute E for t in range(n_obs - 1): for i in range(n_hidden_states): for j in range(n_hidden_states): log_E[i, j, t] = ( log_alpha[i, t] + log_A[i, j] + log_B[j, seq[t + 1]] + log_beta[j, t + 1] - log_likelihood ) # compute gamma log_gamma = log_alpha + log_beta - log_likelihood # M STEP accumulators pi_acc += np.exp(log_gamma[:, 0]) A_acc += np.sum(np.exp(log_E[:, :, : n_obs - 1]), axis=-1) for t in range(n_obs): B_acc[:, seq[t]] += np.exp(log_gamma[:, t]) # M STEP (mostly done in the accumulators already) pi = pi_acc / pi_acc.sum() # equivalent to X / X.sum(axis=1, keepdims=True) but not supported A = A_acc / A_acc.sum(axis=1).reshape(-1, 1) B = B_acc / B_acc.sum(axis=1).reshape(-1, 1) return pi, A, B