1# Copyright 2001 Brad Chapman.  All rights reserved.
2#
3# This file is part of the Biopython distribution and governed by your
4# choice of the "Biopython License Agreement" or the "BSD 3-Clause License".
5# Please see the LICENSE file that should have been included as part of this
6# package.
7
8"""Dynamic Programming algorithms for general usage.
9
10This module contains classes which implement Dynamic Programming
11algorithms that can be used generally.
12"""
13
14
15class AbstractDPAlgorithms:
16    """An abstract class to calculate forward and backward probabilities.
17
18    This class should not be instantiated directly, but should be used
19    through a derived class which implements proper scaling of variables.
20
21    This class is just meant to encapsulate the basic forward and backward
22    algorithms, and allow derived classes to deal with the problems of
23    multiplying probabilities.
24
25    Derived class of this must implement:
26
27    - _forward_recursion -- Calculate the forward values in the recursion
28      using some kind of technique for preventing underflow errors.
29    - _backward_recursion -- Calculate the backward values in the recursion
30      step using some technique to prevent underflow errors.
31
32    """
33
34    def __init__(self, markov_model, sequence):
35        """Initialize to calculate forward and backward probabilities.
36
37        Arguments:
38         - markov_model -- The current Markov model we are working with.
39         - sequence -- A training sequence containing a set of emissions.
40
41        """
42        self._mm = markov_model
43        self._seq = sequence
44
45    def _forward_recursion(self, cur_state, sequence_pos, forward_vars):
46        """Calculate the forward recursion value (PRIVATE)."""
47        raise NotImplementedError("Subclasses must implement")
48
49    def forward_algorithm(self):
50        """Calculate sequence probability using the forward algorithm.
51
52        This implements the forward algorithm, as described on p57-58 of
53        Durbin et al.
54
55        Returns:
56         - A dictionary containing the forward variables. This has keys of the
57           form (state letter, position in the training sequence), and values
58           containing the calculated forward variable.
59         - The calculated probability of the sequence.
60
61        """
62        # all of the different letters that the state path can be in
63        state_letters = self._mm.state_alphabet
64
65        # -- initialize the algorithm
66        #
67        # NOTE: My index numbers are one less than what is given in Durbin
68        # et al, since we are indexing the sequence going from 0 to
69        # (Length - 1) not 1 to Length, like in Durbin et al.
70        #
71        forward_var = {}
72        # f_{0}(0) = 1
73        forward_var[(state_letters[0], -1)] = 1
74        # f_{k}(0) = 0, for k > 0
75        for k in range(1, len(state_letters)):
76            forward_var[(state_letters[k], -1)] = 0
77
78        # -- now do the recursion step
79        # loop over the training sequence
80        # Recursion step: (i = 1 .. L)
81        for i in range(len(self._seq.emissions)):
82            # now loop over the letters in the state path
83            for main_state in state_letters:
84                # calculate the forward value using the appropriate
85                # method to prevent underflow errors
86                forward_value = self._forward_recursion(main_state, i, forward_var)
87
88                if forward_value is not None:
89                    forward_var[(main_state, i)] = forward_value
90
91        # -- termination step - calculate the probability of the sequence
92        first_state = state_letters[0]
93        seq_prob = 0
94
95        for state_item in state_letters:
96            # f_{k}(L)
97            forward_value = forward_var[(state_item, len(self._seq.emissions) - 1)]
98            # a_{k0}
99            transition_value = self._mm.transition_prob[(state_item, first_state)]
100
101            seq_prob += forward_value * transition_value
102
103        return forward_var, seq_prob
104
105    def _backward_recursion(self, cur_state, sequence_pos, forward_vars):
106        """Calculate the backward recursion value (PRIVATE)."""
107        raise NotImplementedError("Subclasses must implement")
108
109    def backward_algorithm(self):
110        """Calculate sequence probability using the backward algorithm.
111
112        This implements the backward algorithm, as described on p58-59 of
113        Durbin et al.
114
115        Returns:
116         - A dictionary containing the backwards variables. This has keys
117           of the form (state letter, position in the training sequence),
118           and values containing the calculated backward variable.
119
120        """
121        # all of the different letters that the state path can be in
122        state_letters = self._mm.state_alphabet
123
124        # -- initialize the algorithm
125        #
126        # NOTE: My index numbers are one less than what is given in Durbin
127        # et al, since we are indexing the sequence going from 0 to
128        # (Length - 1) not 1 to Length, like in Durbin et al.
129        #
130        backward_var = {}
131
132        first_letter = state_letters[0]
133        # b_{k}(L) = a_{k0} for all k
134        for state in state_letters:
135            backward_var[
136                (state, len(self._seq.emissions) - 1)
137            ] = self._mm.transition_prob[(state, state_letters[0])]
138
139        # -- recursion
140        # first loop over the training sequence backwards
141        # Recursion step: (i = L - 1 ... 1)
142        all_indexes = list(range(len(self._seq.emissions) - 1))
143        all_indexes.reverse()
144        for i in all_indexes:
145            # now loop over the letters in the state path
146            for main_state in state_letters:
147                # calculate the backward value using the appropriate
148                # method to prevent underflow errors
149                backward_value = self._backward_recursion(main_state, i, backward_var)
150
151                if backward_value is not None:
152                    backward_var[(main_state, i)] = backward_value
153
154        # skip the termination step to avoid recalculations -- you should
155        # get sequence probabilities using the forward algorithm
156
157        return backward_var
158
159
160class ScaledDPAlgorithms(AbstractDPAlgorithms):
161    """Implement forward and backward algorithms using a rescaling approach.
162
163    This scales the f and b variables, so that they remain within a
164    manageable numerical interval during calculations. This approach is
165    described in Durbin et al. on p 78.
166
167    This approach is a little more straightforward then log transformation
168    but may still give underflow errors for some types of models. In these
169    cases, the LogDPAlgorithms class should be used.
170    """
171
172    def __init__(self, markov_model, sequence):
173        """Initialize the scaled approach to calculating probabilities.
174
175        Arguments:
176         - markov_model -- The current Markov model we are working with.
177         - sequence -- A TrainingSequence object that must have a
178           set of emissions to work with.
179
180        """
181        AbstractDPAlgorithms.__init__(self, markov_model, sequence)
182
183        self._s_values = {}
184
185    def _calculate_s_value(self, seq_pos, previous_vars):
186        """Calculate the next scaling variable for a sequence position (PRIVATE).
187
188        This utilizes the approach of choosing s values such that the
189        sum of all of the scaled f values is equal to 1.
190
191        Arguments:
192         - seq_pos -- The current position we are at in the sequence.
193         - previous_vars -- All of the forward or backward variables
194           calculated so far.
195
196        Returns:
197         - The calculated scaling variable for the sequence item.
198
199        """
200        # all of the different letters the state can have
201        state_letters = self._mm.state_alphabet
202
203        # loop over all of the possible states
204        s_value = 0
205        for main_state in state_letters:
206            emission = self._mm.emission_prob[
207                (main_state, self._seq.emissions[seq_pos])
208            ]
209
210            # now sum over all of the previous vars and transitions
211            trans_and_var_sum = 0
212            for second_state in self._mm.transitions_from(main_state):
213                # the value of the previous f or b value
214                var_value = previous_vars[(second_state, seq_pos - 1)]
215
216                # the transition probability
217                trans_value = self._mm.transition_prob[(second_state, main_state)]
218
219                trans_and_var_sum += var_value * trans_value
220
221            s_value += emission * trans_and_var_sum
222
223        return s_value
224
225    def _forward_recursion(self, cur_state, sequence_pos, forward_vars):
226        """Calculate the value of the forward recursion (PRIVATE).
227
228        Arguments:
229         - cur_state -- The letter of the state we are calculating the
230           forward variable for.
231         - sequence_pos -- The position we are at in the training seq.
232         - forward_vars -- The current set of forward variables
233
234        """
235        # calculate the s value, if we haven't done so already (ie. during
236        # a previous forward or backward recursion)
237        if sequence_pos not in self._s_values:
238            self._s_values[sequence_pos] = self._calculate_s_value(
239                sequence_pos, forward_vars
240            )
241
242        # e_{l}(x_{i})
243        seq_letter = self._seq.emissions[sequence_pos]
244        cur_emission_prob = self._mm.emission_prob[(cur_state, seq_letter)]
245        # divide by the scaling value
246        scale_emission_prob = float(cur_emission_prob) / float(
247            self._s_values[sequence_pos]
248        )
249
250        # loop over all of the possible states at the position
251        state_pos_sum = 0
252        have_transition = 0
253        for second_state in self._mm.transitions_from(cur_state):
254            have_transition = 1
255
256            # get the previous forward_var values
257            # f_{k}(i - 1)
258            prev_forward = forward_vars[(second_state, sequence_pos - 1)]
259
260            # a_{kl}
261            cur_trans_prob = self._mm.transition_prob[(second_state, cur_state)]
262            state_pos_sum += prev_forward * cur_trans_prob
263
264        # if we have the possibility of having a transition
265        # return the recursion value
266        if have_transition:
267            return scale_emission_prob * state_pos_sum
268        else:
269            return None
270
271    def _backward_recursion(self, cur_state, sequence_pos, backward_vars):
272        """Calculate the value of the backward recursion (PRIVATE).
273
274        Arguments:
275         - cur_state -- The letter of the state we are calculating the
276           forward variable for.
277         - sequence_pos -- The position we are at in the training seq.
278         - backward_vars -- The current set of backward variables
279
280        """
281        # calculate the s value, if we haven't done so already (ie. during
282        # a previous forward or backward recursion)
283        if sequence_pos not in self._s_values:
284            self._s_values[sequence_pos] = self._calculate_s_value(
285                sequence_pos, backward_vars
286            )
287
288        # loop over all of the possible states at the position
289        state_pos_sum = 0
290        have_transition = 0
291        for second_state in self._mm.transitions_from(cur_state):
292            have_transition = 1
293            # e_{l}(x_{i + 1})
294            seq_letter = self._seq.emissions[sequence_pos + 1]
295            cur_emission_prob = self._mm.emission_prob[(cur_state, seq_letter)]
296
297            # get the previous backward_var value
298            # b_{l}(i + 1)
299            prev_backward = backward_vars[(second_state, sequence_pos + 1)]
300
301            # the transition probability -- a_{kl}
302            cur_transition_prob = self._mm.transition_prob[(cur_state, second_state)]
303
304            state_pos_sum += cur_emission_prob * prev_backward * cur_transition_prob
305
306        # if we have a probability for a transition, return it
307        if have_transition:
308            return state_pos_sum / float(self._s_values[sequence_pos])
309        # otherwise we have no probability (ie. we can't do this transition)
310        # and return None
311        else:
312            return None
313
314
315class LogDPAlgorithms(AbstractDPAlgorithms):
316    """Implement forward and backward algorithms using a log approach.
317
318    This uses the approach of calculating the sum of log probabilities
319    using a lookup table for common values.
320
321    XXX This is not implemented yet!
322    """
323
324    def __init__(self, markov_model, sequence):
325        """Initialize the class."""
326        raise NotImplementedError("Haven't coded this yet...")
327