1# Copyright 2012 by Wibowo Arindrarto.  All rights reserved.
2# This file is part of the Biopython distribution and governed by your
3# choice of the "Biopython License Agreement" or the "BSD 3-Clause License".
4# Please see the LICENSE file that should have been included as part of this
5# package.
6"""Bio.SearchIO abstract base parser for Exonerate standard output format."""
7
8import re
9from functools import reduce
10from abc import ABC, abstractmethod
11
12from Bio.SearchIO._index import SearchIndexer
13from Bio.SearchIO._model import QueryResult, Hit, HSP, HSPFragment
14from Bio.SeqUtils import seq1
15
16
17# strand char-value mapping
18_STRAND_MAP = {"+": 1, "-": -1, ".": 0}
19
20_RE_SHIFTS = re.compile(r"(#+)")
21# regex for checking whether a vulgar line has protein/translated components
22_RE_TRANS = re.compile(r"[53ISCF]")
23
24
25def _set_frame(frag):
26    """Set the HSPFragment frames (PRIVATE)."""
27    frag.hit_frame = (frag.hit_start % 3 + 1) * frag.hit_strand
28    frag.query_frame = (frag.query_start % 3 + 1) * frag.query_strand
29
30
31def _make_triplets(seq, phase=0):
32    """Select a valid amino acid sequence given a 3-letter code input (PRIVATE).
33
34    This function takes a single three-letter amino acid sequence and the phase
35    of the sequence to return the longest intact amino acid sequence possible.
36    Parts of the input sequence before and after the selected sequence are also
37    returned.
38
39    This is an internal private function and is meant for parsing Exonerate's
40    three-letter amino acid output.
41
42    >>> from Bio.SearchIO.ExonerateIO._base import _make_triplets
43    >>> _make_triplets('GlyThrSerAlaPro')
44    ('', ['Gly', 'Thr', 'Ser', 'Ala', 'Pro'], '')
45    >>> _make_triplets('yThrSerAla', phase=1)
46    ('y', ['Thr', 'Ser', 'Ala'], '')
47    >>> _make_triplets('yThrSerAlaPr', phase=1)
48    ('y', ['Thr', 'Ser', 'Ala'], 'Pr')
49
50    """
51    pre = seq[:phase]
52    np_seq = seq[phase:]
53    non_triplets = len(np_seq) % 3
54    post = "" if not non_triplets else np_seq[-1 * non_triplets :]
55    intacts = [np_seq[3 * i : 3 * (i + 1)] for i in range(len(np_seq) // 3)]
56    return pre, intacts, post
57
58
59def _get_fragments_coord(frags):
60    """Return the letter coordinate of the given list of fragments (PRIVATE).
61
62    This function takes a list of three-letter amino acid sequences and
63    returns a list of coordinates for each fragment had all the input
64    sequences been flattened.
65
66    This is an internal private function and is meant for parsing Exonerate's
67    three-letter amino acid output.
68
69    >>> from Bio.SearchIO.ExonerateIO._base import _get_fragments_coord
70    >>> _get_fragments_coord(['Thr', 'Ser', 'Ala'])
71    [0, 3, 6]
72    >>> _get_fragments_coord(['Thr', 'SerAlaPro', 'GlyLeu'])
73    [0, 3, 12]
74    >>> _get_fragments_coord(['Thr', 'SerAlaPro', 'GlyLeu', 'Cys'])
75    [0, 3, 12, 18]
76
77    """
78    if not frags:
79        return []
80    # first fragment always starts from position 0
81    init = [0]
82    return reduce(lambda acc, frag: acc + [acc[-1] + len(frag)], frags[:-1], init)
83
84
85def _get_fragments_phase(frags):
86    """Return the phases of the given list of 3-letter amino acid fragments (PRIVATE).
87
88    This is an internal private function and is meant for parsing Exonerate's
89    three-letter amino acid output.
90
91    >>> from Bio.SearchIO.ExonerateIO._base import _get_fragments_phase
92    >>> _get_fragments_phase(['Thr', 'Ser', 'Ala'])
93    [0, 0, 0]
94    >>> _get_fragments_phase(['ThrSe', 'rAla'])
95    [0, 1]
96    >>> _get_fragments_phase(['ThrSe', 'rAlaLeu', 'ProCys'])
97    [0, 1, 0]
98    >>> _get_fragments_phase(['ThrSe', 'rAlaLeuP', 'roCys'])
99    [0, 1, 2]
100    >>> _get_fragments_phase(['ThrSe', 'rAlaLeuPr', 'oCys'])
101    [0, 1, 1]
102
103    """
104    return [(3 - (x % 3)) % 3 for x in _get_fragments_coord(frags)]
105
106
107def _adjust_aa_seq(fraglist):
108    """Transform 3-letter AA codes of input fragments to one-letter codes (PRIVATE).
109
110    Argument fraglist should be a list of HSPFragments objects.
111    """
112    custom_map = {"***": "*", "<->": "-"}
113    hsp_hstart = fraglist[0].hit_start
114    hsp_qstart = fraglist[0].query_start
115    frag_phases = _get_fragments_phase(fraglist)
116    for frag, phase in zip(fraglist, frag_phases):
117        assert frag.query_strand == 0 or frag.hit_strand == 0
118        # hit step may be -1 as we're aligning to DNA
119        hstep = 1 if frag.hit_strand >= 0 else -1
120
121        # set fragment phase
122        frag.phase = phase
123
124        # fragment should have a length that is a multiple of 3
125        # assert len(frag) % 3 == 0
126        qseq = str(frag.query.seq)
127        q_triplets_pre, q_triplets, q_triplets_post = _make_triplets(qseq, phase)
128
129        hseq = str(frag.hit.seq)
130        h_triplets_pre, h_triplets, h_triplets_post = _make_triplets(hseq, phase)
131
132        # get one letter codes
133        # and replace gap codon markers and termination characters
134        hseq1_pre = "X" if h_triplets_pre else ""
135        hseq1_post = "X" if h_triplets_post else ""
136        hseq1 = seq1("".join(h_triplets), custom_map=custom_map)
137        hstart = hsp_hstart + (len(hseq1_pre) * hstep)
138        hend = hstart + len(hseq1.replace("-", "")) * hstep
139
140        qseq1_pre = "X" if q_triplets_pre else ""
141        qseq1_post = "X" if q_triplets_post else ""
142        qseq1 = seq1("".join(q_triplets), custom_map=custom_map)
143        qstart = hsp_qstart + len(qseq1_pre)
144        qend = qstart + len(qseq1.replace("-", ""))
145
146        # replace the old frag sequences with the new ones
147        frag.hit = None
148        frag.query = None
149        frag.hit = hseq1_pre + hseq1 + hseq1_post
150        frag.query = qseq1_pre + qseq1 + qseq1_post
151
152        # set coordinates for the protein sequence
153        if frag.query_strand == 0:
154            frag.query_start, frag.query_end = qstart, qend
155        elif frag.hit_strand == 0:
156            frag.hit_start, frag.hit_end = hstart, hend
157
158        # update alignment annotation
159        # by turning them into list of triplets
160        for annot, annotseq in frag.aln_annotation.items():
161            pre, intact, post = _make_triplets(annotseq, phase)
162            frag.aln_annotation[annot] = (
163                list(filter(None, [pre])) + intact + list(filter(None, [post]))
164            )
165
166        # update values for next iteration
167        hsp_hstart, hsp_qstart = hend, qend
168
169    return fraglist
170
171
172def _split_fragment(frag):
173    """Split one HSPFragment containing frame-shifted alignment into two (PRIVATE)."""
174    # given an HSPFragment object with frameshift(s), this method splits it
175    # into fragments without frameshifts by sequentially chopping it off
176    # starting from the beginning
177    simil = frag.aln_annotation["similarity"]
178    # we should have at least 1 frame shift for splitting
179    assert simil.count("#") > 0
180
181    split_frags = []
182    qstep = 1 if frag.query_strand >= 0 else -1
183    hstep = 1 if frag.hit_strand >= 0 else -1
184    qpos = min(frag.query_range) if qstep >= 0 else max(frag.query_range)
185    hpos = min(frag.hit_range) if qstep >= 0 else max(frag.hit_range)
186    abs_pos = 0
187    # split according to hit, then query
188    while simil:
189
190        try:
191            shifts = re.search(_RE_SHIFTS, simil).group(1)
192            s_start = simil.find(shifts)
193            s_stop = s_start + len(shifts)
194            split = frag[abs_pos : abs_pos + s_start]
195        except AttributeError:  # no '#' in simil, i.e. last frag
196            shifts = ""
197            s_start = 0
198            s_stop = len(simil)
199            split = frag[abs_pos:]
200
201        # coordinates for the split strand
202        qstart, hstart = qpos, hpos
203        qpos += (
204            len(split) - sum(split.query.seq.count(x) for x in ("-", "<", ">"))
205        ) * qstep
206        hpos += (
207            len(split) - sum(split.hit.seq.count(x) for x in ("-", "<", ">"))
208        ) * hstep
209
210        split.hit_start = min(hstart, hpos)
211        split.query_start = min(qstart, qpos)
212        split.hit_end = max(hstart, hpos)
213        split.query_end = max(qstart, qpos)
214
215        # account for frameshift length
216        abs_slice = slice(abs_pos + s_start, abs_pos + s_stop)
217        if len(frag.aln_annotation) == 2:
218            seqs = (frag[abs_slice].query.seq, frag[abs_slice].hit.seq)
219        elif len(frag.aln_annotation) == 3:
220            seqs = (
221                frag[abs_slice].aln_annotation["query_annotation"],
222                frag[abs_slice].aln_annotation["hit_annotation"],
223            )
224        if "#" in seqs[0]:
225            qpos += len(shifts) * qstep
226        elif "#" in seqs[1]:
227            hpos += len(shifts) * hstep
228
229        # set frame
230        _set_frame(split)
231        split_frags.append(split)
232        # set similarity string and absolute position for the next loop
233        simil = simil[s_stop:]
234        abs_pos += s_stop
235
236    return split_frags
237
238
239def _create_hsp(hid, qid, hspd):
240    """Return a list of HSP objects from the given parsed HSP values (PRIVATE)."""
241    frags = []
242    # we are iterating over query_ranges, but hit_ranges works just as well
243    for idx, qcoords in enumerate(hspd["query_ranges"]):
244        # get sequences, create object
245        hseqlist = hspd.get("hit")
246        hseq = "" if hseqlist is None else hseqlist[idx]
247        qseqlist = hspd.get("query")
248        qseq = "" if qseqlist is None else qseqlist[idx]
249        frag = HSPFragment(hid, qid, hit=hseq, query=qseq)
250        # coordinates
251        frag.query_start = qcoords[0]
252        frag.query_end = qcoords[1]
253        frag.hit_start = hspd["hit_ranges"][idx][0]
254        frag.hit_end = hspd["hit_ranges"][idx][1]
255        # alignment annotation
256        try:
257            aln_annot = hspd.get("aln_annotation", {})
258            for key, value in aln_annot.items():
259                frag.aln_annotation[key] = value[idx]
260        except IndexError:
261            pass
262        # strands
263        frag.query_strand = hspd["query_strand"]
264        frag.hit_strand = hspd["hit_strand"]
265        # and append the hsp object to the list
266        if frag.aln_annotation.get("similarity") is not None:
267            if "#" in frag.aln_annotation["similarity"]:
268                frags.extend(_split_fragment(frag))
269                continue
270        # try to set frame if there are translation in the alignment
271        if (
272            len(frag.aln_annotation) > 1
273            or frag.query_strand == 0
274            or ("vulgar_comp" in hspd and re.search(_RE_TRANS, hspd["vulgar_comp"]))
275        ):
276            _set_frame(frag)
277
278        frags.append(frag)
279
280    # if the query is protein, we need to change the hit and query sequences
281    # from three-letter amino acid codes to one letter, and adjust their
282    # coordinates accordingly
283    if len(frags[0].aln_annotation) == 2:  # 2 annotations == protein query
284        frags = _adjust_aa_seq(frags)
285
286    hsp = HSP(frags)
287    # set hsp-specific attributes
288    for attr in (
289        "score",
290        "hit_split_codons",
291        "query_split_codons",
292        "model",
293        "vulgar_comp",
294        "cigar_comp",
295        "molecule_type",
296    ):
297        if attr in hspd:
298            setattr(hsp, attr, hspd[attr])
299
300    return hsp
301
302
303def _parse_hit_or_query_line(line):
304    """Parse the 'Query:' line of exonerate alignment outputs (PRIVATE)."""
305    try:
306        mark, id, desc = line.split(" ", 2)
307    except ValueError:  # no desc
308        mark, id = line.split(" ", 1)
309        desc = ""
310
311    return id, desc
312
313
314class _BaseExonerateParser(ABC):
315    """Abstract base class iterator for exonerate format."""
316
317    _ALN_MARK = None
318
319    def __init__(self, handle):
320        self.handle = handle
321        self.has_c4_alignment = False
322
323    def __iter__(self):
324        # read line until the first alignment block or cigar/vulgar lines
325        while True:
326            self.line = self.handle.readline()
327            # flag for human-readable alignment block
328            if self.line.startswith("C4 Alignment:") and not self.has_c4_alignment:
329                self.has_c4_alignment = True
330            if (
331                self.line.startswith("C4 Alignment:")
332                or self.line.startswith("vulgar:")
333                or self.line.startswith("cigar:")
334            ):
335                break
336            elif not self.line or self.line.startswith("-- completed "):
337                return
338
339        for qresult in self._parse_qresult():
340            qresult.program = "exonerate"
341            # HACK: so that all descriptions are set
342            qresult.description = qresult.description
343            for hit in qresult:
344                hit.description = hit.description
345            yield qresult
346
347    def read_until(self, bool_func):
348        """Read the file handle until the given bool function returns True."""
349        while True:
350            if not self.line or bool_func(self.line):
351                return
352            else:
353                self.line = self.handle.readline()
354
355    @abstractmethod
356    def parse_alignment_block(self, header):
357        raise NotImplementedError
358
359    def _parse_alignment_header(self):
360        # read all header lines and store them
361        aln_header = []
362        # header is everything before the first empty line
363        while self.line.strip():
364            aln_header.append(self.line.strip())
365            self.line = self.handle.readline()
366        # then parse them
367        qresult, hit, hsp = {}, {}, {}
368        for line in aln_header:
369            # query line
370            if line.startswith("Query:"):
371                qresult["id"], qresult["description"] = _parse_hit_or_query_line(line)
372            # target line
373            elif line.startswith("Target:"):
374                hit["id"], hit["description"] = _parse_hit_or_query_line(line)
375            # model line
376            elif line.startswith("Model:"):
377                qresult["model"] = line.split(" ", 1)[1]
378            # score line
379            elif line.startswith("Raw score:"):
380                hsp["score"] = line.split(" ", 2)[2]
381            # query range line
382            elif line.startswith("Query range:"):
383                # line is always 'Query range: \d+ -> \d+', so we can pluck
384                # the numbers directly
385                hsp["query_start"], hsp["query_end"] = line.split(" ", 4)[2:5:2]
386            # hit range line
387            elif line.startswith("Target range:"):
388                # same logic with query range
389                hsp["hit_start"], hsp["hit_end"] = line.split(" ", 4)[2:5:2]
390
391        # determine strand
392        if qresult["description"].endswith(":[revcomp]"):
393            hsp["query_strand"] = "-"
394            qresult["description"] = qresult["description"].replace(":[revcomp]", "")
395        elif "protein2" in qresult["model"]:
396            hsp["query_strand"] = "."
397        else:
398            hsp["query_strand"] = "+"
399        if hit["description"].endswith(":[revcomp]"):
400            hsp["hit_strand"] = "-"
401            hit["description"] = hit["description"].replace(":[revcomp]", "")
402        elif "2protein" in qresult["model"]:
403            hsp["hit_strand"] = "."
404        else:
405            hsp["hit_strand"] = "+"
406
407        # NOTE: we haven't processed the coordinates types
408        # and the strands are not yet Biopython's standard (1 / -1 / 0)
409        # since it's easier if we do the conversion later
410
411        return {"qresult": qresult, "hit": hit, "hsp": hsp}
412
413    def _parse_qresult(self):
414        # state values
415        state_EOF = 0
416        state_QRES_NEW = 1
417        state_QRES_SAME = 3
418        state_HIT_NEW = 2
419        state_HIT_SAME = 4
420        # initial dummies
421        qres_state, hit_state = None, None
422        file_state = None
423        cur_qid, cur_hid = None, None
424        prev_qid, prev_hid = None, None
425        cur, prev = None, None
426        hit_list, hsp_list = [], []
427        # if the file has c4 alignments, use that as the alignment mark
428        if self.has_c4_alignment:
429            self._ALN_MARK = "C4 Alignment:"
430
431        while True:
432            self.read_until(lambda line: line.startswith(self._ALN_MARK))
433            if cur is not None:
434                prev = cur
435                prev_qid = cur_qid
436                prev_hid = cur_hid
437            # only parse the result row if it's not EOF
438            if self.line:
439                assert self.line.startswith(self._ALN_MARK), self.line
440                # create temp dicts for storing parsed values
441                header = {"qresult": {}, "hit": {}, "hsp": {}}
442                # if the file has c4 alignments, try to parse the header
443                if self.has_c4_alignment:
444                    self.read_until(lambda line: line.strip().startswith("Query:"))
445                    header = self._parse_alignment_header()
446                # parse the block contents
447                cur = self.parse_alignment_block(header)
448                cur_qid = cur["qresult"]["id"]
449                cur_hid = cur["hit"]["id"]
450            elif not self.line or self.line.startswith("-- completed "):
451                file_state = state_EOF
452                cur_qid, cur_hid = None, None
453
454            # get the state of hit and qresult
455            if prev_qid != cur_qid:
456                qres_state = state_QRES_NEW
457            else:
458                qres_state = state_QRES_SAME
459            # new hits are hits with different ids or hits in a new query
460            if prev_hid != cur_hid or qres_state == state_QRES_NEW:
461                hit_state = state_HIT_NEW
462            else:
463                hit_state = state_HIT_SAME
464
465            if prev is not None:
466                hsp = _create_hsp(prev_hid, prev_qid, prev["hsp"])
467                hsp_list.append(hsp)
468
469                if hit_state == state_HIT_NEW:
470                    hit = Hit(hsp_list)
471                    for attr, value in prev["hit"].items():
472                        setattr(hit, attr, value)
473                    hit_list.append(hit)
474                    hsp_list = []
475
476                if qres_state == state_QRES_NEW or file_state == state_EOF:
477                    qresult = QueryResult(id=prev_qid)
478                    for hit in hit_list:
479                        # not using append since Exonerate may separate the
480                        # same hit if it has different strands
481                        qresult.absorb(hit)
482                    for attr, value in prev["qresult"].items():
483                        setattr(qresult, attr, value)
484                    yield qresult
485                    if file_state == state_EOF:
486                        break
487                    hit_list = []
488
489            # only readline() here if we're not parsing C4 alignments
490            # C4 alignments readline() is handled by its parse_alignment_block
491            # function
492            if not self.has_c4_alignment:
493                self.line = self.handle.readline()
494
495
496class _BaseExonerateIndexer(SearchIndexer):
497    """Indexer class for Exonerate plain text."""
498
499    _parser = None  # should be defined by subclass
500    _query_mark = None  # this one too
501
502    def get_qresult_id(self, pos):
503        raise NotImplementedError("Should be defined by subclass")
504
505    def __iter__(self):
506        """Iterate over the file handle; yields key, start offset, and length."""
507        handle = self._handle
508        handle.seek(0)
509        qresult_key = None
510
511        while True:
512            start_offset = handle.tell()
513            line = handle.readline()
514            if line.startswith(self._query_mark):
515                if qresult_key is None:
516                    qresult_key = self.get_qresult_id(start_offset)
517                    qresult_offset = start_offset
518                else:
519                    curr_key = self.get_qresult_id(start_offset)
520                    if curr_key != qresult_key:
521                        yield qresult_key, qresult_offset, start_offset - qresult_offset
522                        qresult_key = curr_key
523                        qresult_offset = start_offset
524                        handle.seek(qresult_offset)
525            elif not line:
526                yield qresult_key, qresult_offset, start_offset - qresult_offset
527                break
528
529
530# if not used as a module, run the doctest
531if __name__ == "__main__":
532    from Bio._utils import run_doctest
533
534    run_doctest()
535