1 2from sqlalchemy.testing import assert_raises, assert_raises_message 3import sqlalchemy as sa 4from sqlalchemy import MetaData, Integer, ForeignKey, util, event 5from sqlalchemy.orm import mapper, relationship, create_session, \ 6 attributes, class_mapper, clear_mappers, instrumentation, events 7from sqlalchemy.testing.schema import Table 8from sqlalchemy.testing.schema import Column 9from sqlalchemy.testing import eq_, ne_ 10from sqlalchemy.testing import fixtures 11from sqlalchemy import testing 12 13 14class InitTest(fixtures.ORMTest): 15 def fixture(self): 16 return Table('t', MetaData(), 17 Column('id', Integer, primary_key=True), 18 Column('type', Integer), 19 Column('x', Integer), 20 Column('y', Integer)) 21 22 def register(self, cls, canary): 23 original_init = cls.__init__ 24 instrumentation.register_class(cls) 25 ne_(cls.__init__, original_init) 26 manager = instrumentation.manager_of_class(cls) 27 def init(state, args, kwargs): 28 canary.append((cls, 'init', state.class_)) 29 event.listen(manager, 'init', init, raw=True) 30 31 def test_ai(self): 32 inits = [] 33 34 class A(object): 35 def __init__(self): 36 inits.append((A, '__init__')) 37 38 obj = A() 39 eq_(inits, [(A, '__init__')]) 40 41 def test_A(self): 42 inits = [] 43 44 class A(object): 45 pass 46 self.register(A, inits) 47 48 obj = A() 49 eq_(inits, [(A, 'init', A)]) 50 51 def test_Ai(self): 52 inits = [] 53 54 class A(object): 55 def __init__(self): 56 inits.append((A, '__init__')) 57 self.register(A, inits) 58 59 obj = A() 60 eq_(inits, [(A, 'init', A), (A, '__init__')]) 61 62 def test_ai_B(self): 63 inits = [] 64 65 class A(object): 66 def __init__(self): 67 inits.append((A, '__init__')) 68 69 class B(A): 70 pass 71 self.register(B, inits) 72 73 obj = A() 74 eq_(inits, [(A, '__init__')]) 75 76 del inits[:] 77 78 obj = B() 79 eq_(inits, [(B, 'init', B), (A, '__init__')]) 80 81 def test_ai_Bi(self): 82 inits = [] 83 84 class A(object): 85 def __init__(self): 86 inits.append((A, '__init__')) 87 88 class B(A): 89 def __init__(self): 90 inits.append((B, '__init__')) 91 super(B, self).__init__() 92 self.register(B, inits) 93 94 obj = A() 95 eq_(inits, [(A, '__init__')]) 96 97 del inits[:] 98 99 obj = B() 100 eq_(inits, [(B, 'init', B), (B, '__init__'), (A, '__init__')]) 101 102 def test_Ai_bi(self): 103 inits = [] 104 105 class A(object): 106 def __init__(self): 107 inits.append((A, '__init__')) 108 self.register(A, inits) 109 110 class B(A): 111 def __init__(self): 112 inits.append((B, '__init__')) 113 super(B, self).__init__() 114 115 obj = A() 116 eq_(inits, [(A, 'init', A), (A, '__init__')]) 117 118 del inits[:] 119 120 obj = B() 121 eq_(inits, [(B, '__init__'), (A, 'init', B), (A, '__init__')]) 122 123 def test_Ai_Bi(self): 124 inits = [] 125 126 class A(object): 127 def __init__(self): 128 inits.append((A, '__init__')) 129 self.register(A, inits) 130 131 class B(A): 132 def __init__(self): 133 inits.append((B, '__init__')) 134 super(B, self).__init__() 135 self.register(B, inits) 136 137 obj = A() 138 eq_(inits, [(A, 'init', A), (A, '__init__')]) 139 140 del inits[:] 141 142 obj = B() 143 eq_(inits, [(B, 'init', B), (B, '__init__'), (A, '__init__')]) 144 145 def test_Ai_B(self): 146 inits = [] 147 148 class A(object): 149 def __init__(self): 150 inits.append((A, '__init__')) 151 self.register(A, inits) 152 153 class B(A): 154 pass 155 self.register(B, inits) 156 157 obj = A() 158 eq_(inits, [(A, 'init', A), (A, '__init__')]) 159 160 del inits[:] 161 162 obj = B() 163 eq_(inits, [(B, 'init', B), (A, '__init__')]) 164 165 def test_Ai_Bi_Ci(self): 166 inits = [] 167 168 class A(object): 169 def __init__(self): 170 inits.append((A, '__init__')) 171 self.register(A, inits) 172 173 class B(A): 174 def __init__(self): 175 inits.append((B, '__init__')) 176 super(B, self).__init__() 177 self.register(B, inits) 178 179 class C(B): 180 def __init__(self): 181 inits.append((C, '__init__')) 182 super(C, self).__init__() 183 self.register(C, inits) 184 185 obj = A() 186 eq_(inits, [(A, 'init', A), (A, '__init__')]) 187 188 del inits[:] 189 190 obj = B() 191 eq_(inits, [(B, 'init', B), (B, '__init__'), (A, '__init__')]) 192 193 del inits[:] 194 obj = C() 195 eq_(inits, [(C, 'init', C), (C, '__init__'), (B, '__init__'), 196 (A, '__init__')]) 197 198 def test_Ai_bi_Ci(self): 199 inits = [] 200 201 class A(object): 202 def __init__(self): 203 inits.append((A, '__init__')) 204 self.register(A, inits) 205 206 class B(A): 207 def __init__(self): 208 inits.append((B, '__init__')) 209 super(B, self).__init__() 210 211 class C(B): 212 def __init__(self): 213 inits.append((C, '__init__')) 214 super(C, self).__init__() 215 self.register(C, inits) 216 217 obj = A() 218 eq_(inits, [(A, 'init', A), (A, '__init__')]) 219 220 del inits[:] 221 222 obj = B() 223 eq_(inits, [(B, '__init__'), (A, 'init', B), (A, '__init__')]) 224 225 del inits[:] 226 obj = C() 227 eq_(inits, [(C, 'init', C), (C, '__init__'), (B, '__init__'), 228 (A, '__init__')]) 229 230 def test_Ai_b_Ci(self): 231 inits = [] 232 233 class A(object): 234 def __init__(self): 235 inits.append((A, '__init__')) 236 self.register(A, inits) 237 238 class B(A): 239 pass 240 241 class C(B): 242 def __init__(self): 243 inits.append((C, '__init__')) 244 super(C, self).__init__() 245 self.register(C, inits) 246 247 obj = A() 248 eq_(inits, [(A, 'init', A), (A, '__init__')]) 249 250 del inits[:] 251 252 obj = B() 253 eq_(inits, [(A, 'init', B), (A, '__init__')]) 254 255 del inits[:] 256 obj = C() 257 eq_(inits, [(C, 'init', C), (C, '__init__'), (A, '__init__')]) 258 259 def test_Ai_B_Ci(self): 260 inits = [] 261 262 class A(object): 263 def __init__(self): 264 inits.append((A, '__init__')) 265 self.register(A, inits) 266 267 class B(A): 268 pass 269 self.register(B, inits) 270 271 class C(B): 272 def __init__(self): 273 inits.append((C, '__init__')) 274 super(C, self).__init__() 275 self.register(C, inits) 276 277 obj = A() 278 eq_(inits, [(A, 'init', A), (A, '__init__')]) 279 280 del inits[:] 281 282 obj = B() 283 eq_(inits, [(B, 'init', B), (A, '__init__')]) 284 285 del inits[:] 286 obj = C() 287 eq_(inits, [(C, 'init', C), (C, '__init__'), (A, '__init__')]) 288 289 def test_Ai_B_C(self): 290 inits = [] 291 292 class A(object): 293 def __init__(self): 294 inits.append((A, '__init__')) 295 self.register(A, inits) 296 297 class B(A): 298 pass 299 self.register(B, inits) 300 301 class C(B): 302 pass 303 self.register(C, inits) 304 305 obj = A() 306 eq_(inits, [(A, 'init', A), (A, '__init__')]) 307 308 del inits[:] 309 310 obj = B() 311 eq_(inits, [(B, 'init', B), (A, '__init__')]) 312 313 del inits[:] 314 obj = C() 315 eq_(inits, [(C, 'init', C), (A, '__init__')]) 316 317 def test_A_Bi_C(self): 318 inits = [] 319 320 class A(object): 321 pass 322 self.register(A, inits) 323 324 class B(A): 325 def __init__(self): 326 inits.append((B, '__init__')) 327 self.register(B, inits) 328 329 class C(B): 330 pass 331 self.register(C, inits) 332 333 obj = A() 334 eq_(inits, [(A, 'init', A)]) 335 336 del inits[:] 337 338 obj = B() 339 eq_(inits, [(B, 'init', B), (B, '__init__')]) 340 341 del inits[:] 342 obj = C() 343 eq_(inits, [(C, 'init', C), (B, '__init__')]) 344 345 def test_A_B_Ci(self): 346 inits = [] 347 348 class A(object): 349 pass 350 self.register(A, inits) 351 352 class B(A): 353 pass 354 self.register(B, inits) 355 356 class C(B): 357 def __init__(self): 358 inits.append((C, '__init__')) 359 self.register(C, inits) 360 361 obj = A() 362 eq_(inits, [(A, 'init', A)]) 363 364 del inits[:] 365 366 obj = B() 367 eq_(inits, [(B, 'init', B)]) 368 369 del inits[:] 370 obj = C() 371 eq_(inits, [(C, 'init', C), (C, '__init__')]) 372 373 def test_A_B_C(self): 374 inits = [] 375 376 class A(object): 377 pass 378 self.register(A, inits) 379 380 class B(A): 381 pass 382 self.register(B, inits) 383 384 class C(B): 385 pass 386 self.register(C, inits) 387 388 obj = A() 389 eq_(inits, [(A, 'init', A)]) 390 391 del inits[:] 392 393 obj = B() 394 eq_(inits, [(B, 'init', B)]) 395 396 del inits[:] 397 obj = C() 398 eq_(inits, [(C, 'init', C)]) 399 400 def test_defaulted_init(self): 401 class X(object): 402 def __init__(self_, a, b=123, c='abc'): 403 self_.a = a 404 self_.b = b 405 self_.c = c 406 instrumentation.register_class(X) 407 408 o = X('foo') 409 eq_(o.a, 'foo') 410 eq_(o.b, 123) 411 eq_(o.c, 'abc') 412 413 class Y(object): 414 unique = object() 415 416 class OutOfScopeForEval(object): 417 def __repr__(self_): 418 # misleading repr 419 return '123' 420 421 outofscope = OutOfScopeForEval() 422 423 def __init__(self_, u=unique, o=outofscope): 424 self_.u = u 425 self_.o = o 426 427 instrumentation.register_class(Y) 428 429 o = Y() 430 assert o.u is Y.unique 431 assert o.o is Y.outofscope 432 433 434class MapperInitTest(fixtures.ORMTest): 435 436 def fixture(self): 437 return Table('t', MetaData(), 438 Column('id', Integer, primary_key=True), 439 Column('type', Integer), 440 Column('x', Integer), 441 Column('y', Integer)) 442 443 def test_partially_mapped_inheritance(self): 444 class A(object): 445 pass 446 447 class B(A): 448 pass 449 450 class C(B): 451 def __init__(self, x): 452 pass 453 454 m = mapper(A, self.fixture()) 455 456 # B is not mapped in the current implementation 457 assert_raises(sa.orm.exc.UnmappedClassError, class_mapper, B) 458 459 # C is not mapped in the current implementation 460 assert_raises(sa.orm.exc.UnmappedClassError, class_mapper, C) 461 462 def test_del_warning(self): 463 class A(object): 464 def __del__(self): 465 pass 466 467 assert_raises_message( 468 sa.exc.SAWarning, 469 r"__del__\(\) method on class " 470 "<class '.*\.A'> will cause " 471 "unreachable cycles and memory leaks, as SQLAlchemy " 472 "instrumentation often creates reference cycles. " 473 "Please remove this method.", 474 mapper, A, self.fixture() 475 ) 476 477class OnLoadTest(fixtures.ORMTest): 478 """Check that Events.load is not hit in regular attributes operations.""" 479 480 def test_basic(self): 481 import pickle 482 483 global A 484 class A(object): 485 pass 486 487 def canary(instance): 488 assert False 489 490 try: 491 instrumentation.register_class(A) 492 manager = instrumentation.manager_of_class(A) 493 event.listen(manager, 'load', canary) 494 495 a = A() 496 p_a = pickle.dumps(a) 497 re_a = pickle.loads(p_a) 498 finally: 499 del A 500 501 502class NativeInstrumentationTest(fixtures.ORMTest): 503 def test_register_reserved_attribute(self): 504 class T(object): 505 pass 506 507 instrumentation.register_class(T) 508 manager = instrumentation.manager_of_class(T) 509 510 sa = instrumentation.ClassManager.STATE_ATTR 511 ma = instrumentation.ClassManager.MANAGER_ATTR 512 513 fails = lambda method, attr: assert_raises( 514 KeyError, getattr(manager, method), attr, property()) 515 516 fails('install_member', sa) 517 fails('install_member', ma) 518 fails('install_descriptor', sa) 519 fails('install_descriptor', ma) 520 521 def test_mapped_stateattr(self): 522 t = Table('t', MetaData(), 523 Column('id', Integer, primary_key=True), 524 Column(instrumentation.ClassManager.STATE_ATTR, Integer)) 525 526 class T(object): 527 pass 528 529 assert_raises(KeyError, mapper, T, t) 530 531 def test_mapped_managerattr(self): 532 t = Table('t', MetaData(), 533 Column('id', Integer, primary_key=True), 534 Column(instrumentation.ClassManager.MANAGER_ATTR, Integer)) 535 536 class T(object): 537 pass 538 assert_raises(KeyError, mapper, T, t) 539 540class Py3KFunctionInstTest(fixtures.ORMTest): 541 __requires__ = ("python3", ) 542 543 544 def _instrument(self, cls): 545 manager = instrumentation.register_class(cls) 546 canary = [] 547 def check(target, args, kwargs): 548 canary.append((args, kwargs)) 549 event.listen(manager, "init", check) 550 return cls, canary 551 552 def test_kw_only_args(self): 553 cls, canary = self._kw_only_fixture() 554 555 a = cls("a", b="b", c="c") 556 eq_(canary, [(('a', ), {'b': 'b', 'c': 'c'})]) 557 558 def test_kw_plus_posn_args(self): 559 cls, canary = self._kw_plus_posn_fixture() 560 561 a = cls("a", 1, 2, 3, b="b", c="c") 562 eq_(canary, [(('a', 1, 2, 3), {'b': 'b', 'c': 'c'})]) 563 564 def test_kw_only_args_plus_opt(self): 565 cls, canary = self._kw_opt_fixture() 566 567 a = cls("a", b="b") 568 eq_(canary, [(('a', ), {'b': 'b', 'c': 'c'})]) 569 570 canary[:] = [] 571 a = cls("a", b="b", c="d") 572 eq_(canary, [(('a', ), {'b': 'b', 'c': 'd'})]) 573 574 def test_kw_only_sig(self): 575 cls, canary = self._kw_only_fixture() 576 assert_raises( 577 TypeError, 578 cls, "a", "b", "c" 579 ) 580 581 def test_kw_plus_opt_sig(self): 582 cls, canary = self._kw_only_fixture() 583 assert_raises( 584 TypeError, 585 cls, "a", "b", "c" 586 ) 587 588 assert_raises( 589 TypeError, 590 cls, "a", "b", c="c" 591 ) 592 593if util.py3k: 594 _locals = {} 595 exec(""" 596def _kw_only_fixture(self): 597 class A(object): 598 def __init__(self, a, *, b, c): 599 self.a = a 600 self.b = b 601 self.c = c 602 return self._instrument(A) 603 604def _kw_plus_posn_fixture(self): 605 class A(object): 606 def __init__(self, a, *args, b, c): 607 self.a = a 608 self.b = b 609 self.c = c 610 return self._instrument(A) 611 612def _kw_opt_fixture(self): 613 class A(object): 614 def __init__(self, a, *, b, c="c"): 615 self.a = a 616 self.b = b 617 self.c = c 618 return self._instrument(A) 619""", _locals) 620 for k in _locals: 621 setattr(Py3KFunctionInstTest, k, _locals[k]) 622 623class MiscTest(fixtures.ORMTest): 624 """Seems basic, but not directly covered elsewhere!""" 625 626 def test_compileonattr(self): 627 t = Table('t', MetaData(), 628 Column('id', Integer, primary_key=True), 629 Column('x', Integer)) 630 class A(object): 631 pass 632 mapper(A, t) 633 634 a = A() 635 assert a.id is None 636 637 def test_compileonattr_rel(self): 638 m = MetaData() 639 t1 = Table('t1', m, 640 Column('id', Integer, primary_key=True), 641 Column('x', Integer)) 642 t2 = Table('t2', m, 643 Column('id', Integer, primary_key=True), 644 Column('t1_id', Integer, ForeignKey('t1.id'))) 645 class A(object): 646 pass 647 class B(object): 648 pass 649 mapper(A, t1, properties=dict(bs=relationship(B))) 650 mapper(B, t2) 651 652 a = A() 653 assert not a.bs 654 655 def test_uninstrument(self): 656 class A(object): 657 pass 658 659 manager = instrumentation.register_class(A) 660 attributes.register_attribute(A, 'x', uselist=False, useobject=False) 661 662 assert instrumentation.manager_of_class(A) is manager 663 instrumentation.unregister_class(A) 664 assert instrumentation.manager_of_class(A) is None 665 assert not hasattr(A, 'x') 666 667 # I prefer 'is' here but on pypy 668 # it seems only == works 669 assert A.__init__ == object.__init__ 670 671 def test_compileonattr_rel_backref_a(self): 672 m = MetaData() 673 t1 = Table('t1', m, 674 Column('id', Integer, primary_key=True), 675 Column('x', Integer)) 676 t2 = Table('t2', m, 677 Column('id', Integer, primary_key=True), 678 Column('t1_id', Integer, ForeignKey('t1.id'))) 679 680 class Base(object): 681 def __init__(self, *args, **kwargs): 682 pass 683 684 for base in object, Base: 685 class A(base): 686 pass 687 class B(base): 688 pass 689 mapper(A, t1, properties=dict(bs=relationship(B, backref='a'))) 690 mapper(B, t2) 691 692 b = B() 693 assert b.a is None 694 a = A() 695 b.a = a 696 697 session = create_session() 698 session.add(b) 699 assert a in session, "base is %s" % base 700 701 def test_compileonattr_rel_backref_b(self): 702 m = MetaData() 703 t1 = Table('t1', m, 704 Column('id', Integer, primary_key=True), 705 Column('x', Integer)) 706 t2 = Table('t2', m, 707 Column('id', Integer, primary_key=True), 708 Column('t1_id', Integer, ForeignKey('t1.id'))) 709 710 class Base(object): 711 def __init__(self): 712 pass 713 class Base_AKW(object): 714 def __init__(self, *args, **kwargs): 715 pass 716 717 for base in object, Base, Base_AKW: 718 class A(base): 719 pass 720 class B(base): 721 pass 722 mapper(A, t1) 723 mapper(B, t2, properties=dict(a=relationship(A, backref='bs'))) 724 725 a = A() 726 b = B() 727 b.a = a 728 729 session = create_session() 730 session.add(a) 731 assert b in session, 'base: %s' % base 732 733 734