1import re 2from collections import defaultdict, Counter 3import warnings 4 5try: 6 basestring 7 PY2 = True 8except NameError: 9 basestring = (str, bytes) 10 PY2 = False 11 12 13_UNIT_CV_INTERN_TABLE = dict() 14 15 16def clear_unit_cv_table(): 17 """Clear the module-level unit name and 18 controlled vocabulary accession table. 19 """ 20 _UNIT_CV_INTERN_TABLE.clear() 21 22 23def _intern_unit_or_cv(unit_or_cv): 24 """Intern `unit_or_cv` in :const:`~._UNIT_CV_INTERN_TABLE`, potentially 25 keeping a reference to the object stored for the duration of the program. 26 27 Parameters 28 ---------- 29 unit_or_cv : object 30 The value to intern 31 32 Returns 33 ------- 34 object: 35 The object which `unit_or_cv` hash-equals in :const:`~._UNIT_CV_INTERN_TABLE`. 36 """ 37 if unit_or_cv is None: 38 return None 39 try: 40 return _UNIT_CV_INTERN_TABLE[unit_or_cv] 41 except KeyError: 42 _UNIT_CV_INTERN_TABLE[unit_or_cv] = unit_or_cv 43 return _UNIT_CV_INTERN_TABLE[unit_or_cv] 44 45 46class PyteomicsError(Exception): 47 """Exception raised for errors in Pyteomics library. 48 49 Attributes 50 ---------- 51 message : str 52 Error message. 53 """ 54 55 def __init__(self, msg, *values): 56 self.message = msg 57 self.values = values 58 59 def __str__(self): 60 if not self.values: 61 return "Pyteomics error, message: %s" % (repr(self.message),) 62 else: 63 return "Pyteomics error, message: %s %r" % (repr(self.message), self.values) 64 65 66class Charge(int): 67 """A subclass of :py:class:`int`. Can be constructed from strings in "N+" 68 or "N-" format, and the string representation of a :py:class:`Charge` is 69 also in that format. 70 """ 71 def __new__(cls, *args, **kwargs): 72 try: 73 return super(Charge, cls).__new__(cls, *args) 74 except ValueError as e: 75 if isinstance(args[0], basestring): 76 try: 77 num, sign = re.match(r'^(\d+)(\+|-)$', args[0]).groups() 78 return super(Charge, cls).__new__(cls, sign + num, *args[1:], **kwargs) 79 except Exception: 80 pass 81 raise PyteomicsError(*e.args) 82 83 def __str__(self): 84 return str(abs(self)) + '+-'[self < 0] 85 86 87class Ion(str): 88 """Represents an Ion, right now just a subclass of String. 89 """ 90 _pattern = r'([abcxyz]\d+(\-H2O|\-NH3)?)([\+|-]\d+)' # "y2-H2O+1" 91 92 def __init__(self, *args, **kwargs): 93 if args and isinstance(args[0], basestring): 94 try: 95 self.ion_type, self.neutral_loss, self.charge = re.match(self._pattern, args[0]).groups() 96 except Exception: 97 raise PyteomicsError("Malformed ion string, must match the regex {!r}".format(self._pattern)) 98 99 100class ChargeList(list): 101 """Just a list of :py:class:`Charge`s. When printed, looks like an 102 enumeration of the list contents. Can also be constructed from such 103 strings (e.g. "2+, 3+ and 4+"). 104 """ 105 106 def __init__(self, *args, **kwargs): 107 if args and isinstance(args[0], basestring): 108 delim = r'(?:,\s*)|(?:\s*and\s*)' 109 self.extend(map(Charge, re.split(delim, args[0]))) 110 else: 111 try: 112 super(ChargeList, self).__init__( 113 sorted(set(args[0])), *args[1:], **kwargs) 114 except Exception: 115 super(ChargeList, self).__init__(*args, **kwargs) 116 self[:] = map(Charge, self) 117 118 def __str__(self): 119 if len(self) > 1: 120 return ', '.join(map(str, self[:-1])) + ' and {}'.format(self[-1]) 121 elif self: 122 return str(self[0]) 123 return super(ChargeList, self).__str__() 124 125 126def _parse_charge(s, list_only=False): 127 if not list_only: 128 try: 129 return Charge(s) 130 except PyteomicsError: 131 pass 132 return ChargeList(s) 133 134 135def _parse_ion(ion_text): 136 try: 137 return Ion(ion_text) 138 except Exception as e: 139 warnings.warn('Could not parse ion string: {} ({})'.format(ion_text, e.args[0])) 140 141 142class BasicComposition(defaultdict, Counter): 143 """A generic dictionary for compositions. 144 Keys should be strings, values should be integers. 145 Allows simple arithmetics.""" 146 147 def __init__(self, *args, **kwargs): 148 defaultdict.__init__(self, int) 149 Counter.__init__(self, *args, **kwargs) 150 for k, v in list(self.items()): 151 if not v: 152 del self[k] 153 154 def __str__(self): 155 return '{}({})'.format(type(self).__name__, dict.__repr__(self)) 156 157 def __repr__(self): 158 return str(self) 159 160 def _repr_pretty_(self, p, cycle): 161 if cycle: # should never happen 162 p.text('{} object with a cyclic reference'.format(type(self).__name__)) 163 p.text(str(self)) 164 165 def __add__(self, other): 166 result = self.copy() 167 for elem, cnt in other.items(): 168 result[elem] += cnt 169 return result 170 171 def __iadd__(self, other): 172 for elem, cnt in other.items(): 173 self[elem] += cnt 174 return self 175 176 def __radd__(self, other): 177 return self + other 178 179 def __sub__(self, other): 180 result = self.copy() 181 for elem, cnt in other.items(): 182 result[elem] -= cnt 183 return result 184 185 def __isub__(self, other): 186 for elem, cnt in other.items(): 187 self[elem] -= cnt 188 return self 189 190 def __rsub__(self, other): 191 return (self - other) * (-1) 192 193 def __mul__(self, other): 194 if not isinstance(other, int): 195 raise PyteomicsError('Cannot multiply Composition by non-integer', 196 other) 197 return type(self)({k: v * other for k, v in self.items()}) 198 199 def __imul__(self, other): 200 if not isinstance(other, int): 201 raise PyteomicsError('Cannot multiply Composition by non-integer', 202 other) 203 for elem in self: 204 self[elem] *= other 205 return self 206 207 def __rmul__(self, other): 208 return self * other 209 210 def __eq__(self, other): 211 if not isinstance(other, dict): 212 return False 213 self_items = {i for i in self.items() if i[1]} 214 other_items = {i for i in other.items() if i[1]} 215 return self_items == other_items 216 217 # override default behavior: 218 # we don't want to add 0's to the dictionary 219 def __missing__(self, key): 220 return 0 221 222 def __setitem__(self, key, value): 223 if isinstance(value, float): 224 value = int(round(value)) 225 elif not isinstance(value, int): 226 raise PyteomicsError('Only integers allowed as values in ' 227 'Composition, got {}.'.format(type(value).__name__)) 228 if value: # reject 0's 229 super(BasicComposition, self).__setitem__(key, value) 230 elif key in self: 231 del self[key] 232 233 def copy(self): 234 return type(self)(self) 235 236 def __reduce__(self): 237 class_, args, state, list_iterator, dict_iterator = super( 238 BasicComposition, self).__reduce__() 239 # Override the reduce of defaultdict so we do not provide the 240 # `int` type as the first argument 241 # which prevents from correctly unpickling the object 242 args = () 243 return class_, args, state, list_iterator, dict_iterator 244 245 246class _MappingOverAttributeProxy(object): 247 '''A replacement for __dict__ for unpickling an object which once 248 has __slots__ now but did not before.''' 249 250 def __init__(self, obj): 251 self.obj = obj 252 253 def __getitem__(self, key): 254 return getattr(self.obj, key) 255 256 def __setitem__(self, key, value): 257 setattr(self.obj, key, value) 258 259 def __contains__(self, key): 260 return hasattr(self.obj, key) 261 262 def __repr__(self): 263 return "{self.__class__.__name__}({self.obj})".format(self=self) 264 265 266class unitint(int): 267 '''Represents an integer value with a unit name. 268 269 Behaves identically to a built-in :class:`int` type. 270 271 Attributes 272 ---------- 273 unit_info : :class:`str` 274 The name of the unit this value posseses. 275 ''' 276 def __new__(cls, value, unit_info=None): 277 inst = int.__new__(cls, value) 278 inst.unit_info = unit_info 279 return inst 280 281 def __reduce__(self): 282 return self.__class__, (int(self), self.unit_info) 283 284 def _repr_pretty_(self, p, cycle): 285 base = super(unitint, self).__repr__() 286 if self.unit_info: 287 string = "%s %s" % (base, self.unit_info) 288 else: 289 string = base 290 p.text(string) 291 292 293class unitfloat(float): 294 '''Represents an float value with a unit name. 295 296 Behaves identically to a built-in :class:`float` type. 297 298 Attributes 299 ---------- 300 unit_info : :class:`str` 301 The name of the unit this value posseses. 302 ''' 303 __slots__ = ('unit_info', ) 304 305 def __new__(cls, value, unit_info=None): 306 inst = float.__new__(cls, value) 307 inst.unit_info = unit_info 308 return inst 309 310 @property 311 def __dict__(self): 312 return _MappingOverAttributeProxy(self) 313 314 def __reduce__(self): 315 return self.__class__, (float(self), self.unit_info) 316 317 def _repr_pretty_(self, p, cycle): 318 base = super(unitfloat, self).__repr__() 319 if self.unit_info: 320 string = "%s %s" % (base, self.unit_info) 321 else: 322 string = base 323 p.text(string) 324 325 326class unitstr(str): 327 '''Represents an string value with a unit name. 328 329 Behaves identically to a built-in :class:`str` type. 330 331 Attributes 332 ---------- 333 unit_info : :class:`str` 334 The name of the unit this value posseses. 335 ''' 336 if not PY2: 337 __slots__ = ("unit_info", ) 338 339 def __new__(cls, value, unit_info=None): 340 if PY2 and isinstance(value, unicode): 341 value = value.encode('utf-8') 342 inst = str.__new__(cls, value) 343 inst.unit_info = unit_info 344 return inst 345 346 @property 347 def __dict__(self): 348 return _MappingOverAttributeProxy(self) 349 350 def __reduce__(self): 351 return self.__class__, (str(self), self.unit_info) 352 353 def _repr_pretty_(self, p, cycle): 354 base = super(unitstr, self).__repr__() 355 if self.unit_info: 356 string = "%s %s" % (base, self.unit_info) 357 else: 358 string = base 359 p.text(string) 360 361 362class cvstr(str): 363 '''A helper class to associate a controlled vocabullary accession 364 number with an otherwise plain :class:`str` object 365 366 Attributes 367 ---------- 368 accession : str 369 The accession number for this parameter, e.g. MS:1000040 370 unit_accession : str 371 The accession number for the unit of the value, if any 372 ''' 373 374 if not PY2: 375 __slots__ = ('accession', 'unit_accession') 376 377 _cache = {} 378 379 def __new__(cls, value, accession=None, unit_accession=None): 380 try: 381 inst = cls._cache[value] 382 if inst.accession == accession and inst.unit_accession == unit_accession: 383 return inst 384 except KeyError: 385 pass 386 387 if PY2 and isinstance(value, unicode): 388 value = value.encode('utf-8') 389 inst = str.__new__(cls, value) 390 inst.accession = _intern_unit_or_cv(accession) 391 inst.unit_accession = _intern_unit_or_cv(unit_accession) 392 cls._cache[value] = inst 393 return inst 394 395 @property 396 def __dict__(self): 397 return _MappingOverAttributeProxy(self) 398 399 def __reduce__(self): 400 return self.__class__, (str(self), self.accession, self.unit_accession) 401 402 403class CVQueryEngine(object): 404 '''Traverse an arbitrarily nested dictionary looking 405 for keys which are :class:`cvstr` instances, or objects 406 with an attribute called ``accession``. 407 ''' 408 409 def _accession(self, key): 410 return getattr(key, 'accession', None) 411 412 def _query_dict(self, data, accession): 413 for key, value in data.items(): 414 if self._accession(key) == accession: 415 if not isinstance(value, str) or value != '': 416 return value 417 else: 418 return key 419 elif isinstance(value, dict): 420 inner = self._query_dict(value, accession) 421 if inner is not None: 422 return inner 423 elif isinstance(value, (list, tuple)): 424 inner = self._query_sequence(value, accession) 425 if inner is not None: 426 return inner 427 elif self._accession(value) == accession: 428 return value 429 430 def _query_sequence(self, data, accession): 431 for value in data: 432 if isinstance(value, dict): 433 inner = self._query_dict(value, accession) 434 if inner is not None: 435 return inner 436 elif isinstance(value, (list, tuple)): 437 inner = self._query_sequence(value, accession) 438 if inner is not None: 439 return inner 440 elif self._accession(value) == accession: 441 return value 442 443 def query(self, data, accession): 444 '''Search ``data`` for a key with the accession 445 number ``accession``. Returns :const:`None` if 446 not found. 447 ''' 448 if accession is None: 449 raise TypeError("`accession` cannot be None") 450 return self._query_dict(data, accession) 451 452 def _is_empty(self, value): 453 if isinstance(value, basestring): 454 return value == '' 455 return False 456 457 def _walk_dict(self, data, index): 458 for key, value in data.items(): 459 accession = self._accession(key) 460 if accession: 461 if not self._is_empty(value): 462 index[accession] = value 463 else: 464 index[accession] = key 465 elif isinstance(value, dict): 466 self._walk_dict(value, index) 467 elif isinstance(value, (list, tuple)): 468 self._walk_sequence(value, index) 469 accession = self._accession(value) 470 if accession: 471 index[accession] = value 472 return index 473 474 def _walk_sequence(self, data, index): 475 for value in data: 476 if isinstance(value, dict): 477 self._walk_dict(value, index) 478 elif isinstance(value, (list, tuple)): 479 self._walk_sequence(value, index) 480 else: 481 accession = self._accession(value) 482 if accession: 483 index[accession] = value 484 485 def index(self, data): 486 '''Construct a flat :class:`dict` whose keys are the 487 accession numbers for all qualified keys in ``data`` 488 and whose values are the mapped values from ``data``. 489 ''' 490 index = self._walk_dict(data, {}) 491 return index 492 493 def __call__(self, data, accession=None): 494 '''If ``accession`` is :const:`None`, calls 495 :meth:`index` on ``data``, otherwise calls 496 :meth:`query` with ``data`` and ``accession``. 497 ''' 498 if accession is None: 499 return self.index(data) 500 else: 501 return self.query(data, accession) 502 503'''A ready-to-use instance of :class:`~.CVQueryEngine`''' 504cvquery = CVQueryEngine() 505