1# Copyright 2001 Brad Chapman. All rights reserved. 2# 3# This file is part of the Biopython distribution and governed by your 4# choice of the "Biopython License Agreement" or the "BSD 3-Clause License". 5# Please see the LICENSE file that should have been included as part of this 6# package. 7 8"""Dynamic Programming algorithms for general usage. 9 10This module contains classes which implement Dynamic Programming 11algorithms that can be used generally. 12""" 13 14 15class AbstractDPAlgorithms: 16 """An abstract class to calculate forward and backward probabilities. 17 18 This class should not be instantiated directly, but should be used 19 through a derived class which implements proper scaling of variables. 20 21 This class is just meant to encapsulate the basic forward and backward 22 algorithms, and allow derived classes to deal with the problems of 23 multiplying probabilities. 24 25 Derived class of this must implement: 26 27 - _forward_recursion -- Calculate the forward values in the recursion 28 using some kind of technique for preventing underflow errors. 29 - _backward_recursion -- Calculate the backward values in the recursion 30 step using some technique to prevent underflow errors. 31 32 """ 33 34 def __init__(self, markov_model, sequence): 35 """Initialize to calculate forward and backward probabilities. 36 37 Arguments: 38 - markov_model -- The current Markov model we are working with. 39 - sequence -- A training sequence containing a set of emissions. 40 41 """ 42 self._mm = markov_model 43 self._seq = sequence 44 45 def _forward_recursion(self, cur_state, sequence_pos, forward_vars): 46 """Calculate the forward recursion value (PRIVATE).""" 47 raise NotImplementedError("Subclasses must implement") 48 49 def forward_algorithm(self): 50 """Calculate sequence probability using the forward algorithm. 51 52 This implements the forward algorithm, as described on p57-58 of 53 Durbin et al. 54 55 Returns: 56 - A dictionary containing the forward variables. This has keys of the 57 form (state letter, position in the training sequence), and values 58 containing the calculated forward variable. 59 - The calculated probability of the sequence. 60 61 """ 62 # all of the different letters that the state path can be in 63 state_letters = self._mm.state_alphabet 64 65 # -- initialize the algorithm 66 # 67 # NOTE: My index numbers are one less than what is given in Durbin 68 # et al, since we are indexing the sequence going from 0 to 69 # (Length - 1) not 1 to Length, like in Durbin et al. 70 # 71 forward_var = {} 72 # f_{0}(0) = 1 73 forward_var[(state_letters[0], -1)] = 1 74 # f_{k}(0) = 0, for k > 0 75 for k in range(1, len(state_letters)): 76 forward_var[(state_letters[k], -1)] = 0 77 78 # -- now do the recursion step 79 # loop over the training sequence 80 # Recursion step: (i = 1 .. L) 81 for i in range(len(self._seq.emissions)): 82 # now loop over the letters in the state path 83 for main_state in state_letters: 84 # calculate the forward value using the appropriate 85 # method to prevent underflow errors 86 forward_value = self._forward_recursion(main_state, i, forward_var) 87 88 if forward_value is not None: 89 forward_var[(main_state, i)] = forward_value 90 91 # -- termination step - calculate the probability of the sequence 92 first_state = state_letters[0] 93 seq_prob = 0 94 95 for state_item in state_letters: 96 # f_{k}(L) 97 forward_value = forward_var[(state_item, len(self._seq.emissions) - 1)] 98 # a_{k0} 99 transition_value = self._mm.transition_prob[(state_item, first_state)] 100 101 seq_prob += forward_value * transition_value 102 103 return forward_var, seq_prob 104 105 def _backward_recursion(self, cur_state, sequence_pos, forward_vars): 106 """Calculate the backward recursion value (PRIVATE).""" 107 raise NotImplementedError("Subclasses must implement") 108 109 def backward_algorithm(self): 110 """Calculate sequence probability using the backward algorithm. 111 112 This implements the backward algorithm, as described on p58-59 of 113 Durbin et al. 114 115 Returns: 116 - A dictionary containing the backwards variables. This has keys 117 of the form (state letter, position in the training sequence), 118 and values containing the calculated backward variable. 119 120 """ 121 # all of the different letters that the state path can be in 122 state_letters = self._mm.state_alphabet 123 124 # -- initialize the algorithm 125 # 126 # NOTE: My index numbers are one less than what is given in Durbin 127 # et al, since we are indexing the sequence going from 0 to 128 # (Length - 1) not 1 to Length, like in Durbin et al. 129 # 130 backward_var = {} 131 132 first_letter = state_letters[0] 133 # b_{k}(L) = a_{k0} for all k 134 for state in state_letters: 135 backward_var[ 136 (state, len(self._seq.emissions) - 1) 137 ] = self._mm.transition_prob[(state, state_letters[0])] 138 139 # -- recursion 140 # first loop over the training sequence backwards 141 # Recursion step: (i = L - 1 ... 1) 142 all_indexes = list(range(len(self._seq.emissions) - 1)) 143 all_indexes.reverse() 144 for i in all_indexes: 145 # now loop over the letters in the state path 146 for main_state in state_letters: 147 # calculate the backward value using the appropriate 148 # method to prevent underflow errors 149 backward_value = self._backward_recursion(main_state, i, backward_var) 150 151 if backward_value is not None: 152 backward_var[(main_state, i)] = backward_value 153 154 # skip the termination step to avoid recalculations -- you should 155 # get sequence probabilities using the forward algorithm 156 157 return backward_var 158 159 160class ScaledDPAlgorithms(AbstractDPAlgorithms): 161 """Implement forward and backward algorithms using a rescaling approach. 162 163 This scales the f and b variables, so that they remain within a 164 manageable numerical interval during calculations. This approach is 165 described in Durbin et al. on p 78. 166 167 This approach is a little more straightforward then log transformation 168 but may still give underflow errors for some types of models. In these 169 cases, the LogDPAlgorithms class should be used. 170 """ 171 172 def __init__(self, markov_model, sequence): 173 """Initialize the scaled approach to calculating probabilities. 174 175 Arguments: 176 - markov_model -- The current Markov model we are working with. 177 - sequence -- A TrainingSequence object that must have a 178 set of emissions to work with. 179 180 """ 181 AbstractDPAlgorithms.__init__(self, markov_model, sequence) 182 183 self._s_values = {} 184 185 def _calculate_s_value(self, seq_pos, previous_vars): 186 """Calculate the next scaling variable for a sequence position (PRIVATE). 187 188 This utilizes the approach of choosing s values such that the 189 sum of all of the scaled f values is equal to 1. 190 191 Arguments: 192 - seq_pos -- The current position we are at in the sequence. 193 - previous_vars -- All of the forward or backward variables 194 calculated so far. 195 196 Returns: 197 - The calculated scaling variable for the sequence item. 198 199 """ 200 # all of the different letters the state can have 201 state_letters = self._mm.state_alphabet 202 203 # loop over all of the possible states 204 s_value = 0 205 for main_state in state_letters: 206 emission = self._mm.emission_prob[ 207 (main_state, self._seq.emissions[seq_pos]) 208 ] 209 210 # now sum over all of the previous vars and transitions 211 trans_and_var_sum = 0 212 for second_state in self._mm.transitions_from(main_state): 213 # the value of the previous f or b value 214 var_value = previous_vars[(second_state, seq_pos - 1)] 215 216 # the transition probability 217 trans_value = self._mm.transition_prob[(second_state, main_state)] 218 219 trans_and_var_sum += var_value * trans_value 220 221 s_value += emission * trans_and_var_sum 222 223 return s_value 224 225 def _forward_recursion(self, cur_state, sequence_pos, forward_vars): 226 """Calculate the value of the forward recursion (PRIVATE). 227 228 Arguments: 229 - cur_state -- The letter of the state we are calculating the 230 forward variable for. 231 - sequence_pos -- The position we are at in the training seq. 232 - forward_vars -- The current set of forward variables 233 234 """ 235 # calculate the s value, if we haven't done so already (ie. during 236 # a previous forward or backward recursion) 237 if sequence_pos not in self._s_values: 238 self._s_values[sequence_pos] = self._calculate_s_value( 239 sequence_pos, forward_vars 240 ) 241 242 # e_{l}(x_{i}) 243 seq_letter = self._seq.emissions[sequence_pos] 244 cur_emission_prob = self._mm.emission_prob[(cur_state, seq_letter)] 245 # divide by the scaling value 246 scale_emission_prob = float(cur_emission_prob) / float( 247 self._s_values[sequence_pos] 248 ) 249 250 # loop over all of the possible states at the position 251 state_pos_sum = 0 252 have_transition = 0 253 for second_state in self._mm.transitions_from(cur_state): 254 have_transition = 1 255 256 # get the previous forward_var values 257 # f_{k}(i - 1) 258 prev_forward = forward_vars[(second_state, sequence_pos - 1)] 259 260 # a_{kl} 261 cur_trans_prob = self._mm.transition_prob[(second_state, cur_state)] 262 state_pos_sum += prev_forward * cur_trans_prob 263 264 # if we have the possibility of having a transition 265 # return the recursion value 266 if have_transition: 267 return scale_emission_prob * state_pos_sum 268 else: 269 return None 270 271 def _backward_recursion(self, cur_state, sequence_pos, backward_vars): 272 """Calculate the value of the backward recursion (PRIVATE). 273 274 Arguments: 275 - cur_state -- The letter of the state we are calculating the 276 forward variable for. 277 - sequence_pos -- The position we are at in the training seq. 278 - backward_vars -- The current set of backward variables 279 280 """ 281 # calculate the s value, if we haven't done so already (ie. during 282 # a previous forward or backward recursion) 283 if sequence_pos not in self._s_values: 284 self._s_values[sequence_pos] = self._calculate_s_value( 285 sequence_pos, backward_vars 286 ) 287 288 # loop over all of the possible states at the position 289 state_pos_sum = 0 290 have_transition = 0 291 for second_state in self._mm.transitions_from(cur_state): 292 have_transition = 1 293 # e_{l}(x_{i + 1}) 294 seq_letter = self._seq.emissions[sequence_pos + 1] 295 cur_emission_prob = self._mm.emission_prob[(cur_state, seq_letter)] 296 297 # get the previous backward_var value 298 # b_{l}(i + 1) 299 prev_backward = backward_vars[(second_state, sequence_pos + 1)] 300 301 # the transition probability -- a_{kl} 302 cur_transition_prob = self._mm.transition_prob[(cur_state, second_state)] 303 304 state_pos_sum += cur_emission_prob * prev_backward * cur_transition_prob 305 306 # if we have a probability for a transition, return it 307 if have_transition: 308 return state_pos_sum / float(self._s_values[sequence_pos]) 309 # otherwise we have no probability (ie. we can't do this transition) 310 # and return None 311 else: 312 return None 313 314 315class LogDPAlgorithms(AbstractDPAlgorithms): 316 """Implement forward and backward algorithms using a log approach. 317 318 This uses the approach of calculating the sum of log probabilities 319 using a lookup table for common values. 320 321 XXX This is not implemented yet! 322 """ 323 324 def __init__(self, markov_model, sequence): 325 """Initialize the class.""" 326 raise NotImplementedError("Haven't coded this yet...") 327