1# Copyright 2012 by Wibowo Arindrarto. All rights reserved. 2# This file is part of the Biopython distribution and governed by your 3# choice of the "Biopython License Agreement" or the "BSD 3-Clause License". 4# Please see the LICENSE file that should have been included as part of this 5# package. 6"""Bio.SearchIO abstract base parser for Exonerate standard output format.""" 7 8import re 9from functools import reduce 10from abc import ABC, abstractmethod 11 12from Bio.SearchIO._index import SearchIndexer 13from Bio.SearchIO._model import QueryResult, Hit, HSP, HSPFragment 14from Bio.SeqUtils import seq1 15 16 17# strand char-value mapping 18_STRAND_MAP = {"+": 1, "-": -1, ".": 0} 19 20_RE_SHIFTS = re.compile(r"(#+)") 21# regex for checking whether a vulgar line has protein/translated components 22_RE_TRANS = re.compile(r"[53ISCF]") 23 24 25def _set_frame(frag): 26 """Set the HSPFragment frames (PRIVATE).""" 27 frag.hit_frame = (frag.hit_start % 3 + 1) * frag.hit_strand 28 frag.query_frame = (frag.query_start % 3 + 1) * frag.query_strand 29 30 31def _make_triplets(seq, phase=0): 32 """Select a valid amino acid sequence given a 3-letter code input (PRIVATE). 33 34 This function takes a single three-letter amino acid sequence and the phase 35 of the sequence to return the longest intact amino acid sequence possible. 36 Parts of the input sequence before and after the selected sequence are also 37 returned. 38 39 This is an internal private function and is meant for parsing Exonerate's 40 three-letter amino acid output. 41 42 >>> from Bio.SearchIO.ExonerateIO._base import _make_triplets 43 >>> _make_triplets('GlyThrSerAlaPro') 44 ('', ['Gly', 'Thr', 'Ser', 'Ala', 'Pro'], '') 45 >>> _make_triplets('yThrSerAla', phase=1) 46 ('y', ['Thr', 'Ser', 'Ala'], '') 47 >>> _make_triplets('yThrSerAlaPr', phase=1) 48 ('y', ['Thr', 'Ser', 'Ala'], 'Pr') 49 50 """ 51 pre = seq[:phase] 52 np_seq = seq[phase:] 53 non_triplets = len(np_seq) % 3 54 post = "" if not non_triplets else np_seq[-1 * non_triplets :] 55 intacts = [np_seq[3 * i : 3 * (i + 1)] for i in range(len(np_seq) // 3)] 56 return pre, intacts, post 57 58 59def _get_fragments_coord(frags): 60 """Return the letter coordinate of the given list of fragments (PRIVATE). 61 62 This function takes a list of three-letter amino acid sequences and 63 returns a list of coordinates for each fragment had all the input 64 sequences been flattened. 65 66 This is an internal private function and is meant for parsing Exonerate's 67 three-letter amino acid output. 68 69 >>> from Bio.SearchIO.ExonerateIO._base import _get_fragments_coord 70 >>> _get_fragments_coord(['Thr', 'Ser', 'Ala']) 71 [0, 3, 6] 72 >>> _get_fragments_coord(['Thr', 'SerAlaPro', 'GlyLeu']) 73 [0, 3, 12] 74 >>> _get_fragments_coord(['Thr', 'SerAlaPro', 'GlyLeu', 'Cys']) 75 [0, 3, 12, 18] 76 77 """ 78 if not frags: 79 return [] 80 # first fragment always starts from position 0 81 init = [0] 82 return reduce(lambda acc, frag: acc + [acc[-1] + len(frag)], frags[:-1], init) 83 84 85def _get_fragments_phase(frags): 86 """Return the phases of the given list of 3-letter amino acid fragments (PRIVATE). 87 88 This is an internal private function and is meant for parsing Exonerate's 89 three-letter amino acid output. 90 91 >>> from Bio.SearchIO.ExonerateIO._base import _get_fragments_phase 92 >>> _get_fragments_phase(['Thr', 'Ser', 'Ala']) 93 [0, 0, 0] 94 >>> _get_fragments_phase(['ThrSe', 'rAla']) 95 [0, 1] 96 >>> _get_fragments_phase(['ThrSe', 'rAlaLeu', 'ProCys']) 97 [0, 1, 0] 98 >>> _get_fragments_phase(['ThrSe', 'rAlaLeuP', 'roCys']) 99 [0, 1, 2] 100 >>> _get_fragments_phase(['ThrSe', 'rAlaLeuPr', 'oCys']) 101 [0, 1, 1] 102 103 """ 104 return [(3 - (x % 3)) % 3 for x in _get_fragments_coord(frags)] 105 106 107def _adjust_aa_seq(fraglist): 108 """Transform 3-letter AA codes of input fragments to one-letter codes (PRIVATE). 109 110 Argument fraglist should be a list of HSPFragments objects. 111 """ 112 custom_map = {"***": "*", "<->": "-"} 113 hsp_hstart = fraglist[0].hit_start 114 hsp_qstart = fraglist[0].query_start 115 frag_phases = _get_fragments_phase(fraglist) 116 for frag, phase in zip(fraglist, frag_phases): 117 assert frag.query_strand == 0 or frag.hit_strand == 0 118 # hit step may be -1 as we're aligning to DNA 119 hstep = 1 if frag.hit_strand >= 0 else -1 120 121 # set fragment phase 122 frag.phase = phase 123 124 # fragment should have a length that is a multiple of 3 125 # assert len(frag) % 3 == 0 126 qseq = str(frag.query.seq) 127 q_triplets_pre, q_triplets, q_triplets_post = _make_triplets(qseq, phase) 128 129 hseq = str(frag.hit.seq) 130 h_triplets_pre, h_triplets, h_triplets_post = _make_triplets(hseq, phase) 131 132 # get one letter codes 133 # and replace gap codon markers and termination characters 134 hseq1_pre = "X" if h_triplets_pre else "" 135 hseq1_post = "X" if h_triplets_post else "" 136 hseq1 = seq1("".join(h_triplets), custom_map=custom_map) 137 hstart = hsp_hstart + (len(hseq1_pre) * hstep) 138 hend = hstart + len(hseq1.replace("-", "")) * hstep 139 140 qseq1_pre = "X" if q_triplets_pre else "" 141 qseq1_post = "X" if q_triplets_post else "" 142 qseq1 = seq1("".join(q_triplets), custom_map=custom_map) 143 qstart = hsp_qstart + len(qseq1_pre) 144 qend = qstart + len(qseq1.replace("-", "")) 145 146 # replace the old frag sequences with the new ones 147 frag.hit = None 148 frag.query = None 149 frag.hit = hseq1_pre + hseq1 + hseq1_post 150 frag.query = qseq1_pre + qseq1 + qseq1_post 151 152 # set coordinates for the protein sequence 153 if frag.query_strand == 0: 154 frag.query_start, frag.query_end = qstart, qend 155 elif frag.hit_strand == 0: 156 frag.hit_start, frag.hit_end = hstart, hend 157 158 # update alignment annotation 159 # by turning them into list of triplets 160 for annot, annotseq in frag.aln_annotation.items(): 161 pre, intact, post = _make_triplets(annotseq, phase) 162 frag.aln_annotation[annot] = ( 163 list(filter(None, [pre])) + intact + list(filter(None, [post])) 164 ) 165 166 # update values for next iteration 167 hsp_hstart, hsp_qstart = hend, qend 168 169 return fraglist 170 171 172def _split_fragment(frag): 173 """Split one HSPFragment containing frame-shifted alignment into two (PRIVATE).""" 174 # given an HSPFragment object with frameshift(s), this method splits it 175 # into fragments without frameshifts by sequentially chopping it off 176 # starting from the beginning 177 simil = frag.aln_annotation["similarity"] 178 # we should have at least 1 frame shift for splitting 179 assert simil.count("#") > 0 180 181 split_frags = [] 182 qstep = 1 if frag.query_strand >= 0 else -1 183 hstep = 1 if frag.hit_strand >= 0 else -1 184 qpos = min(frag.query_range) if qstep >= 0 else max(frag.query_range) 185 hpos = min(frag.hit_range) if qstep >= 0 else max(frag.hit_range) 186 abs_pos = 0 187 # split according to hit, then query 188 while simil: 189 190 try: 191 shifts = re.search(_RE_SHIFTS, simil).group(1) 192 s_start = simil.find(shifts) 193 s_stop = s_start + len(shifts) 194 split = frag[abs_pos : abs_pos + s_start] 195 except AttributeError: # no '#' in simil, i.e. last frag 196 shifts = "" 197 s_start = 0 198 s_stop = len(simil) 199 split = frag[abs_pos:] 200 201 # coordinates for the split strand 202 qstart, hstart = qpos, hpos 203 qpos += ( 204 len(split) - sum(split.query.seq.count(x) for x in ("-", "<", ">")) 205 ) * qstep 206 hpos += ( 207 len(split) - sum(split.hit.seq.count(x) for x in ("-", "<", ">")) 208 ) * hstep 209 210 split.hit_start = min(hstart, hpos) 211 split.query_start = min(qstart, qpos) 212 split.hit_end = max(hstart, hpos) 213 split.query_end = max(qstart, qpos) 214 215 # account for frameshift length 216 abs_slice = slice(abs_pos + s_start, abs_pos + s_stop) 217 if len(frag.aln_annotation) == 2: 218 seqs = (frag[abs_slice].query.seq, frag[abs_slice].hit.seq) 219 elif len(frag.aln_annotation) == 3: 220 seqs = ( 221 frag[abs_slice].aln_annotation["query_annotation"], 222 frag[abs_slice].aln_annotation["hit_annotation"], 223 ) 224 if "#" in seqs[0]: 225 qpos += len(shifts) * qstep 226 elif "#" in seqs[1]: 227 hpos += len(shifts) * hstep 228 229 # set frame 230 _set_frame(split) 231 split_frags.append(split) 232 # set similarity string and absolute position for the next loop 233 simil = simil[s_stop:] 234 abs_pos += s_stop 235 236 return split_frags 237 238 239def _create_hsp(hid, qid, hspd): 240 """Return a list of HSP objects from the given parsed HSP values (PRIVATE).""" 241 frags = [] 242 # we are iterating over query_ranges, but hit_ranges works just as well 243 for idx, qcoords in enumerate(hspd["query_ranges"]): 244 # get sequences, create object 245 hseqlist = hspd.get("hit") 246 hseq = "" if hseqlist is None else hseqlist[idx] 247 qseqlist = hspd.get("query") 248 qseq = "" if qseqlist is None else qseqlist[idx] 249 frag = HSPFragment(hid, qid, hit=hseq, query=qseq) 250 # coordinates 251 frag.query_start = qcoords[0] 252 frag.query_end = qcoords[1] 253 frag.hit_start = hspd["hit_ranges"][idx][0] 254 frag.hit_end = hspd["hit_ranges"][idx][1] 255 # alignment annotation 256 try: 257 aln_annot = hspd.get("aln_annotation", {}) 258 for key, value in aln_annot.items(): 259 frag.aln_annotation[key] = value[idx] 260 except IndexError: 261 pass 262 # strands 263 frag.query_strand = hspd["query_strand"] 264 frag.hit_strand = hspd["hit_strand"] 265 # and append the hsp object to the list 266 if frag.aln_annotation.get("similarity") is not None: 267 if "#" in frag.aln_annotation["similarity"]: 268 frags.extend(_split_fragment(frag)) 269 continue 270 # try to set frame if there are translation in the alignment 271 if ( 272 len(frag.aln_annotation) > 1 273 or frag.query_strand == 0 274 or ("vulgar_comp" in hspd and re.search(_RE_TRANS, hspd["vulgar_comp"])) 275 ): 276 _set_frame(frag) 277 278 frags.append(frag) 279 280 # if the query is protein, we need to change the hit and query sequences 281 # from three-letter amino acid codes to one letter, and adjust their 282 # coordinates accordingly 283 if len(frags[0].aln_annotation) == 2: # 2 annotations == protein query 284 frags = _adjust_aa_seq(frags) 285 286 hsp = HSP(frags) 287 # set hsp-specific attributes 288 for attr in ( 289 "score", 290 "hit_split_codons", 291 "query_split_codons", 292 "model", 293 "vulgar_comp", 294 "cigar_comp", 295 "molecule_type", 296 ): 297 if attr in hspd: 298 setattr(hsp, attr, hspd[attr]) 299 300 return hsp 301 302 303def _parse_hit_or_query_line(line): 304 """Parse the 'Query:' line of exonerate alignment outputs (PRIVATE).""" 305 try: 306 mark, id, desc = line.split(" ", 2) 307 except ValueError: # no desc 308 mark, id = line.split(" ", 1) 309 desc = "" 310 311 return id, desc 312 313 314class _BaseExonerateParser(ABC): 315 """Abstract base class iterator for exonerate format.""" 316 317 _ALN_MARK = None 318 319 def __init__(self, handle): 320 self.handle = handle 321 self.has_c4_alignment = False 322 323 def __iter__(self): 324 # read line until the first alignment block or cigar/vulgar lines 325 while True: 326 self.line = self.handle.readline() 327 # flag for human-readable alignment block 328 if self.line.startswith("C4 Alignment:") and not self.has_c4_alignment: 329 self.has_c4_alignment = True 330 if ( 331 self.line.startswith("C4 Alignment:") 332 or self.line.startswith("vulgar:") 333 or self.line.startswith("cigar:") 334 ): 335 break 336 elif not self.line or self.line.startswith("-- completed "): 337 return 338 339 for qresult in self._parse_qresult(): 340 qresult.program = "exonerate" 341 # HACK: so that all descriptions are set 342 qresult.description = qresult.description 343 for hit in qresult: 344 hit.description = hit.description 345 yield qresult 346 347 def read_until(self, bool_func): 348 """Read the file handle until the given bool function returns True.""" 349 while True: 350 if not self.line or bool_func(self.line): 351 return 352 else: 353 self.line = self.handle.readline() 354 355 @abstractmethod 356 def parse_alignment_block(self, header): 357 raise NotImplementedError 358 359 def _parse_alignment_header(self): 360 # read all header lines and store them 361 aln_header = [] 362 # header is everything before the first empty line 363 while self.line.strip(): 364 aln_header.append(self.line.strip()) 365 self.line = self.handle.readline() 366 # then parse them 367 qresult, hit, hsp = {}, {}, {} 368 for line in aln_header: 369 # query line 370 if line.startswith("Query:"): 371 qresult["id"], qresult["description"] = _parse_hit_or_query_line(line) 372 # target line 373 elif line.startswith("Target:"): 374 hit["id"], hit["description"] = _parse_hit_or_query_line(line) 375 # model line 376 elif line.startswith("Model:"): 377 qresult["model"] = line.split(" ", 1)[1] 378 # score line 379 elif line.startswith("Raw score:"): 380 hsp["score"] = line.split(" ", 2)[2] 381 # query range line 382 elif line.startswith("Query range:"): 383 # line is always 'Query range: \d+ -> \d+', so we can pluck 384 # the numbers directly 385 hsp["query_start"], hsp["query_end"] = line.split(" ", 4)[2:5:2] 386 # hit range line 387 elif line.startswith("Target range:"): 388 # same logic with query range 389 hsp["hit_start"], hsp["hit_end"] = line.split(" ", 4)[2:5:2] 390 391 # determine strand 392 if qresult["description"].endswith(":[revcomp]"): 393 hsp["query_strand"] = "-" 394 qresult["description"] = qresult["description"].replace(":[revcomp]", "") 395 elif "protein2" in qresult["model"]: 396 hsp["query_strand"] = "." 397 else: 398 hsp["query_strand"] = "+" 399 if hit["description"].endswith(":[revcomp]"): 400 hsp["hit_strand"] = "-" 401 hit["description"] = hit["description"].replace(":[revcomp]", "") 402 elif "2protein" in qresult["model"]: 403 hsp["hit_strand"] = "." 404 else: 405 hsp["hit_strand"] = "+" 406 407 # NOTE: we haven't processed the coordinates types 408 # and the strands are not yet Biopython's standard (1 / -1 / 0) 409 # since it's easier if we do the conversion later 410 411 return {"qresult": qresult, "hit": hit, "hsp": hsp} 412 413 def _parse_qresult(self): 414 # state values 415 state_EOF = 0 416 state_QRES_NEW = 1 417 state_QRES_SAME = 3 418 state_HIT_NEW = 2 419 state_HIT_SAME = 4 420 # initial dummies 421 qres_state, hit_state = None, None 422 file_state = None 423 cur_qid, cur_hid = None, None 424 prev_qid, prev_hid = None, None 425 cur, prev = None, None 426 hit_list, hsp_list = [], [] 427 # if the file has c4 alignments, use that as the alignment mark 428 if self.has_c4_alignment: 429 self._ALN_MARK = "C4 Alignment:" 430 431 while True: 432 self.read_until(lambda line: line.startswith(self._ALN_MARK)) 433 if cur is not None: 434 prev = cur 435 prev_qid = cur_qid 436 prev_hid = cur_hid 437 # only parse the result row if it's not EOF 438 if self.line: 439 assert self.line.startswith(self._ALN_MARK), self.line 440 # create temp dicts for storing parsed values 441 header = {"qresult": {}, "hit": {}, "hsp": {}} 442 # if the file has c4 alignments, try to parse the header 443 if self.has_c4_alignment: 444 self.read_until(lambda line: line.strip().startswith("Query:")) 445 header = self._parse_alignment_header() 446 # parse the block contents 447 cur = self.parse_alignment_block(header) 448 cur_qid = cur["qresult"]["id"] 449 cur_hid = cur["hit"]["id"] 450 elif not self.line or self.line.startswith("-- completed "): 451 file_state = state_EOF 452 cur_qid, cur_hid = None, None 453 454 # get the state of hit and qresult 455 if prev_qid != cur_qid: 456 qres_state = state_QRES_NEW 457 else: 458 qres_state = state_QRES_SAME 459 # new hits are hits with different ids or hits in a new query 460 if prev_hid != cur_hid or qres_state == state_QRES_NEW: 461 hit_state = state_HIT_NEW 462 else: 463 hit_state = state_HIT_SAME 464 465 if prev is not None: 466 hsp = _create_hsp(prev_hid, prev_qid, prev["hsp"]) 467 hsp_list.append(hsp) 468 469 if hit_state == state_HIT_NEW: 470 hit = Hit(hsp_list) 471 for attr, value in prev["hit"].items(): 472 setattr(hit, attr, value) 473 hit_list.append(hit) 474 hsp_list = [] 475 476 if qres_state == state_QRES_NEW or file_state == state_EOF: 477 qresult = QueryResult(id=prev_qid) 478 for hit in hit_list: 479 # not using append since Exonerate may separate the 480 # same hit if it has different strands 481 qresult.absorb(hit) 482 for attr, value in prev["qresult"].items(): 483 setattr(qresult, attr, value) 484 yield qresult 485 if file_state == state_EOF: 486 break 487 hit_list = [] 488 489 # only readline() here if we're not parsing C4 alignments 490 # C4 alignments readline() is handled by its parse_alignment_block 491 # function 492 if not self.has_c4_alignment: 493 self.line = self.handle.readline() 494 495 496class _BaseExonerateIndexer(SearchIndexer): 497 """Indexer class for Exonerate plain text.""" 498 499 _parser = None # should be defined by subclass 500 _query_mark = None # this one too 501 502 def get_qresult_id(self, pos): 503 raise NotImplementedError("Should be defined by subclass") 504 505 def __iter__(self): 506 """Iterate over the file handle; yields key, start offset, and length.""" 507 handle = self._handle 508 handle.seek(0) 509 qresult_key = None 510 511 while True: 512 start_offset = handle.tell() 513 line = handle.readline() 514 if line.startswith(self._query_mark): 515 if qresult_key is None: 516 qresult_key = self.get_qresult_id(start_offset) 517 qresult_offset = start_offset 518 else: 519 curr_key = self.get_qresult_id(start_offset) 520 if curr_key != qresult_key: 521 yield qresult_key, qresult_offset, start_offset - qresult_offset 522 qresult_key = curr_key 523 qresult_offset = start_offset 524 handle.seek(qresult_offset) 525 elif not line: 526 yield qresult_key, qresult_offset, start_offset - qresult_offset 527 break 528 529 530# if not used as a module, run the doctest 531if __name__ == "__main__": 532 from Bio._utils import run_doctest 533 534 run_doctest() 535