1# sql/annotation.py
2# Copyright (C) 2005-2016 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: http://www.opensource.org/licenses/mit-license.php
7
8"""The :class:`.Annotated` class and related routines; creates hash-equivalent
9copies of SQL constructs which contain context-specific markers and
10associations.
11
12"""
13
14from .. import util
15from . import operators
16
17
18class Annotated(object):
19    """clones a ClauseElement and applies an 'annotations' dictionary.
20
21    Unlike regular clones, this clone also mimics __hash__() and
22    __cmp__() of the original element so that it takes its place
23    in hashed collections.
24
25    A reference to the original element is maintained, for the important
26    reason of keeping its hash value current.  When GC'ed, the
27    hash value may be reused, causing conflicts.
28
29    """
30
31    def __new__(cls, *args):
32        if not args:
33            # clone constructor
34            return object.__new__(cls)
35        else:
36            element, values = args
37            # pull appropriate subclass from registry of annotated
38            # classes
39            try:
40                cls = annotated_classes[element.__class__]
41            except KeyError:
42                cls = _new_annotation_type(element.__class__, cls)
43            return object.__new__(cls)
44
45    def __init__(self, element, values):
46        self.__dict__ = element.__dict__.copy()
47        self.__element = element
48        self._annotations = values
49        self._hash = hash(element)
50
51    def _annotate(self, values):
52        _values = self._annotations.copy()
53        _values.update(values)
54        return self._with_annotations(_values)
55
56    def _with_annotations(self, values):
57        clone = self.__class__.__new__(self.__class__)
58        clone.__dict__ = self.__dict__.copy()
59        clone._annotations = values
60        return clone
61
62    def _deannotate(self, values=None, clone=True):
63        if values is None:
64            return self.__element
65        else:
66            _values = self._annotations.copy()
67            for v in values:
68                _values.pop(v, None)
69            return self._with_annotations(_values)
70
71    def _compiler_dispatch(self, visitor, **kw):
72        return self.__element.__class__._compiler_dispatch(
73            self, visitor, **kw)
74
75    @property
76    def _constructor(self):
77        return self.__element._constructor
78
79    def _clone(self):
80        clone = self.__element._clone()
81        if clone is self.__element:
82            # detect immutable, don't change anything
83            return self
84        else:
85            # update the clone with any changes that have occurred
86            # to this object's __dict__.
87            clone.__dict__.update(self.__dict__)
88            return self.__class__(clone, self._annotations)
89
90    def __hash__(self):
91        return self._hash
92
93    def __eq__(self, other):
94        if isinstance(self.__element, operators.ColumnOperators):
95            return self.__element.__class__.__eq__(self, other)
96        else:
97            return hash(other) == hash(self)
98
99
100# hard-generate Annotated subclasses.  this technique
101# is used instead of on-the-fly types (i.e. type.__new__())
102# so that the resulting objects are pickleable.
103annotated_classes = {}
104
105
106def _deep_annotate(element, annotations, exclude=None):
107    """Deep copy the given ClauseElement, annotating each element
108    with the given annotations dictionary.
109
110    Elements within the exclude collection will be cloned but not annotated.
111
112    """
113    def clone(elem):
114        if exclude and \
115                hasattr(elem, 'proxy_set') and \
116                elem.proxy_set.intersection(exclude):
117            newelem = elem._clone()
118        elif annotations != elem._annotations:
119            newelem = elem._annotate(annotations)
120        else:
121            newelem = elem
122        newelem._copy_internals(clone=clone)
123        return newelem
124
125    if element is not None:
126        element = clone(element)
127    return element
128
129
130def _deep_deannotate(element, values=None):
131    """Deep copy the given element, removing annotations."""
132
133    cloned = util.column_dict()
134
135    def clone(elem):
136        # if a values dict is given,
137        # the elem must be cloned each time it appears,
138        # as there may be different annotations in source
139        # elements that are remaining.  if totally
140        # removing all annotations, can assume the same
141        # slate...
142        if values or elem not in cloned:
143            newelem = elem._deannotate(values=values, clone=True)
144            newelem._copy_internals(clone=clone)
145            if not values:
146                cloned[elem] = newelem
147            return newelem
148        else:
149            return cloned[elem]
150
151    if element is not None:
152        element = clone(element)
153    return element
154
155
156def _shallow_annotate(element, annotations):
157    """Annotate the given ClauseElement and copy its internals so that
158    internal objects refer to the new annotated object.
159
160    Basically used to apply a "dont traverse" annotation to a
161    selectable, without digging throughout the whole
162    structure wasting time.
163    """
164    element = element._annotate(annotations)
165    element._copy_internals()
166    return element
167
168
169def _new_annotation_type(cls, base_cls):
170    if issubclass(cls, Annotated):
171        return cls
172    elif cls in annotated_classes:
173        return annotated_classes[cls]
174
175    for super_ in cls.__mro__:
176        # check if an Annotated subclass more specific than
177        # the given base_cls is already registered, such
178        # as AnnotatedColumnElement.
179        if super_ in annotated_classes:
180            base_cls = annotated_classes[super_]
181            break
182
183    annotated_classes[cls] = anno_cls = type(
184        "Annotated%s" % cls.__name__,
185        (base_cls, cls), {})
186    globals()["Annotated%s" % cls.__name__] = anno_cls
187    return anno_cls
188
189
190def _prepare_annotations(target_hierarchy, base_cls):
191    stack = [target_hierarchy]
192    while stack:
193        cls = stack.pop()
194        stack.extend(cls.__subclasses__())
195
196        _new_annotation_type(cls, base_cls)
197