1import re 2import sys 3import time 4import errno 5import multiprocessing 6import logging 7from typing import Optional 8 9from xopen import xopen 10import dnaio 11 12try: 13 import resource 14except ImportError: 15 # Windows 16 resource = None # type: ignore 17 18 19logger = logging.getLogger(__name__) 20 21 22def available_cpu_count(): 23 """ 24 Return the number of available virtual or physical CPUs on this system. 25 The number of available CPUs can be smaller than the total number of CPUs 26 when the cpuset(7) mechanism is in use, as is the case on some cluster 27 systems. 28 29 Adapted from http://stackoverflow.com/a/1006301/715090 30 """ 31 try: 32 with open("/proc/self/status") as f: 33 status = f.read() 34 m = re.search(r"(?m)^Cpus_allowed:\s*(.*)$", status) 35 if m: 36 res = bin(int(m.group(1).replace(",", ""), 16)).count("1") 37 if res > 0: 38 return min(res, multiprocessing.cpu_count()) 39 except OSError: 40 pass 41 42 return multiprocessing.cpu_count() 43 44 45def raise_open_files_limit(n): 46 if resource is None: 47 return 48 soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) 49 soft = min(soft + n, hard) 50 resource.setrlimit(resource.RLIMIT_NOFILE, (soft, hard)) 51 52 53class Progress: 54 """ 55 Print an animated progress report to sys.stderr 56 """ 57 58 def __init__(self, every=1): 59 """ 60 every: minimum time to wait in seconds between progress updates 61 """ 62 self._every = every 63 self._n = 0 64 # Time at which the progress was last updated 65 self._time = time.time() 66 self._start_time = self._time 67 self._animation = self.scissors() 68 69 @staticmethod 70 def scissors(width=10): 71 while True: 72 for is_reverse, rang in [(False, range(width + 1)), (True, range(width + 1))]: 73 for position in rang: 74 for is_open in (True, False): 75 left = " " * position 76 right = "-" * (width - position) 77 if is_reverse: 78 sc = ">8" if is_open else "=8" 79 left, right = right, left 80 else: 81 sc = "8<" if is_open else "8=" 82 yield "[" + left + sc + right + "]" 83 84 def update(self, total, _final=False): 85 current_time = time.time() 86 if _final: 87 time_delta = current_time - self._start_time 88 delta = total 89 else: 90 time_delta = current_time - self._time 91 delta = total - self._n 92 if delta < 1: 93 return 94 if time_delta == 0: 95 return 96 if not _final: 97 if time_delta < self._every: 98 return 99 100 t = current_time - self._start_time 101 hours = int(t) // 3600 102 minutes = (int(t) - hours * 3600) // 60 103 seconds = int(t) % 60 104 per_second = delta / time_delta 105 per_item = time_delta / delta 106 107 print( 108 "\r" 109 "{animation} {hours:02d}:{minutes:02d}:{seconds:02d} " 110 "{total:13,d} reads @ {per_item:7.1F} µs/read; {per_minute:6.2F} M reads/minute" 111 "".format( 112 hours=hours, minutes=minutes, seconds=seconds, 113 total=total, per_item=per_item * 1E6, per_minute=per_second * 60 / 1E6, 114 animation=next(self._animation)), 115 end="", file=sys.stderr 116 ) 117 self._n = total 118 self._time = current_time 119 120 def stop(self, total): 121 """ 122 Print final progress reflecting the final total 123 """ 124 self.update(total, _final=True) 125 print(file=sys.stderr) 126 127 128class DummyProgress(Progress): 129 """ 130 Does not print anything 131 """ 132 def update(self, total, _final=False): 133 pass 134 135 def stop(self, total): 136 pass 137 138 139_REVCOMP_TRANS = str.maketrans( 140 "ACGTUMRWSYKVHDBNacgtumrwsykvhdbn", 141 "TGCAAKYWSRMBDHVNtgcaakywsrmbdhvn", 142) 143 144 145def reverse_complement(s: str): 146 return s.translate(_REVCOMP_TRANS)[::-1] 147 148 149def reverse_complemented_sequence(sequence: dnaio.Sequence): 150 if sequence.qualities is None: 151 qualities = None 152 else: 153 qualities = sequence.qualities[::-1] 154 return dnaio.Sequence(sequence.name, reverse_complement(sequence.sequence), qualities) 155 156 157class FileOpener: 158 def __init__(self, compression_level: int = 6, threads: int = None): 159 """ 160 threads -- no. of external compression threads. 161 0: write in-process 162 None: min(cpu_count(0, 4) 163 """ 164 self.compression_level = compression_level 165 self.threads = threads 166 167 def xopen(self, path, mode): 168 threads = self.threads if "w" in mode else 0 169 f = xopen(path, mode, compresslevel=self.compression_level, threads=threads) 170 logger.debug("Opening '%s', mode '%s' with xopen resulted in %s", path, mode, f) 171 return f 172 173 def xopen_or_none(self, path, mode): 174 """Return opened file or None if the path is None""" 175 if path is None: 176 return None 177 return self.xopen(path, mode) 178 179 def xopen_pair(self, path1: str, path2: Optional[str], mode): 180 if path1 is None and path2 is not None: 181 raise ValueError("When giving paths for paired-end files, only providing the second" 182 " file is not supported") 183 file1 = self.xopen_or_none(path1, mode) 184 file2 = self.xopen_or_none(path2, mode) 185 return file1, file2 186 187 def dnaio_open(self, *args, **kwargs): 188 kwargs["opener"] = self.xopen 189 f = dnaio.open(*args, **kwargs) 190 logger.debug("Opening %r, mode '%s' with dnaio resulted in %s", 191 args[0], kwargs['mode'], f) 192 return f 193 194 def dnaio_open_raise_limit(self, *args, **kwargs): 195 """ 196 Open a FASTA/FASTQ file for writing. If it fails because the number of open files 197 would be exceeded, try to raise the soft limit and re-try. 198 """ 199 try: 200 f = self.dnaio_open(*args, **kwargs) 201 except OSError as e: 202 if e.errno == errno.EMFILE: # Too many open files 203 logger.debug("Too many open files, attempting to raise soft limit") 204 raise_open_files_limit(8) 205 f = self.dnaio_open(*args, **kwargs) 206 else: 207 raise 208 return f 209