1# ---------------------------------------------------------------------------- 2# Copyright (c) 2013--, scikit-bio development team. 3# 4# Distributed under the terms of the Modified BSD License. 5# 6# The full license is in the file COPYING.txt, distributed with this software. 7# ---------------------------------------------------------------------------- 8 9import re 10 11from skbio.metadata import IntervalMetadata 12from skbio.io.format._base import _line_generator 13from skbio.io import FileFormatError 14 15 16def _vocabulary_change(format='insdc', read_in=True): 17 '''Return a dict that converts between memory and output vocabulary.''' 18 convert = {'phase': {'insdc': 'codon_start'}, 19 'source': {'insdc': 'inference'}, 20 'db_xref': {'gff3': 'Dbxref'}, 21 'note': {'gff3': 'Note'}} 22 if read_in: 23 return {v[format]: k for k, v in convert.items() if format in v} 24 else: 25 return {k: v[format] for k, v in convert.items() if format in v} 26 27 28def _vocabulary_skip(format='insdc'): 29 '''Return a list of vocabularies that should be skipped when auto 30 output to disk for the specified format. 31 32 ''' 33 skip = {'type': ('insdc', 'gff3'), 34 'ID': ('insdc'), 35 'translation': ('gff3'), 36 'strand': ('insdc')} 37 return [k for k, v in skip.items() if format in v] 38 39 40def _yield_section(is_another_section, **kwargs): 41 '''Returns function that returns successive sections from file. 42 43 Parameters 44 ---------- 45 is_another_section : callable 46 It takes a string as input and return a boolean indicating 47 a new section starts. 48 kwargs : dict, optional 49 Keyword arguments will be passed to `_line_generator`. 50 51 Returns 52 ------- 53 function 54 A function accept a list of lines as input and return 55 a generator to yield section one by one. 56 ''' 57 def parser(lines): 58 curr = [] 59 for line in _line_generator(lines, **kwargs): 60 # if we find another, return the previous section 61 if is_another_section(line): 62 if curr: 63 yield curr 64 curr = [] 65 curr.append(line) 66 # don't forget to return the last section in the file 67 if curr: 68 yield curr 69 return parser 70 71 72def _parse_section_default( 73 lines, label_delimiter=None, join_delimiter=' ', return_label=False): 74 '''Parse sections in default way. 75 76 Do 2 things: 77 1. split first line with label_delimiter for label 78 2. join all the lines into one str with join_delimiter. 79 ''' 80 data = [] 81 label = None 82 line = lines[0] 83 84 items = line.split(label_delimiter, 1) 85 86 if len(items) == 2: 87 label, section = items 88 else: 89 label = items[0] 90 section = "" 91 data.append(section) 92 93 data.extend(lines[1:]) 94 data = join_delimiter.join(i.strip() for i in data) 95 if return_label: 96 return label, data 97 else: 98 return data 99 100 101def _serialize_section_default(header, obj, indent=12): 102 return '{header:<{indent}}{obj}\n'.format( 103 header=header, obj=obj, indent=indent) 104 105 106def _parse_feature_table(lines, length): 107 '''parse DDBJ/ENA/GenBank Feature Table.''' 108 imd = IntervalMetadata(length) 109 # skip the 1st FEATURES line 110 if lines[0].startswith('FEATURES'): 111 lines = lines[1:] 112 # magic number 21: the lines following header of each feature 113 # are indented with 21 spaces. 114 feature_indent = ' ' * 21 115 section_splitter = _yield_section( 116 lambda x: not x.startswith(feature_indent), 117 skip_blanks=True, strip=False) 118 119 for section in section_splitter(lines): 120 _parse_single_feature(section, imd) 121 return imd 122 123 124def _parse_single_feature(lines, imd): 125 '''Parse a feature. 126 127 Parse a feature and add it to ``IntervalMetadata`` object. 128 129 Parameters 130 ---------- 131 imd : IntervalMetadata 132 ''' 133 voca_change = _vocabulary_change('insdc') 134 135 # each component of a feature starts with '/', except the 1st 136 # component of location. 137 section_splitter = _yield_section( 138 lambda x: x.startswith('/'), strip=True) 139 section_iter = section_splitter(lines) 140 141 # 1st section is location 142 section = next(section_iter) 143 feature_type, feature_loc = _parse_section_default( 144 section, join_delimiter='', return_label=True) 145 146 metadata = {'type': feature_type, '__location': feature_loc} 147 148 intvl = imd.add(*_parse_loc_str(feature_loc)) 149 150 for section in section_iter: 151 # following sections are Qualifiers 152 k, v = _parse_section_default( 153 section, label_delimiter='=', 154 join_delimiter=' ', return_label=True) 155 # 1st char is '/' 156 k = k[1:] 157 if k in voca_change: 158 k = voca_change[k] 159 160 if k == 'phase': 161 v = int(v) - 1 162 163 # some Qualifiers can appear multiple times 164 if k in metadata: 165 if not isinstance(metadata[k], list): 166 metadata[k] = [metadata[k]] 167 metadata[k].append(v) 168 else: 169 metadata[k] = v 170 171 intvl.metadata.update(metadata) 172 173 174def _parse_loc_str(loc_str): 175 '''Parse location string. 176 177 .. warning: This converts coordinates to 0-based from 1-based 178 GenBank coordinate system. 179 180 The location descriptor can be one of the following [1]_: 181 (a) a single base number. e.g. 467 182 (b) a site between two indicated adjoining bases. e.g. 123^124 183 (c) a single base chosen from within a specified range of bases (not 184 allowed for new entries). e.g. 102.110 185 (d) the base numbers delimiting a sequence span. e.g.340..565 186 (e) a remote entry identifier followed by a local location 187 descriptor (i.e., a-d). e.g. J00194.1:100..202 188 189 Notes 190 ----- 191 This does not fully handle (e) case. It will discard the remote 192 entry part and only keep the local part. When it parses locations 193 across strand (e.g. "complement(123..145),200..209"), it will 194 record all the span parts but will record strand as negative. 195 196 References 197 ---------- 198 .. [1] http://www.insdc.org/files/feature_table.html#3.4 199 200 ''' 201 # define the tokens 202 operators = ['join', 'complement', 'order'] 203 LPAREN = r'(?P<LPAREN>\()' 204 RPAREN = r'(?P<RPAREN>\))' 205 COMMA = r'(?P<COMMA>,)' 206 WS = r'(?P<WS>\s+)' 207 a = r'(?P<A>\d+)' 208 b = r'(?P<B>\d+\^\d+)' 209 c = r'(?P<C>\d+\.\d+)' 210 d = r'(?P<D><?\d+\.\.>?\d+)' 211 e_left = r'(?P<EL><?[a-zA-Z_0-9\.]+:\d+\.\.>?\d+)' 212 e_right = r'(?P<ER><?\d+\.\.>?[a-zA-Z_0-9\.]+:\d+)' 213 illegal = r'(?P<ILLEGAL>.+)' 214 # The order of tokens in the master regular expression also 215 # matters. When matching, re tries to match pattens in the order 216 # specified. Thus, if a pattern happens to be a substring of a 217 # longer pattern, you need to make sure the longer pattern goes 218 # first. 219 master_pat = re.compile('|'.join( 220 operators + [WS, LPAREN, RPAREN, COMMA, 221 b, c, d, e_left, e_right, a, 222 illegal])) 223 224 scanner = master_pat.scanner(loc_str) 225 226 bounds = [] 227 fuzzy = [] 228 229 metadata = {'strand': '+'} 230 231 for m in iter(scanner.match, None): 232 p, v = m.lastgroup, m.group() 233 if v == 'complement': 234 metadata['strand'] = '-' 235 elif p == 'A': 236 start = int(v) 237 bounds.append((start-1, start)) 238 fuzzy.append((False, False)) 239 elif p == 'B': 240 start, end = v.split('^') 241 start = int(start) 242 bounds.append((start-1, start)) 243 fuzzy.append((False, False)) 244 elif p == 'C' or p == 'D': 245 if p == 'C': 246 start, end = v.split('.') 247 else: 248 start, end = v.split('..') 249 fuzzy_s = fuzzy_e = False 250 if start.startswith('<'): 251 start = start[1:] 252 fuzzy_s = True 253 if end.startswith('>'): 254 end = end[1:] 255 fuzzy_e = True 256 bounds.append((int(start)-1, int(end))) 257 fuzzy.append((fuzzy_s, fuzzy_e)) 258 elif p == 'ILLEGAL': 259 raise FileFormatError( 260 'Could not parse location string: "%s"' % loc_str) 261 262 return bounds, fuzzy, metadata 263 264 265def _serialize_feature_table(intervals, indent=21): 266 ''' 267 Parameters 268 ---------- 269 intervals : list of ``Interval`` 270 ''' 271 for intvl in intervals: 272 yield _serialize_single_feature(intvl, indent) 273 274 275def _serialize_single_feature(intvl, indent=21): 276 ''' 277 Parameters 278 ---------- 279 intvl : Interval 280 ''' 281 # there are 5 spaces before Feature Key starts. 282 padding = ' ' * 5 283 qualifiers = [] 284 md = intvl.metadata 285 voca_skip = _vocabulary_skip('insdc') 286 voca_change = _vocabulary_change('insdc', read_in=False) 287 # sort it so the output order is deterministic 288 for k in sorted(md): 289 if k.startswith('__') or k in voca_skip: 290 continue 291 v = md[k] 292 if k == 'phase': 293 v = str(v + 1) 294 if k in voca_change: 295 k = voca_change[k] 296 if isinstance(v, list): 297 for vi in v: 298 qualifiers.append(_serialize_qualifier(k, vi)) 299 else: 300 qualifiers.append(_serialize_qualifier(k, v)) 301 302 if '__location' in md: 303 loc = md['__location'] 304 else: 305 loc = _serialize_location(intvl) 306 # the qualifiers start at column 22 307 qualifiers = [' ' * indent + i for i in qualifiers] 308 return '{header:<{indent}}{loc}\n{qualifiers}\n'.format( 309 header=padding + md['type'], 310 loc=loc, 311 indent=indent, 312 qualifiers='\n'.join(qualifiers)) 313 314 315def _serialize_location(intvl): 316 loc = [] 317 for bound, fuzzy in zip(intvl.bounds, intvl.fuzzy): 318 start, end = bound 319 start += 1 320 if start == end: 321 s = str(start) 322 elif fuzzy[0] and fuzzy[1]: 323 s = '<%d..>%d' % (start, end) 324 elif fuzzy[0] and not fuzzy[1]: 325 s = '<%d..%d' % (start, end) 326 elif not fuzzy[0] and fuzzy[1]: 327 s = '%d..>%d' % (start, end) 328 else: 329 s = '%d..%d' % (start, end) 330 loc.append(s) 331 if len(loc) > 1: 332 loc_str = 'join({})'.format(','.join(loc)) 333 else: 334 loc_str = loc[0] 335 if intvl.metadata.get('strand') == '-': 336 loc_str = 'complement({})'.format(loc_str) 337 return loc_str 338 339 340def _serialize_qualifier(key, value): 341 '''Serialize a Qualifier in a feature. 342 343 Parameters 344 ---------- 345 value : int, str 346 ''' 347 # if value is empty 348 if not value: 349 return '/%s' % key 350 351 return '/{k}={v}'.format(k=key, v=value) 352