1# Copyright 2001 Brad Chapman. All rights reserved. 2# 3# This file is part of the Biopython distribution and governed by your 4# choice of the "Biopython License Agreement" or the "BSD 3-Clause License". 5# Please see the LICENSE file that should have been included as part of this 6# package. 7 8"""Deal with representations of Markov Models.""" 9# standard modules 10import copy 11import math 12import random 13from collections import defaultdict 14 15from Bio.Seq import Seq 16 17 18def _gen_random_array(n): 19 """Return an array of n random numbers summing to 1.0 (PRIVATE).""" 20 randArray = [random.random() for _ in range(n)] 21 total = sum(randArray) 22 return [x / total for x in randArray] 23 24 25def _calculate_emissions(emission_probs): 26 """Calculate which symbols can be emitted in each state (PRIVATE).""" 27 # loop over all of the state-symbol duples, mapping states to 28 # lists of emitted symbols 29 emissions = defaultdict(list) 30 for state, symbol in emission_probs: 31 emissions[state].append(symbol) 32 33 return emissions 34 35 36def _calculate_from_transitions(trans_probs): 37 """Calculate which 'from transitions' are allowed for each state (PRIVATE). 38 39 This looks through all of the trans_probs, and uses this dictionary 40 to determine allowed transitions. It converts this information into 41 a dictionary, whose keys are source states and whose values are 42 lists of destination states reachable from the source state via a 43 transition. 44 """ 45 transitions = defaultdict(list) 46 for from_state, to_state in trans_probs: 47 transitions[from_state].append(to_state) 48 49 return transitions 50 51 52def _calculate_to_transitions(trans_probs): 53 """Calculate which 'to transitions' are allowed for each state (PRIVATE). 54 55 This looks through all of the trans_probs, and uses this dictionary 56 to determine allowed transitions. It converts this information into 57 a dictionary, whose keys are destination states and whose values are 58 lists of source states from which the destination is reachable via a 59 transition. 60 """ 61 transitions = defaultdict(list) 62 for from_state, to_state in trans_probs: 63 transitions[to_state].append(from_state) 64 65 return transitions 66 67 68class MarkovModelBuilder: 69 """Interface to build up a Markov Model. 70 71 This class is designed to try to separate the task of specifying the 72 Markov Model from the actual model itself. This is in hopes of making 73 the actual Markov Model classes smaller. 74 75 So, this builder class should be used to create Markov models instead 76 of trying to initiate a Markov Model directly. 77 """ 78 79 # the default pseudo counts to use 80 DEFAULT_PSEUDO = 1 81 82 def __init__(self, state_alphabet, emission_alphabet): 83 """Initialize a builder to create Markov Models. 84 85 Arguments: 86 - state_alphabet -- An iterable (e.g., tuple or list) containing 87 all of the letters that can appear in the states 88 - emission_alphabet -- An iterable (e.g., tuple or list) containing 89 all of the letters for states that can be emitted by the HMM. 90 91 """ 92 self._state_alphabet = tuple(state_alphabet) 93 self._emission_alphabet = tuple(emission_alphabet) 94 95 # probabilities for the initial state, initialized by calling 96 # set_initial_probabilities (required) 97 self.initial_prob = {} 98 99 # the probabilities for transitions and emissions 100 # by default we have no transitions and all possible emissions 101 self.transition_prob = {} 102 self.emission_prob = self._all_blank(state_alphabet, emission_alphabet) 103 104 # the default pseudocounts for transition and emission counting 105 self.transition_pseudo = {} 106 self.emission_pseudo = self._all_pseudo(state_alphabet, emission_alphabet) 107 108 def _all_blank(self, first_alphabet, second_alphabet): 109 """Return a dictionary with all counts set to zero (PRIVATE). 110 111 This uses the letters in the first and second alphabet to create 112 a dictionary with keys of two tuples organized as 113 (letter of first alphabet, letter of second alphabet). The values 114 are all set to 0. 115 """ 116 all_blank = {} 117 for first_state in first_alphabet: 118 for second_state in second_alphabet: 119 all_blank[(first_state, second_state)] = 0 120 121 return all_blank 122 123 def _all_pseudo(self, first_alphabet, second_alphabet): 124 """Return a dictionary with all counts set to a default value (PRIVATE). 125 126 This takes the letters in first alphabet and second alphabet and 127 creates a dictionary with keys of two tuples organized as: 128 (letter of first alphabet, letter of second alphabet). The values 129 are all set to the value of the class attribute DEFAULT_PSEUDO. 130 """ 131 all_counts = {} 132 for first_state in first_alphabet: 133 for second_state in second_alphabet: 134 all_counts[(first_state, second_state)] = self.DEFAULT_PSEUDO 135 136 return all_counts 137 138 def get_markov_model(self): 139 """Return the markov model corresponding with the current parameters. 140 141 Each markov model returned by a call to this function is unique 142 (ie. they don't influence each other). 143 """ 144 # user must set initial probabilities 145 if not self.initial_prob: 146 raise Exception( 147 "set_initial_probabilities must be called to " 148 "fully initialize the Markov model" 149 ) 150 151 initial_prob = copy.deepcopy(self.initial_prob) 152 transition_prob = copy.deepcopy(self.transition_prob) 153 emission_prob = copy.deepcopy(self.emission_prob) 154 transition_pseudo = copy.deepcopy(self.transition_pseudo) 155 emission_pseudo = copy.deepcopy(self.emission_pseudo) 156 157 return HiddenMarkovModel( 158 self._state_alphabet, 159 self._emission_alphabet, 160 initial_prob, 161 transition_prob, 162 emission_prob, 163 transition_pseudo, 164 emission_pseudo, 165 ) 166 167 def set_initial_probabilities(self, initial_prob): 168 """Set initial state probabilities. 169 170 initial_prob is a dictionary mapping states to probabilities. 171 Suppose, for example, that the state alphabet is ('A', 'B'). Call 172 set_initial_prob({'A': 1}) to guarantee that the initial 173 state will be 'A'. Call set_initial_prob({'A': 0.5, 'B': 0.5}) 174 to make each initial state equally probable. 175 176 This method must now be called in order to use the Markov model 177 because the calculation of initial probabilities has changed 178 incompatibly; the previous calculation was incorrect. 179 180 If initial probabilities are set for all states, then they should add up 181 to 1. Otherwise the sum should be <= 1. The residual probability is 182 divided up evenly between all the states for which the initial 183 probability has not been set. For example, calling 184 set_initial_prob({}) results in P('A') = 0.5 and P('B') = 0.5, 185 for the above example. 186 """ 187 self.initial_prob = copy.copy(initial_prob) 188 189 # ensure that all referenced states are valid 190 for state in initial_prob: 191 if state not in self._state_alphabet: 192 raise ValueError( 193 "State %s was not found in the sequence alphabet" % state 194 ) 195 196 # distribute the residual probability, if any 197 num_states_not_set = len(self._state_alphabet) - len(self.initial_prob) 198 if num_states_not_set < 0: 199 raise Exception("Initial probabilities can't exceed # of states") 200 prob_sum = sum(self.initial_prob.values()) 201 if prob_sum > 1.0: 202 raise Exception("Total initial probability cannot exceed 1.0") 203 if num_states_not_set > 0: 204 prob = (1.0 - prob_sum) / num_states_not_set 205 for state in self._state_alphabet: 206 if state not in self.initial_prob: 207 self.initial_prob[state] = prob 208 209 def set_equal_probabilities(self): 210 """Reset all probabilities to be an average value. 211 212 Resets the values of all initial probabilities and all allowed 213 transitions and all allowed emissions to be equal to 1 divided by the 214 number of possible elements. 215 216 This is useful if you just want to initialize a Markov Model to 217 starting values (ie. if you have no prior notions of what the 218 probabilities should be -- or if you are just feeling too lazy 219 to calculate them :-). 220 221 Warning 1 -- this will reset all currently set probabilities. 222 223 Warning 2 -- This just sets all probabilities for transitions and 224 emissions to total up to 1, so it doesn't ensure that the sum of 225 each set of transitions adds up to 1. 226 """ 227 # set initial state probabilities 228 new_initial_prob = float(1) / float(len(self.transition_prob)) 229 for state in self._state_alphabet: 230 self.initial_prob[state] = new_initial_prob 231 232 # set the transitions 233 new_trans_prob = float(1) / float(len(self.transition_prob)) 234 for key in self.transition_prob: 235 self.transition_prob[key] = new_trans_prob 236 237 # set the emissions 238 new_emission_prob = float(1) / float(len(self.emission_prob)) 239 for key in self.emission_prob: 240 self.emission_prob[key] = new_emission_prob 241 242 def set_random_initial_probabilities(self): 243 """Set all initial state probabilities to a randomly generated distribution. 244 245 Returns the dictionary containing the initial probabilities. 246 """ 247 initial_freqs = _gen_random_array(len(self._state_alphabet)) 248 for state in self._state_alphabet: 249 self.initial_prob[state] = initial_freqs.pop() 250 251 return self.initial_prob 252 253 def set_random_transition_probabilities(self): 254 """Set all allowed transition probabilities to a randomly generated distribution. 255 256 Returns the dictionary containing the transition probabilities. 257 """ 258 if not self.transition_prob: 259 raise Exception( 260 "No transitions have been allowed yet. " 261 "Allow some or all transitions by calling " 262 "allow_transition or allow_all_transitions first." 263 ) 264 265 transitions_from = _calculate_from_transitions(self.transition_prob) 266 for from_state in transitions_from: 267 freqs = _gen_random_array(len(transitions_from[from_state])) 268 for to_state in transitions_from[from_state]: 269 self.transition_prob[(from_state, to_state)] = freqs.pop() 270 271 return self.transition_prob 272 273 def set_random_emission_probabilities(self): 274 """Set all allowed emission probabilities to a randomly generated distribution. 275 276 Returns the dictionary containing the emission probabilities. 277 """ 278 if not self.emission_prob: 279 raise Exception( 280 "No emissions have been allowed yet. Allow some or all emissions." 281 ) 282 283 emissions = _calculate_emissions(self.emission_prob) 284 for state in emissions: 285 freqs = _gen_random_array(len(emissions[state])) 286 for symbol in emissions[state]: 287 self.emission_prob[(state, symbol)] = freqs.pop() 288 289 return self.emission_prob 290 291 def set_random_probabilities(self): 292 """Set all probabilities to randomly generated numbers. 293 294 Resets probabilities of all initial states, transitions, and 295 emissions to random values. 296 """ 297 self.set_random_initial_probabilities() 298 self.set_random_transition_probabilities() 299 self.set_random_emission_probabilities() 300 301 # --- functions to deal with the transitions in the sequence 302 303 def allow_all_transitions(self): 304 """Create transitions between all states. 305 306 By default all transitions within the alphabet are disallowed; 307 this is a convenience function to change this to allow all 308 possible transitions. 309 """ 310 # first get all probabilities and pseudo counts set 311 # to the default values 312 all_probs = self._all_blank(self._state_alphabet, self._state_alphabet) 313 314 all_pseudo = self._all_pseudo(self._state_alphabet, self._state_alphabet) 315 316 # now set any probabilities and pseudo counts that 317 # were previously set 318 for set_key in self.transition_prob: 319 all_probs[set_key] = self.transition_prob[set_key] 320 321 for set_key in self.transition_pseudo: 322 all_pseudo[set_key] = self.transition_pseudo[set_key] 323 324 # finally reinitialize the transition probs and pseudo counts 325 self.transition_prob = all_probs 326 self.transition_pseudo = all_pseudo 327 328 def allow_transition( 329 self, from_state, to_state, probability=None, pseudocount=None 330 ): 331 """Set a transition as being possible between the two states. 332 333 probability and pseudocount are optional arguments 334 specifying the probabilities and pseudo counts for the transition. 335 If these are not supplied, then the values are set to the 336 default values. 337 338 Raises: 339 KeyError -- if the two states already have an allowed transition. 340 341 """ 342 # check the sanity of adding these states 343 for state in [from_state, to_state]: 344 if state not in self._state_alphabet: 345 raise ValueError( 346 "State %s was not found in the sequence alphabet" % state 347 ) 348 349 # ensure that the states are not already set 350 if (from_state, to_state) not in self.transition_prob and ( 351 from_state, 352 to_state, 353 ) not in self.transition_pseudo: 354 # set the initial probability 355 if probability is None: 356 probability = 0 357 self.transition_prob[(from_state, to_state)] = probability 358 359 # set the initial pseudocounts 360 if pseudocount is None: 361 pseudocount = self.DEFAULT_PSEUDO 362 self.transition_pseudo[(from_state, to_state)] = pseudocount 363 else: 364 raise KeyError( 365 "Transition from %s to %s is already allowed." % (from_state, to_state) 366 ) 367 368 def destroy_transition(self, from_state, to_state): 369 """Restrict transitions between the two states. 370 371 Raises: 372 KeyError if the transition is not currently allowed. 373 374 """ 375 try: 376 del self.transition_prob[(from_state, to_state)] 377 del self.transition_pseudo[(from_state, to_state)] 378 except KeyError: 379 raise KeyError( 380 "Transition from %s to %s is already disallowed." 381 % (from_state, to_state) 382 ) 383 384 def set_transition_score(self, from_state, to_state, probability): 385 """Set the probability of a transition between two states. 386 387 Raises: 388 KeyError if the transition is not allowed. 389 390 """ 391 if (from_state, to_state) in self.transition_prob: 392 self.transition_prob[(from_state, to_state)] = probability 393 else: 394 raise KeyError( 395 "Transition from %s to %s is not allowed." % (from_state, to_state) 396 ) 397 398 def set_transition_pseudocount(self, from_state, to_state, count): 399 """Set the default pseudocount for a transition. 400 401 To avoid computational problems, it is helpful to be able to 402 set a 'default' pseudocount to start with for estimating 403 transition and emission probabilities (see p62 in Durbin et al 404 for more discussion on this. By default, all transitions have 405 a pseudocount of 1. 406 407 Raises: 408 KeyError if the transition is not allowed. 409 410 """ 411 if (from_state, to_state) in self.transition_pseudo: 412 self.transition_pseudo[(from_state, to_state)] = count 413 else: 414 raise KeyError( 415 "Transition from %s to %s is not allowed." % (from_state, to_state) 416 ) 417 418 # --- functions to deal with emissions from the sequence 419 420 def set_emission_score(self, seq_state, emission_state, probability): 421 """Set the probability of a emission from a particular state. 422 423 Raises: 424 KeyError if the emission from the given state is not allowed. 425 426 """ 427 if (seq_state, emission_state) in self.emission_prob: 428 self.emission_prob[(seq_state, emission_state)] = probability 429 else: 430 raise KeyError( 431 "Emission of %s from %s is not allowed." % (emission_state, seq_state) 432 ) 433 434 def set_emission_pseudocount(self, seq_state, emission_state, count): 435 """Set the default pseudocount for an emission. 436 437 To avoid computational problems, it is helpful to be able to 438 set a 'default' pseudocount to start with for estimating 439 transition and emission probabilities (see p62 in Durbin et al 440 for more discussion on this. By default, all emissions have 441 a pseudocount of 1. 442 443 Raises: 444 KeyError if the emission from the given state is not allowed. 445 446 """ 447 if (seq_state, emission_state) in self.emission_pseudo: 448 self.emission_pseudo[(seq_state, emission_state)] = count 449 else: 450 raise KeyError( 451 "Emission of %s from %s is not allowed." % (emission_state, seq_state) 452 ) 453 454 455class HiddenMarkovModel: 456 """Represent a hidden markov model that can be used for state estimation.""" 457 458 def __init__( 459 self, 460 state_alphabet, 461 emission_alphabet, 462 initial_prob, 463 transition_prob, 464 emission_prob, 465 transition_pseudo, 466 emission_pseudo, 467 ): 468 """Initialize a Markov Model. 469 470 Note: You should use the MarkovModelBuilder class instead of 471 initiating this class directly. 472 473 Arguments: 474 - state_alphabet -- A tuple containing all of the letters that can 475 appear in the states. 476 - emission_alphabet -- A tuple containing all of the letters for 477 states that can be emitted by the HMM. 478 - initial_prob - A dictionary of initial probabilities for all states. 479 - transition_prob -- A dictionary of transition probabilities for all 480 possible transitions in the sequence. 481 - emission_prob -- A dictionary of emission probabilities for all 482 possible emissions from the sequence states. 483 - transition_pseudo -- Pseudo-counts to be used for the transitions, 484 when counting for purposes of estimating transition probabilities. 485 - emission_pseudo -- Pseudo-counts to be used for the emissions, 486 when counting for purposes of estimating emission probabilities. 487 488 """ 489 self.state_alphabet = state_alphabet 490 self.emission_alphabet = emission_alphabet 491 492 self.initial_prob = initial_prob 493 494 self._transition_pseudo = transition_pseudo 495 self._emission_pseudo = emission_pseudo 496 497 self.transition_prob = transition_prob 498 self.emission_prob = emission_prob 499 500 # a dictionary of the possible transitions from each state 501 # each key is a source state, mapped to a list of the destination states 502 # that are reachable from the source state via a transition 503 self._transitions_from = _calculate_from_transitions(self.transition_prob) 504 505 # a dictionary of the possible transitions to each state 506 # each key is a destination state, mapped to a list of source states 507 # from which the destination is reachable via a transition 508 self._transitions_to = _calculate_to_transitions(self.transition_prob) 509 510 def get_blank_transitions(self): 511 """Get the default transitions for the model. 512 513 Returns a dictionary of all of the default transitions between any 514 two letters in the sequence alphabet. The dictionary is structured 515 with keys as (letter1, letter2) and values as the starting number 516 of transitions. 517 """ 518 return self._transition_pseudo 519 520 def get_blank_emissions(self): 521 """Get the starting default emmissions for each sequence. 522 523 This returns a dictionary of the default emmissions for each 524 letter. The dictionary is structured with keys as 525 (seq_letter, emmission_letter) and values as the starting number 526 of emmissions. 527 """ 528 return self._emission_pseudo 529 530 def transitions_from(self, state_letter): 531 """Get all destination states which can transition from source state_letter. 532 533 This returns all letters which the given state_letter can transition 534 to, i.e. all the destination states reachable from state_letter. 535 536 An empty list is returned if state_letter has no outgoing transitions. 537 """ 538 if state_letter in self._transitions_from: 539 return self._transitions_from[state_letter] 540 else: 541 return [] 542 543 def transitions_to(self, state_letter): 544 """Get all source states which can transition to destination state_letter. 545 546 This returns all letters which the given state_letter is reachable 547 from, i.e. all the source states which can reach state_later 548 549 An empty list is returned if state_letter is unreachable. 550 """ 551 if state_letter in self._transitions_to: 552 return self._transitions_to[state_letter] 553 else: 554 return [] 555 556 def viterbi(self, sequence, state_alphabet): 557 """Calculate the most probable state path using the Viterbi algorithm. 558 559 This implements the Viterbi algorithm (see pgs 55-57 in Durbin et 560 al for a full explanation -- this is where I took my implementation 561 ideas from), to allow decoding of the state path, given a sequence 562 of emissions. 563 564 Arguments: 565 - sequence -- A Seq object with the emission sequence that we 566 want to decode. 567 - state_alphabet -- An iterable (e.g., tuple or list) containing 568 all of the letters that can appear in the states 569 570 """ 571 # calculate logarithms of the initial, transition, and emission probs 572 log_initial = self._log_transform(self.initial_prob) 573 log_trans = self._log_transform(self.transition_prob) 574 log_emission = self._log_transform(self.emission_prob) 575 576 viterbi_probs = {} 577 pred_state_seq = {} 578 579 # --- recursion 580 # loop over the training squence (i = 1 .. L) 581 # NOTE: My index numbers are one less than what is given in Durbin 582 # et al, since we are indexing the sequence going from 0 to 583 # (Length - 1) not 1 to Length, like in Durbin et al. 584 for i in range(0, len(sequence)): 585 # loop over all of the possible i-th states in the state path 586 for cur_state in state_alphabet: 587 # e_{l}(x_{i}) 588 emission_part = log_emission[(cur_state, sequence[i])] 589 590 max_prob = 0 591 if i == 0: 592 # for the first state, use the initial probability rather 593 # than looking back to previous states 594 max_prob = log_initial[cur_state] 595 else: 596 # loop over all possible (i-1)-th previous states 597 possible_state_probs = {} 598 for prev_state in self.transitions_to(cur_state): 599 # a_{kl} 600 trans_part = log_trans[(prev_state, cur_state)] 601 602 # v_{k}(i - 1) 603 viterbi_part = viterbi_probs[(prev_state, i - 1)] 604 cur_prob = viterbi_part + trans_part 605 606 possible_state_probs[prev_state] = cur_prob 607 608 # calculate the viterbi probability using the max 609 max_prob = max(possible_state_probs.values()) 610 611 # v_{k}(i) 612 viterbi_probs[(cur_state, i)] = emission_part + max_prob 613 614 if i > 0: 615 # get the most likely prev_state leading to cur_state 616 for state in possible_state_probs: 617 if possible_state_probs[state] == max_prob: 618 pred_state_seq[(i - 1, cur_state)] = state 619 break 620 621 # --- termination 622 # calculate the probability of the state path 623 # loop over all states 624 all_probs = {} 625 for state in state_alphabet: 626 # v_{k}(L) 627 all_probs[state] = viterbi_probs[(state, len(sequence) - 1)] 628 629 state_path_prob = max(all_probs.values()) 630 631 # find the last pointer we need to trace back from 632 last_state = "" 633 for state in all_probs: 634 if all_probs[state] == state_path_prob: 635 last_state = state 636 637 assert last_state != "", "Didn't find the last state to trace from!" 638 639 # --- traceback 640 traceback_seq = [] 641 642 loop_seq = list(range(1, len(sequence))) 643 loop_seq.reverse() 644 645 # last_state is the last state in the most probable state sequence. 646 # Compute that sequence by walking backwards in time. From the i-th 647 # state in the sequence, find the (i-1)-th state as the most 648 # probable state preceding the i-th state. 649 state = last_state 650 traceback_seq.append(state) 651 for i in loop_seq: 652 state = pred_state_seq[(i - 1, state)] 653 traceback_seq.append(state) 654 655 # put the traceback sequence in the proper orientation 656 traceback_seq.reverse() 657 traceback_seq = "".join(traceback_seq) 658 659 return Seq(traceback_seq), state_path_prob 660 661 def _log_transform(self, probability): 662 """Return log transform of the given probability dictionary (PRIVATE). 663 664 When calculating the Viterbi equation, add logs of probabilities rather 665 than multiplying probabilities, to avoid underflow errors. This method 666 returns a new dictionary with the same keys as the given dictionary 667 and log-transformed values. 668 """ 669 log_prob = copy.copy(probability) 670 for key in log_prob: 671 prob = log_prob[key] 672 if prob > 0: 673 log_prob[key] = math.log(log_prob[key]) 674 else: 675 log_prob[key] = -math.inf 676 677 return log_prob 678