1""" 2 pint.systems 3 ~~~~~~~~~~~~ 4 5 Functions and classes related to system definitions and conversions. 6 7 :copyright: 2016 by Pint Authors, see AUTHORS for more details. 8 :license: BSD, see LICENSE for more details. 9""" 10 11import re 12 13from .babel_names import _babel_systems 14from .compat import babel_parse 15from .definitions import Definition, UnitDefinition 16from .errors import DefinitionSyntaxError, RedefinitionError 17from .util import ( 18 SharedRegistryObject, 19 SourceIterator, 20 getattr_maybe_raise, 21 logger, 22 to_units_container, 23) 24 25 26class Group(SharedRegistryObject): 27 """A group is a set of units. 28 29 Units can be added directly or by including other groups. 30 31 Members are computed dynamically, that is if a unit is added to a group X 32 all groups that include X are affected. 33 34 The group belongs to one Registry. 35 36 It can be specified in the definition file as:: 37 38 @group <name> [using <group 1>, ..., <group N>] 39 <definition 1> 40 ... 41 <definition N> 42 @end 43 """ 44 45 #: Regex to match the header parts of a definition. 46 _header_re = re.compile(r"@group\s+(?P<name>\w+)\s*(using\s(?P<used_groups>.*))*") 47 48 def __init__(self, name): 49 """ 50 :param name: Name of the group. If not given, a root Group will be created. 51 :type name: str 52 :param groups: dictionary like object groups and system. 53 The newly created group will be added after creation. 54 :type groups: dict[str | Group] 55 """ 56 57 # The name of the group. 58 #: type: str 59 self.name = name 60 61 #: Names of the units in this group. 62 #: :type: set[str] 63 self._unit_names = set() 64 65 #: Names of the groups in this group. 66 #: :type: set[str] 67 self._used_groups = set() 68 69 #: Names of the groups in which this group is contained. 70 #: :type: set[str] 71 self._used_by = set() 72 73 # Add this group to the group dictionary 74 self._REGISTRY._groups[self.name] = self 75 76 if name != "root": 77 # All groups are added to root group 78 self._REGISTRY._groups["root"].add_groups(name) 79 80 #: A cache of the included units. 81 #: None indicates that the cache has been invalidated. 82 #: :type: frozenset[str] | None 83 self._computed_members = None 84 85 @property 86 def members(self): 87 """Names of the units that are members of the group. 88 89 Calculated to include to all units in all included _used_groups. 90 91 """ 92 if self._computed_members is None: 93 self._computed_members = set(self._unit_names) 94 95 for _, group in self.iter_used_groups(): 96 self._computed_members |= group.members 97 98 self._computed_members = frozenset(self._computed_members) 99 100 return self._computed_members 101 102 def invalidate_members(self): 103 """Invalidate computed members in this Group and all parent nodes.""" 104 self._computed_members = None 105 d = self._REGISTRY._groups 106 for name in self._used_by: 107 d[name].invalidate_members() 108 109 def iter_used_groups(self): 110 pending = set(self._used_groups) 111 d = self._REGISTRY._groups 112 while pending: 113 name = pending.pop() 114 group = d[name] 115 pending |= group._used_groups 116 yield name, d[name] 117 118 def is_used_group(self, group_name): 119 for name, _ in self.iter_used_groups(): 120 if name == group_name: 121 return True 122 return False 123 124 def add_units(self, *unit_names): 125 """Add units to group.""" 126 for unit_name in unit_names: 127 self._unit_names.add(unit_name) 128 129 self.invalidate_members() 130 131 @property 132 def non_inherited_unit_names(self): 133 return frozenset(self._unit_names) 134 135 def remove_units(self, *unit_names): 136 """Remove units from group.""" 137 for unit_name in unit_names: 138 self._unit_names.remove(unit_name) 139 140 self.invalidate_members() 141 142 def add_groups(self, *group_names): 143 """Add groups to group.""" 144 d = self._REGISTRY._groups 145 for group_name in group_names: 146 147 grp = d[group_name] 148 149 if grp.is_used_group(self.name): 150 raise ValueError( 151 "Cyclic relationship found between %s and %s" 152 % (self.name, group_name) 153 ) 154 155 self._used_groups.add(group_name) 156 grp._used_by.add(self.name) 157 158 self.invalidate_members() 159 160 def remove_groups(self, *group_names): 161 """Remove groups from group.""" 162 d = self._REGISTRY._groups 163 for group_name in group_names: 164 grp = d[group_name] 165 166 self._used_groups.remove(group_name) 167 grp._used_by.remove(self.name) 168 169 self.invalidate_members() 170 171 @classmethod 172 def from_lines(cls, lines, define_func, non_int_type=float): 173 """Return a Group object parsing an iterable of lines. 174 175 Parameters 176 ---------- 177 lines : list[str] 178 iterable 179 define_func : callable 180 Function to define a unit in the registry; it must accept a single string as 181 a parameter. 182 183 Returns 184 ------- 185 186 """ 187 lines = SourceIterator(lines) 188 lineno, header = next(lines) 189 190 r = cls._header_re.search(header) 191 192 if r is None: 193 raise ValueError("Invalid Group header syntax: '%s'" % header) 194 195 name = r.groupdict()["name"].strip() 196 groups = r.groupdict()["used_groups"] 197 if groups: 198 group_names = tuple(a.strip() for a in groups.split(",")) 199 else: 200 group_names = () 201 202 unit_names = [] 203 for lineno, line in lines: 204 if "=" in line: 205 # Is a definition 206 definition = Definition.from_string(line, non_int_type=non_int_type) 207 if not isinstance(definition, UnitDefinition): 208 raise DefinitionSyntaxError( 209 "Only UnitDefinition are valid inside _used_groups, not " 210 + str(definition), 211 lineno=lineno, 212 ) 213 214 try: 215 define_func(definition) 216 except (RedefinitionError, DefinitionSyntaxError) as ex: 217 if ex.lineno is None: 218 ex.lineno = lineno 219 raise ex 220 221 unit_names.append(definition.name) 222 else: 223 unit_names.append(line.strip()) 224 225 grp = cls(name) 226 227 grp.add_units(*unit_names) 228 229 if group_names: 230 grp.add_groups(*group_names) 231 232 return grp 233 234 def __getattr__(self, item): 235 getattr_maybe_raise(self, item) 236 return self._REGISTRY 237 238 239class System(SharedRegistryObject): 240 """A system is a Group plus a set of base units. 241 242 Members are computed dynamically, that is if a unit is added to a group X 243 all groups that include X are affected. 244 245 The System belongs to one Registry. 246 247 It can be specified in the definition file as:: 248 249 @system <name> [using <group 1>, ..., <group N>] 250 <rule 1> 251 ... 252 <rule N> 253 @end 254 255 The syntax for the rule is: 256 257 new_unit_name : old_unit_name 258 259 where: 260 - old_unit_name: a root unit part which is going to be removed from the system. 261 - new_unit_name: a non root unit which is going to replace the old_unit. 262 263 If the new_unit_name and the old_unit_name, the later and the colon can be omitted. 264 """ 265 266 #: Regex to match the header parts of a context. 267 _header_re = re.compile(r"@system\s+(?P<name>\w+)\s*(using\s(?P<used_groups>.*))*") 268 269 def __init__(self, name): 270 """ 271 :param name: Name of the group 272 :type name: str 273 """ 274 275 #: Name of the system 276 #: :type: str 277 self.name = name 278 279 #: Maps root unit names to a dict indicating the new unit and its exponent. 280 #: :type: dict[str, dict[str, number]]] 281 self.base_units = {} 282 283 #: Derived unit names. 284 #: :type: set(str) 285 self.derived_units = set() 286 287 #: Names of the _used_groups in used by this system. 288 #: :type: set(str) 289 self._used_groups = set() 290 291 #: :type: frozenset | None 292 self._computed_members = None 293 294 # Add this system to the system dictionary 295 self._REGISTRY._systems[self.name] = self 296 297 def __dir__(self): 298 return list(self.members) 299 300 def __getattr__(self, item): 301 getattr_maybe_raise(self, item) 302 u = getattr(self._REGISTRY, self.name + "_" + item, None) 303 if u is not None: 304 return u 305 return getattr(self._REGISTRY, item) 306 307 @property 308 def members(self): 309 d = self._REGISTRY._groups 310 if self._computed_members is None: 311 self._computed_members = set() 312 313 for group_name in self._used_groups: 314 try: 315 self._computed_members |= d[group_name].members 316 except KeyError: 317 logger.warning( 318 "Could not resolve {} in System {}".format( 319 group_name, self.name 320 ) 321 ) 322 323 self._computed_members = frozenset(self._computed_members) 324 325 return self._computed_members 326 327 def invalidate_members(self): 328 """Invalidate computed members in this Group and all parent nodes.""" 329 self._computed_members = None 330 331 def add_groups(self, *group_names): 332 """Add groups to group.""" 333 self._used_groups |= set(group_names) 334 335 self.invalidate_members() 336 337 def remove_groups(self, *group_names): 338 """Remove groups from group.""" 339 self._used_groups -= set(group_names) 340 341 self.invalidate_members() 342 343 def format_babel(self, locale): 344 """translate the name of the system.""" 345 if locale and self.name in _babel_systems: 346 name = _babel_systems[self.name] 347 locale = babel_parse(locale) 348 return locale.measurement_systems[name] 349 return self.name 350 351 @classmethod 352 def from_lines(cls, lines, get_root_func, non_int_type=float): 353 lines = SourceIterator(lines) 354 355 lineno, header = next(lines) 356 357 r = cls._header_re.search(header) 358 359 if r is None: 360 raise ValueError("Invalid System header syntax '%s'" % header) 361 362 name = r.groupdict()["name"].strip() 363 groups = r.groupdict()["used_groups"] 364 365 # If the systems has no group, it automatically uses the root group. 366 if groups: 367 group_names = tuple(a.strip() for a in groups.split(",")) 368 else: 369 group_names = ("root",) 370 371 base_unit_names = {} 372 derived_unit_names = [] 373 for lineno, line in lines: 374 line = line.strip() 375 376 # We would identify a 377 # - old_unit: a root unit part which is going to be removed from the system. 378 # - new_unit: a non root unit which is going to replace the old_unit. 379 380 if ":" in line: 381 # The syntax is new_unit:old_unit 382 383 new_unit, old_unit = line.split(":") 384 new_unit, old_unit = new_unit.strip(), old_unit.strip() 385 386 # The old unit MUST be a root unit, if not raise an error. 387 if old_unit != str(get_root_func(old_unit)[1]): 388 raise ValueError( 389 "In `%s`, the unit at the right of the `:` must be a root unit." 390 % line 391 ) 392 393 # Here we find new_unit expanded in terms of root_units 394 new_unit_expanded = to_units_container( 395 get_root_func(new_unit)[1], cls._REGISTRY 396 ) 397 398 # We require that the old unit is present in the new_unit expanded 399 if old_unit not in new_unit_expanded: 400 raise ValueError("Old unit must be a component of new unit") 401 402 # Here we invert the equation, in other words 403 # we write old units in terms new unit and expansion 404 new_unit_dict = { 405 new_unit: -1 / value 406 for new_unit, value in new_unit_expanded.items() 407 if new_unit != old_unit 408 } 409 new_unit_dict[new_unit] = 1 / new_unit_expanded[old_unit] 410 411 base_unit_names[old_unit] = new_unit_dict 412 413 else: 414 # The syntax is new_unit 415 # old_unit is inferred as the root unit with the same dimensionality. 416 417 new_unit = line 418 old_unit_dict = to_units_container(get_root_func(line)[1]) 419 420 if len(old_unit_dict) != 1: 421 raise ValueError( 422 "The new base must be a root dimension if not discarded unit is specified." 423 ) 424 425 old_unit, value = dict(old_unit_dict).popitem() 426 427 base_unit_names[old_unit] = {new_unit: 1 / value} 428 429 system = cls(name) 430 431 system.add_groups(*group_names) 432 433 system.base_units.update(**base_unit_names) 434 system.derived_units |= set(derived_unit_names) 435 436 return system 437 438 439class Lister: 440 def __init__(self, d): 441 self.d = d 442 443 def __dir__(self): 444 return list(self.d.keys()) 445 446 def __getattr__(self, item): 447 getattr_maybe_raise(self, item) 448 return self.d[item] 449 450 451_Group = Group 452_System = System 453 454 455def build_group_class(registry): 456 class Group(_Group): 457 _REGISTRY = registry 458 459 return Group 460 461 462def build_system_class(registry): 463 class System(_System): 464 _REGISTRY = registry 465 466 return System 467