1""" 2This module provides different kinds of iterators, all wrapped by the 3DataIterator class which should generally be the only one used in practice. 4 5The BaseIterator class allows "peeking" `checklines` lines into the data -- 6even if it's a consumable iterator -- in order to figure out what the dialect 7is and therefore decide whether the data is GFF or GTF format, which is 8important for figuring out how to construct the database. 9 10""" 11import os 12import tempfile 13import itertools 14from gffutils.feature import feature_from_line 15from gffutils.interface import FeatureDB 16from gffutils import helpers 17from textwrap import dedent 18import six 19from six.moves.urllib.request import urlopen 20if six.PY3: 21 from urllib import parse as urlparse 22else: 23 import urlparse 24 25 26def peek(it, n): 27 _peek = [] 28 for _ in range(n): 29 try: 30 _peek.append(six.next(it)) 31 except StopIteration: 32 break 33 return _peek, itertools.chain(_peek, it) 34 35 36class Directive(object): 37 def __init__(self, line): 38 self.info = line 39 40 41class _BaseIterator(object): 42 def __init__(self, data, checklines=10, transform=None, 43 force_dialect_check=False, dialect=None): 44 """ 45 Base class for iterating over features. In general, you should use 46 DataIterator -- so see the docstring of class for argument 47 descriptions. 48 49 50 All subclasses -- _FileIterator, _URLIterator, _FeatureIterator, 51 _StringIterator -- gain the following behavior: 52 53 - self.current_item and self.current_item_number are set on every 54 iteration. This is very useful for debugging, or reporting to 55 the user exactly what item or line number caused the issue. 56 57 - transform a Feature before it gets yielded, filter out a Feature 58 59 - auto-detect dialect by peeking `checklines` items into the 60 iterator, and then re-reading those, applying the detected 61 dialect. If multiple dialects are found, use 62 helpers._choose_dialect to figure out the best one. 63 64 - keep track of directives 65 66 """ 67 self.data = data 68 self.checklines = checklines 69 self.current_item = None 70 self.current_item_number = None 71 self.dialect = None 72 self._observed_dialects = [] 73 self.directives = [] 74 self.transform = transform 75 self.warnings = [] 76 77 if force_dialect_check and dialect is not None: 78 raise ValueError("force_dialect_check is True, but a dialect " 79 "is provided") 80 if force_dialect_check: 81 # In this case, self.dialect remains None. When 82 # parser._split_keyvals gets None as a dialect, it tries to infer 83 # a dialect. 84 self._iter = self._custom_iter() 85 elif dialect is not None: 86 self._observed_dialects = [dialect] 87 self.dialect = helpers._choose_dialect(self._observed_dialects) 88 self._iter = self._custom_iter() 89 else: 90 # Otherwise, check some lines to determine what the dialect should 91 # be 92 self.peek, self._iter = peek(self._custom_iter(), checklines) 93 self._observed_dialects = [i.dialect for i in self.peek] 94 self.dialect = helpers._choose_dialect(self._observed_dialects) 95 96 def _custom_iter(self): 97 raise NotImplementedError("Must define in subclasses") 98 99 def __iter__(self): 100 for i in self._iter: 101 i.dialect = self.dialect 102 if self.transform: 103 i = self.transform(i) 104 if i: 105 yield i 106 else: 107 yield i 108 109 def _directive_handler(self, directive): 110 self.directives.append(directive[2:]) 111 112 113class _FileIterator(_BaseIterator): 114 """ 115 Subclass for iterating over features provided as a filename 116 """ 117 def open_function(self, data): 118 data = os.path.expanduser(data) 119 if data.endswith('.gz'): 120 import gzip 121 return gzip.open(data) 122 return open(data) 123 124 def _custom_iter(self): 125 valid_lines = 0 126 for i, line in enumerate(self.open_function(self.data)): 127 if isinstance(line, six.binary_type): 128 line = line.decode('utf-8') 129 line = line.rstrip('\n\r') 130 self.current_item = line 131 self.current_item_number = i 132 133 if line == '##FASTA' or line.startswith('>'): 134 return 135 136 if line.startswith('##'): 137 self._directive_handler(line) 138 continue 139 140 if line.startswith(('#')) or len(line) == 0: 141 continue 142 143 # (If we got here it should be a valid line) 144 valid_lines += 1 145 yield feature_from_line(line, dialect=self.dialect) 146 147 148class _UrlIterator(_FileIterator): 149 """ 150 Subclass for iterating over features provided as a URL 151 """ 152 def open_function(self, data): 153 response = urlopen(data) 154 155 # ideas from 156 # http://stackoverflow.com/a/17537107 157 # https://rationalpie.wordpress.com/2010/06/02/\ 158 # python-streaming-gzip-decompression/ 159 if data.endswith('.gz'): 160 import zlib 161 d = zlib.decompressobj(16 + zlib.MAX_WBITS) 162 READ_BLOCK_SIZE = 1024 163 164 def _iter(): 165 last_line = "" 166 while True: 167 data = response.read(READ_BLOCK_SIZE) 168 if not data: 169 break 170 data = "".join((last_line, d.decompress(data).decode())) 171 lines = data.split('\n') 172 last_line = lines.pop() 173 for line in lines: 174 yield line + '\n' 175 yield last_line 176 return _iter() 177 178 else: 179 return response 180 181 182class _FeatureIterator(_BaseIterator): 183 """ 184 Subclass for iterating over features that are already in an iterator 185 """ 186 def _custom_iter(self): 187 for i, feature in enumerate(self.data): 188 self.current_item = feature 189 self.current_item_number = i 190 yield feature 191 192 193class _StringIterator(_FileIterator): 194 """ 195 Subclass for iterating over features provided as a string (e.g., from 196 file.read()) 197 """ 198 def _custom_iter(self): 199 self.tmp = tempfile.NamedTemporaryFile(delete=False) 200 data = dedent(self.data) 201 if isinstance(data, six.text_type): 202 data = data.encode('utf-8') 203 self.tmp.write(data) 204 self.tmp.close() 205 self.data = self.tmp.name 206 for feature in super(_StringIterator, self)._custom_iter(): 207 yield feature 208 os.unlink(self.tmp.name) 209 210 211def is_url(url): 212 """ 213 Check to see if a URL has a valid protocol. 214 215 Parameters 216 ---------- 217 url : str or unicode 218 219 Returns 220 ------- 221 True if `url` has a valid protocol False otherwise. 222 """ 223 try: 224 return urlparse.urlparse(url).scheme in urlparse.uses_netloc 225 except: 226 return False 227 228 229def DataIterator(data, checklines=10, transform=None, 230 force_dialect_check=False, from_string=False, **kwargs): 231 """ 232 Iterate over features, no matter how they are provided. 233 234 Parameters 235 ---------- 236 data : str, iterable of Feature objs, FeatureDB 237 `data` can be a string (filename, URL, or contents of a file, if 238 from_string=True), any arbitrary iterable of features, or a FeatureDB 239 (in which case its all_features() method will be called). 240 241 checklines : int 242 Number of lines to check in order to infer a dialect. 243 244 transform : None or callable 245 If not None, `transform` should accept a Feature object as its only 246 argument and return either a (possibly modified) Feature object or 247 a value that evaluates to False. If the return value is False, the 248 feature will be skipped. 249 250 force_dialect_check : bool 251 If True, check the dialect of every feature. Thorough, but can be 252 slow. 253 254 from_string : bool 255 If True, `data` should be interpreted as the contents of a file rather 256 than the filename itself. 257 258 dialect : None or dict 259 Provide the dialect, which will override auto-detected dialects. If 260 provided, you should probably also use `force_dialect_check=False` and 261 `checklines=0` but this is not enforced. 262 """ 263 264 _kwargs = dict(data=data, checklines=checklines, transform=transform, 265 force_dialect_check=force_dialect_check, **kwargs) 266 if isinstance(data, six.string_types): 267 if from_string: 268 return _StringIterator(**_kwargs) 269 else: 270 if os.path.exists(data): 271 return _FileIterator(**_kwargs) 272 elif is_url(data): 273 return _UrlIterator(**_kwargs) 274 elif isinstance(data, FeatureDB): 275 _kwargs['data'] = data.all_features() 276 return _FeatureIterator(**_kwargs) 277 278 else: 279 return _FeatureIterator(**_kwargs) 280