1# sql/annotation.py 2# Copyright (C) 2005-2016 the SQLAlchemy authors and contributors 3# <see AUTHORS file> 4# 5# This module is part of SQLAlchemy and is released under 6# the MIT License: http://www.opensource.org/licenses/mit-license.php 7 8"""The :class:`.Annotated` class and related routines; creates hash-equivalent 9copies of SQL constructs which contain context-specific markers and 10associations. 11 12""" 13 14from .. import util 15from . import operators 16 17 18class Annotated(object): 19 """clones a ClauseElement and applies an 'annotations' dictionary. 20 21 Unlike regular clones, this clone also mimics __hash__() and 22 __cmp__() of the original element so that it takes its place 23 in hashed collections. 24 25 A reference to the original element is maintained, for the important 26 reason of keeping its hash value current. When GC'ed, the 27 hash value may be reused, causing conflicts. 28 29 """ 30 31 def __new__(cls, *args): 32 if not args: 33 # clone constructor 34 return object.__new__(cls) 35 else: 36 element, values = args 37 # pull appropriate subclass from registry of annotated 38 # classes 39 try: 40 cls = annotated_classes[element.__class__] 41 except KeyError: 42 cls = _new_annotation_type(element.__class__, cls) 43 return object.__new__(cls) 44 45 def __init__(self, element, values): 46 self.__dict__ = element.__dict__.copy() 47 self.__element = element 48 self._annotations = values 49 self._hash = hash(element) 50 51 def _annotate(self, values): 52 _values = self._annotations.copy() 53 _values.update(values) 54 return self._with_annotations(_values) 55 56 def _with_annotations(self, values): 57 clone = self.__class__.__new__(self.__class__) 58 clone.__dict__ = self.__dict__.copy() 59 clone._annotations = values 60 return clone 61 62 def _deannotate(self, values=None, clone=True): 63 if values is None: 64 return self.__element 65 else: 66 _values = self._annotations.copy() 67 for v in values: 68 _values.pop(v, None) 69 return self._with_annotations(_values) 70 71 def _compiler_dispatch(self, visitor, **kw): 72 return self.__element.__class__._compiler_dispatch( 73 self, visitor, **kw) 74 75 @property 76 def _constructor(self): 77 return self.__element._constructor 78 79 def _clone(self): 80 clone = self.__element._clone() 81 if clone is self.__element: 82 # detect immutable, don't change anything 83 return self 84 else: 85 # update the clone with any changes that have occurred 86 # to this object's __dict__. 87 clone.__dict__.update(self.__dict__) 88 return self.__class__(clone, self._annotations) 89 90 def __hash__(self): 91 return self._hash 92 93 def __eq__(self, other): 94 if isinstance(self.__element, operators.ColumnOperators): 95 return self.__element.__class__.__eq__(self, other) 96 else: 97 return hash(other) == hash(self) 98 99 100# hard-generate Annotated subclasses. this technique 101# is used instead of on-the-fly types (i.e. type.__new__()) 102# so that the resulting objects are pickleable. 103annotated_classes = {} 104 105 106def _deep_annotate(element, annotations, exclude=None): 107 """Deep copy the given ClauseElement, annotating each element 108 with the given annotations dictionary. 109 110 Elements within the exclude collection will be cloned but not annotated. 111 112 """ 113 def clone(elem): 114 if exclude and \ 115 hasattr(elem, 'proxy_set') and \ 116 elem.proxy_set.intersection(exclude): 117 newelem = elem._clone() 118 elif annotations != elem._annotations: 119 newelem = elem._annotate(annotations) 120 else: 121 newelem = elem 122 newelem._copy_internals(clone=clone) 123 return newelem 124 125 if element is not None: 126 element = clone(element) 127 return element 128 129 130def _deep_deannotate(element, values=None): 131 """Deep copy the given element, removing annotations.""" 132 133 cloned = util.column_dict() 134 135 def clone(elem): 136 # if a values dict is given, 137 # the elem must be cloned each time it appears, 138 # as there may be different annotations in source 139 # elements that are remaining. if totally 140 # removing all annotations, can assume the same 141 # slate... 142 if values or elem not in cloned: 143 newelem = elem._deannotate(values=values, clone=True) 144 newelem._copy_internals(clone=clone) 145 if not values: 146 cloned[elem] = newelem 147 return newelem 148 else: 149 return cloned[elem] 150 151 if element is not None: 152 element = clone(element) 153 return element 154 155 156def _shallow_annotate(element, annotations): 157 """Annotate the given ClauseElement and copy its internals so that 158 internal objects refer to the new annotated object. 159 160 Basically used to apply a "dont traverse" annotation to a 161 selectable, without digging throughout the whole 162 structure wasting time. 163 """ 164 element = element._annotate(annotations) 165 element._copy_internals() 166 return element 167 168 169def _new_annotation_type(cls, base_cls): 170 if issubclass(cls, Annotated): 171 return cls 172 elif cls in annotated_classes: 173 return annotated_classes[cls] 174 175 for super_ in cls.__mro__: 176 # check if an Annotated subclass more specific than 177 # the given base_cls is already registered, such 178 # as AnnotatedColumnElement. 179 if super_ in annotated_classes: 180 base_cls = annotated_classes[super_] 181 break 182 183 annotated_classes[cls] = anno_cls = type( 184 "Annotated%s" % cls.__name__, 185 (base_cls, cls), {}) 186 globals()["Annotated%s" % cls.__name__] = anno_cls 187 return anno_cls 188 189 190def _prepare_annotations(target_hierarchy, base_cls): 191 stack = [target_hierarchy] 192 while stack: 193 cls = stack.pop() 194 stack.extend(cls.__subclasses__()) 195 196 _new_annotation_type(cls, base_cls) 197