1""" 2pint.registry 3~~~~~~~~~~~~~ 4 5Defines the Registry, a class to contain units and their relations. 6 7The module actually defines 5 registries with different capabilities: 8 9- BaseRegistry: Basic unit definition and querying. 10 Conversion between multiplicative units. 11 12- NonMultiplicativeRegistry: Conversion between non multiplicative (offset) units. 13 (e.g. Temperature) 14 15 * Inherits from BaseRegistry 16 17- ContextRegisty: Conversion between units with different dimensions according 18 to previously established relations (contexts) - e.g. in spectroscopy, 19 conversion between frequency and energy is possible. May also override 20 conversions between units on the same dimension - e.g. different 21 rounding conventions. 22 23 * Inherits from BaseRegistry 24 25- SystemRegistry: Group unit and changing of base units. 26 (e.g. in MKS, meter, kilogram and second are base units.) 27 28 * Inherits from BaseRegistry 29 30- UnitRegistry: Combine all previous capabilities, it is exposed by Pint. 31 32:copyright: 2016 by Pint Authors, see AUTHORS for more details. 33:license: BSD, see LICENSE for more details. 34""" 35 36from __future__ import annotations 37 38import copy 39import functools 40import importlib.resources 41import itertools 42import locale 43import os 44import re 45from collections import ChainMap, defaultdict 46from contextlib import contextmanager 47from decimal import Decimal 48from fractions import Fraction 49from io import StringIO 50from numbers import Number 51from tokenize import NAME, NUMBER 52from typing import ( 53 TYPE_CHECKING, 54 Any, 55 Callable, 56 ContextManager, 57 Dict, 58 FrozenSet, 59 Iterable, 60 Iterator, 61 List, 62 Optional, 63 Set, 64 Tuple, 65 Type, 66 TypeVar, 67 Union, 68) 69 70from . import registry_helpers, systems 71from ._typing import F, QuantityOrUnitLike 72from .compat import HAS_BABEL, babel_parse, tokenizer 73from .context import Context, ContextChain 74from .converters import LogarithmicConverter, ScaleConverter 75from .definitions import ( 76 AliasDefinition, 77 Definition, 78 DimensionDefinition, 79 PrefixDefinition, 80 UnitDefinition, 81) 82from .errors import ( 83 DefinitionSyntaxError, 84 DimensionalityError, 85 RedefinitionError, 86 UndefinedUnitError, 87) 88from .pint_eval import build_eval_tree 89from .systems import Group, System 90from .util import ( 91 ParserHelper, 92 SourceIterator, 93 UnitsContainer, 94 _is_dim, 95 find_connected_nodes, 96 find_shortest_path, 97 getattr_maybe_raise, 98 logger, 99 pi_theorem, 100 solve_dependencies, 101 string_preprocessor, 102 to_units_container, 103) 104 105if TYPE_CHECKING: 106 from ._typing import UnitLike 107 from .quantity import Quantity 108 from .unit import Unit 109 from .unit import UnitsContainer as UnitsContainerT 110 111 if HAS_BABEL: 112 import babel 113 114 Locale = babel.Locale 115 else: 116 Locale = None 117 118T = TypeVar("T") 119 120_BLOCK_RE = re.compile(r"[ (]") 121 122 123@functools.lru_cache() 124def pattern_to_regex(pattern): 125 if hasattr(pattern, "finditer"): 126 pattern = pattern.pattern 127 128 # Replace "{unit_name}" match string with float regex with unit_name as group 129 pattern = re.sub( 130 r"{(\w+)}", r"(?P<\1>[+-]?[0-9]+(?:.[0-9]+)?(?:[Ee][+-]?[0-9]+)?)", pattern 131 ) 132 133 return re.compile(pattern) 134 135 136class RegistryMeta(type): 137 """This is just to call after_init at the right time 138 instead of asking the developer to do it when subclassing. 139 """ 140 141 def __call__(self, *args, **kwargs): 142 obj = super().__call__(*args, **kwargs) 143 obj._after_init() 144 return obj 145 146 147class RegistryCache: 148 """Cache to speed up unit registries""" 149 150 def __init__(self) -> None: 151 #: Maps dimensionality (UnitsContainer) to Units (str) 152 self.dimensional_equivalents: Dict[UnitsContainer, Set[str]] = {} 153 #: Maps dimensionality (UnitsContainer) to Dimensionality (UnitsContainer) 154 self.root_units = {} 155 #: Maps dimensionality (UnitsContainer) to Units (UnitsContainer) 156 self.dimensionality: Dict[UnitsContainer, UnitsContainer] = {} 157 #: Cache the unit name associated to user input. ('mV' -> 'millivolt') 158 self.parse_unit: Dict[str, UnitsContainer] = {} 159 160 161class ContextCacheOverlay: 162 """Layer on top of the base UnitRegistry cache, specific to a combination of 163 active contexts which contain unit redefinitions. 164 """ 165 166 def __init__(self, registry_cache: RegistryCache) -> None: 167 self.dimensional_equivalents = registry_cache.dimensional_equivalents 168 self.root_units = {} 169 self.dimensionality = registry_cache.dimensionality 170 self.parse_unit = registry_cache.parse_unit 171 172 173NON_INT_TYPE = Type[Union[float, Decimal, Fraction]] 174PreprocessorType = Callable[[str], str] 175 176 177class BaseRegistry(metaclass=RegistryMeta): 178 """Base class for all registries. 179 180 Capabilities: 181 182 - Register units, prefixes, and dimensions, and their relations. 183 - Convert between units. 184 - Find dimensionality of a unit. 185 - Parse units with prefix and/or suffix. 186 - Parse expressions. 187 - Parse a definition file. 188 - Allow extending the definition file parser by registering @ directives. 189 190 Parameters 191 ---------- 192 filename : str or None 193 path of the units definition file to load or line iterable object. Empty to load 194 the default definition file. None to leave the UnitRegistry empty. 195 force_ndarray : bool 196 convert any input, scalar or not to a numpy.ndarray. 197 force_ndarray_like : bool 198 convert all inputs other than duck arrays to a numpy.ndarray. 199 on_redefinition : str 200 action to take in case a unit is redefined: 'warn', 'raise', 'ignore' 201 auto_reduce_dimensions : 202 If True, reduce dimensionality on appropriate operations. 203 preprocessors : 204 list of callables which are iteratively ran on any input expression or unit 205 string 206 fmt_locale : 207 locale identifier string, used in `format_babel` 208 non_int_type : type 209 numerical type used for non integer values. (Default: float) 210 case_sensitive : bool, optional 211 Control default case sensitivity of unit parsing. (Default: True) 212 213 """ 214 215 #: Map context prefix to function 216 #: type: Dict[str, (SourceIterator -> None)] 217 _parsers: Dict[str, Callable[[SourceIterator], None]] = None 218 219 #: Babel.Locale instance or None 220 fmt_locale: Optional[Locale] = None 221 222 def __init__( 223 self, 224 filename="", 225 force_ndarray: bool = False, 226 force_ndarray_like: bool = False, 227 on_redefinition: str = "warn", 228 auto_reduce_dimensions: bool = False, 229 preprocessors: Optional[List[PreprocessorType]] = None, 230 fmt_locale: Optional[str] = None, 231 non_int_type: NON_INT_TYPE = float, 232 case_sensitive: bool = True, 233 ): 234 self._register_parsers() 235 self._init_dynamic_classes() 236 237 self._filename = filename 238 self.force_ndarray = force_ndarray 239 self.force_ndarray_like = force_ndarray_like 240 self.preprocessors = preprocessors or [] 241 242 #: Action to take in case a unit is redefined. 'warn', 'raise', 'ignore' 243 self._on_redefinition = on_redefinition 244 245 #: Determines if dimensionality should be reduced on appropriate operations. 246 self.auto_reduce_dimensions = auto_reduce_dimensions 247 248 #: Default locale identifier string, used when calling format_babel without explicit locale. 249 self.set_fmt_locale(fmt_locale) 250 251 #: Numerical type used for non integer values. 252 self.non_int_type = non_int_type 253 254 #: Default unit case sensitivity 255 self.case_sensitive = case_sensitive 256 257 #: Map between name (string) and value (string) of defaults stored in the 258 #: definitions file. 259 self._defaults: Dict[str, str] = {} 260 261 #: Map dimension name (string) to its definition (DimensionDefinition). 262 self._dimensions: Dict[str, DimensionDefinition] = {} 263 264 #: Map unit name (string) to its definition (UnitDefinition). 265 #: Might contain prefixed units. 266 self._units: Dict[str, UnitDefinition] = {} 267 268 #: Map unit name in lower case (string) to a set of unit names with the right 269 #: case. 270 #: Does not contain prefixed units. 271 #: e.g: 'hz' - > set('Hz', ) 272 self._units_casei: Dict[str, Set[str]] = defaultdict(set) 273 274 #: Map prefix name (string) to its definition (PrefixDefinition). 275 self._prefixes: Dict[str, PrefixDefinition] = { 276 "": PrefixDefinition("", "", (), 1) 277 } 278 279 #: Map suffix name (string) to canonical , and unit alias to canonical unit name 280 self._suffixes: Dict[str, str] = {"": "", "s": ""} 281 282 #: Map contexts to RegistryCache 283 self._cache = RegistryCache() 284 285 self._initialized = False 286 287 def _init_dynamic_classes(self) -> None: 288 """Generate subclasses on the fly and attach them to self""" 289 from .unit import build_unit_class 290 291 self.Unit = build_unit_class(self) 292 293 from .quantity import build_quantity_class 294 295 self.Quantity: Type["Quantity"] = build_quantity_class(self) 296 297 from .measurement import build_measurement_class 298 299 self.Measurement = build_measurement_class(self) 300 301 def _after_init(self) -> None: 302 """This should be called after all __init__""" 303 304 if self._filename == "": 305 self.load_definitions("default_en.txt", True) 306 elif self._filename is not None: 307 self.load_definitions(self._filename) 308 309 self._build_cache() 310 self._initialized = True 311 312 def _register_parsers(self) -> None: 313 self._register_parser("@defaults", self._parse_defaults) 314 315 def _parse_defaults(self, ifile) -> None: 316 """Loader for a @default section.""" 317 next(ifile) 318 for lineno, part in ifile.block_iter(): 319 k, v = part.split("=") 320 self._defaults[k.strip()] = v.strip() 321 322 def __deepcopy__(self, memo) -> "BaseRegistry": 323 new = object.__new__(type(self)) 324 new.__dict__ = copy.deepcopy(self.__dict__, memo) 325 new._init_dynamic_classes() 326 return new 327 328 def __getattr__(self, item): 329 getattr_maybe_raise(self, item) 330 return self.Unit(item) 331 332 def __getitem__(self, item): 333 logger.warning( 334 "Calling the getitem method from a UnitRegistry is deprecated. " 335 "use `parse_expression` method or use the registry as a callable." 336 ) 337 return self.parse_expression(item) 338 339 def __contains__(self, item) -> bool: 340 """Support checking prefixed units with the `in` operator""" 341 try: 342 self.__getattr__(item) 343 return True 344 except UndefinedUnitError: 345 return False 346 347 def __dir__(self) -> List[str]: 348 #: Calling dir(registry) gives all units, methods, and attributes. 349 #: Also used for autocompletion in IPython. 350 return list(self._units.keys()) + list(object.__dir__(self)) 351 352 def __iter__(self) -> Iterator[str]: 353 """Allows for listing all units in registry with `list(ureg)`. 354 355 Returns 356 ------- 357 Iterator over names of all units in registry, ordered alphabetically. 358 """ 359 return iter(sorted(self._units.keys())) 360 361 def set_fmt_locale(self, loc: Optional[str]) -> None: 362 """Change the locale used by default by `format_babel`. 363 364 Parameters 365 ---------- 366 loc : str or None 367 None` (do not translate), 'sys' (detect the system locale) or a locale id string. 368 """ 369 if isinstance(loc, str): 370 if loc == "sys": 371 loc = locale.getdefaultlocale()[0] 372 373 # We call babel parse to fail here and not in the formatting operation 374 babel_parse(loc) 375 376 self.fmt_locale = loc 377 378 def UnitsContainer(self, *args, **kwargs) -> UnitsContainerT: 379 return UnitsContainer(*args, non_int_type=self.non_int_type, **kwargs) 380 381 @property 382 def default_format(self) -> str: 383 """Default formatting string for quantities.""" 384 return self.Quantity.default_format 385 386 @default_format.setter 387 def default_format(self, value: str): 388 self.Unit.default_format = value 389 self.Quantity.default_format = value 390 self.Measurement.default_format = value 391 392 def define(self, definition: Union[str, Definition]) -> None: 393 """Add unit to the registry. 394 395 Parameters 396 ---------- 397 definition : str or Definition 398 a dimension, unit or prefix definition. 399 """ 400 401 if isinstance(definition, str): 402 for line in definition.split("\n"): 403 self._define(Definition.from_string(line, self.non_int_type)) 404 else: 405 self._define(definition) 406 407 def _define(self, definition: Definition) -> Tuple[Definition, dict, dict]: 408 """Add unit to the registry. 409 410 This method defines only multiplicative units, converting any other type 411 to `delta_` units. 412 413 Parameters 414 ---------- 415 definition : Definition 416 a dimension, unit or prefix definition. 417 418 Returns 419 ------- 420 Definition, dict, dict 421 Definition instance, case sensitive unit dict, case insensitive unit dict. 422 423 """ 424 425 if isinstance(definition, DimensionDefinition): 426 d, di = self._dimensions, None 427 428 elif isinstance(definition, UnitDefinition): 429 d, di = self._units, self._units_casei 430 431 # For a base units, we need to define the related dimension 432 # (making sure there is only one to define) 433 if definition.is_base: 434 for dimension in definition.reference.keys(): 435 if dimension in self._dimensions: 436 if dimension != "[]": 437 raise DefinitionSyntaxError( 438 "Only one unit per dimension can be a base unit" 439 ) 440 continue 441 442 self.define( 443 DimensionDefinition(dimension, "", (), None, is_base=True) 444 ) 445 446 elif isinstance(definition, PrefixDefinition): 447 d, di = self._prefixes, None 448 449 elif isinstance(definition, AliasDefinition): 450 d, di = self._units, self._units_casei 451 self._define_alias(definition, d, di) 452 return d[definition.name], d, di 453 454 else: 455 raise TypeError("{} is not a valid definition.".format(definition)) 456 457 # define "delta_" units for units with an offset 458 if getattr(definition.converter, "offset", 0) != 0: 459 460 if definition.name.startswith("["): 461 d_name = "[delta_" + definition.name[1:] 462 else: 463 d_name = "delta_" + definition.name 464 465 if definition.symbol: 466 d_symbol = "Δ" + definition.symbol 467 else: 468 d_symbol = None 469 470 d_aliases = tuple("Δ" + alias for alias in definition.aliases) + tuple( 471 "delta_" + alias for alias in definition.aliases 472 ) 473 474 d_reference = self.UnitsContainer( 475 {ref: value for ref, value in definition.reference.items()} 476 ) 477 478 d_def = UnitDefinition( 479 d_name, 480 d_symbol, 481 d_aliases, 482 ScaleConverter(definition.converter.scale), 483 d_reference, 484 definition.is_base, 485 ) 486 else: 487 d_def = definition 488 489 self._define_adder(d_def, d, di) 490 491 return definition, d, di 492 493 def _define_adder(self, definition, unit_dict, casei_unit_dict): 494 """Helper function to store a definition in the internal dictionaries. 495 It stores the definition under its name, symbol and aliases. 496 """ 497 self._define_single_adder( 498 definition.name, definition, unit_dict, casei_unit_dict 499 ) 500 501 if definition.has_symbol: 502 self._define_single_adder( 503 definition.symbol, definition, unit_dict, casei_unit_dict 504 ) 505 506 for alias in definition.aliases: 507 if " " in alias: 508 logger.warn("Alias cannot contain a space: " + alias) 509 510 self._define_single_adder(alias, definition, unit_dict, casei_unit_dict) 511 512 def _define_single_adder(self, key, value, unit_dict, casei_unit_dict): 513 """Helper function to store a definition in the internal dictionaries. 514 515 It warns or raise error on redefinition. 516 """ 517 if key in unit_dict: 518 if self._on_redefinition == "raise": 519 raise RedefinitionError(key, type(value)) 520 elif self._on_redefinition == "warn": 521 logger.warning("Redefining '%s' (%s)" % (key, type(value))) 522 523 unit_dict[key] = value 524 if casei_unit_dict is not None: 525 casei_unit_dict[key.lower()].add(key) 526 527 def _define_alias(self, definition, unit_dict, casei_unit_dict): 528 unit = unit_dict[definition.name] 529 unit.add_aliases(*definition.aliases) 530 for alias in unit.aliases: 531 unit_dict[alias] = unit 532 casei_unit_dict[alias.lower()].add(alias) 533 534 def _register_parser(self, prefix, parserfunc): 535 """Register a loader for a given @ directive.. 536 537 Parameters 538 ---------- 539 prefix : 540 string identifying the section (e.g. @context) 541 parserfunc : SourceIterator -> None 542 A function that is able to parse a Definition section. 543 544 Returns 545 ------- 546 547 """ 548 if self._parsers is None: 549 self._parsers = {} 550 551 if prefix and prefix[0] == "@": 552 self._parsers[prefix] = parserfunc 553 else: 554 raise ValueError("Prefix directives must start with '@'") 555 556 def load_definitions(self, file, is_resource: bool = False) -> None: 557 """Add units and prefixes defined in a definition text file. 558 559 Parameters 560 ---------- 561 file : 562 can be a filename or a line iterable. 563 is_resource : 564 used to indicate that the file is a resource file 565 and therefore should be loaded from the package. (Default value = False) 566 567 Returns 568 ------- 569 570 """ 571 # Permit both filenames and line-iterables 572 if isinstance(file, str): 573 try: 574 if is_resource: 575 rbytes = importlib.resources.read_binary(__package__, file) 576 return self.load_definitions( 577 StringIO(rbytes.decode("utf-8")), is_resource 578 ) 579 else: 580 with open(file, encoding="utf-8") as fp: 581 return self.load_definitions(fp, is_resource) 582 except (RedefinitionError, DefinitionSyntaxError) as e: 583 if e.filename is None: 584 e.filename = file 585 raise e 586 except Exception as e: 587 msg = getattr(e, "message", "") or str(e) 588 raise ValueError("While opening {}\n{}".format(file, msg)) 589 590 ifile = SourceIterator(file) 591 for no, line in ifile: 592 if line.startswith("@") and not line.startswith("@alias"): 593 if line.startswith("@import"): 594 if is_resource: 595 path = line[7:].strip() 596 else: 597 try: 598 path = os.path.dirname(file.name) 599 except AttributeError: 600 path = os.getcwd() 601 path = os.path.join(path, os.path.normpath(line[7:].strip())) 602 self.load_definitions(path, is_resource) 603 else: 604 parts = _BLOCK_RE.split(line) 605 606 loader = ( 607 self._parsers.get(parts[0], None) if self._parsers else None 608 ) 609 610 if loader is None: 611 raise DefinitionSyntaxError( 612 "Unknown directive %s" % line, lineno=no 613 ) 614 615 try: 616 loader(ifile) 617 except DefinitionSyntaxError as ex: 618 if ex.lineno is None: 619 ex.lineno = no 620 raise ex 621 else: 622 try: 623 self.define(Definition.from_string(line, self.non_int_type)) 624 except DefinitionSyntaxError as ex: 625 if ex.lineno is None: 626 ex.lineno = no 627 raise ex 628 except Exception as ex: 629 logger.error("In line {}, cannot add '{}' {}".format(no, line, ex)) 630 631 def _build_cache(self) -> None: 632 """Build a cache of dimensionality and base units.""" 633 self._cache = RegistryCache() 634 635 deps = { 636 name: definition.reference.keys() if definition.reference else set() 637 for name, definition in self._units.items() 638 } 639 640 for unit_names in solve_dependencies(deps): 641 for unit_name in unit_names: 642 if "[" in unit_name: 643 continue 644 parsed_names = self.parse_unit_name(unit_name) 645 if parsed_names: 646 prefix, base_name, _ = parsed_names[0] 647 else: 648 prefix, base_name = "", unit_name 649 650 try: 651 uc = ParserHelper.from_word(base_name, self.non_int_type) 652 653 bu = self._get_root_units(uc) 654 di = self._get_dimensionality(uc) 655 656 self._cache.root_units[uc] = bu 657 self._cache.dimensionality[uc] = di 658 659 if not prefix: 660 dimeq_set = self._cache.dimensional_equivalents.setdefault( 661 di, set() 662 ) 663 dimeq_set.add(self._units[base_name]._name) 664 665 except Exception as exc: 666 logger.warning(f"Could not resolve {unit_name}: {exc!r}") 667 668 def get_name( 669 self, name_or_alias: str, case_sensitive: Optional[bool] = None 670 ) -> str: 671 """Return the canonical name of a unit.""" 672 673 if name_or_alias == "dimensionless": 674 return "" 675 676 try: 677 return self._units[name_or_alias]._name 678 except KeyError: 679 pass 680 681 candidates = self.parse_unit_name(name_or_alias, case_sensitive) 682 if not candidates: 683 raise UndefinedUnitError(name_or_alias) 684 elif len(candidates) == 1: 685 prefix, unit_name, _ = candidates[0] 686 else: 687 logger.warning( 688 "Parsing {} yield multiple results. " 689 "Options are: {}".format(name_or_alias, candidates) 690 ) 691 prefix, unit_name, _ = candidates[0] 692 693 if prefix: 694 name = prefix + unit_name 695 symbol = self.get_symbol(name, case_sensitive) 696 prefix_def = self._prefixes[prefix] 697 self._units[name] = UnitDefinition( 698 name, 699 symbol, 700 (), 701 prefix_def.converter, 702 self.UnitsContainer({unit_name: 1}), 703 ) 704 return prefix + unit_name 705 706 return unit_name 707 708 def get_symbol( 709 self, name_or_alias: str, case_sensitive: Optional[bool] = None 710 ) -> str: 711 """Return the preferred alias for a unit.""" 712 candidates = self.parse_unit_name(name_or_alias, case_sensitive) 713 if not candidates: 714 raise UndefinedUnitError(name_or_alias) 715 elif len(candidates) == 1: 716 prefix, unit_name, _ = candidates[0] 717 else: 718 logger.warning( 719 "Parsing {0} yield multiple results. " 720 "Options are: {1!r}".format(name_or_alias, candidates) 721 ) 722 prefix, unit_name, _ = candidates[0] 723 724 return self._prefixes[prefix].symbol + self._units[unit_name].symbol 725 726 def _get_symbol(self, name: str) -> str: 727 return self._units[name].symbol 728 729 def get_dimensionality(self, input_units) -> UnitsContainerT: 730 """Convert unit or dict of units or dimensions to a dict of base dimensions 731 dimensions 732 """ 733 734 # TODO: This should be to_units_container(input_units, self) 735 # but this tries to reparse and fail for dimensions. 736 input_units = to_units_container(input_units) 737 738 return self._get_dimensionality(input_units) 739 740 def _get_dimensionality( 741 self, input_units: Optional[UnitsContainerT] 742 ) -> UnitsContainerT: 743 """Convert a UnitsContainer to base dimensions.""" 744 if not input_units: 745 return self.UnitsContainer() 746 747 cache = self._cache.dimensionality 748 749 try: 750 return cache[input_units] 751 except KeyError: 752 pass 753 754 accumulator = defaultdict(int) 755 self._get_dimensionality_recurse(input_units, 1, accumulator) 756 757 if "[]" in accumulator: 758 del accumulator["[]"] 759 760 dims = self.UnitsContainer({k: v for k, v in accumulator.items() if v != 0}) 761 762 cache[input_units] = dims 763 764 return dims 765 766 def _get_dimensionality_recurse(self, ref, exp, accumulator): 767 for key in ref: 768 exp2 = exp * ref[key] 769 if _is_dim(key): 770 reg = self._dimensions[key] 771 if reg.is_base: 772 accumulator[key] += exp2 773 elif reg.reference is not None: 774 self._get_dimensionality_recurse(reg.reference, exp2, accumulator) 775 else: 776 reg = self._units[self.get_name(key)] 777 if reg.reference is not None: 778 self._get_dimensionality_recurse(reg.reference, exp2, accumulator) 779 780 def _get_dimensionality_ratio(self, unit1, unit2): 781 """Get the exponential ratio between two units, i.e. solve unit2 = unit1**x for x. 782 783 Parameters 784 ---------- 785 unit1 : UnitsContainer compatible (str, Unit, UnitsContainer, dict) 786 first unit 787 unit2 : UnitsContainer compatible (str, Unit, UnitsContainer, dict) 788 second unit 789 790 Returns 791 ------- 792 number or None 793 exponential proportionality or None if the units cannot be converted 794 795 """ 796 # shortcut in case of equal units 797 if unit1 == unit2: 798 return 1 799 800 dim1, dim2 = (self.get_dimensionality(unit) for unit in (unit1, unit2)) 801 if not dim1 or not dim2 or dim1.keys() != dim2.keys(): # not comparable 802 return None 803 804 ratios = (dim2[key] / val for key, val in dim1.items()) 805 first = next(ratios) 806 if all(r == first for r in ratios): # all are same, we're good 807 return first 808 return None 809 810 def get_root_units( 811 self, input_units: UnitLike, check_nonmult: bool = True 812 ) -> Tuple[Number, Unit]: 813 """Convert unit or dict of units to the root units. 814 815 If any unit is non multiplicative and check_converter is True, 816 then None is returned as the multiplicative factor. 817 818 Parameters 819 ---------- 820 input_units : UnitsContainer or str 821 units 822 check_nonmult : bool 823 if True, None will be returned as the 824 multiplicative factor if a non-multiplicative 825 units is found in the final Units. (Default value = True) 826 827 Returns 828 ------- 829 Number, pint.Unit 830 multiplicative factor, base units 831 832 """ 833 input_units = to_units_container(input_units, self) 834 835 f, units = self._get_root_units(input_units, check_nonmult) 836 837 return f, self.Unit(units) 838 839 def _get_root_units(self, input_units, check_nonmult=True): 840 """Convert unit or dict of units to the root units. 841 842 If any unit is non multiplicative and check_converter is True, 843 then None is returned as the multiplicative factor. 844 845 Parameters 846 ---------- 847 input_units : UnitsContainer or dict 848 units 849 check_nonmult : bool 850 if True, None will be returned as the 851 multiplicative factor if a non-multiplicative 852 units is found in the final Units. (Default value = True) 853 854 Returns 855 ------- 856 number, Unit 857 multiplicative factor, base units 858 859 """ 860 if not input_units: 861 return 1, self.UnitsContainer() 862 863 cache = self._cache.root_units 864 try: 865 return cache[input_units] 866 except KeyError: 867 pass 868 869 accumulators = [1, defaultdict(int)] 870 self._get_root_units_recurse(input_units, 1, accumulators) 871 872 factor = accumulators[0] 873 units = self.UnitsContainer( 874 {k: v for k, v in accumulators[1].items() if v != 0} 875 ) 876 877 # Check if any of the final units is non multiplicative and return None instead. 878 if check_nonmult: 879 if any(not self._units[unit].converter.is_multiplicative for unit in units): 880 factor = None 881 882 cache[input_units] = factor, units 883 return factor, units 884 885 def get_base_units(self, input_units, check_nonmult=True, system=None): 886 """Convert unit or dict of units to the base units. 887 888 If any unit is non multiplicative and check_converter is True, 889 then None is returned as the multiplicative factor. 890 891 Parameters 892 ---------- 893 input_units : UnitsContainer or str 894 units 895 check_nonmult : bool 896 If True, None will be returned as the multiplicative factor if 897 non-multiplicative units are found in the final Units. 898 (Default value = True) 899 system : 900 (Default value = None) 901 902 Returns 903 ------- 904 Number, pint.Unit 905 multiplicative factor, base units 906 907 """ 908 909 return self.get_root_units(input_units, check_nonmult) 910 911 def _get_root_units_recurse(self, ref, exp, accumulators): 912 for key in ref: 913 exp2 = exp * ref[key] 914 key = self.get_name(key) 915 reg = self._units[key] 916 if reg.is_base: 917 accumulators[1][key] += exp2 918 else: 919 accumulators[0] *= reg._converter.scale ** exp2 920 if reg.reference is not None: 921 self._get_root_units_recurse(reg.reference, exp2, accumulators) 922 923 def get_compatible_units( 924 self, input_units, group_or_system=None 925 ) -> FrozenSet["Unit"]: 926 """ """ 927 input_units = to_units_container(input_units) 928 929 equiv = self._get_compatible_units(input_units, group_or_system) 930 931 return frozenset(self.Unit(eq) for eq in equiv) 932 933 def _get_compatible_units(self, input_units, group_or_system): 934 """ """ 935 if not input_units: 936 return frozenset() 937 938 src_dim = self._get_dimensionality(input_units) 939 return self._cache.dimensional_equivalents[src_dim] 940 941 def is_compatible_with( 942 self, obj1: Any, obj2: Any, *contexts: Union[str, Context], **ctx_kwargs 943 ) -> bool: 944 """check if the other object is compatible 945 946 Parameters 947 ---------- 948 obj1, obj2 949 The objects to check against each other. Treated as 950 dimensionless if not a Quantity, Unit or str. 951 *contexts : str or pint.Context 952 Contexts to use in the transformation. 953 **ctx_kwargs : 954 Values for the Context/s 955 956 Returns 957 ------- 958 bool 959 """ 960 if isinstance(obj1, (self.Quantity, self.Unit)): 961 return obj1.is_compatible_with(obj2, *contexts, **ctx_kwargs) 962 963 if isinstance(obj1, str): 964 return self.parse_expression(obj1).is_compatible_with( 965 obj2, *contexts, **ctx_kwargs 966 ) 967 968 return not isinstance(obj2, (self.Quantity, self.Unit)) 969 970 def convert( 971 self, 972 value: T, 973 src: QuantityOrUnitLike, 974 dst: QuantityOrUnitLike, 975 inplace: bool = False, 976 ) -> T: 977 """Convert value from some source to destination units. 978 979 Parameters 980 ---------- 981 value : 982 value 983 src : pint.Quantity or str 984 source units. 985 dst : pint.Quantity or str 986 destination units. 987 inplace : 988 (Default value = False) 989 990 Returns 991 ------- 992 type 993 converted value 994 995 """ 996 src = to_units_container(src, self) 997 998 dst = to_units_container(dst, self) 999 1000 if src == dst: 1001 return value 1002 1003 return self._convert(value, src, dst, inplace) 1004 1005 def _convert(self, value, src, dst, inplace=False, check_dimensionality=True): 1006 """Convert value from some source to destination units. 1007 1008 Parameters 1009 ---------- 1010 value : 1011 value 1012 src : UnitsContainer 1013 source units. 1014 dst : UnitsContainer 1015 destination units. 1016 inplace : 1017 (Default value = False) 1018 check_dimensionality : 1019 (Default value = True) 1020 1021 Returns 1022 ------- 1023 type 1024 converted value 1025 1026 """ 1027 1028 if check_dimensionality: 1029 1030 src_dim = self._get_dimensionality(src) 1031 dst_dim = self._get_dimensionality(dst) 1032 1033 # If the source and destination dimensionality are different, 1034 # then the conversion cannot be performed. 1035 if src_dim != dst_dim: 1036 raise DimensionalityError(src, dst, src_dim, dst_dim) 1037 1038 # Here src and dst have only multiplicative units left. Thus we can 1039 # convert with a factor. 1040 factor, _ = self._get_root_units(src / dst) 1041 1042 # factor is type float and if our magnitude is type Decimal then 1043 # must first convert to Decimal before we can '*' the values 1044 if isinstance(value, Decimal): 1045 factor = Decimal(str(factor)) 1046 elif isinstance(value, Fraction): 1047 factor = Fraction(str(factor)) 1048 1049 if inplace: 1050 value *= factor 1051 else: 1052 value = value * factor 1053 1054 return value 1055 1056 def parse_unit_name( 1057 self, unit_name: str, case_sensitive: Optional[bool] = None 1058 ) -> Tuple[Tuple[str, str, str], ...]: 1059 """Parse a unit to identify prefix, unit name and suffix 1060 by walking the list of prefix and suffix. 1061 In case of equivalent combinations (e.g. ('kilo', 'gram', '') and 1062 ('', 'kilogram', ''), prefer those with prefix. 1063 1064 Parameters 1065 ---------- 1066 unit_name : 1067 1068 case_sensitive : bool or None 1069 Control if unit lookup is case sensitive. Defaults to None, which uses the 1070 registry's case_sensitive setting 1071 1072 Returns 1073 ------- 1074 tuple of tuples (str, str, str) 1075 all non-equivalent combinations of (prefix, unit name, suffix) 1076 """ 1077 return self._dedup_candidates( 1078 self._parse_unit_name(unit_name, case_sensitive=case_sensitive) 1079 ) 1080 1081 def _parse_unit_name( 1082 self, unit_name: str, case_sensitive: Optional[bool] = None 1083 ) -> Iterator[Tuple[str, str, str]]: 1084 """Helper of parse_unit_name.""" 1085 case_sensitive = ( 1086 self.case_sensitive if case_sensitive is None else case_sensitive 1087 ) 1088 stw = unit_name.startswith 1089 edw = unit_name.endswith 1090 for suffix, prefix in itertools.product(self._suffixes, self._prefixes): 1091 if stw(prefix) and edw(suffix): 1092 name = unit_name[len(prefix) :] 1093 if suffix: 1094 name = name[: -len(suffix)] 1095 if len(name) == 1: 1096 continue 1097 if case_sensitive: 1098 if name in self._units: 1099 yield ( 1100 self._prefixes[prefix].name, 1101 self._units[name].name, 1102 self._suffixes[suffix], 1103 ) 1104 else: 1105 for real_name in self._units_casei.get(name.lower(), ()): 1106 yield ( 1107 self._prefixes[prefix].name, 1108 self._units[real_name].name, 1109 self._suffixes[suffix], 1110 ) 1111 1112 @staticmethod 1113 def _dedup_candidates( 1114 candidates: Iterable[Tuple[str, str, str]] 1115 ) -> Tuple[Tuple[str, str, str], ...]: 1116 """Helper of parse_unit_name. 1117 1118 Given an iterable of unit triplets (prefix, name, suffix), remove those with 1119 different names but equal value, preferring those with a prefix. 1120 1121 e.g. ('kilo', 'gram', '') and ('', 'kilogram', '') 1122 """ 1123 candidates = dict.fromkeys(candidates) # ordered set 1124 for cp, cu, cs in list(candidates): 1125 assert isinstance(cp, str) 1126 assert isinstance(cu, str) 1127 if cs != "": 1128 raise NotImplementedError("non-empty suffix") 1129 if cp: 1130 candidates.pop(("", cp + cu, ""), None) 1131 return tuple(candidates) 1132 1133 def parse_units( 1134 self, 1135 input_string: str, 1136 as_delta: Optional[bool] = None, 1137 case_sensitive: Optional[bool] = None, 1138 ) -> Unit: 1139 """Parse a units expression and returns a UnitContainer with 1140 the canonical names. 1141 1142 The expression can only contain products, ratios and powers of units. 1143 1144 Parameters 1145 ---------- 1146 input_string : str 1147 as_delta : bool or None 1148 if the expression has multiple units, the parser will 1149 interpret non multiplicative units as their `delta_` counterparts. (Default value = None) 1150 case_sensitive : bool or None 1151 Control if unit parsing is case sensitive. Defaults to None, which uses the 1152 registry's setting. 1153 1154 Returns 1155 ------- 1156 pint.Unit 1157 1158 """ 1159 for p in self.preprocessors: 1160 input_string = p(input_string) 1161 units = self._parse_units(input_string, as_delta, case_sensitive) 1162 return self.Unit(units) 1163 1164 def _parse_units(self, input_string, as_delta=True, case_sensitive=None): 1165 """Parse a units expression and returns a UnitContainer with 1166 the canonical names. 1167 """ 1168 1169 cache = self._cache.parse_unit 1170 # Issue #1097: it is possible, when a unit was defined while a different context 1171 # was active, that the unit is in self._cache.parse_unit but not in self._units. 1172 # If this is the case, force self._units to be repopulated. 1173 if as_delta and input_string in cache and input_string in self._units: 1174 return cache[input_string] 1175 1176 if not input_string: 1177 return self.UnitsContainer() 1178 1179 # Sanitize input_string with whitespaces. 1180 input_string = input_string.strip() 1181 1182 units = ParserHelper.from_string(input_string, self.non_int_type) 1183 if units.scale != 1: 1184 raise ValueError("Unit expression cannot have a scaling factor.") 1185 1186 ret = {} 1187 many = len(units) > 1 1188 for name in units: 1189 cname = self.get_name(name, case_sensitive=case_sensitive) 1190 value = units[name] 1191 if not cname: 1192 continue 1193 if as_delta and (many or (not many and value != 1)): 1194 definition = self._units[cname] 1195 if not definition.is_multiplicative: 1196 cname = "delta_" + cname 1197 ret[cname] = value 1198 1199 ret = self.UnitsContainer(ret) 1200 1201 if as_delta: 1202 cache[input_string] = ret 1203 1204 return ret 1205 1206 def _eval_token(self, token, case_sensitive=None, use_decimal=False, **values): 1207 1208 # TODO: remove this code when use_decimal is deprecated 1209 if use_decimal: 1210 raise DeprecationWarning( 1211 "`use_decimal` is deprecated, use `non_int_type` keyword argument when instantiating the registry.\n" 1212 ">>> from decimal import Decimal\n" 1213 ">>> ureg = UnitRegistry(non_int_type=Decimal)" 1214 ) 1215 1216 token_type = token[0] 1217 token_text = token[1] 1218 if token_type == NAME: 1219 if token_text == "dimensionless": 1220 return 1 * self.dimensionless 1221 elif token_text in values: 1222 return self.Quantity(values[token_text]) 1223 else: 1224 return self.Quantity( 1225 1, 1226 self.UnitsContainer( 1227 {self.get_name(token_text, case_sensitive=case_sensitive): 1} 1228 ), 1229 ) 1230 elif token_type == NUMBER: 1231 return ParserHelper.eval_token(token, non_int_type=self.non_int_type) 1232 else: 1233 raise Exception("unknown token type") 1234 1235 def parse_pattern( 1236 self, 1237 input_string: str, 1238 pattern: str, 1239 case_sensitive: Optional[bool] = None, 1240 use_decimal: bool = False, 1241 many: bool = False, 1242 ) -> Union[List[str], str, None]: 1243 """Parse a string with a given regex pattern and returns result. 1244 1245 Parameters 1246 ---------- 1247 input_string : 1248 1249 pattern_string: 1250 The regex parse string 1251 case_sensitive : 1252 (Default value = None, which uses registry setting) 1253 use_decimal : 1254 (Default value = False) 1255 many : 1256 Match many results 1257 (Default value = False) 1258 1259 1260 Returns 1261 ------- 1262 1263 """ 1264 1265 if not input_string: 1266 return [] if many else None 1267 1268 # Parse string 1269 pattern = pattern_to_regex(pattern) 1270 matched = re.finditer(pattern, input_string) 1271 1272 # Extract result(s) 1273 results = [] 1274 for match in matched: 1275 # Extract units from result 1276 match = match.groupdict() 1277 1278 # Parse units 1279 units = [] 1280 for unit, value in match.items(): 1281 # Construct measure by multiplying value by unit 1282 units.append( 1283 float(value) 1284 * self.parse_expression(unit, case_sensitive, use_decimal) 1285 ) 1286 1287 # Add to results 1288 results.append(units) 1289 1290 # Return first match only 1291 if not many: 1292 return results[0] 1293 1294 return results 1295 1296 def parse_expression( 1297 self, 1298 input_string: str, 1299 case_sensitive: Optional[bool] = None, 1300 use_decimal: bool = False, 1301 **values, 1302 ) -> Quantity: 1303 """Parse a mathematical expression including units and return a quantity object. 1304 1305 Numerical constants can be specified as keyword arguments and will take precedence 1306 over the names defined in the registry. 1307 1308 Parameters 1309 ---------- 1310 input_string : 1311 1312 case_sensitive : 1313 (Default value = None, which uses registry setting) 1314 use_decimal : 1315 (Default value = False) 1316 **values : 1317 1318 1319 Returns 1320 ------- 1321 1322 """ 1323 1324 # TODO: remove this code when use_decimal is deprecated 1325 if use_decimal: 1326 raise DeprecationWarning( 1327 "`use_decimal` is deprecated, use `non_int_type` keyword argument when instantiating the registry.\n" 1328 ">>> from decimal import Decimal\n" 1329 ">>> ureg = UnitRegistry(non_int_type=Decimal)" 1330 ) 1331 1332 if not input_string: 1333 return self.Quantity(1) 1334 1335 for p in self.preprocessors: 1336 input_string = p(input_string) 1337 input_string = string_preprocessor(input_string) 1338 gen = tokenizer(input_string) 1339 1340 return build_eval_tree(gen).evaluate( 1341 lambda x: self._eval_token(x, case_sensitive=case_sensitive, **values) 1342 ) 1343 1344 __call__ = parse_expression 1345 1346 1347class NonMultiplicativeRegistry(BaseRegistry): 1348 """Handle of non multiplicative units (e.g. Temperature). 1349 1350 Capabilities: 1351 - Register non-multiplicative units and their relations. 1352 - Convert between non-multiplicative units. 1353 1354 Parameters 1355 ---------- 1356 default_as_delta : bool 1357 If True, non-multiplicative units are interpreted as 1358 their *delta* counterparts in multiplications. 1359 autoconvert_offset_to_baseunit : bool 1360 If True, non-multiplicative units are 1361 converted to base units in multiplications. 1362 1363 """ 1364 1365 def __init__( 1366 self, 1367 default_as_delta: bool = True, 1368 autoconvert_offset_to_baseunit: bool = False, 1369 **kwargs: Any, 1370 ) -> None: 1371 super().__init__(**kwargs) 1372 1373 #: When performing a multiplication of units, interpret 1374 #: non-multiplicative units as their *delta* counterparts. 1375 self.default_as_delta = default_as_delta 1376 1377 # Determines if quantities with offset units are converted to their 1378 # base units on multiplication and division. 1379 self.autoconvert_offset_to_baseunit = autoconvert_offset_to_baseunit 1380 1381 def _parse_units( 1382 self, 1383 input_string: str, 1384 as_delta: Optional[bool] = None, 1385 case_sensitive: Optional[bool] = None, 1386 ): 1387 """ """ 1388 if as_delta is None: 1389 as_delta = self.default_as_delta 1390 1391 return super()._parse_units(input_string, as_delta, case_sensitive) 1392 1393 def _define(self, definition: Union[str, Definition]): 1394 """Add unit to the registry. 1395 1396 In addition to what is done by the BaseRegistry, 1397 registers also non-multiplicative units. 1398 1399 Parameters 1400 ---------- 1401 definition : str or Definition 1402 A dimension, unit or prefix definition. 1403 1404 Returns 1405 ------- 1406 Definition, dict, dict 1407 Definition instance, case sensitive unit dict, case insensitive unit dict. 1408 1409 """ 1410 1411 definition, d, di = super()._define(definition) 1412 1413 # define additional units for units with an offset 1414 if getattr(definition.converter, "offset", 0) != 0: 1415 self._define_adder(definition, d, di) 1416 1417 return definition, d, di 1418 1419 def _is_multiplicative(self, u) -> bool: 1420 if u in self._units: 1421 return self._units[u].is_multiplicative 1422 1423 # If the unit is not in the registry might be because it is not 1424 # registered with its prefixed version. 1425 # TODO: Might be better to register them. 1426 names = self.parse_unit_name(u) 1427 assert len(names) == 1 1428 _, base_name, _ = names[0] 1429 try: 1430 return self._units[base_name].is_multiplicative 1431 except KeyError: 1432 raise UndefinedUnitError(u) 1433 1434 def _validate_and_extract(self, units): 1435 # u is for unit, e is for exponent 1436 nonmult_units = [ 1437 (u, e) for u, e in units.items() if not self._is_multiplicative(u) 1438 ] 1439 1440 # Let's validate source offset units 1441 if len(nonmult_units) > 1: 1442 # More than one src offset unit is not allowed 1443 raise ValueError("more than one offset unit.") 1444 1445 elif len(nonmult_units) == 1: 1446 # A single src offset unit is present. Extract it 1447 # But check that: 1448 # - the exponent is 1 1449 # - is not used in multiplicative context 1450 nonmult_unit, exponent = nonmult_units.pop() 1451 1452 if exponent != 1: 1453 raise ValueError("offset units in higher order.") 1454 1455 if len(units) > 1 and not self.autoconvert_offset_to_baseunit: 1456 raise ValueError("offset unit used in multiplicative context.") 1457 1458 return nonmult_unit 1459 1460 return None 1461 1462 def _add_ref_of_log_unit(self, offset_unit, all_units): 1463 1464 slct_unit = self._units[offset_unit] 1465 if isinstance(slct_unit.converter, LogarithmicConverter): 1466 # Extract reference unit 1467 slct_ref = slct_unit.reference 1468 # If reference unit is not dimensionless 1469 if slct_ref != UnitsContainer(): 1470 # Extract reference unit 1471 (u, e) = [(u, e) for u, e in slct_ref.items()].pop() 1472 # Add it back to the unit list 1473 return all_units.add(u, e) 1474 # Otherwise, return the units unmodified 1475 return all_units 1476 1477 def _convert(self, value, src, dst, inplace=False): 1478 """Convert value from some source to destination units. 1479 1480 In addition to what is done by the BaseRegistry, 1481 converts between non-multiplicative units. 1482 1483 Parameters 1484 ---------- 1485 value : 1486 value 1487 src : UnitsContainer 1488 source units. 1489 dst : UnitsContainer 1490 destination units. 1491 inplace : 1492 (Default value = False) 1493 1494 Returns 1495 ------- 1496 type 1497 converted value 1498 1499 """ 1500 1501 # Conversion needs to consider if non-multiplicative (AKA offset 1502 # units) are involved. Conversion is only possible if src and dst 1503 # have at most one offset unit per dimension. Other rules are applied 1504 # by validate and extract. 1505 try: 1506 src_offset_unit = self._validate_and_extract(src) 1507 except ValueError as ex: 1508 raise DimensionalityError(src, dst, extra_msg=f" - In source units, {ex}") 1509 1510 try: 1511 dst_offset_unit = self._validate_and_extract(dst) 1512 except ValueError as ex: 1513 raise DimensionalityError( 1514 src, dst, extra_msg=f" - In destination units, {ex}" 1515 ) 1516 1517 if not (src_offset_unit or dst_offset_unit): 1518 return super()._convert(value, src, dst, inplace) 1519 1520 src_dim = self._get_dimensionality(src) 1521 dst_dim = self._get_dimensionality(dst) 1522 1523 # If the source and destination dimensionality are different, 1524 # then the conversion cannot be performed. 1525 if src_dim != dst_dim: 1526 raise DimensionalityError(src, dst, src_dim, dst_dim) 1527 1528 # clean src from offset units by converting to reference 1529 if src_offset_unit: 1530 value = self._units[src_offset_unit].converter.to_reference(value, inplace) 1531 src = src.remove([src_offset_unit]) 1532 # Add reference unit for multiplicative section 1533 src = self._add_ref_of_log_unit(src_offset_unit, src) 1534 1535 # clean dst units from offset units 1536 if dst_offset_unit: 1537 dst = dst.remove([dst_offset_unit]) 1538 # Add reference unit for multiplicative section 1539 dst = self._add_ref_of_log_unit(dst_offset_unit, dst) 1540 1541 # Convert non multiplicative units to the dst. 1542 value = super()._convert(value, src, dst, inplace, False) 1543 1544 # Finally convert to offset units specified in destination 1545 if dst_offset_unit: 1546 value = self._units[dst_offset_unit].converter.from_reference( 1547 value, inplace 1548 ) 1549 1550 return value 1551 1552 1553class ContextRegistry(BaseRegistry): 1554 """Handle of Contexts. 1555 1556 Conversion between units with different dimensions according 1557 to previously established relations (contexts). 1558 (e.g. in the spectroscopy, conversion between frequency and energy is possible) 1559 1560 Capabilities: 1561 1562 - Register contexts. 1563 - Enable and disable contexts. 1564 - Parse @context directive. 1565 """ 1566 1567 def __init__(self, **kwargs: Any) -> None: 1568 # Map context name (string) or abbreviation to context. 1569 self._contexts: Dict[str, Context] = {} 1570 # Stores active contexts. 1571 self._active_ctx = ContextChain() 1572 # Map context chain to cache 1573 self._caches = {} 1574 # Map context chain to units override 1575 self._context_units = {} 1576 1577 super().__init__(**kwargs) 1578 1579 # Allow contexts to add override layers to the units 1580 self._units = ChainMap(self._units) 1581 1582 def _register_parsers(self) -> None: 1583 super()._register_parsers() 1584 self._register_parser("@context", self._parse_context) 1585 1586 def _parse_context(self, ifile) -> None: 1587 try: 1588 self.add_context( 1589 Context.from_lines( 1590 ifile.block_iter(), 1591 self.get_dimensionality, 1592 non_int_type=self.non_int_type, 1593 ) 1594 ) 1595 except KeyError as e: 1596 raise DefinitionSyntaxError(f"unknown dimension {e} in context") 1597 1598 def add_context(self, context: Context) -> None: 1599 """Add a context object to the registry. 1600 1601 The context will be accessible by its name and aliases. 1602 1603 Notice that this method will NOT enable the context; 1604 see :meth:`enable_contexts`. 1605 """ 1606 if not context.name: 1607 raise ValueError("Can't add unnamed context to registry") 1608 if context.name in self._contexts: 1609 logger.warning( 1610 "The name %s was already registered for another context.", context.name 1611 ) 1612 self._contexts[context.name] = context 1613 for alias in context.aliases: 1614 if alias in self._contexts: 1615 logger.warning( 1616 "The name %s was already registered for another context", 1617 context.name, 1618 ) 1619 self._contexts[alias] = context 1620 1621 def remove_context(self, name_or_alias: str) -> Context: 1622 """Remove a context from the registry and return it. 1623 1624 Notice that this methods will not disable the context; 1625 see :meth:`disable_contexts`. 1626 """ 1627 context = self._contexts[name_or_alias] 1628 1629 del self._contexts[context.name] 1630 for alias in context.aliases: 1631 del self._contexts[alias] 1632 1633 return context 1634 1635 def _build_cache(self) -> None: 1636 super()._build_cache() 1637 self._caches[()] = self._cache 1638 1639 def _switch_context_cache_and_units(self) -> None: 1640 """If any of the active contexts redefine units, create variant self._cache 1641 and self._units specific to the combination of active contexts. 1642 The next time this method is invoked with the same combination of contexts, 1643 reuse the same variant self._cache and self._units as in the previous time. 1644 """ 1645 del self._units.maps[:-1] 1646 units_overlay = any(ctx.redefinitions for ctx in self._active_ctx.contexts) 1647 if not units_overlay: 1648 # Use the default _cache and _units 1649 self._cache = self._caches[()] 1650 return 1651 1652 key = self._active_ctx.hashable() 1653 try: 1654 self._cache = self._caches[key] 1655 self._units.maps.insert(0, self._context_units[key]) 1656 except KeyError: 1657 pass 1658 1659 # First time using this specific combination of contexts and it contains 1660 # unit redefinitions 1661 base_cache = self._caches[()] 1662 self._caches[key] = self._cache = ContextCacheOverlay(base_cache) 1663 1664 self._context_units[key] = units_overlay = {} 1665 self._units.maps.insert(0, units_overlay) 1666 1667 on_redefinition_backup = self._on_redefinition 1668 self._on_redefinition = "ignore" 1669 try: 1670 for ctx in reversed(self._active_ctx.contexts): 1671 for definition in ctx.redefinitions: 1672 self._redefine(definition) 1673 finally: 1674 self._on_redefinition = on_redefinition_backup 1675 1676 def _redefine(self, definition: UnitDefinition) -> None: 1677 """Redefine a unit from a context""" 1678 # Find original definition in the UnitRegistry 1679 candidates = self.parse_unit_name(definition.name) 1680 if not candidates: 1681 raise UndefinedUnitError(definition.name) 1682 candidates_no_prefix = [c for c in candidates if not c[0]] 1683 if not candidates_no_prefix: 1684 raise ValueError(f"Can't redefine a unit with a prefix: {definition.name}") 1685 assert len(candidates_no_prefix) == 1 1686 _, name, _ = candidates_no_prefix[0] 1687 try: 1688 basedef = self._units[name] 1689 except KeyError: 1690 raise UndefinedUnitError(name) 1691 1692 # Rebuild definition as a variant of the base 1693 if basedef.is_base: 1694 raise ValueError("Can't redefine a base unit to a derived one") 1695 1696 dims_old = self._get_dimensionality(basedef.reference) 1697 dims_new = self._get_dimensionality(definition.reference) 1698 if dims_old != dims_new: 1699 raise ValueError( 1700 f"Can't change dimensionality of {basedef.name} " 1701 f"from {dims_old} to {dims_new} in a context" 1702 ) 1703 1704 # Do not modify in place the original definition, as (1) the context may 1705 # be shared by other registries, and (2) it would alter the cache key 1706 definition = UnitDefinition( 1707 name=basedef.name, 1708 symbol=basedef.symbol, 1709 aliases=basedef.aliases, 1710 is_base=False, 1711 reference=definition.reference, 1712 converter=definition.converter, 1713 ) 1714 1715 # Write into the context-specific self._units.maps[0] and self._cache.root_units 1716 self.define(definition) 1717 1718 def enable_contexts( 1719 self, *names_or_contexts: Union[str, Context], **kwargs 1720 ) -> None: 1721 """Enable contexts provided by name or by object. 1722 1723 Parameters 1724 ---------- 1725 *names_or_contexts : 1726 one or more contexts or context names/aliases 1727 **kwargs : 1728 keyword arguments for the context(s) 1729 1730 Examples 1731 -------- 1732 See :meth:`context` 1733 """ 1734 1735 # If present, copy the defaults from the containing contexts 1736 if self._active_ctx.defaults: 1737 kwargs = dict(self._active_ctx.defaults, **kwargs) 1738 1739 # For each name, we first find the corresponding context 1740 ctxs = [ 1741 self._contexts[name] if isinstance(name, str) else name 1742 for name in names_or_contexts 1743 ] 1744 1745 # Check if the contexts have been checked first, if not we make sure 1746 # that dimensions are expressed in terms of base dimensions. 1747 for ctx in ctxs: 1748 if ctx.checked: 1749 continue 1750 funcs_copy = dict(ctx.funcs) 1751 for (src, dst), func in funcs_copy.items(): 1752 src_ = self._get_dimensionality(src) 1753 dst_ = self._get_dimensionality(dst) 1754 if src != src_ or dst != dst_: 1755 ctx.remove_transformation(src, dst) 1756 ctx.add_transformation(src_, dst_, func) 1757 ctx.checked = True 1758 1759 # and create a new one with the new defaults. 1760 contexts = tuple(Context.from_context(ctx, **kwargs) for ctx in ctxs) 1761 1762 # Finally we add them to the active context. 1763 self._active_ctx.insert_contexts(*contexts) 1764 self._switch_context_cache_and_units() 1765 1766 def disable_contexts(self, n: int = None) -> None: 1767 """Disable the last n enabled contexts. 1768 1769 Parameters 1770 ---------- 1771 n : int 1772 Number of contexts to disable. Default: disable all contexts. 1773 """ 1774 self._active_ctx.remove_contexts(n) 1775 self._switch_context_cache_and_units() 1776 1777 @contextmanager 1778 def context(self, *names, **kwargs) -> ContextManager[Context]: 1779 """Used as a context manager, this function enables to activate a context 1780 which is removed after usage. 1781 1782 Parameters 1783 ---------- 1784 *names : 1785 name(s) of the context(s). 1786 **kwargs : 1787 keyword arguments for the contexts. 1788 1789 Examples 1790 -------- 1791 Context can be called by their name: 1792 1793 >>> import pint 1794 >>> ureg = pint.UnitRegistry() 1795 >>> ureg.add_context(pint.Context('one')) 1796 >>> ureg.add_context(pint.Context('two')) 1797 >>> with ureg.context('one'): 1798 ... pass 1799 1800 If a context has an argument, you can specify its value as a keyword argument: 1801 1802 >>> with ureg.context('one', n=1): 1803 ... pass 1804 1805 Multiple contexts can be entered in single call: 1806 1807 >>> with ureg.context('one', 'two', n=1): 1808 ... pass 1809 1810 Or nested allowing you to give different values to the same keyword argument: 1811 1812 >>> with ureg.context('one', n=1): 1813 ... with ureg.context('two', n=2): 1814 ... pass 1815 1816 A nested context inherits the defaults from the containing context: 1817 1818 >>> with ureg.context('one', n=1): 1819 ... # Here n takes the value of the outer context 1820 ... with ureg.context('two'): 1821 ... pass 1822 """ 1823 # Enable the contexts. 1824 self.enable_contexts(*names, **kwargs) 1825 1826 try: 1827 # After adding the context and rebuilding the graph, the registry 1828 # is ready to use. 1829 yield self 1830 finally: 1831 # Upon leaving the with statement, 1832 # the added contexts are removed from the active one. 1833 self.disable_contexts(len(names)) 1834 1835 def with_context(self, name, **kwargs) -> Callable[[F], F]: 1836 """Decorator to wrap a function call in a Pint context. 1837 1838 Use it to ensure that a certain context is active when 1839 calling a function:: 1840 1841 Parameters 1842 ---------- 1843 name : 1844 name of the context. 1845 **kwargs : 1846 keyword arguments for the context 1847 1848 1849 Returns 1850 ------- 1851 callable 1852 the wrapped function. 1853 1854 Example 1855 ------- 1856 >>> @ureg.with_context('sp') 1857 ... def my_cool_fun(wavelength): 1858 ... print('This wavelength is equivalent to: %s', wavelength.to('terahertz')) 1859 """ 1860 1861 def decorator(func): 1862 assigned = tuple( 1863 attr for attr in functools.WRAPPER_ASSIGNMENTS if hasattr(func, attr) 1864 ) 1865 updated = tuple( 1866 attr for attr in functools.WRAPPER_UPDATES if hasattr(func, attr) 1867 ) 1868 1869 @functools.wraps(func, assigned=assigned, updated=updated) 1870 def wrapper(*values, **wrapper_kwargs): 1871 with self.context(name, **kwargs): 1872 return func(*values, **wrapper_kwargs) 1873 1874 return wrapper 1875 1876 return decorator 1877 1878 def _convert(self, value, src, dst, inplace=False): 1879 """Convert value from some source to destination units. 1880 1881 In addition to what is done by the BaseRegistry, 1882 converts between units with different dimensions by following 1883 transformation rules defined in the context. 1884 1885 Parameters 1886 ---------- 1887 value : 1888 value 1889 src : UnitsContainer 1890 source units. 1891 dst : UnitsContainer 1892 destination units. 1893 inplace : 1894 (Default value = False) 1895 1896 Returns 1897 ------- 1898 callable 1899 converted value 1900 """ 1901 # If there is an active context, we look for a path connecting source and 1902 # destination dimensionality. If it exists, we transform the source value 1903 # by applying sequentially each transformation of the path. 1904 if self._active_ctx: 1905 1906 src_dim = self._get_dimensionality(src) 1907 dst_dim = self._get_dimensionality(dst) 1908 1909 path = find_shortest_path(self._active_ctx.graph, src_dim, dst_dim) 1910 if path: 1911 src = self.Quantity(value, src) 1912 for a, b in zip(path[:-1], path[1:]): 1913 src = self._active_ctx.transform(a, b, self, src) 1914 1915 value, src = src._magnitude, src._units 1916 1917 return super()._convert(value, src, dst, inplace) 1918 1919 def _get_compatible_units(self, input_units, group_or_system): 1920 src_dim = self._get_dimensionality(input_units) 1921 1922 ret = super()._get_compatible_units(input_units, group_or_system) 1923 1924 if self._active_ctx: 1925 ret = ret.copy() # Do not alter self._cache 1926 nodes = find_connected_nodes(self._active_ctx.graph, src_dim) 1927 if nodes: 1928 for node in nodes: 1929 ret |= self._cache.dimensional_equivalents[node] 1930 1931 return ret 1932 1933 1934class SystemRegistry(BaseRegistry): 1935 """Handle of Systems and Groups. 1936 1937 Conversion between units with different dimensions according 1938 to previously established relations (contexts). 1939 (e.g. in the spectroscopy, conversion between frequency and energy is possible) 1940 1941 Capabilities: 1942 1943 - Register systems and groups. 1944 - List systems 1945 - Get or get the default system. 1946 - Parse @system and @group directive. 1947 """ 1948 1949 def __init__(self, system=None, **kwargs): 1950 super().__init__(**kwargs) 1951 1952 #: Map system name to system. 1953 #: :type: dict[ str | System] 1954 self._systems: Dict[str, System] = {} 1955 1956 #: Maps dimensionality (UnitsContainer) to Dimensionality (UnitsContainer) 1957 self._base_units_cache = dict() 1958 1959 #: Map group name to group. 1960 #: :type: dict[ str | Group] 1961 self._groups: Dict[str, Group] = {} 1962 self._groups["root"] = self.Group("root") 1963 self._default_system = system 1964 1965 def _init_dynamic_classes(self) -> None: 1966 super()._init_dynamic_classes() 1967 self.Group = systems.build_group_class(self) 1968 self.System = systems.build_system_class(self) 1969 1970 def _after_init(self) -> None: 1971 """Invoked at the end of ``__init__``. 1972 1973 - Create default group and add all orphan units to it 1974 - Set default system 1975 """ 1976 super()._after_init() 1977 1978 #: Copy units not defined in any group to the default group 1979 if "group" in self._defaults: 1980 grp = self.get_group(self._defaults["group"], True) 1981 group_units = frozenset( 1982 [ 1983 member 1984 for group in self._groups.values() 1985 if group.name != "root" 1986 for member in group.members 1987 ] 1988 ) 1989 all_units = self.get_group("root", False).members 1990 grp.add_units(*(all_units - group_units)) 1991 1992 #: System name to be used by default. 1993 self._default_system = self._default_system or self._defaults.get( 1994 "system", None 1995 ) 1996 1997 def _register_parsers(self) -> None: 1998 super()._register_parsers() 1999 self._register_parser("@group", self._parse_group) 2000 self._register_parser("@system", self._parse_system) 2001 2002 def _parse_group(self, ifile) -> None: 2003 self.Group.from_lines(ifile.block_iter(), self.define, self.non_int_type) 2004 2005 def _parse_system(self, ifile) -> None: 2006 self.System.from_lines( 2007 ifile.block_iter(), self.get_root_units, self.non_int_type 2008 ) 2009 2010 def get_group(self, name: str, create_if_needed: bool = True) -> Group: 2011 """Return a Group. 2012 2013 Parameters 2014 ---------- 2015 name : str 2016 Name of the group to be 2017 create_if_needed : bool 2018 If True, create a group if not found. If False, raise an Exception. 2019 (Default value = True) 2020 2021 Returns 2022 ------- 2023 type 2024 Group 2025 """ 2026 if name in self._groups: 2027 return self._groups[name] 2028 2029 if not create_if_needed: 2030 raise ValueError("Unknown group %s" % name) 2031 2032 return self.Group(name) 2033 2034 @property 2035 def sys(self): 2036 return systems.Lister(self._systems) 2037 2038 @property 2039 def default_system(self) -> System: 2040 return self._default_system 2041 2042 @default_system.setter 2043 def default_system(self, name): 2044 if name: 2045 if name not in self._systems: 2046 raise ValueError("Unknown system %s" % name) 2047 2048 self._base_units_cache = {} 2049 2050 self._default_system = name 2051 2052 def get_system(self, name: str, create_if_needed: bool = True) -> System: 2053 """Return a Group. 2054 2055 Parameters 2056 ---------- 2057 name : str 2058 Name of the group to be 2059 create_if_needed : bool 2060 If True, create a group if not found. If False, raise an Exception. 2061 (Default value = True) 2062 2063 Returns 2064 ------- 2065 type 2066 System 2067 2068 """ 2069 if name in self._systems: 2070 return self._systems[name] 2071 2072 if not create_if_needed: 2073 raise ValueError("Unknown system %s" % name) 2074 2075 return self.System(name) 2076 2077 def _define(self, definition): 2078 2079 # In addition to the what is done by the BaseRegistry, 2080 # this adds all units to the `root` group. 2081 2082 definition, d, di = super()._define(definition) 2083 2084 if isinstance(definition, UnitDefinition): 2085 # We add all units to the root group 2086 self.get_group("root").add_units(definition.name) 2087 2088 return definition, d, di 2089 2090 def get_base_units( 2091 self, 2092 input_units: Union[UnitLike, Quantity], 2093 check_nonmult: bool = True, 2094 system: Union[str, System, None] = None, 2095 ) -> Tuple[Number, Unit]: 2096 """Convert unit or dict of units to the base units. 2097 2098 If any unit is non multiplicative and check_converter is True, 2099 then None is returned as the multiplicative factor. 2100 2101 Unlike BaseRegistry, in this registry root_units might be different 2102 from base_units 2103 2104 Parameters 2105 ---------- 2106 input_units : UnitsContainer or str 2107 units 2108 check_nonmult : bool 2109 if True, None will be returned as the 2110 multiplicative factor if a non-multiplicative 2111 units is found in the final Units. (Default value = True) 2112 system : 2113 (Default value = None) 2114 2115 Returns 2116 ------- 2117 type 2118 multiplicative factor, base units 2119 2120 """ 2121 2122 input_units = to_units_container(input_units) 2123 2124 f, units = self._get_base_units(input_units, check_nonmult, system) 2125 2126 return f, self.Unit(units) 2127 2128 def _get_base_units( 2129 self, 2130 input_units: UnitsContainerT, 2131 check_nonmult: bool = True, 2132 system: Union[str, System, None] = None, 2133 ): 2134 2135 if system is None: 2136 system = self._default_system 2137 2138 # The cache is only done for check_nonmult=True and the current system. 2139 if ( 2140 check_nonmult 2141 and system == self._default_system 2142 and input_units in self._base_units_cache 2143 ): 2144 return self._base_units_cache[input_units] 2145 2146 factor, units = self.get_root_units(input_units, check_nonmult) 2147 2148 if not system: 2149 return factor, units 2150 2151 # This will not be necessary after integration with the registry 2152 # as it has a UnitsContainer intermediate 2153 units = to_units_container(units, self) 2154 2155 destination_units = self.UnitsContainer() 2156 2157 bu = self.get_system(system, False).base_units 2158 2159 for unit, value in units.items(): 2160 if unit in bu: 2161 new_unit = bu[unit] 2162 new_unit = to_units_container(new_unit, self) 2163 destination_units *= new_unit ** value 2164 else: 2165 destination_units *= self.UnitsContainer({unit: value}) 2166 2167 base_factor = self.convert(factor, units, destination_units) 2168 2169 if check_nonmult: 2170 self._base_units_cache[input_units] = base_factor, destination_units 2171 2172 return base_factor, destination_units 2173 2174 def _get_compatible_units(self, input_units, group_or_system) -> FrozenSet[Unit]: 2175 2176 if group_or_system is None: 2177 group_or_system = self._default_system 2178 2179 ret = super()._get_compatible_units(input_units, group_or_system) 2180 2181 if group_or_system: 2182 if group_or_system in self._systems: 2183 members = self._systems[group_or_system].members 2184 elif group_or_system in self._groups: 2185 members = self._groups[group_or_system].members 2186 else: 2187 raise ValueError( 2188 "Unknown Group o System with name '%s'" % group_or_system 2189 ) 2190 return frozenset(ret & members) 2191 2192 return ret 2193 2194 2195class UnitRegistry(SystemRegistry, ContextRegistry, NonMultiplicativeRegistry): 2196 """The unit registry stores the definitions and relationships between units. 2197 2198 Parameters 2199 ---------- 2200 filename : 2201 path of the units definition file to load or line-iterable object. 2202 Empty to load the default definition file. 2203 None to leave the UnitRegistry empty. 2204 force_ndarray : bool 2205 convert any input, scalar or not to a numpy.ndarray. 2206 force_ndarray_like : bool 2207 convert all inputs other than duck arrays to a numpy.ndarray. 2208 default_as_delta : 2209 In the context of a multiplication of units, interpret 2210 non-multiplicative units as their *delta* counterparts. 2211 autoconvert_offset_to_baseunit : 2212 If True converts offset units in quantities are 2213 converted to their base units in multiplicative 2214 context. If False no conversion happens. 2215 on_redefinition : str 2216 action to take in case a unit is redefined. 2217 'warn', 'raise', 'ignore' 2218 auto_reduce_dimensions : 2219 If True, reduce dimensionality on appropriate operations. 2220 preprocessors : 2221 list of callables which are iteratively ran on any input expression 2222 or unit string 2223 fmt_locale : 2224 locale identifier string, used in `format_babel`. Default to None 2225 case_sensitive : bool, optional 2226 Control default case sensitivity of unit parsing. (Default: True) 2227 """ 2228 2229 def __init__( 2230 self, 2231 filename="", 2232 force_ndarray: bool = False, 2233 force_ndarray_like: bool = False, 2234 default_as_delta: bool = True, 2235 autoconvert_offset_to_baseunit: bool = False, 2236 on_redefinition: str = "warn", 2237 system=None, 2238 auto_reduce_dimensions=False, 2239 preprocessors=None, 2240 fmt_locale=None, 2241 non_int_type=float, 2242 case_sensitive: bool = True, 2243 ): 2244 2245 super().__init__( 2246 filename=filename, 2247 force_ndarray=force_ndarray, 2248 force_ndarray_like=force_ndarray_like, 2249 on_redefinition=on_redefinition, 2250 default_as_delta=default_as_delta, 2251 autoconvert_offset_to_baseunit=autoconvert_offset_to_baseunit, 2252 system=system, 2253 auto_reduce_dimensions=auto_reduce_dimensions, 2254 preprocessors=preprocessors, 2255 fmt_locale=fmt_locale, 2256 non_int_type=non_int_type, 2257 case_sensitive=case_sensitive, 2258 ) 2259 2260 def pi_theorem(self, quantities): 2261 """Builds dimensionless quantities using the Buckingham π theorem 2262 2263 Parameters 2264 ---------- 2265 quantities : dict 2266 mapping between variable name and units 2267 2268 Returns 2269 ------- 2270 list 2271 a list of dimensionless quantities expressed as dicts 2272 2273 """ 2274 return pi_theorem(quantities, self) 2275 2276 def setup_matplotlib(self, enable: bool = True) -> None: 2277 """Set up handlers for matplotlib's unit support. 2278 2279 Parameters 2280 ---------- 2281 enable : bool 2282 whether support should be enabled or disabled (Default value = True) 2283 2284 """ 2285 # Delays importing matplotlib until it's actually requested 2286 from .matplotlib import setup_matplotlib_handlers 2287 2288 setup_matplotlib_handlers(self, enable) 2289 2290 wraps = registry_helpers.wraps 2291 2292 check = registry_helpers.check 2293 2294 2295class LazyRegistry: 2296 def __init__(self, args=None, kwargs=None): 2297 self.__dict__["params"] = args or (), kwargs or {} 2298 2299 def __init(self): 2300 args, kwargs = self.__dict__["params"] 2301 kwargs["on_redefinition"] = "raise" 2302 self.__class__ = UnitRegistry 2303 self.__init__(*args, **kwargs) 2304 self._after_init() 2305 2306 def __getattr__(self, item): 2307 if item == "_on_redefinition": 2308 return "raise" 2309 self.__init() 2310 return getattr(self, item) 2311 2312 def __setattr__(self, key, value): 2313 if key == "__class__": 2314 super().__setattr__(key, value) 2315 else: 2316 self.__init() 2317 setattr(self, key, value) 2318 2319 def __getitem__(self, item): 2320 self.__init() 2321 return self[item] 2322 2323 def __call__(self, *args, **kwargs): 2324 self.__init() 2325 return self(*args, **kwargs) 2326 2327 2328class ApplicationRegistry: 2329 """A wrapper class used to distribute changes to the application registry.""" 2330 2331 def __init__(self, registry): 2332 self._registry = registry 2333 2334 def get(self): 2335 """Get the wrapped registry""" 2336 return self._registry 2337 2338 def set(self, new_registry): 2339 """Set the new registry 2340 2341 Parameters 2342 ---------- 2343 new_registry : ApplicationRegistry or LazyRegistry or UnitRegistry 2344 The new registry. 2345 2346 See Also 2347 -------- 2348 set_application_registry 2349 """ 2350 if isinstance(new_registry, type(self)): 2351 new_registry = new_registry.get() 2352 2353 if not isinstance(new_registry, (LazyRegistry, UnitRegistry)): 2354 raise TypeError("Expected UnitRegistry; got %s" % type(new_registry)) 2355 logger.debug( 2356 "Changing app registry from %r to %r.", self._registry, new_registry 2357 ) 2358 self._registry = new_registry 2359 2360 def __getattr__(self, name): 2361 return getattr(self._registry, name) 2362 2363 def __dir__(self): 2364 return dir(self._registry) 2365 2366 def __getitem__(self, item): 2367 return self._registry[item] 2368 2369 def __call__(self, *args, **kwargs): 2370 return self._registry(*args, **kwargs) 2371 2372 def __contains__(self, item): 2373 return self._registry.__contains__(item) 2374 2375 def __iter__(self): 2376 return iter(self._registry) 2377