1# Copyright 2001 Brad Chapman.  All rights reserved.
2#
3# This file is part of the Biopython distribution and governed by your
4# choice of the "Biopython License Agreement" or the "BSD 3-Clause License".
5# Please see the LICENSE file that should have been included as part of this
6# package.
7
8"""Deal with representations of Markov Models."""
9# standard modules
10import copy
11import math
12import random
13from collections import defaultdict
14
15from Bio.Seq import Seq
16
17
18def _gen_random_array(n):
19    """Return an array of n random numbers summing to 1.0 (PRIVATE)."""
20    randArray = [random.random() for _ in range(n)]
21    total = sum(randArray)
22    return [x / total for x in randArray]
23
24
25def _calculate_emissions(emission_probs):
26    """Calculate which symbols can be emitted in each state (PRIVATE)."""
27    # loop over all of the state-symbol duples, mapping states to
28    # lists of emitted symbols
29    emissions = defaultdict(list)
30    for state, symbol in emission_probs:
31        emissions[state].append(symbol)
32
33    return emissions
34
35
36def _calculate_from_transitions(trans_probs):
37    """Calculate which 'from transitions' are allowed for each state (PRIVATE).
38
39    This looks through all of the trans_probs, and uses this dictionary
40    to determine allowed transitions. It converts this information into
41    a dictionary, whose keys are source states and whose values are
42    lists of destination states reachable from the source state via a
43    transition.
44    """
45    transitions = defaultdict(list)
46    for from_state, to_state in trans_probs:
47        transitions[from_state].append(to_state)
48
49    return transitions
50
51
52def _calculate_to_transitions(trans_probs):
53    """Calculate which 'to transitions' are allowed for each state (PRIVATE).
54
55    This looks through all of the trans_probs, and uses this dictionary
56    to determine allowed transitions. It converts this information into
57    a dictionary, whose keys are destination states and whose values are
58    lists of source states from which the destination is reachable via a
59    transition.
60    """
61    transitions = defaultdict(list)
62    for from_state, to_state in trans_probs:
63        transitions[to_state].append(from_state)
64
65    return transitions
66
67
68class MarkovModelBuilder:
69    """Interface to build up a Markov Model.
70
71    This class is designed to try to separate the task of specifying the
72    Markov Model from the actual model itself. This is in hopes of making
73    the actual Markov Model classes smaller.
74
75    So, this builder class should be used to create Markov models instead
76    of trying to initiate a Markov Model directly.
77    """
78
79    # the default pseudo counts to use
80    DEFAULT_PSEUDO = 1
81
82    def __init__(self, state_alphabet, emission_alphabet):
83        """Initialize a builder to create Markov Models.
84
85        Arguments:
86         - state_alphabet -- An iterable (e.g., tuple or list) containing
87           all of the letters that can appear in the states
88         - emission_alphabet -- An iterable (e.g., tuple or list) containing
89           all of the letters for states that can be emitted by the HMM.
90
91        """
92        self._state_alphabet = tuple(state_alphabet)
93        self._emission_alphabet = tuple(emission_alphabet)
94
95        # probabilities for the initial state, initialized by calling
96        # set_initial_probabilities (required)
97        self.initial_prob = {}
98
99        # the probabilities for transitions and emissions
100        # by default we have no transitions and all possible emissions
101        self.transition_prob = {}
102        self.emission_prob = self._all_blank(state_alphabet, emission_alphabet)
103
104        # the default pseudocounts for transition and emission counting
105        self.transition_pseudo = {}
106        self.emission_pseudo = self._all_pseudo(state_alphabet, emission_alphabet)
107
108    def _all_blank(self, first_alphabet, second_alphabet):
109        """Return a dictionary with all counts set to zero (PRIVATE).
110
111        This uses the letters in the first and second alphabet to create
112        a dictionary with keys of two tuples organized as
113        (letter of first alphabet, letter of second alphabet). The values
114        are all set to 0.
115        """
116        all_blank = {}
117        for first_state in first_alphabet:
118            for second_state in second_alphabet:
119                all_blank[(first_state, second_state)] = 0
120
121        return all_blank
122
123    def _all_pseudo(self, first_alphabet, second_alphabet):
124        """Return a dictionary with all counts set to a default value (PRIVATE).
125
126        This takes the letters in first alphabet and second alphabet and
127        creates a dictionary with keys of two tuples organized as:
128        (letter of first alphabet, letter of second alphabet). The values
129        are all set to the value of the class attribute DEFAULT_PSEUDO.
130        """
131        all_counts = {}
132        for first_state in first_alphabet:
133            for second_state in second_alphabet:
134                all_counts[(first_state, second_state)] = self.DEFAULT_PSEUDO
135
136        return all_counts
137
138    def get_markov_model(self):
139        """Return the markov model corresponding with the current parameters.
140
141        Each markov model returned by a call to this function is unique
142        (ie. they don't influence each other).
143        """
144        # user must set initial probabilities
145        if not self.initial_prob:
146            raise Exception(
147                "set_initial_probabilities must be called to "
148                "fully initialize the Markov model"
149            )
150
151        initial_prob = copy.deepcopy(self.initial_prob)
152        transition_prob = copy.deepcopy(self.transition_prob)
153        emission_prob = copy.deepcopy(self.emission_prob)
154        transition_pseudo = copy.deepcopy(self.transition_pseudo)
155        emission_pseudo = copy.deepcopy(self.emission_pseudo)
156
157        return HiddenMarkovModel(
158            self._state_alphabet,
159            self._emission_alphabet,
160            initial_prob,
161            transition_prob,
162            emission_prob,
163            transition_pseudo,
164            emission_pseudo,
165        )
166
167    def set_initial_probabilities(self, initial_prob):
168        """Set initial state probabilities.
169
170        initial_prob is a dictionary mapping states to probabilities.
171        Suppose, for example, that the state alphabet is ('A', 'B'). Call
172        set_initial_prob({'A': 1}) to guarantee that the initial
173        state will be 'A'. Call set_initial_prob({'A': 0.5, 'B': 0.5})
174        to make each initial state equally probable.
175
176        This method must now be called in order to use the Markov model
177        because the calculation of initial probabilities has changed
178        incompatibly; the previous calculation was incorrect.
179
180        If initial probabilities are set for all states, then they should add up
181        to 1. Otherwise the sum should be <= 1. The residual probability is
182        divided up evenly between all the states for which the initial
183        probability has not been set. For example, calling
184        set_initial_prob({}) results in P('A') = 0.5 and P('B') = 0.5,
185        for the above example.
186        """
187        self.initial_prob = copy.copy(initial_prob)
188
189        # ensure that all referenced states are valid
190        for state in initial_prob:
191            if state not in self._state_alphabet:
192                raise ValueError(
193                    "State %s was not found in the sequence alphabet" % state
194                )
195
196        # distribute the residual probability, if any
197        num_states_not_set = len(self._state_alphabet) - len(self.initial_prob)
198        if num_states_not_set < 0:
199            raise Exception("Initial probabilities can't exceed # of states")
200        prob_sum = sum(self.initial_prob.values())
201        if prob_sum > 1.0:
202            raise Exception("Total initial probability cannot exceed 1.0")
203        if num_states_not_set > 0:
204            prob = (1.0 - prob_sum) / num_states_not_set
205            for state in self._state_alphabet:
206                if state not in self.initial_prob:
207                    self.initial_prob[state] = prob
208
209    def set_equal_probabilities(self):
210        """Reset all probabilities to be an average value.
211
212        Resets the values of all initial probabilities and all allowed
213        transitions and all allowed emissions to be equal to 1 divided by the
214        number of possible elements.
215
216        This is useful if you just want to initialize a Markov Model to
217        starting values (ie. if you have no prior notions of what the
218        probabilities should be -- or if you are just feeling too lazy
219        to calculate them :-).
220
221        Warning 1 -- this will reset all currently set probabilities.
222
223        Warning 2 -- This just sets all probabilities for transitions and
224        emissions to total up to 1, so it doesn't ensure that the sum of
225        each set of transitions adds up to 1.
226        """
227        # set initial state probabilities
228        new_initial_prob = float(1) / float(len(self.transition_prob))
229        for state in self._state_alphabet:
230            self.initial_prob[state] = new_initial_prob
231
232        # set the transitions
233        new_trans_prob = float(1) / float(len(self.transition_prob))
234        for key in self.transition_prob:
235            self.transition_prob[key] = new_trans_prob
236
237        # set the emissions
238        new_emission_prob = float(1) / float(len(self.emission_prob))
239        for key in self.emission_prob:
240            self.emission_prob[key] = new_emission_prob
241
242    def set_random_initial_probabilities(self):
243        """Set all initial state probabilities to a randomly generated distribution.
244
245        Returns the dictionary containing the initial probabilities.
246        """
247        initial_freqs = _gen_random_array(len(self._state_alphabet))
248        for state in self._state_alphabet:
249            self.initial_prob[state] = initial_freqs.pop()
250
251        return self.initial_prob
252
253    def set_random_transition_probabilities(self):
254        """Set all allowed transition probabilities to a randomly generated distribution.
255
256        Returns the dictionary containing the transition probabilities.
257        """
258        if not self.transition_prob:
259            raise Exception(
260                "No transitions have been allowed yet. "
261                "Allow some or all transitions by calling "
262                "allow_transition or allow_all_transitions first."
263            )
264
265        transitions_from = _calculate_from_transitions(self.transition_prob)
266        for from_state in transitions_from:
267            freqs = _gen_random_array(len(transitions_from[from_state]))
268            for to_state in transitions_from[from_state]:
269                self.transition_prob[(from_state, to_state)] = freqs.pop()
270
271        return self.transition_prob
272
273    def set_random_emission_probabilities(self):
274        """Set all allowed emission probabilities to a randomly generated distribution.
275
276        Returns the dictionary containing the emission probabilities.
277        """
278        if not self.emission_prob:
279            raise Exception(
280                "No emissions have been allowed yet. Allow some or all emissions."
281            )
282
283        emissions = _calculate_emissions(self.emission_prob)
284        for state in emissions:
285            freqs = _gen_random_array(len(emissions[state]))
286            for symbol in emissions[state]:
287                self.emission_prob[(state, symbol)] = freqs.pop()
288
289        return self.emission_prob
290
291    def set_random_probabilities(self):
292        """Set all probabilities to randomly generated numbers.
293
294        Resets probabilities of all initial states, transitions, and
295        emissions to random values.
296        """
297        self.set_random_initial_probabilities()
298        self.set_random_transition_probabilities()
299        self.set_random_emission_probabilities()
300
301    # --- functions to deal with the transitions in the sequence
302
303    def allow_all_transitions(self):
304        """Create transitions between all states.
305
306        By default all transitions within the alphabet are disallowed;
307        this is a convenience function to change this to allow all
308        possible transitions.
309        """
310        # first get all probabilities and pseudo counts set
311        # to the default values
312        all_probs = self._all_blank(self._state_alphabet, self._state_alphabet)
313
314        all_pseudo = self._all_pseudo(self._state_alphabet, self._state_alphabet)
315
316        # now set any probabilities and pseudo counts that
317        # were previously set
318        for set_key in self.transition_prob:
319            all_probs[set_key] = self.transition_prob[set_key]
320
321        for set_key in self.transition_pseudo:
322            all_pseudo[set_key] = self.transition_pseudo[set_key]
323
324        # finally reinitialize the transition probs and pseudo counts
325        self.transition_prob = all_probs
326        self.transition_pseudo = all_pseudo
327
328    def allow_transition(
329        self, from_state, to_state, probability=None, pseudocount=None
330    ):
331        """Set a transition as being possible between the two states.
332
333        probability and pseudocount are optional arguments
334        specifying the probabilities and pseudo counts for the transition.
335        If these are not supplied, then the values are set to the
336        default values.
337
338        Raises:
339        KeyError -- if the two states already have an allowed transition.
340
341        """
342        # check the sanity of adding these states
343        for state in [from_state, to_state]:
344            if state not in self._state_alphabet:
345                raise ValueError(
346                    "State %s was not found in the sequence alphabet" % state
347                )
348
349        # ensure that the states are not already set
350        if (from_state, to_state) not in self.transition_prob and (
351            from_state,
352            to_state,
353        ) not in self.transition_pseudo:
354            # set the initial probability
355            if probability is None:
356                probability = 0
357            self.transition_prob[(from_state, to_state)] = probability
358
359            # set the initial pseudocounts
360            if pseudocount is None:
361                pseudocount = self.DEFAULT_PSEUDO
362            self.transition_pseudo[(from_state, to_state)] = pseudocount
363        else:
364            raise KeyError(
365                "Transition from %s to %s is already allowed." % (from_state, to_state)
366            )
367
368    def destroy_transition(self, from_state, to_state):
369        """Restrict transitions between the two states.
370
371        Raises:
372        KeyError if the transition is not currently allowed.
373
374        """
375        try:
376            del self.transition_prob[(from_state, to_state)]
377            del self.transition_pseudo[(from_state, to_state)]
378        except KeyError:
379            raise KeyError(
380                "Transition from %s to %s is already disallowed."
381                % (from_state, to_state)
382            )
383
384    def set_transition_score(self, from_state, to_state, probability):
385        """Set the probability of a transition between two states.
386
387        Raises:
388        KeyError if the transition is not allowed.
389
390        """
391        if (from_state, to_state) in self.transition_prob:
392            self.transition_prob[(from_state, to_state)] = probability
393        else:
394            raise KeyError(
395                "Transition from %s to %s is not allowed." % (from_state, to_state)
396            )
397
398    def set_transition_pseudocount(self, from_state, to_state, count):
399        """Set the default pseudocount for a transition.
400
401        To avoid computational problems, it is helpful to be able to
402        set a 'default' pseudocount to start with for estimating
403        transition and emission probabilities (see p62 in Durbin et al
404        for more discussion on this. By default, all transitions have
405        a pseudocount of 1.
406
407        Raises:
408        KeyError if the transition is not allowed.
409
410        """
411        if (from_state, to_state) in self.transition_pseudo:
412            self.transition_pseudo[(from_state, to_state)] = count
413        else:
414            raise KeyError(
415                "Transition from %s to %s is not allowed." % (from_state, to_state)
416            )
417
418    # --- functions to deal with emissions from the sequence
419
420    def set_emission_score(self, seq_state, emission_state, probability):
421        """Set the probability of a emission from a particular state.
422
423        Raises:
424        KeyError if the emission from the given state is not allowed.
425
426        """
427        if (seq_state, emission_state) in self.emission_prob:
428            self.emission_prob[(seq_state, emission_state)] = probability
429        else:
430            raise KeyError(
431                "Emission of %s from %s is not allowed." % (emission_state, seq_state)
432            )
433
434    def set_emission_pseudocount(self, seq_state, emission_state, count):
435        """Set the default pseudocount for an emission.
436
437        To avoid computational problems, it is helpful to be able to
438        set a 'default' pseudocount to start with for estimating
439        transition and emission probabilities (see p62 in Durbin et al
440        for more discussion on this. By default, all emissions have
441        a pseudocount of 1.
442
443        Raises:
444        KeyError if the emission from the given state is not allowed.
445
446        """
447        if (seq_state, emission_state) in self.emission_pseudo:
448            self.emission_pseudo[(seq_state, emission_state)] = count
449        else:
450            raise KeyError(
451                "Emission of %s from %s is not allowed." % (emission_state, seq_state)
452            )
453
454
455class HiddenMarkovModel:
456    """Represent a hidden markov model that can be used for state estimation."""
457
458    def __init__(
459        self,
460        state_alphabet,
461        emission_alphabet,
462        initial_prob,
463        transition_prob,
464        emission_prob,
465        transition_pseudo,
466        emission_pseudo,
467    ):
468        """Initialize a Markov Model.
469
470        Note: You should use the MarkovModelBuilder class instead of
471        initiating this class directly.
472
473        Arguments:
474         - state_alphabet -- A tuple containing all of the letters that can
475           appear in the states.
476         - emission_alphabet -- A tuple containing all of the letters for
477           states that can be emitted by the HMM.
478         - initial_prob - A dictionary of initial probabilities for all states.
479         - transition_prob -- A dictionary of transition probabilities for all
480           possible transitions in the sequence.
481         - emission_prob -- A dictionary of emission probabilities for all
482           possible emissions from the sequence states.
483         - transition_pseudo -- Pseudo-counts to be used for the transitions,
484           when counting for purposes of estimating transition probabilities.
485         - emission_pseudo -- Pseudo-counts to be used for the emissions,
486           when counting for purposes of estimating emission probabilities.
487
488        """
489        self.state_alphabet = state_alphabet
490        self.emission_alphabet = emission_alphabet
491
492        self.initial_prob = initial_prob
493
494        self._transition_pseudo = transition_pseudo
495        self._emission_pseudo = emission_pseudo
496
497        self.transition_prob = transition_prob
498        self.emission_prob = emission_prob
499
500        # a dictionary of the possible transitions from each state
501        # each key is a source state, mapped to a list of the destination states
502        # that are reachable from the source state via a transition
503        self._transitions_from = _calculate_from_transitions(self.transition_prob)
504
505        # a dictionary of the possible transitions to each state
506        # each key is a destination state, mapped to a list of source states
507        # from which the destination is reachable via a transition
508        self._transitions_to = _calculate_to_transitions(self.transition_prob)
509
510    def get_blank_transitions(self):
511        """Get the default transitions for the model.
512
513        Returns a dictionary of all of the default transitions between any
514        two letters in the sequence alphabet. The dictionary is structured
515        with keys as (letter1, letter2) and values as the starting number
516        of transitions.
517        """
518        return self._transition_pseudo
519
520    def get_blank_emissions(self):
521        """Get the starting default emmissions for each sequence.
522
523        This returns a dictionary of the default emmissions for each
524        letter. The dictionary is structured with keys as
525        (seq_letter, emmission_letter) and values as the starting number
526        of emmissions.
527        """
528        return self._emission_pseudo
529
530    def transitions_from(self, state_letter):
531        """Get all destination states which can transition from source state_letter.
532
533        This returns all letters which the given state_letter can transition
534        to, i.e. all the destination states reachable from state_letter.
535
536        An empty list is returned if state_letter has no outgoing transitions.
537        """
538        if state_letter in self._transitions_from:
539            return self._transitions_from[state_letter]
540        else:
541            return []
542
543    def transitions_to(self, state_letter):
544        """Get all source states which can transition to destination state_letter.
545
546        This returns all letters which the given state_letter is reachable
547        from, i.e. all the source states which can reach state_later
548
549        An empty list is returned if state_letter is unreachable.
550        """
551        if state_letter in self._transitions_to:
552            return self._transitions_to[state_letter]
553        else:
554            return []
555
556    def viterbi(self, sequence, state_alphabet):
557        """Calculate the most probable state path using the Viterbi algorithm.
558
559        This implements the Viterbi algorithm (see pgs 55-57 in Durbin et
560        al for a full explanation -- this is where I took my implementation
561        ideas from), to allow decoding of the state path, given a sequence
562        of emissions.
563
564        Arguments:
565         - sequence -- A Seq object with the emission sequence that we
566           want to decode.
567         - state_alphabet -- An iterable (e.g., tuple or list) containing
568           all of the letters that can appear in the states
569
570        """
571        # calculate logarithms of the initial, transition, and emission probs
572        log_initial = self._log_transform(self.initial_prob)
573        log_trans = self._log_transform(self.transition_prob)
574        log_emission = self._log_transform(self.emission_prob)
575
576        viterbi_probs = {}
577        pred_state_seq = {}
578
579        # --- recursion
580        # loop over the training squence (i = 1 .. L)
581        # NOTE: My index numbers are one less than what is given in Durbin
582        # et al, since we are indexing the sequence going from 0 to
583        # (Length - 1) not 1 to Length, like in Durbin et al.
584        for i in range(0, len(sequence)):
585            # loop over all of the possible i-th states in the state path
586            for cur_state in state_alphabet:
587                # e_{l}(x_{i})
588                emission_part = log_emission[(cur_state, sequence[i])]
589
590                max_prob = 0
591                if i == 0:
592                    # for the first state, use the initial probability rather
593                    # than looking back to previous states
594                    max_prob = log_initial[cur_state]
595                else:
596                    # loop over all possible (i-1)-th previous states
597                    possible_state_probs = {}
598                    for prev_state in self.transitions_to(cur_state):
599                        # a_{kl}
600                        trans_part = log_trans[(prev_state, cur_state)]
601
602                        # v_{k}(i - 1)
603                        viterbi_part = viterbi_probs[(prev_state, i - 1)]
604                        cur_prob = viterbi_part + trans_part
605
606                        possible_state_probs[prev_state] = cur_prob
607
608                    # calculate the viterbi probability using the max
609                    max_prob = max(possible_state_probs.values())
610
611                # v_{k}(i)
612                viterbi_probs[(cur_state, i)] = emission_part + max_prob
613
614                if i > 0:
615                    # get the most likely prev_state leading to cur_state
616                    for state in possible_state_probs:
617                        if possible_state_probs[state] == max_prob:
618                            pred_state_seq[(i - 1, cur_state)] = state
619                            break
620
621        # --- termination
622        # calculate the probability of the state path
623        # loop over all states
624        all_probs = {}
625        for state in state_alphabet:
626            # v_{k}(L)
627            all_probs[state] = viterbi_probs[(state, len(sequence) - 1)]
628
629        state_path_prob = max(all_probs.values())
630
631        # find the last pointer we need to trace back from
632        last_state = ""
633        for state in all_probs:
634            if all_probs[state] == state_path_prob:
635                last_state = state
636
637        assert last_state != "", "Didn't find the last state to trace from!"
638
639        # --- traceback
640        traceback_seq = []
641
642        loop_seq = list(range(1, len(sequence)))
643        loop_seq.reverse()
644
645        # last_state is the last state in the most probable state sequence.
646        # Compute that sequence by walking backwards in time. From the i-th
647        # state in the sequence, find the (i-1)-th state as the most
648        # probable state preceding the i-th state.
649        state = last_state
650        traceback_seq.append(state)
651        for i in loop_seq:
652            state = pred_state_seq[(i - 1, state)]
653            traceback_seq.append(state)
654
655        # put the traceback sequence in the proper orientation
656        traceback_seq.reverse()
657        traceback_seq = "".join(traceback_seq)
658
659        return Seq(traceback_seq), state_path_prob
660
661    def _log_transform(self, probability):
662        """Return log transform of the given probability dictionary (PRIVATE).
663
664        When calculating the Viterbi equation, add logs of probabilities rather
665        than multiplying probabilities, to avoid underflow errors. This method
666        returns a new dictionary with the same keys as the given dictionary
667        and log-transformed values.
668        """
669        log_prob = copy.copy(probability)
670        for key in log_prob:
671            prob = log_prob[key]
672            if prob > 0:
673                log_prob[key] = math.log(log_prob[key])
674            else:
675                log_prob[key] = -math.inf
676
677        return log_prob
678