1import weakref 2 3from sqlalchemy import exc 4from sqlalchemy import MetaData 5from sqlalchemy.ext.declarative import clsregistry 6from sqlalchemy.testing import assert_raises_message 7from sqlalchemy.testing import eq_ 8from sqlalchemy.testing import fixtures 9from sqlalchemy.testing import is_ 10from sqlalchemy.testing.util import gc_collect 11 12 13class MockClass(object): 14 def __init__(self, base, name): 15 self._decl_class_registry = base 16 tokens = name.split(".") 17 self.__module__ = ".".join(tokens[0:-1]) 18 self.name = self.__name__ = tokens[-1] 19 self.metadata = MetaData() 20 21 22class MockProp(object): 23 parent = "some_parent" 24 25 26class ClsRegistryTest(fixtures.TestBase): 27 __requires__ = ("predictable_gc",) 28 29 def test_same_module_same_name(self): 30 base = weakref.WeakValueDictionary() 31 f1 = MockClass(base, "foo.bar.Foo") 32 f2 = MockClass(base, "foo.bar.Foo") 33 clsregistry.add_class("Foo", f1) 34 gc_collect() 35 36 assert_raises_message( 37 exc.SAWarning, 38 "This declarative base already contains a class with the " 39 "same class name and module name as foo.bar.Foo, and " 40 "will be replaced in the string-lookup table.", 41 clsregistry.add_class, 42 "Foo", 43 f2, 44 ) 45 46 def test_resolve(self): 47 base = weakref.WeakValueDictionary() 48 f1 = MockClass(base, "foo.bar.Foo") 49 f2 = MockClass(base, "foo.alt.Foo") 50 clsregistry.add_class("Foo", f1) 51 clsregistry.add_class("Foo", f2) 52 name_resolver, resolver = clsregistry._resolver(f1, MockProp()) 53 54 gc_collect() 55 56 is_(resolver("foo.bar.Foo")(), f1) 57 is_(resolver("foo.alt.Foo")(), f2) 58 59 is_(name_resolver("foo.bar.Foo")(), f1) 60 is_(name_resolver("foo.alt.Foo")(), f2) 61 62 def test_fragment_resolve(self): 63 base = weakref.WeakValueDictionary() 64 f1 = MockClass(base, "foo.bar.Foo") 65 f2 = MockClass(base, "foo.alt.Foo") 66 f3 = MockClass(base, "bat.alt.Hoho") 67 clsregistry.add_class("Foo", f1) 68 clsregistry.add_class("Foo", f2) 69 clsregistry.add_class("HoHo", f3) 70 name_resolver, resolver = clsregistry._resolver(f1, MockProp()) 71 72 gc_collect() 73 74 is_(resolver("bar.Foo")(), f1) 75 is_(resolver("alt.Foo")(), f2) 76 77 is_(name_resolver("bar.Foo")(), f1) 78 is_(name_resolver("alt.Foo")(), f2) 79 80 def test_fragment_ambiguous(self): 81 base = weakref.WeakValueDictionary() 82 f1 = MockClass(base, "foo.bar.Foo") 83 f2 = MockClass(base, "foo.alt.Foo") 84 f3 = MockClass(base, "bat.alt.Foo") 85 clsregistry.add_class("Foo", f1) 86 clsregistry.add_class("Foo", f2) 87 clsregistry.add_class("Foo", f3) 88 name_resolver, resolver = clsregistry._resolver(f1, MockProp()) 89 90 gc_collect() 91 92 assert_raises_message( 93 exc.InvalidRequestError, 94 'Multiple classes found for path "alt.Foo" in the registry ' 95 "of this declarative base. Please use a fully " 96 "module-qualified path.", 97 resolver("alt.Foo"), 98 ) 99 100 assert_raises_message( 101 exc.InvalidRequestError, 102 'Multiple classes found for path "alt.Foo" in the registry ' 103 "of this declarative base. Please use a fully " 104 "module-qualified path.", 105 name_resolver("alt.Foo"), 106 ) 107 108 def test_no_fns_in_name_resolve(self): 109 base = weakref.WeakValueDictionary() 110 f1 = MockClass(base, "foo.bar.Foo") 111 f2 = MockClass(base, "foo.alt.Foo") 112 clsregistry.add_class("Foo", f1) 113 clsregistry.add_class("Foo", f2) 114 name_resolver, resolver = clsregistry._resolver(f1, MockProp()) 115 116 gc_collect() 117 118 import sqlalchemy 119 120 is_( 121 resolver("__import__('sqlalchemy.util').util.EMPTY_SET")(), 122 sqlalchemy.util.EMPTY_SET, 123 ) 124 125 assert_raises_message( 126 exc.InvalidRequestError, 127 r"When initializing mapper some_parent, expression " 128 r"\"__import__\('sqlalchemy.util'\).util.EMPTY_SET\" " 129 "failed to locate a name", 130 name_resolver("__import__('sqlalchemy.util').util.EMPTY_SET"), 131 ) 132 133 def test_resolve_dupe_by_name(self): 134 base = weakref.WeakValueDictionary() 135 f1 = MockClass(base, "foo.bar.Foo") 136 f2 = MockClass(base, "foo.alt.Foo") 137 clsregistry.add_class("Foo", f1) 138 clsregistry.add_class("Foo", f2) 139 140 gc_collect() 141 142 name_resolver, resolver = clsregistry._resolver(f1, MockProp()) 143 resolver = resolver("Foo") 144 assert_raises_message( 145 exc.InvalidRequestError, 146 'Multiple classes found for path "Foo" in the ' 147 "registry of this declarative base. Please use a " 148 "fully module-qualified path.", 149 resolver, 150 ) 151 152 resolver = name_resolver("Foo") 153 assert_raises_message( 154 exc.InvalidRequestError, 155 'Multiple classes found for path "Foo" in the ' 156 "registry of this declarative base. Please use a " 157 "fully module-qualified path.", 158 resolver, 159 ) 160 161 def test_dupe_classes_back_to_one(self): 162 base = weakref.WeakValueDictionary() 163 f1 = MockClass(base, "foo.bar.Foo") 164 f2 = MockClass(base, "foo.alt.Foo") 165 clsregistry.add_class("Foo", f1) 166 clsregistry.add_class("Foo", f2) 167 168 del f2 169 gc_collect() 170 171 # registry restores itself to just the one class 172 name_resolver, resolver = clsregistry._resolver(f1, MockProp()) 173 f_resolver = resolver("Foo") 174 is_(f_resolver(), f1) 175 176 f_resolver = name_resolver("Foo") 177 is_(f_resolver(), f1) 178 179 def test_dupe_classes_cleanout(self): 180 # force this to maintain isolation between tests 181 clsregistry._registries.clear() 182 183 base = weakref.WeakValueDictionary() 184 185 for i in range(3): 186 f1 = MockClass(base, "foo.bar.Foo") 187 f2 = MockClass(base, "foo.alt.Foo") 188 clsregistry.add_class("Foo", f1) 189 clsregistry.add_class("Foo", f2) 190 191 eq_(len(clsregistry._registries), 11) 192 193 del f1 194 del f2 195 gc_collect() 196 197 eq_(len(clsregistry._registries), 1) 198 199 def test_dupe_classes_name_race(self): 200 """test the race condition that the class was garbage " 201 "collected while being resolved from a dupe class.""" 202 base = weakref.WeakValueDictionary() 203 f1 = MockClass(base, "foo.bar.Foo") 204 f2 = MockClass(base, "foo.alt.Foo") 205 clsregistry.add_class("Foo", f1) 206 clsregistry.add_class("Foo", f2) 207 208 dupe_reg = base["Foo"] 209 dupe_reg.contents = [lambda: None] 210 name_resolver, resolver = clsregistry._resolver(f1, MockProp()) 211 f_resolver = resolver("Foo") 212 assert_raises_message( 213 exc.InvalidRequestError, 214 r"When initializing mapper some_parent, expression " 215 r"'Foo' failed to locate a name \('Foo'\).", 216 f_resolver, 217 ) 218 219 f_resolver = name_resolver("Foo") 220 assert_raises_message( 221 exc.InvalidRequestError, 222 r"When initializing mapper some_parent, expression " 223 r"'Foo' failed to locate a name \('Foo'\).", 224 f_resolver, 225 ) 226 227 def test_module_reg_cleanout_race(self): 228 """test the race condition that a class was gc'ed as we tried 229 to look it up by module name.""" 230 231 base = weakref.WeakValueDictionary() 232 f1 = MockClass(base, "foo.bar.Foo") 233 clsregistry.add_class("Foo", f1) 234 reg = base["_sa_module_registry"] 235 236 mod_entry = reg["foo"]["bar"] 237 name_resolver, resolver = clsregistry._resolver(f1, MockProp()) 238 f_resolver = resolver("foo") 239 del mod_entry.contents["Foo"] 240 assert_raises_message( 241 AttributeError, 242 "Module 'bar' has no mapped classes registered " 243 "under the name 'Foo'", 244 lambda: f_resolver().bar.Foo, 245 ) 246 247 f_resolver = name_resolver("foo") 248 assert_raises_message( 249 AttributeError, 250 "Module 'bar' has no mapped classes registered " 251 "under the name 'Foo'", 252 lambda: f_resolver().bar.Foo, 253 ) 254 255 def test_module_reg_no_class(self): 256 base = weakref.WeakValueDictionary() 257 f1 = MockClass(base, "foo.bar.Foo") 258 clsregistry.add_class("Foo", f1) 259 reg = base["_sa_module_registry"] 260 mod_entry = reg["foo"]["bar"] # noqa 261 name_resolver, resolver = clsregistry._resolver(f1, MockProp()) 262 f_resolver = resolver("foo") 263 assert_raises_message( 264 AttributeError, 265 "Module 'bar' has no mapped classes registered " 266 "under the name 'Bat'", 267 lambda: f_resolver().bar.Bat, 268 ) 269 270 f_resolver = name_resolver("foo") 271 assert_raises_message( 272 AttributeError, 273 "Module 'bar' has no mapped classes registered " 274 "under the name 'Bat'", 275 lambda: f_resolver().bar.Bat, 276 ) 277 278 def test_module_reg_cleanout_two_sub(self): 279 base = weakref.WeakValueDictionary() 280 f1 = MockClass(base, "foo.bar.Foo") 281 clsregistry.add_class("Foo", f1) 282 reg = base["_sa_module_registry"] 283 284 f2 = MockClass(base, "foo.alt.Bar") 285 clsregistry.add_class("Bar", f2) 286 assert reg["foo"]["bar"] 287 del f1 288 gc_collect() 289 assert "bar" not in reg["foo"] 290 assert "alt" in reg["foo"] 291 292 del f2 293 gc_collect() 294 assert "foo" not in reg.contents 295 296 def test_module_reg_cleanout_sub_to_base(self): 297 base = weakref.WeakValueDictionary() 298 f3 = MockClass(base, "bat.bar.Hoho") 299 clsregistry.add_class("Hoho", f3) 300 reg = base["_sa_module_registry"] 301 302 assert reg["bat"]["bar"] 303 del f3 304 gc_collect() 305 assert "bat" not in reg 306 307 def test_module_reg_cleanout_cls_to_base(self): 308 base = weakref.WeakValueDictionary() 309 f4 = MockClass(base, "single.Blat") 310 clsregistry.add_class("Blat", f4) 311 reg = base["_sa_module_registry"] 312 assert reg["single"] 313 del f4 314 gc_collect() 315 assert "single" not in reg 316