Source code for hmmkay.utils

"""
The utils module contains helpers for input checking, parameter generation and
sequence generation.
"""
import numbers

from numba import njit, types
from numba.typed import List
import numpy as np


__all__ = ["make_observation_sequences", "make_proba_matrices", "check_sequences"]


def _check_array_sums_to_1(a, name="array"):
    a_sum = a.sum()
    if not (1 - 1e-5 < a_sum < 1 + 1e-5):
        err_msg = f"{name} must sum to 1. Got \n{a}.sum() = {a_sum}"
        raise ValueError(err_msg)


@njit(cache=True)
def _choice(p):
    """return i with probability p[i]"""
    # inspired from https://github.com/numba/numba/issues/2539
    # p must sum to 1
    return np.searchsorted(np.cumsum(p), np.random.random(), side="right")


@njit(cache=True)
def _logsumexp(a):
    # stolen from pygbm \o/

    a_max = np.amax(a)
    if not np.isfinite(a_max):
        a_max = 0

    s = np.sum(np.exp(a - a_max))
    return np.log(s) + a_max


@njit(cache=True)
def _argmax(a):
    # Apparently much faster than np.argmax in our context
    curr_max = a[0]
    curr_max_idx = 0
    for i in range(1, len(a)):
        if a[i] > curr_max:
            curr_max = a[i]
            curr_max_idx = i
    return curr_max_idx


def _get_hmm_learn_model(hmm):
    """Return equivalent hmm_learn model"""
    try:
        import hmmlearn.hmm  # noqa
    except ImportError as ie:
        raise RuntimeError("Please install hmmlearn to run tests/benchmarks.") from ie

    hmm_learn_model = hmmlearn.hmm.MultinomialHMM(
        n_components=hmm.A.shape[0], init_params="", tol=0, n_iter=hmm.n_iter
    )
    hmm_learn_model.startprob_ = hmm.pi
    hmm_learn_model.transmat_ = hmm.A
    hmm_learn_model.emissionprob_ = hmm.B

    return hmm_learn_model


def _to_weird_format(sequences):
    # Please don't ask
    if isinstance(sequences, (list, List)):
        # list of lists, potentially different lenghts
        X = np.concatenate(sequences).reshape(-1, 1)
        lenghts = [len(seq) for seq in sequences]
    else:  # 2d array, sequences are of same length
        X = np.array(sequences).ravel().reshape(-1, 1)
        lenghts = [sequences.shape[1]] * sequences.shape[0]

    return {"X": X, "lengths": lenghts}


[docs]def make_proba_matrices(n_hidden_states=4, n_observable_states=3, random_state=None): """Generate random probability matrices. Parameters ---------- n_hidden_states : int, default=4 Number of hidden states n_observable_states : int, default=3 Number of observable states random_state: int or np.random.RandomState instance, default=None Controls the RNG, see `scikt-learn glossary <https://scikit-learn.org/stable/glossary.html#term-random-state>`_ for details. Returns ------- init_probas : array-like of shape (n_hidden_states,) The initial probabilities. transitions : array-like of shape (n_hidden_states, n_hidden_states) The transition probabilities. ``transitions[i, j] = P(st+1 = j / st = i)``. emissions : array-like of shape (n_hidden_states, n_observable_states) The probabilities of symbol emission. ``emissions[i, o] = P(Ot = o / st = i)``. """ rng = _check_random_state(random_state) pi = rng.rand(n_hidden_states) pi /= pi.sum() A = rng.rand(n_hidden_states, n_hidden_states) A /= A.sum(axis=1, keepdims=True) B = rng.rand(n_hidden_states, n_observable_states) B /= B.sum(axis=1, keepdims=True) return pi, A, B
[docs]def make_observation_sequences( n_seq=10, n_observable_states=3, n_obs_min=10, n_obs_max=None, random_state=None ): """Generate random observation sequences. Parameters ---------- n_seq : int, default=10 Number of sequences to generate n_observable_states : int, default=3 Number of observable states. n_obs_min : int, default=10 Minimum length of each sequence. n_obs_max : int or None, default=None If None (default), all sequences are of length ``n_obs_min`` and a 2d ndarray is returned. If an int, the length of each sequence is chosen randomly with ``n_obs_min <= length < n_obs_max``. A numba typed list of arrays is returned in this case. random_state: int or np.random.RandomState instance, default=None Controls the RNG, see `scikt-learn glossary <https://scikit-learn.org/stable/glossary.html#term-random-state>`_ for details. Returns ------- sequences : ndarray of shape (n_seq, n_obs_min,) or numba typed list of \ ndarray of variable length The generated sequences of observable states """ # TODO: generate a typed list instead of a list. rng = _check_random_state(random_state) if n_obs_max is None: # return 2d numpy array, all observations have same length return rng.randint(n_observable_states, size=(n_seq, n_obs_min), dtype=np.int32) else: sequences = List.empty_list(types.int32[:]) for _ in range(n_seq): sequences.append( rng.randint( n_observable_states, size=rng.randint(n_obs_min, n_obs_max), dtype=np.int32, ) ) return sequences
def _check_random_state(seed): # Stolen from scikit-learn if seed is None or seed is np.random: return np.random.mtrand._rand if isinstance(seed, numbers.Integral): return np.random.RandomState(seed) if isinstance(seed, np.random.RandomState): return seed raise ValueError( "%r cannot be used to seed a numpy.random.RandomState" " instance" % seed )
[docs]def check_sequences(sequences, return_longest_length=False): """Convert sequences into appropriate format. This helper is called before any method that uses sequences. It is recommended to convert your sequences once and for all before using the ``HMM`` class, to avoid repeated convertions. Parameters ---------- sequences : array-like of shape (n_seq, n_obs) or list/typed list of iterables of \ variable length. Lists of iterables are converted to typed lists of numpy arrays, which can have different lengths. 2D arrays are untouched (all sequences have the same length). return_longest_length : bool, default=False If True, also return the length of the longest sequence. Returns ------- sequences : ndarray of shape (n_seq, n_obs) or typed list of ndarray of \ variable length The sequences converted either to ndarray or numba typed list of ndarrays. """ if isinstance(sequences, List): # typed list if return_longest_length: longest_seq_length = max(len(seq) for seq in sequences) elif isinstance(sequences, list): # regular list, convert to numba typed list if return_longest_length: longest_seq_length = max(len(seq) for seq in sequences) new_sequences = List.empty_list(types.int32[:]) for seq in sequences: new_sequences.append(np.asarray(seq, dtype=np.int32)) sequences = new_sequences elif isinstance(sequences, np.ndarray): if return_longest_length: longest_seq_length = sequences.shape[1] else: raise ValueError( "Accepted sequences types are 2d numpy arrays, " "lists of iterables, or numba typed lists of iterables." ) if return_longest_length: return sequences, longest_seq_length else: return sequences