1import weakref
2
3from sqlalchemy import exc
4from sqlalchemy import MetaData
5from sqlalchemy.ext.declarative import clsregistry
6from sqlalchemy.testing import assert_raises_message
7from sqlalchemy.testing import eq_
8from sqlalchemy.testing import fixtures
9from sqlalchemy.testing import is_
10from sqlalchemy.testing.util import gc_collect
11
12
13class MockClass(object):
14    def __init__(self, base, name):
15        self._decl_class_registry = base
16        tokens = name.split(".")
17        self.__module__ = ".".join(tokens[0:-1])
18        self.name = self.__name__ = tokens[-1]
19        self.metadata = MetaData()
20
21
22class MockProp(object):
23    parent = "some_parent"
24
25
26class ClsRegistryTest(fixtures.TestBase):
27    __requires__ = ("predictable_gc",)
28
29    def test_same_module_same_name(self):
30        base = weakref.WeakValueDictionary()
31        f1 = MockClass(base, "foo.bar.Foo")
32        f2 = MockClass(base, "foo.bar.Foo")
33        clsregistry.add_class("Foo", f1)
34        gc_collect()
35
36        assert_raises_message(
37            exc.SAWarning,
38            "This declarative base already contains a class with the "
39            "same class name and module name as foo.bar.Foo, and "
40            "will be replaced in the string-lookup table.",
41            clsregistry.add_class,
42            "Foo",
43            f2,
44        )
45
46    def test_resolve(self):
47        base = weakref.WeakValueDictionary()
48        f1 = MockClass(base, "foo.bar.Foo")
49        f2 = MockClass(base, "foo.alt.Foo")
50        clsregistry.add_class("Foo", f1)
51        clsregistry.add_class("Foo", f2)
52        name_resolver, resolver = clsregistry._resolver(f1, MockProp())
53
54        gc_collect()
55
56        is_(resolver("foo.bar.Foo")(), f1)
57        is_(resolver("foo.alt.Foo")(), f2)
58
59        is_(name_resolver("foo.bar.Foo")(), f1)
60        is_(name_resolver("foo.alt.Foo")(), f2)
61
62    def test_fragment_resolve(self):
63        base = weakref.WeakValueDictionary()
64        f1 = MockClass(base, "foo.bar.Foo")
65        f2 = MockClass(base, "foo.alt.Foo")
66        f3 = MockClass(base, "bat.alt.Hoho")
67        clsregistry.add_class("Foo", f1)
68        clsregistry.add_class("Foo", f2)
69        clsregistry.add_class("HoHo", f3)
70        name_resolver, resolver = clsregistry._resolver(f1, MockProp())
71
72        gc_collect()
73
74        is_(resolver("bar.Foo")(), f1)
75        is_(resolver("alt.Foo")(), f2)
76
77        is_(name_resolver("bar.Foo")(), f1)
78        is_(name_resolver("alt.Foo")(), f2)
79
80    def test_fragment_ambiguous(self):
81        base = weakref.WeakValueDictionary()
82        f1 = MockClass(base, "foo.bar.Foo")
83        f2 = MockClass(base, "foo.alt.Foo")
84        f3 = MockClass(base, "bat.alt.Foo")
85        clsregistry.add_class("Foo", f1)
86        clsregistry.add_class("Foo", f2)
87        clsregistry.add_class("Foo", f3)
88        name_resolver, resolver = clsregistry._resolver(f1, MockProp())
89
90        gc_collect()
91
92        assert_raises_message(
93            exc.InvalidRequestError,
94            'Multiple classes found for path "alt.Foo" in the registry '
95            "of this declarative base. Please use a fully "
96            "module-qualified path.",
97            resolver("alt.Foo"),
98        )
99
100        assert_raises_message(
101            exc.InvalidRequestError,
102            'Multiple classes found for path "alt.Foo" in the registry '
103            "of this declarative base. Please use a fully "
104            "module-qualified path.",
105            name_resolver("alt.Foo"),
106        )
107
108    def test_no_fns_in_name_resolve(self):
109        base = weakref.WeakValueDictionary()
110        f1 = MockClass(base, "foo.bar.Foo")
111        f2 = MockClass(base, "foo.alt.Foo")
112        clsregistry.add_class("Foo", f1)
113        clsregistry.add_class("Foo", f2)
114        name_resolver, resolver = clsregistry._resolver(f1, MockProp())
115
116        gc_collect()
117
118        import sqlalchemy
119
120        is_(
121            resolver("__import__('sqlalchemy.util').util.EMPTY_SET")(),
122            sqlalchemy.util.EMPTY_SET,
123        )
124
125        assert_raises_message(
126            exc.InvalidRequestError,
127            r"When initializing mapper some_parent, expression "
128            r"\"__import__\('sqlalchemy.util'\).util.EMPTY_SET\" "
129            "failed to locate a name",
130            name_resolver("__import__('sqlalchemy.util').util.EMPTY_SET"),
131        )
132
133    def test_resolve_dupe_by_name(self):
134        base = weakref.WeakValueDictionary()
135        f1 = MockClass(base, "foo.bar.Foo")
136        f2 = MockClass(base, "foo.alt.Foo")
137        clsregistry.add_class("Foo", f1)
138        clsregistry.add_class("Foo", f2)
139
140        gc_collect()
141
142        name_resolver, resolver = clsregistry._resolver(f1, MockProp())
143        resolver = resolver("Foo")
144        assert_raises_message(
145            exc.InvalidRequestError,
146            'Multiple classes found for path "Foo" in the '
147            "registry of this declarative base. Please use a "
148            "fully module-qualified path.",
149            resolver,
150        )
151
152        resolver = name_resolver("Foo")
153        assert_raises_message(
154            exc.InvalidRequestError,
155            'Multiple classes found for path "Foo" in the '
156            "registry of this declarative base. Please use a "
157            "fully module-qualified path.",
158            resolver,
159        )
160
161    def test_dupe_classes_back_to_one(self):
162        base = weakref.WeakValueDictionary()
163        f1 = MockClass(base, "foo.bar.Foo")
164        f2 = MockClass(base, "foo.alt.Foo")
165        clsregistry.add_class("Foo", f1)
166        clsregistry.add_class("Foo", f2)
167
168        del f2
169        gc_collect()
170
171        # registry restores itself to just the one class
172        name_resolver, resolver = clsregistry._resolver(f1, MockProp())
173        f_resolver = resolver("Foo")
174        is_(f_resolver(), f1)
175
176        f_resolver = name_resolver("Foo")
177        is_(f_resolver(), f1)
178
179    def test_dupe_classes_cleanout(self):
180        # force this to maintain isolation between tests
181        clsregistry._registries.clear()
182
183        base = weakref.WeakValueDictionary()
184
185        for i in range(3):
186            f1 = MockClass(base, "foo.bar.Foo")
187            f2 = MockClass(base, "foo.alt.Foo")
188            clsregistry.add_class("Foo", f1)
189            clsregistry.add_class("Foo", f2)
190
191            eq_(len(clsregistry._registries), 11)
192
193            del f1
194            del f2
195            gc_collect()
196
197            eq_(len(clsregistry._registries), 1)
198
199    def test_dupe_classes_name_race(self):
200        """test the race condition that the class was garbage "
201        "collected while being resolved from a dupe class."""
202        base = weakref.WeakValueDictionary()
203        f1 = MockClass(base, "foo.bar.Foo")
204        f2 = MockClass(base, "foo.alt.Foo")
205        clsregistry.add_class("Foo", f1)
206        clsregistry.add_class("Foo", f2)
207
208        dupe_reg = base["Foo"]
209        dupe_reg.contents = [lambda: None]
210        name_resolver, resolver = clsregistry._resolver(f1, MockProp())
211        f_resolver = resolver("Foo")
212        assert_raises_message(
213            exc.InvalidRequestError,
214            r"When initializing mapper some_parent, expression "
215            r"'Foo' failed to locate a name \('Foo'\).",
216            f_resolver,
217        )
218
219        f_resolver = name_resolver("Foo")
220        assert_raises_message(
221            exc.InvalidRequestError,
222            r"When initializing mapper some_parent, expression "
223            r"'Foo' failed to locate a name \('Foo'\).",
224            f_resolver,
225        )
226
227    def test_module_reg_cleanout_race(self):
228        """test the race condition that a class was gc'ed as we tried
229        to look it up by module name."""
230
231        base = weakref.WeakValueDictionary()
232        f1 = MockClass(base, "foo.bar.Foo")
233        clsregistry.add_class("Foo", f1)
234        reg = base["_sa_module_registry"]
235
236        mod_entry = reg["foo"]["bar"]
237        name_resolver, resolver = clsregistry._resolver(f1, MockProp())
238        f_resolver = resolver("foo")
239        del mod_entry.contents["Foo"]
240        assert_raises_message(
241            AttributeError,
242            "Module 'bar' has no mapped classes registered "
243            "under the name 'Foo'",
244            lambda: f_resolver().bar.Foo,
245        )
246
247        f_resolver = name_resolver("foo")
248        assert_raises_message(
249            AttributeError,
250            "Module 'bar' has no mapped classes registered "
251            "under the name 'Foo'",
252            lambda: f_resolver().bar.Foo,
253        )
254
255    def test_module_reg_no_class(self):
256        base = weakref.WeakValueDictionary()
257        f1 = MockClass(base, "foo.bar.Foo")
258        clsregistry.add_class("Foo", f1)
259        reg = base["_sa_module_registry"]
260        mod_entry = reg["foo"]["bar"]  # noqa
261        name_resolver, resolver = clsregistry._resolver(f1, MockProp())
262        f_resolver = resolver("foo")
263        assert_raises_message(
264            AttributeError,
265            "Module 'bar' has no mapped classes registered "
266            "under the name 'Bat'",
267            lambda: f_resolver().bar.Bat,
268        )
269
270        f_resolver = name_resolver("foo")
271        assert_raises_message(
272            AttributeError,
273            "Module 'bar' has no mapped classes registered "
274            "under the name 'Bat'",
275            lambda: f_resolver().bar.Bat,
276        )
277
278    def test_module_reg_cleanout_two_sub(self):
279        base = weakref.WeakValueDictionary()
280        f1 = MockClass(base, "foo.bar.Foo")
281        clsregistry.add_class("Foo", f1)
282        reg = base["_sa_module_registry"]
283
284        f2 = MockClass(base, "foo.alt.Bar")
285        clsregistry.add_class("Bar", f2)
286        assert reg["foo"]["bar"]
287        del f1
288        gc_collect()
289        assert "bar" not in reg["foo"]
290        assert "alt" in reg["foo"]
291
292        del f2
293        gc_collect()
294        assert "foo" not in reg.contents
295
296    def test_module_reg_cleanout_sub_to_base(self):
297        base = weakref.WeakValueDictionary()
298        f3 = MockClass(base, "bat.bar.Hoho")
299        clsregistry.add_class("Hoho", f3)
300        reg = base["_sa_module_registry"]
301
302        assert reg["bat"]["bar"]
303        del f3
304        gc_collect()
305        assert "bat" not in reg
306
307    def test_module_reg_cleanout_cls_to_base(self):
308        base = weakref.WeakValueDictionary()
309        f4 = MockClass(base, "single.Blat")
310        clsregistry.add_class("Blat", f4)
311        reg = base["_sa_module_registry"]
312        assert reg["single"]
313        del f4
314        gc_collect()
315        assert "single" not in reg
316