1# Licensed under a 3-clause BSD style license - see LICENSE.rst 2""" 3This module contains helper functions and classes for handling metadata. 4""" 5 6from functools import wraps 7 8import warnings 9 10from collections import OrderedDict 11from collections.abc import Mapping 12from copy import deepcopy 13 14import numpy as np 15from astropy.utils.exceptions import AstropyWarning 16from astropy.utils.misc import dtype_bytes_or_chars 17 18 19__all__ = ['MergeConflictError', 'MergeConflictWarning', 'MERGE_STRATEGIES', 20 'common_dtype', 'MergePlus', 'MergeNpConcatenate', 'MergeStrategy', 21 'MergeStrategyMeta', 'enable_merge_strategies', 'merge', 'MetaData', 22 'MetaAttribute'] 23 24 25class MergeConflictError(TypeError): 26 pass 27 28 29class MergeConflictWarning(AstropyWarning): 30 pass 31 32 33MERGE_STRATEGIES = [] 34 35 36def common_dtype(arrs): 37 """ 38 Use numpy to find the common dtype for a list of ndarrays. 39 40 Only allow arrays within the following fundamental numpy data types: 41 ``np.bool_``, ``np.object_``, ``np.number``, ``np.character``, ``np.void`` 42 43 Parameters 44 ---------- 45 arrs : list of ndarray 46 Arrays for which to find the common dtype 47 48 Returns 49 ------- 50 dtype_str : str 51 String representation of dytpe (dtype ``str`` attribute) 52 """ 53 def dtype(arr): 54 return getattr(arr, 'dtype', np.dtype('O')) 55 56 np_types = (np.bool_, np.object_, np.number, np.character, np.void) 57 uniq_types = set(tuple(issubclass(dtype(arr).type, np_type) for np_type in np_types) 58 for arr in arrs) 59 if len(uniq_types) > 1: 60 # Embed into the exception the actual list of incompatible types. 61 incompat_types = [dtype(arr).name for arr in arrs] 62 tme = MergeConflictError(f'Arrays have incompatible types {incompat_types}') 63 tme._incompat_types = incompat_types 64 raise tme 65 66 arrs = [np.empty(1, dtype=dtype(arr)) for arr in arrs] 67 68 # For string-type arrays need to explicitly fill in non-zero 69 # values or the final arr_common = .. step is unpredictable. 70 for i, arr in enumerate(arrs): 71 if arr.dtype.kind in ('S', 'U'): 72 arrs[i] = [('0' if arr.dtype.kind == 'U' else b'0') * 73 dtype_bytes_or_chars(arr.dtype)] 74 75 arr_common = np.array([arr[0] for arr in arrs]) 76 return arr_common.dtype.str 77 78 79class MergeStrategyMeta(type): 80 """ 81 Metaclass that registers MergeStrategy subclasses into the 82 MERGE_STRATEGIES registry. 83 """ 84 85 def __new__(mcls, name, bases, members): 86 cls = super().__new__(mcls, name, bases, members) 87 88 # Wrap ``merge`` classmethod to catch any exception and re-raise as 89 # MergeConflictError. 90 if 'merge' in members and isinstance(members['merge'], classmethod): 91 orig_merge = members['merge'].__func__ 92 93 @wraps(orig_merge) 94 def merge(cls, left, right): 95 try: 96 return orig_merge(cls, left, right) 97 except Exception as err: 98 raise MergeConflictError(err) 99 100 cls.merge = classmethod(merge) 101 102 # Register merging class (except for base MergeStrategy class) 103 if 'types' in members: 104 types = members['types'] 105 if isinstance(types, tuple): 106 types = [types] 107 for left, right in reversed(types): 108 MERGE_STRATEGIES.insert(0, (left, right, cls)) 109 110 return cls 111 112 113class MergeStrategy(metaclass=MergeStrategyMeta): 114 """ 115 Base class for defining a strategy for merging metadata from two 116 sources, left and right, into a single output. 117 118 The primary functionality for the class is the ``merge(cls, left, right)`` 119 class method. This takes ``left`` and ``right`` side arguments and 120 returns a single merged output. 121 122 The first class attribute is ``types``. This is defined as a list of 123 (left_types, right_types) tuples that indicate for which input types the 124 merge strategy applies. In determining whether to apply this merge 125 strategy to a pair of (left, right) objects, a test is done: 126 ``isinstance(left, left_types) and isinstance(right, right_types)``. For 127 example:: 128 129 types = [(np.ndarray, np.ndarray), # Two ndarrays 130 (np.ndarray, (list, tuple)), # ndarray and (list or tuple) 131 ((list, tuple), np.ndarray)] # (list or tuple) and ndarray 132 133 As a convenience, ``types`` can be defined as a single two-tuple instead of 134 a list of two-tuples, e.g. ``types = (np.ndarray, np.ndarray)``. 135 136 The other class attribute is ``enabled``, which defaults to ``False`` in 137 the base class. By defining a subclass of ``MergeStrategy`` the new merge 138 strategy is automatically registered to be available for use in 139 merging. However, by default the new merge strategy is *not enabled*. This 140 prevents inadvertently changing the behavior of unrelated code that is 141 performing metadata merge operations. 142 143 In most cases (particularly in library code that others might use) it is 144 recommended to leave custom strategies disabled and use the 145 `~astropy.utils.metadata.enable_merge_strategies` context manager to locally 146 enable the desired strategies. However, if one is confident that the 147 new strategy will not produce unexpected behavior, then one can globally 148 enable it by setting the ``enabled`` class attribute to ``True``. 149 150 Examples 151 -------- 152 Here we define a custom merge strategy that takes an int or float on 153 the left and right sides and returns a list with the two values. 154 155 >>> from astropy.utils.metadata import MergeStrategy 156 >>> class MergeNumbersAsList(MergeStrategy): 157 ... types = ((int, float), (int, float)) # (left_types, right_types) 158 ... 159 ... @classmethod 160 ... def merge(cls, left, right): 161 ... return [left, right] 162 163 """ 164 # Set ``enabled = True`` to globally enable applying this merge strategy. 165 # This is not generally recommended. 166 enabled = False 167 168 # types = [(left_types, right_types), ...] 169 170 171class MergePlus(MergeStrategy): 172 """ 173 Merge ``left`` and ``right`` objects using the plus operator. This 174 merge strategy is globally enabled by default. 175 """ 176 types = [(list, list), (tuple, tuple)] 177 enabled = True 178 179 @classmethod 180 def merge(cls, left, right): 181 return left + right 182 183 184class MergeNpConcatenate(MergeStrategy): 185 """ 186 Merge ``left`` and ``right`` objects using np.concatenate. This 187 merge strategy is globally enabled by default. 188 189 This will upcast a list or tuple to np.ndarray and the output is 190 always ndarray. 191 """ 192 types = [(np.ndarray, np.ndarray), 193 (np.ndarray, (list, tuple)), 194 ((list, tuple), np.ndarray)] 195 enabled = True 196 197 @classmethod 198 def merge(cls, left, right): 199 left, right = np.asanyarray(left), np.asanyarray(right) 200 common_dtype([left, right]) # Ensure left and right have compatible dtype 201 return np.concatenate([left, right]) 202 203 204def _both_isinstance(left, right, cls): 205 return isinstance(left, cls) and isinstance(right, cls) 206 207 208def _not_equal(left, right): 209 try: 210 return bool(left != right) 211 except Exception: 212 return True 213 214 215class _EnableMergeStrategies: 216 def __init__(self, *merge_strategies): 217 self.merge_strategies = merge_strategies 218 self.orig_enabled = {} 219 for left_type, right_type, merge_strategy in MERGE_STRATEGIES: 220 if issubclass(merge_strategy, merge_strategies): 221 self.orig_enabled[merge_strategy] = merge_strategy.enabled 222 merge_strategy.enabled = True 223 224 def __enter__(self): 225 pass 226 227 def __exit__(self, type, value, tb): 228 for merge_strategy, enabled in self.orig_enabled.items(): 229 merge_strategy.enabled = enabled 230 231 232def enable_merge_strategies(*merge_strategies): 233 """ 234 Context manager to temporarily enable one or more custom metadata merge 235 strategies. 236 237 Examples 238 -------- 239 Here we define a custom merge strategy that takes an int or float on 240 the left and right sides and returns a list with the two values. 241 242 >>> from astropy.utils.metadata import MergeStrategy 243 >>> class MergeNumbersAsList(MergeStrategy): 244 ... types = ((int, float), # left side types 245 ... (int, float)) # right side types 246 ... @classmethod 247 ... def merge(cls, left, right): 248 ... return [left, right] 249 250 By defining this class the merge strategy is automatically registered to be 251 available for use in merging. However, by default new merge strategies are 252 *not enabled*. This prevents inadvertently changing the behavior of 253 unrelated code that is performing metadata merge operations. 254 255 In order to use the new merge strategy, use this context manager as in the 256 following example:: 257 258 >>> from astropy.table import Table, vstack 259 >>> from astropy.utils.metadata import enable_merge_strategies 260 >>> t1 = Table([[1]], names=['a']) 261 >>> t2 = Table([[2]], names=['a']) 262 >>> t1.meta = {'m': 1} 263 >>> t2.meta = {'m': 2} 264 >>> with enable_merge_strategies(MergeNumbersAsList): 265 ... t12 = vstack([t1, t2]) 266 >>> t12.meta['m'] 267 [1, 2] 268 269 One can supply further merge strategies as additional arguments to the 270 context manager. 271 272 As a convenience, the enabling operation is actually done by checking 273 whether the registered strategies are subclasses of the context manager 274 arguments. This means one can define a related set of merge strategies and 275 then enable them all at once by enabling the base class. As a trivial 276 example, *all* registered merge strategies can be enabled with:: 277 278 >>> with enable_merge_strategies(MergeStrategy): 279 ... t12 = vstack([t1, t2]) 280 281 Parameters 282 ---------- 283 *merge_strategies : `~astropy.utils.metadata.MergeStrategy` 284 Merge strategies that will be enabled. 285 286 """ 287 288 return _EnableMergeStrategies(*merge_strategies) 289 290 291def _warn_str_func(key, left, right): 292 out = ('Cannot merge meta key {0!r} types {1!r}' 293 ' and {2!r}, choosing {0}={3!r}' 294 .format(key, type(left), type(right), right)) 295 return out 296 297 298def _error_str_func(key, left, right): 299 out = f'Cannot merge meta key {key!r} types {type(left)!r} and {type(right)!r}' 300 return out 301 302 303def merge(left, right, merge_func=None, metadata_conflicts='warn', 304 warn_str_func=_warn_str_func, 305 error_str_func=_error_str_func): 306 """ 307 Merge the ``left`` and ``right`` metadata objects. 308 309 This is a simplistic and limited implementation at this point. 310 """ 311 if not _both_isinstance(left, right, dict): 312 raise MergeConflictError('Can only merge two dict-based objects') 313 314 out = deepcopy(left) 315 316 for key, val in right.items(): 317 # If no conflict then insert val into out dict and continue 318 if key not in out: 319 out[key] = deepcopy(val) 320 continue 321 322 # There is a conflict that must be resolved 323 if _both_isinstance(left[key], right[key], dict): 324 out[key] = merge(left[key], right[key], merge_func, 325 metadata_conflicts=metadata_conflicts) 326 327 else: 328 try: 329 if merge_func is None: 330 for left_type, right_type, merge_cls in MERGE_STRATEGIES: 331 if not merge_cls.enabled: 332 continue 333 if (isinstance(left[key], left_type) and 334 isinstance(right[key], right_type)): 335 out[key] = merge_cls.merge(left[key], right[key]) 336 break 337 else: 338 raise MergeConflictError 339 else: 340 out[key] = merge_func(left[key], right[key]) 341 except MergeConflictError: 342 343 # Pick the metadata item that is not None, or they are both not 344 # None, then if they are equal, there is no conflict, and if 345 # they are different, there is a conflict and we pick the one 346 # on the right (or raise an error). 347 348 if left[key] is None: 349 # This may not seem necessary since out[key] gets set to 350 # right[key], but not all objects support != which is 351 # needed for one of the if clauses. 352 out[key] = right[key] 353 elif right[key] is None: 354 out[key] = left[key] 355 elif _not_equal(left[key], right[key]): 356 if metadata_conflicts == 'warn': 357 warnings.warn(warn_str_func(key, left[key], right[key]), 358 MergeConflictWarning) 359 elif metadata_conflicts == 'error': 360 raise MergeConflictError(error_str_func(key, left[key], right[key])) 361 elif metadata_conflicts != 'silent': 362 raise ValueError('metadata_conflicts argument must be one ' 363 'of "silent", "warn", or "error"') 364 out[key] = right[key] 365 else: 366 out[key] = right[key] 367 368 return out 369 370 371class MetaData: 372 """ 373 A descriptor for classes that have a ``meta`` property. 374 375 This can be set to any valid `~collections.abc.Mapping`. 376 377 Parameters 378 ---------- 379 doc : `str`, optional 380 Documentation for the attribute of the class. 381 Default is ``""``. 382 383 .. versionadded:: 1.2 384 385 copy : `bool`, optional 386 If ``True`` the the value is deepcopied before setting, otherwise it 387 is saved as reference. 388 Default is ``True``. 389 390 .. versionadded:: 1.2 391 """ 392 393 def __init__(self, doc="", copy=True): 394 self.__doc__ = doc 395 self.copy = copy 396 397 def __get__(self, instance, owner): 398 if instance is None: 399 return self 400 if not hasattr(instance, '_meta'): 401 instance._meta = OrderedDict() 402 return instance._meta 403 404 def __set__(self, instance, value): 405 if value is None: 406 instance._meta = OrderedDict() 407 else: 408 if isinstance(value, Mapping): 409 if self.copy: 410 instance._meta = deepcopy(value) 411 else: 412 instance._meta = value 413 else: 414 raise TypeError("meta attribute must be dict-like") 415 416 417class MetaAttribute: 418 """ 419 Descriptor to define custom attribute which gets stored in the object 420 ``meta`` dict and can have a defined default. 421 422 This descriptor is intended to provide a convenient way to add attributes 423 to a subclass of a complex class such as ``Table`` or ``NDData``. 424 425 This requires that the object has an attribute ``meta`` which is a 426 dict-like object. The value of the MetaAttribute will be stored in a 427 new dict meta['__attributes__'] that is created when required. 428 429 Classes that define MetaAttributes are encouraged to support initializing 430 the attributes via the class ``__init__``. For example:: 431 432 for attr in list(kwargs): 433 descr = getattr(self.__class__, attr, None) 434 if isinstance(descr, MetaAttribute): 435 setattr(self, attr, kwargs.pop(attr)) 436 437 The name of a ``MetaAttribute`` cannot be the same as any of the following: 438 439 - Keyword argument in the owner class ``__init__`` 440 - Method or attribute of the "parent class", where the parent class is 441 taken to be ``owner.__mro__[1]``. 442 443 :param default: default value 444 445 """ 446 def __init__(self, default=None): 447 self.default = default 448 449 def __get__(self, instance, owner): 450 # When called without an instance, return self to allow access 451 # to descriptor attributes. 452 if instance is None: 453 return self 454 455 # If default is None and value has not been set already then return None 456 # without doing touching meta['__attributes__'] at all. This helps e.g. 457 # with the Table._hidden_columns attribute so it doesn't auto-create 458 # meta['__attributes__'] always. 459 if (self.default is None 460 and self.name not in instance.meta.get('__attributes__', {})): 461 return None 462 463 # Get the __attributes__ dict and create if not there already. 464 attributes = instance.meta.setdefault('__attributes__', {}) 465 try: 466 value = attributes[self.name] 467 except KeyError: 468 if self.default is not None: 469 attributes[self.name] = deepcopy(self.default) 470 # Return either specified default or None 471 value = attributes.get(self.name) 472 return value 473 474 def __set__(self, instance, value): 475 # Get the __attributes__ dict and create if not there already. 476 attributes = instance.meta.setdefault('__attributes__', {}) 477 attributes[self.name] = value 478 479 def __delete__(self, instance): 480 # Remove this attribute from meta['__attributes__'] if it exists. 481 if '__attributes__' in instance.meta: 482 attrs = instance.meta['__attributes__'] 483 if self.name in attrs: 484 del attrs[self.name] 485 # If this was the last attribute then remove the meta key as well 486 if not attrs: 487 del instance.meta['__attributes__'] 488 489 def __set_name__(self, owner, name): 490 import inspect 491 params = [param.name for param in inspect.signature(owner).parameters.values() 492 if param.kind not in (inspect.Parameter.VAR_KEYWORD, 493 inspect.Parameter.VAR_POSITIONAL)] 494 495 # Reject names from existing params or best guess at parent class 496 if name in params or hasattr(owner.__mro__[1], name): 497 raise ValueError(f'{name} not allowed as {self.__class__.__name__}') 498 499 self.name = name 500 501 def __repr__(self): 502 return f'<{self.__class__.__name__} name={self.name} default={self.default}>' 503