1# Licensed under a 3-clause BSD style license - see LICENSE.rst
2"""
3This module contains helper functions and classes for handling metadata.
4"""
5
6from functools import wraps
7
8import warnings
9
10from collections import OrderedDict
11from collections.abc import Mapping
12from copy import deepcopy
13
14import numpy as np
15from astropy.utils.exceptions import AstropyWarning
16from astropy.utils.misc import dtype_bytes_or_chars
17
18
19__all__ = ['MergeConflictError', 'MergeConflictWarning', 'MERGE_STRATEGIES',
20           'common_dtype', 'MergePlus', 'MergeNpConcatenate', 'MergeStrategy',
21           'MergeStrategyMeta', 'enable_merge_strategies', 'merge', 'MetaData',
22           'MetaAttribute']
23
24
25class MergeConflictError(TypeError):
26    pass
27
28
29class MergeConflictWarning(AstropyWarning):
30    pass
31
32
33MERGE_STRATEGIES = []
34
35
36def common_dtype(arrs):
37    """
38    Use numpy to find the common dtype for a list of ndarrays.
39
40    Only allow arrays within the following fundamental numpy data types:
41    ``np.bool_``, ``np.object_``, ``np.number``, ``np.character``, ``np.void``
42
43    Parameters
44    ----------
45    arrs : list of ndarray
46        Arrays for which to find the common dtype
47
48    Returns
49    -------
50    dtype_str : str
51        String representation of dytpe (dtype ``str`` attribute)
52    """
53    def dtype(arr):
54        return getattr(arr, 'dtype', np.dtype('O'))
55
56    np_types = (np.bool_, np.object_, np.number, np.character, np.void)
57    uniq_types = set(tuple(issubclass(dtype(arr).type, np_type) for np_type in np_types)
58                     for arr in arrs)
59    if len(uniq_types) > 1:
60        # Embed into the exception the actual list of incompatible types.
61        incompat_types = [dtype(arr).name for arr in arrs]
62        tme = MergeConflictError(f'Arrays have incompatible types {incompat_types}')
63        tme._incompat_types = incompat_types
64        raise tme
65
66    arrs = [np.empty(1, dtype=dtype(arr)) for arr in arrs]
67
68    # For string-type arrays need to explicitly fill in non-zero
69    # values or the final arr_common = .. step is unpredictable.
70    for i, arr in enumerate(arrs):
71        if arr.dtype.kind in ('S', 'U'):
72            arrs[i] = [('0' if arr.dtype.kind == 'U' else b'0') *
73                       dtype_bytes_or_chars(arr.dtype)]
74
75    arr_common = np.array([arr[0] for arr in arrs])
76    return arr_common.dtype.str
77
78
79class MergeStrategyMeta(type):
80    """
81    Metaclass that registers MergeStrategy subclasses into the
82    MERGE_STRATEGIES registry.
83    """
84
85    def __new__(mcls, name, bases, members):
86        cls = super().__new__(mcls, name, bases, members)
87
88        # Wrap ``merge`` classmethod to catch any exception and re-raise as
89        # MergeConflictError.
90        if 'merge' in members and isinstance(members['merge'], classmethod):
91            orig_merge = members['merge'].__func__
92
93            @wraps(orig_merge)
94            def merge(cls, left, right):
95                try:
96                    return orig_merge(cls, left, right)
97                except Exception as err:
98                    raise MergeConflictError(err)
99
100            cls.merge = classmethod(merge)
101
102        # Register merging class (except for base MergeStrategy class)
103        if 'types' in members:
104            types = members['types']
105            if isinstance(types, tuple):
106                types = [types]
107            for left, right in reversed(types):
108                MERGE_STRATEGIES.insert(0, (left, right, cls))
109
110        return cls
111
112
113class MergeStrategy(metaclass=MergeStrategyMeta):
114    """
115    Base class for defining a strategy for merging metadata from two
116    sources, left and right, into a single output.
117
118    The primary functionality for the class is the ``merge(cls, left, right)``
119    class method.  This takes ``left`` and ``right`` side arguments and
120    returns a single merged output.
121
122    The first class attribute is ``types``.  This is defined as a list of
123    (left_types, right_types) tuples that indicate for which input types the
124    merge strategy applies.  In determining whether to apply this merge
125    strategy to a pair of (left, right) objects, a test is done:
126    ``isinstance(left, left_types) and isinstance(right, right_types)``.  For
127    example::
128
129      types = [(np.ndarray, np.ndarray),  # Two ndarrays
130               (np.ndarray, (list, tuple)),  # ndarray and (list or tuple)
131               ((list, tuple), np.ndarray)]  # (list or tuple) and ndarray
132
133    As a convenience, ``types`` can be defined as a single two-tuple instead of
134    a list of two-tuples, e.g. ``types = (np.ndarray, np.ndarray)``.
135
136    The other class attribute is ``enabled``, which defaults to ``False`` in
137    the base class.  By defining a subclass of ``MergeStrategy`` the new merge
138    strategy is automatically registered to be available for use in
139    merging. However, by default the new merge strategy is *not enabled*.  This
140    prevents inadvertently changing the behavior of unrelated code that is
141    performing metadata merge operations.
142
143    In most cases (particularly in library code that others might use) it is
144    recommended to leave custom strategies disabled and use the
145    `~astropy.utils.metadata.enable_merge_strategies` context manager to locally
146    enable the desired strategies.  However, if one is confident that the
147    new strategy will not produce unexpected behavior, then one can globally
148    enable it by setting the ``enabled`` class attribute to ``True``.
149
150    Examples
151    --------
152    Here we define a custom merge strategy that takes an int or float on
153    the left and right sides and returns a list with the two values.
154
155      >>> from astropy.utils.metadata import MergeStrategy
156      >>> class MergeNumbersAsList(MergeStrategy):
157      ...     types = ((int, float), (int, float))  # (left_types, right_types)
158      ...
159      ...     @classmethod
160      ...     def merge(cls, left, right):
161      ...         return [left, right]
162
163    """
164    # Set ``enabled = True`` to globally enable applying this merge strategy.
165    # This is not generally recommended.
166    enabled = False
167
168    # types = [(left_types, right_types), ...]
169
170
171class MergePlus(MergeStrategy):
172    """
173    Merge ``left`` and ``right`` objects using the plus operator.  This
174    merge strategy is globally enabled by default.
175    """
176    types = [(list, list), (tuple, tuple)]
177    enabled = True
178
179    @classmethod
180    def merge(cls, left, right):
181        return left + right
182
183
184class MergeNpConcatenate(MergeStrategy):
185    """
186    Merge ``left`` and ``right`` objects using np.concatenate.  This
187    merge strategy is globally enabled by default.
188
189    This will upcast a list or tuple to np.ndarray and the output is
190    always ndarray.
191    """
192    types = [(np.ndarray, np.ndarray),
193             (np.ndarray, (list, tuple)),
194             ((list, tuple), np.ndarray)]
195    enabled = True
196
197    @classmethod
198    def merge(cls, left, right):
199        left, right = np.asanyarray(left), np.asanyarray(right)
200        common_dtype([left, right])  # Ensure left and right have compatible dtype
201        return np.concatenate([left, right])
202
203
204def _both_isinstance(left, right, cls):
205    return isinstance(left, cls) and isinstance(right, cls)
206
207
208def _not_equal(left, right):
209    try:
210        return bool(left != right)
211    except Exception:
212        return True
213
214
215class _EnableMergeStrategies:
216    def __init__(self, *merge_strategies):
217        self.merge_strategies = merge_strategies
218        self.orig_enabled = {}
219        for left_type, right_type, merge_strategy in MERGE_STRATEGIES:
220            if issubclass(merge_strategy, merge_strategies):
221                self.orig_enabled[merge_strategy] = merge_strategy.enabled
222                merge_strategy.enabled = True
223
224    def __enter__(self):
225        pass
226
227    def __exit__(self, type, value, tb):
228        for merge_strategy, enabled in self.orig_enabled.items():
229            merge_strategy.enabled = enabled
230
231
232def enable_merge_strategies(*merge_strategies):
233    """
234    Context manager to temporarily enable one or more custom metadata merge
235    strategies.
236
237    Examples
238    --------
239    Here we define a custom merge strategy that takes an int or float on
240    the left and right sides and returns a list with the two values.
241
242      >>> from astropy.utils.metadata import MergeStrategy
243      >>> class MergeNumbersAsList(MergeStrategy):
244      ...     types = ((int, float),  # left side types
245      ...              (int, float))  # right side types
246      ...     @classmethod
247      ...     def merge(cls, left, right):
248      ...         return [left, right]
249
250    By defining this class the merge strategy is automatically registered to be
251    available for use in merging. However, by default new merge strategies are
252    *not enabled*.  This prevents inadvertently changing the behavior of
253    unrelated code that is performing metadata merge operations.
254
255    In order to use the new merge strategy, use this context manager as in the
256    following example::
257
258      >>> from astropy.table import Table, vstack
259      >>> from astropy.utils.metadata import enable_merge_strategies
260      >>> t1 = Table([[1]], names=['a'])
261      >>> t2 = Table([[2]], names=['a'])
262      >>> t1.meta = {'m': 1}
263      >>> t2.meta = {'m': 2}
264      >>> with enable_merge_strategies(MergeNumbersAsList):
265      ...    t12 = vstack([t1, t2])
266      >>> t12.meta['m']
267      [1, 2]
268
269    One can supply further merge strategies as additional arguments to the
270    context manager.
271
272    As a convenience, the enabling operation is actually done by checking
273    whether the registered strategies are subclasses of the context manager
274    arguments.  This means one can define a related set of merge strategies and
275    then enable them all at once by enabling the base class.  As a trivial
276    example, *all* registered merge strategies can be enabled with::
277
278      >>> with enable_merge_strategies(MergeStrategy):
279      ...    t12 = vstack([t1, t2])
280
281    Parameters
282    ----------
283    *merge_strategies : `~astropy.utils.metadata.MergeStrategy`
284        Merge strategies that will be enabled.
285
286    """
287
288    return _EnableMergeStrategies(*merge_strategies)
289
290
291def _warn_str_func(key, left, right):
292    out = ('Cannot merge meta key {0!r} types {1!r}'
293           ' and {2!r}, choosing {0}={3!r}'
294           .format(key, type(left), type(right), right))
295    return out
296
297
298def _error_str_func(key, left, right):
299    out = f'Cannot merge meta key {key!r} types {type(left)!r} and {type(right)!r}'
300    return out
301
302
303def merge(left, right, merge_func=None, metadata_conflicts='warn',
304          warn_str_func=_warn_str_func,
305          error_str_func=_error_str_func):
306    """
307    Merge the ``left`` and ``right`` metadata objects.
308
309    This is a simplistic and limited implementation at this point.
310    """
311    if not _both_isinstance(left, right, dict):
312        raise MergeConflictError('Can only merge two dict-based objects')
313
314    out = deepcopy(left)
315
316    for key, val in right.items():
317        # If no conflict then insert val into out dict and continue
318        if key not in out:
319            out[key] = deepcopy(val)
320            continue
321
322        # There is a conflict that must be resolved
323        if _both_isinstance(left[key], right[key], dict):
324            out[key] = merge(left[key], right[key], merge_func,
325                             metadata_conflicts=metadata_conflicts)
326
327        else:
328            try:
329                if merge_func is None:
330                    for left_type, right_type, merge_cls in MERGE_STRATEGIES:
331                        if not merge_cls.enabled:
332                            continue
333                        if (isinstance(left[key], left_type) and
334                                isinstance(right[key], right_type)):
335                            out[key] = merge_cls.merge(left[key], right[key])
336                            break
337                    else:
338                        raise MergeConflictError
339                else:
340                    out[key] = merge_func(left[key], right[key])
341            except MergeConflictError:
342
343                # Pick the metadata item that is not None, or they are both not
344                # None, then if they are equal, there is no conflict, and if
345                # they are different, there is a conflict and we pick the one
346                # on the right (or raise an error).
347
348                if left[key] is None:
349                    # This may not seem necessary since out[key] gets set to
350                    # right[key], but not all objects support != which is
351                    # needed for one of the if clauses.
352                    out[key] = right[key]
353                elif right[key] is None:
354                    out[key] = left[key]
355                elif _not_equal(left[key], right[key]):
356                    if metadata_conflicts == 'warn':
357                        warnings.warn(warn_str_func(key, left[key], right[key]),
358                                      MergeConflictWarning)
359                    elif metadata_conflicts == 'error':
360                        raise MergeConflictError(error_str_func(key, left[key], right[key]))
361                    elif metadata_conflicts != 'silent':
362                        raise ValueError('metadata_conflicts argument must be one '
363                                         'of "silent", "warn", or "error"')
364                    out[key] = right[key]
365                else:
366                    out[key] = right[key]
367
368    return out
369
370
371class MetaData:
372    """
373    A descriptor for classes that have a ``meta`` property.
374
375    This can be set to any valid `~collections.abc.Mapping`.
376
377    Parameters
378    ----------
379    doc : `str`, optional
380        Documentation for the attribute of the class.
381        Default is ``""``.
382
383        .. versionadded:: 1.2
384
385    copy : `bool`, optional
386        If ``True`` the the value is deepcopied before setting, otherwise it
387        is saved as reference.
388        Default is ``True``.
389
390        .. versionadded:: 1.2
391    """
392
393    def __init__(self, doc="", copy=True):
394        self.__doc__ = doc
395        self.copy = copy
396
397    def __get__(self, instance, owner):
398        if instance is None:
399            return self
400        if not hasattr(instance, '_meta'):
401            instance._meta = OrderedDict()
402        return instance._meta
403
404    def __set__(self, instance, value):
405        if value is None:
406            instance._meta = OrderedDict()
407        else:
408            if isinstance(value, Mapping):
409                if self.copy:
410                    instance._meta = deepcopy(value)
411                else:
412                    instance._meta = value
413            else:
414                raise TypeError("meta attribute must be dict-like")
415
416
417class MetaAttribute:
418    """
419    Descriptor to define custom attribute which gets stored in the object
420    ``meta`` dict and can have a defined default.
421
422    This descriptor is intended to provide a convenient way to add attributes
423    to a subclass of a complex class such as ``Table`` or ``NDData``.
424
425    This requires that the object has an attribute ``meta`` which is a
426    dict-like object.  The value of the MetaAttribute will be stored in a
427    new dict meta['__attributes__'] that is created when required.
428
429    Classes that define MetaAttributes are encouraged to support initializing
430    the attributes via the class ``__init__``.  For example::
431
432        for attr in list(kwargs):
433            descr = getattr(self.__class__, attr, None)
434            if isinstance(descr, MetaAttribute):
435                setattr(self, attr, kwargs.pop(attr))
436
437    The name of a ``MetaAttribute`` cannot be the same as any of the following:
438
439    - Keyword argument in the owner class ``__init__``
440    - Method or attribute of the "parent class", where the parent class is
441      taken to be ``owner.__mro__[1]``.
442
443    :param default: default value
444
445    """
446    def __init__(self, default=None):
447        self.default = default
448
449    def __get__(self, instance, owner):
450        # When called without an instance, return self to allow access
451        # to descriptor attributes.
452        if instance is None:
453            return self
454
455        # If default is None and value has not been set already then return None
456        # without doing touching meta['__attributes__'] at all. This helps e.g.
457        # with the Table._hidden_columns attribute so it doesn't auto-create
458        # meta['__attributes__'] always.
459        if (self.default is None
460                and self.name not in instance.meta.get('__attributes__', {})):
461            return None
462
463        # Get the __attributes__ dict and create if not there already.
464        attributes = instance.meta.setdefault('__attributes__', {})
465        try:
466            value = attributes[self.name]
467        except KeyError:
468            if self.default is not None:
469                attributes[self.name] = deepcopy(self.default)
470            # Return either specified default or None
471            value = attributes.get(self.name)
472        return value
473
474    def __set__(self, instance, value):
475        # Get the __attributes__ dict and create if not there already.
476        attributes = instance.meta.setdefault('__attributes__', {})
477        attributes[self.name] = value
478
479    def __delete__(self, instance):
480        # Remove this attribute from meta['__attributes__'] if it exists.
481        if '__attributes__' in instance.meta:
482            attrs = instance.meta['__attributes__']
483            if self.name in attrs:
484                del attrs[self.name]
485            # If this was the last attribute then remove the meta key as well
486            if not attrs:
487                del instance.meta['__attributes__']
488
489    def __set_name__(self, owner, name):
490        import inspect
491        params = [param.name for param in inspect.signature(owner).parameters.values()
492                  if param.kind not in (inspect.Parameter.VAR_KEYWORD,
493                                        inspect.Parameter.VAR_POSITIONAL)]
494
495        # Reject names from existing params or best guess at parent class
496        if name in params or hasattr(owner.__mro__[1], name):
497            raise ValueError(f'{name} not allowed as {self.__class__.__name__}')
498
499        self.name = name
500
501    def __repr__(self):
502        return f'<{self.__class__.__name__} name={self.name} default={self.default}>'
503