1import re
2import sys
3import time
4import errno
5import multiprocessing
6import logging
7from typing import Optional
8
9from xopen import xopen
10import dnaio
11
12try:
13    import resource
14except ImportError:
15    # Windows
16    resource = None  # type: ignore
17
18
19logger = logging.getLogger(__name__)
20
21
22def available_cpu_count():
23    """
24    Return the number of available virtual or physical CPUs on this system.
25    The number of available CPUs can be smaller than the total number of CPUs
26    when the cpuset(7) mechanism is in use, as is the case on some cluster
27    systems.
28
29    Adapted from http://stackoverflow.com/a/1006301/715090
30    """
31    try:
32        with open("/proc/self/status") as f:
33            status = f.read()
34        m = re.search(r"(?m)^Cpus_allowed:\s*(.*)$", status)
35        if m:
36            res = bin(int(m.group(1).replace(",", ""), 16)).count("1")
37            if res > 0:
38                return min(res, multiprocessing.cpu_count())
39    except OSError:
40        pass
41
42    return multiprocessing.cpu_count()
43
44
45def raise_open_files_limit(n):
46    if resource is None:
47        return
48    soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
49    soft = min(soft + n, hard)
50    resource.setrlimit(resource.RLIMIT_NOFILE, (soft, hard))
51
52
53class Progress:
54    """
55    Print an animated progress report to sys.stderr
56    """
57
58    def __init__(self, every=1):
59        """
60        every: minimum time to wait in seconds between progress updates
61        """
62        self._every = every
63        self._n = 0
64        # Time at which the progress was last updated
65        self._time = time.time()
66        self._start_time = self._time
67        self._animation = self.scissors()
68
69    @staticmethod
70    def scissors(width=10):
71        while True:
72            for is_reverse, rang in [(False, range(width + 1)), (True, range(width + 1))]:
73                for position in rang:
74                    for is_open in (True, False):
75                        left = " " * position
76                        right = "-" * (width - position)
77                        if is_reverse:
78                            sc = ">8" if is_open else "=8"
79                            left, right = right, left
80                        else:
81                            sc = "8<" if is_open else "8="
82                        yield "[" + left + sc + right + "]"
83
84    def update(self, total, _final=False):
85        current_time = time.time()
86        if _final:
87            time_delta = current_time - self._start_time
88            delta = total
89        else:
90            time_delta = current_time - self._time
91            delta = total - self._n
92        if delta < 1:
93            return
94        if time_delta == 0:
95            return
96        if not _final:
97            if time_delta < self._every:
98                return
99
100        t = current_time - self._start_time
101        hours = int(t) // 3600
102        minutes = (int(t) - hours * 3600) // 60
103        seconds = int(t) % 60
104        per_second = delta / time_delta
105        per_item = time_delta / delta
106
107        print(
108            "\r"
109            "{animation} {hours:02d}:{minutes:02d}:{seconds:02d} "
110            "{total:13,d} reads  @  {per_item:7.1F} µs/read; {per_minute:6.2F} M reads/minute"
111            "".format(
112                hours=hours, minutes=minutes, seconds=seconds,
113                total=total, per_item=per_item * 1E6, per_minute=per_second * 60 / 1E6,
114                animation=next(self._animation)),
115            end="", file=sys.stderr
116        )
117        self._n = total
118        self._time = current_time
119
120    def stop(self, total):
121        """
122        Print final progress reflecting the final total
123        """
124        self.update(total, _final=True)
125        print(file=sys.stderr)
126
127
128class DummyProgress(Progress):
129    """
130    Does not print anything
131    """
132    def update(self, total, _final=False):
133        pass
134
135    def stop(self, total):
136        pass
137
138
139_REVCOMP_TRANS = str.maketrans(
140    "ACGTUMRWSYKVHDBNacgtumrwsykvhdbn",
141    "TGCAAKYWSRMBDHVNtgcaakywsrmbdhvn",
142)
143
144
145def reverse_complement(s: str):
146    return s.translate(_REVCOMP_TRANS)[::-1]
147
148
149def reverse_complemented_sequence(sequence: dnaio.Sequence):
150    if sequence.qualities is None:
151        qualities = None
152    else:
153        qualities = sequence.qualities[::-1]
154    return dnaio.Sequence(sequence.name, reverse_complement(sequence.sequence), qualities)
155
156
157class FileOpener:
158    def __init__(self, compression_level: int = 6, threads: int = None):
159        """
160        threads -- no. of external compression threads.
161            0: write in-process
162            None: min(cpu_count(0, 4)
163        """
164        self.compression_level = compression_level
165        self.threads = threads
166
167    def xopen(self, path, mode):
168        threads = self.threads if "w" in mode else 0
169        f = xopen(path, mode, compresslevel=self.compression_level, threads=threads)
170        logger.debug("Opening '%s', mode '%s' with xopen resulted in %s", path, mode, f)
171        return f
172
173    def xopen_or_none(self, path, mode):
174        """Return opened file or None if the path is None"""
175        if path is None:
176            return None
177        return self.xopen(path, mode)
178
179    def xopen_pair(self, path1: str, path2: Optional[str], mode):
180        if path1 is None and path2 is not None:
181            raise ValueError("When giving paths for paired-end files, only providing the second"
182                " file is not supported")
183        file1 = self.xopen_or_none(path1, mode)
184        file2 = self.xopen_or_none(path2, mode)
185        return file1, file2
186
187    def dnaio_open(self, *args, **kwargs):
188        kwargs["opener"] = self.xopen
189        f = dnaio.open(*args, **kwargs)
190        logger.debug("Opening %r, mode '%s' with dnaio resulted in %s",
191            args[0], kwargs['mode'], f)
192        return f
193
194    def dnaio_open_raise_limit(self, *args, **kwargs):
195        """
196        Open a FASTA/FASTQ file for writing. If it fails because the number of open files
197        would be exceeded, try to raise the soft limit and re-try.
198        """
199        try:
200            f = self.dnaio_open(*args, **kwargs)
201        except OSError as e:
202            if e.errno == errno.EMFILE:  # Too many open files
203                logger.debug("Too many open files, attempting to raise soft limit")
204                raise_open_files_limit(8)
205                f = self.dnaio_open(*args, **kwargs)
206            else:
207                raise
208        return f
209