1"""
2    pint.systems
3    ~~~~~~~~~~~~
4
5    Functions and classes related to system definitions and conversions.
6
7    :copyright: 2016 by Pint Authors, see AUTHORS for more details.
8    :license: BSD, see LICENSE for more details.
9"""
10
11import re
12
13from .babel_names import _babel_systems
14from .compat import babel_parse
15from .definitions import Definition, UnitDefinition
16from .errors import DefinitionSyntaxError, RedefinitionError
17from .util import (
18    SharedRegistryObject,
19    SourceIterator,
20    getattr_maybe_raise,
21    logger,
22    to_units_container,
23)
24
25
26class Group(SharedRegistryObject):
27    """A group is a set of units.
28
29    Units can be added directly or by including other groups.
30
31    Members are computed dynamically, that is if a unit is added to a group X
32    all groups that include X are affected.
33
34    The group belongs to one Registry.
35
36    It can be specified in the definition file as::
37
38        @group <name> [using <group 1>, ..., <group N>]
39            <definition 1>
40            ...
41            <definition N>
42        @end
43    """
44
45    #: Regex to match the header parts of a definition.
46    _header_re = re.compile(r"@group\s+(?P<name>\w+)\s*(using\s(?P<used_groups>.*))*")
47
48    def __init__(self, name):
49        """
50        :param name: Name of the group. If not given, a root Group will be created.
51        :type name: str
52        :param groups: dictionary like object groups and system.
53                        The newly created group will be added after creation.
54        :type groups: dict[str | Group]
55        """
56
57        # The name of the group.
58        #: type: str
59        self.name = name
60
61        #: Names of the units in this group.
62        #: :type: set[str]
63        self._unit_names = set()
64
65        #: Names of the groups in this group.
66        #: :type: set[str]
67        self._used_groups = set()
68
69        #: Names of the groups in which this group is contained.
70        #: :type: set[str]
71        self._used_by = set()
72
73        # Add this group to the group dictionary
74        self._REGISTRY._groups[self.name] = self
75
76        if name != "root":
77            # All groups are added to root group
78            self._REGISTRY._groups["root"].add_groups(name)
79
80        #: A cache of the included units.
81        #: None indicates that the cache has been invalidated.
82        #: :type: frozenset[str] | None
83        self._computed_members = None
84
85    @property
86    def members(self):
87        """Names of the units that are members of the group.
88
89        Calculated to include to all units in all included _used_groups.
90
91        """
92        if self._computed_members is None:
93            self._computed_members = set(self._unit_names)
94
95            for _, group in self.iter_used_groups():
96                self._computed_members |= group.members
97
98            self._computed_members = frozenset(self._computed_members)
99
100        return self._computed_members
101
102    def invalidate_members(self):
103        """Invalidate computed members in this Group and all parent nodes."""
104        self._computed_members = None
105        d = self._REGISTRY._groups
106        for name in self._used_by:
107            d[name].invalidate_members()
108
109    def iter_used_groups(self):
110        pending = set(self._used_groups)
111        d = self._REGISTRY._groups
112        while pending:
113            name = pending.pop()
114            group = d[name]
115            pending |= group._used_groups
116            yield name, d[name]
117
118    def is_used_group(self, group_name):
119        for name, _ in self.iter_used_groups():
120            if name == group_name:
121                return True
122        return False
123
124    def add_units(self, *unit_names):
125        """Add units to group."""
126        for unit_name in unit_names:
127            self._unit_names.add(unit_name)
128
129        self.invalidate_members()
130
131    @property
132    def non_inherited_unit_names(self):
133        return frozenset(self._unit_names)
134
135    def remove_units(self, *unit_names):
136        """Remove units from group."""
137        for unit_name in unit_names:
138            self._unit_names.remove(unit_name)
139
140        self.invalidate_members()
141
142    def add_groups(self, *group_names):
143        """Add groups to group."""
144        d = self._REGISTRY._groups
145        for group_name in group_names:
146
147            grp = d[group_name]
148
149            if grp.is_used_group(self.name):
150                raise ValueError(
151                    "Cyclic relationship found between %s and %s"
152                    % (self.name, group_name)
153                )
154
155            self._used_groups.add(group_name)
156            grp._used_by.add(self.name)
157
158        self.invalidate_members()
159
160    def remove_groups(self, *group_names):
161        """Remove groups from group."""
162        d = self._REGISTRY._groups
163        for group_name in group_names:
164            grp = d[group_name]
165
166            self._used_groups.remove(group_name)
167            grp._used_by.remove(self.name)
168
169        self.invalidate_members()
170
171    @classmethod
172    def from_lines(cls, lines, define_func, non_int_type=float):
173        """Return a Group object parsing an iterable of lines.
174
175        Parameters
176        ----------
177        lines : list[str]
178            iterable
179        define_func : callable
180            Function to define a unit in the registry; it must accept a single string as
181            a parameter.
182
183        Returns
184        -------
185
186        """
187        lines = SourceIterator(lines)
188        lineno, header = next(lines)
189
190        r = cls._header_re.search(header)
191
192        if r is None:
193            raise ValueError("Invalid Group header syntax: '%s'" % header)
194
195        name = r.groupdict()["name"].strip()
196        groups = r.groupdict()["used_groups"]
197        if groups:
198            group_names = tuple(a.strip() for a in groups.split(","))
199        else:
200            group_names = ()
201
202        unit_names = []
203        for lineno, line in lines:
204            if "=" in line:
205                # Is a definition
206                definition = Definition.from_string(line, non_int_type=non_int_type)
207                if not isinstance(definition, UnitDefinition):
208                    raise DefinitionSyntaxError(
209                        "Only UnitDefinition are valid inside _used_groups, not "
210                        + str(definition),
211                        lineno=lineno,
212                    )
213
214                try:
215                    define_func(definition)
216                except (RedefinitionError, DefinitionSyntaxError) as ex:
217                    if ex.lineno is None:
218                        ex.lineno = lineno
219                    raise ex
220
221                unit_names.append(definition.name)
222            else:
223                unit_names.append(line.strip())
224
225        grp = cls(name)
226
227        grp.add_units(*unit_names)
228
229        if group_names:
230            grp.add_groups(*group_names)
231
232        return grp
233
234    def __getattr__(self, item):
235        getattr_maybe_raise(self, item)
236        return self._REGISTRY
237
238
239class System(SharedRegistryObject):
240    """A system is a Group plus a set of base units.
241
242    Members are computed dynamically, that is if a unit is added to a group X
243    all groups that include X are affected.
244
245    The System belongs to one Registry.
246
247    It can be specified in the definition file as::
248
249        @system <name> [using <group 1>, ..., <group N>]
250            <rule 1>
251            ...
252            <rule N>
253        @end
254
255    The syntax for the rule is:
256
257        new_unit_name : old_unit_name
258
259    where:
260        - old_unit_name: a root unit part which is going to be removed from the system.
261        - new_unit_name: a non root unit which is going to replace the old_unit.
262
263    If the new_unit_name and the old_unit_name, the later and the colon can be omitted.
264    """
265
266    #: Regex to match the header parts of a context.
267    _header_re = re.compile(r"@system\s+(?P<name>\w+)\s*(using\s(?P<used_groups>.*))*")
268
269    def __init__(self, name):
270        """
271        :param name: Name of the group
272        :type name: str
273        """
274
275        #: Name of the system
276        #: :type: str
277        self.name = name
278
279        #: Maps root unit names to a dict indicating the new unit and its exponent.
280        #: :type: dict[str, dict[str, number]]]
281        self.base_units = {}
282
283        #: Derived unit names.
284        #: :type: set(str)
285        self.derived_units = set()
286
287        #: Names of the _used_groups in used by this system.
288        #: :type: set(str)
289        self._used_groups = set()
290
291        #: :type: frozenset | None
292        self._computed_members = None
293
294        # Add this system to the system dictionary
295        self._REGISTRY._systems[self.name] = self
296
297    def __dir__(self):
298        return list(self.members)
299
300    def __getattr__(self, item):
301        getattr_maybe_raise(self, item)
302        u = getattr(self._REGISTRY, self.name + "_" + item, None)
303        if u is not None:
304            return u
305        return getattr(self._REGISTRY, item)
306
307    @property
308    def members(self):
309        d = self._REGISTRY._groups
310        if self._computed_members is None:
311            self._computed_members = set()
312
313            for group_name in self._used_groups:
314                try:
315                    self._computed_members |= d[group_name].members
316                except KeyError:
317                    logger.warning(
318                        "Could not resolve {} in System {}".format(
319                            group_name, self.name
320                        )
321                    )
322
323            self._computed_members = frozenset(self._computed_members)
324
325        return self._computed_members
326
327    def invalidate_members(self):
328        """Invalidate computed members in this Group and all parent nodes."""
329        self._computed_members = None
330
331    def add_groups(self, *group_names):
332        """Add groups to group."""
333        self._used_groups |= set(group_names)
334
335        self.invalidate_members()
336
337    def remove_groups(self, *group_names):
338        """Remove groups from group."""
339        self._used_groups -= set(group_names)
340
341        self.invalidate_members()
342
343    def format_babel(self, locale):
344        """translate the name of the system."""
345        if locale and self.name in _babel_systems:
346            name = _babel_systems[self.name]
347            locale = babel_parse(locale)
348            return locale.measurement_systems[name]
349        return self.name
350
351    @classmethod
352    def from_lines(cls, lines, get_root_func, non_int_type=float):
353        lines = SourceIterator(lines)
354
355        lineno, header = next(lines)
356
357        r = cls._header_re.search(header)
358
359        if r is None:
360            raise ValueError("Invalid System header syntax '%s'" % header)
361
362        name = r.groupdict()["name"].strip()
363        groups = r.groupdict()["used_groups"]
364
365        # If the systems has no group, it automatically uses the root group.
366        if groups:
367            group_names = tuple(a.strip() for a in groups.split(","))
368        else:
369            group_names = ("root",)
370
371        base_unit_names = {}
372        derived_unit_names = []
373        for lineno, line in lines:
374            line = line.strip()
375
376            # We would identify a
377            #  - old_unit: a root unit part which is going to be removed from the system.
378            #  - new_unit: a non root unit which is going to replace the old_unit.
379
380            if ":" in line:
381                # The syntax is new_unit:old_unit
382
383                new_unit, old_unit = line.split(":")
384                new_unit, old_unit = new_unit.strip(), old_unit.strip()
385
386                # The old unit MUST be a root unit, if not raise an error.
387                if old_unit != str(get_root_func(old_unit)[1]):
388                    raise ValueError(
389                        "In `%s`, the unit at the right of the `:` must be a root unit."
390                        % line
391                    )
392
393                # Here we find new_unit expanded in terms of root_units
394                new_unit_expanded = to_units_container(
395                    get_root_func(new_unit)[1], cls._REGISTRY
396                )
397
398                # We require that the old unit is present in the new_unit expanded
399                if old_unit not in new_unit_expanded:
400                    raise ValueError("Old unit must be a component of new unit")
401
402                # Here we invert the equation, in other words
403                # we write old units in terms new unit and expansion
404                new_unit_dict = {
405                    new_unit: -1 / value
406                    for new_unit, value in new_unit_expanded.items()
407                    if new_unit != old_unit
408                }
409                new_unit_dict[new_unit] = 1 / new_unit_expanded[old_unit]
410
411                base_unit_names[old_unit] = new_unit_dict
412
413            else:
414                # The syntax is new_unit
415                # old_unit is inferred as the root unit with the same dimensionality.
416
417                new_unit = line
418                old_unit_dict = to_units_container(get_root_func(line)[1])
419
420                if len(old_unit_dict) != 1:
421                    raise ValueError(
422                        "The new base must be a root dimension if not discarded unit is specified."
423                    )
424
425                old_unit, value = dict(old_unit_dict).popitem()
426
427                base_unit_names[old_unit] = {new_unit: 1 / value}
428
429        system = cls(name)
430
431        system.add_groups(*group_names)
432
433        system.base_units.update(**base_unit_names)
434        system.derived_units |= set(derived_unit_names)
435
436        return system
437
438
439class Lister:
440    def __init__(self, d):
441        self.d = d
442
443    def __dir__(self):
444        return list(self.d.keys())
445
446    def __getattr__(self, item):
447        getattr_maybe_raise(self, item)
448        return self.d[item]
449
450
451_Group = Group
452_System = System
453
454
455def build_group_class(registry):
456    class Group(_Group):
457        _REGISTRY = registry
458
459    return Group
460
461
462def build_system_class(registry):
463    class System(_System):
464        _REGISTRY = registry
465
466    return System
467