1import pickle 2from sqlalchemy.orm import attributes, instrumentation, exc as orm_exc 3from sqlalchemy.orm.collections import collection 4from sqlalchemy.orm.interfaces import AttributeExtension 5from sqlalchemy import exc as sa_exc 6from sqlalchemy.testing import eq_, ne_, assert_raises, \ 7 assert_raises_message 8from sqlalchemy.testing import fixtures 9from sqlalchemy.testing.util import gc_collect, all_partial_orderings 10from sqlalchemy.util import jython 11from sqlalchemy import event 12from sqlalchemy import testing 13from sqlalchemy.testing.mock import Mock, call 14from sqlalchemy.orm.state import InstanceState 15 16# global for pickling tests 17MyTest = None 18MyTest2 = None 19 20 21def _set_callable(state, dict_, key, callable_): 22 fn = InstanceState._instance_level_callable_processor( 23 state.manager, callable_, key) 24 fn(state, dict_, None) 25 26 27class AttributeImplAPITest(fixtures.MappedTest): 28 def _scalar_obj_fixture(self): 29 class A(object): 30 pass 31 class B(object): 32 pass 33 instrumentation.register_class(A) 34 instrumentation.register_class(B) 35 attributes.register_attribute(A, "b", uselist=False, useobject=True) 36 return A, B 37 38 def _collection_obj_fixture(self): 39 class A(object): 40 pass 41 class B(object): 42 pass 43 instrumentation.register_class(A) 44 instrumentation.register_class(B) 45 attributes.register_attribute(A, "b", uselist=True, useobject=True) 46 return A, B 47 48 def test_scalar_obj_remove_invalid(self): 49 A, B = self._scalar_obj_fixture() 50 51 a1 = A() 52 b1 = B() 53 b2 = B() 54 55 A.b.impl.append( 56 attributes.instance_state(a1), 57 attributes.instance_dict(a1), b1, None 58 ) 59 60 assert a1.b is b1 61 62 assert_raises_message( 63 ValueError, 64 "Object <B at .*?> not " 65 "associated with <A at .*?> on attribute 'b'", 66 A.b.impl.remove, 67 attributes.instance_state(a1), 68 attributes.instance_dict(a1), b2, None 69 ) 70 71 def test_scalar_obj_pop_invalid(self): 72 A, B = self._scalar_obj_fixture() 73 74 a1 = A() 75 b1 = B() 76 b2 = B() 77 78 A.b.impl.append( 79 attributes.instance_state(a1), 80 attributes.instance_dict(a1), b1, None 81 ) 82 83 assert a1.b is b1 84 85 A.b.impl.pop( 86 attributes.instance_state(a1), 87 attributes.instance_dict(a1), b2, None 88 ) 89 assert a1.b is b1 90 91 def test_scalar_obj_pop_valid(self): 92 A, B = self._scalar_obj_fixture() 93 94 a1 = A() 95 b1 = B() 96 97 A.b.impl.append( 98 attributes.instance_state(a1), 99 attributes.instance_dict(a1), b1, None 100 ) 101 102 assert a1.b is b1 103 104 A.b.impl.pop( 105 attributes.instance_state(a1), 106 attributes.instance_dict(a1), b1, None 107 ) 108 assert a1.b is None 109 110 def test_collection_obj_remove_invalid(self): 111 A, B = self._collection_obj_fixture() 112 113 a1 = A() 114 b1 = B() 115 b2 = B() 116 117 A.b.impl.append( 118 attributes.instance_state(a1), 119 attributes.instance_dict(a1), b1, None 120 ) 121 122 assert a1.b == [b1] 123 124 assert_raises_message( 125 ValueError, 126 r"list.remove\(.*?\): .* not in list", 127 A.b.impl.remove, 128 attributes.instance_state(a1), 129 attributes.instance_dict(a1), b2, None 130 ) 131 132 def test_collection_obj_pop_invalid(self): 133 A, B = self._collection_obj_fixture() 134 135 a1 = A() 136 b1 = B() 137 b2 = B() 138 139 A.b.impl.append( 140 attributes.instance_state(a1), 141 attributes.instance_dict(a1), b1, None 142 ) 143 144 assert a1.b == [b1] 145 146 A.b.impl.pop( 147 attributes.instance_state(a1), 148 attributes.instance_dict(a1), b2, None 149 ) 150 assert a1.b == [b1] 151 152 def test_collection_obj_pop_valid(self): 153 A, B = self._collection_obj_fixture() 154 155 a1 = A() 156 b1 = B() 157 158 A.b.impl.append( 159 attributes.instance_state(a1), 160 attributes.instance_dict(a1), b1, None 161 ) 162 163 assert a1.b == [b1] 164 165 A.b.impl.pop( 166 attributes.instance_state(a1), 167 attributes.instance_dict(a1), b1, None 168 ) 169 assert a1.b == [] 170 171 172class AttributesTest(fixtures.ORMTest): 173 def setup(self): 174 global MyTest, MyTest2 175 class MyTest(object): pass 176 class MyTest2(object): pass 177 178 def teardown(self): 179 global MyTest, MyTest2 180 MyTest, MyTest2 = None, None 181 182 def test_basic(self): 183 class User(object): 184 pass 185 186 instrumentation.register_class(User) 187 attributes.register_attribute(User, 'user_id', uselist=False, 188 useobject=False) 189 attributes.register_attribute(User, 'user_name', uselist=False, 190 useobject=False) 191 attributes.register_attribute(User, 'email_address', 192 uselist=False, useobject=False) 193 u = User() 194 u.user_id = 7 195 u.user_name = 'john' 196 u.email_address = 'lala@123.com' 197 self.assert_(u.user_id == 7 and u.user_name == 'john' 198 and u.email_address == 'lala@123.com') 199 attributes.instance_state(u)._commit_all(attributes.instance_dict(u)) 200 self.assert_(u.user_id == 7 and u.user_name == 'john' 201 and u.email_address == 'lala@123.com') 202 u.user_name = 'heythere' 203 u.email_address = 'foo@bar.com' 204 self.assert_(u.user_id == 7 and u.user_name == 'heythere' 205 and u.email_address == 'foo@bar.com') 206 207 def test_pickleness(self): 208 instrumentation.register_class(MyTest) 209 instrumentation.register_class(MyTest2) 210 attributes.register_attribute(MyTest, 'user_id', uselist=False, 211 useobject=False) 212 attributes.register_attribute(MyTest, 'user_name', 213 uselist=False, useobject=False) 214 attributes.register_attribute(MyTest, 'email_address', 215 uselist=False, useobject=False) 216 attributes.register_attribute(MyTest2, 'a', uselist=False, 217 useobject=False) 218 attributes.register_attribute(MyTest2, 'b', uselist=False, 219 useobject=False) 220 221 # shouldn't be pickling callables at the class level 222 223 def somecallable(state, passive): 224 return None 225 226 attributes.register_attribute(MyTest, 'mt2', uselist=True, 227 trackparent=True, callable_=somecallable, 228 useobject=True) 229 230 o = MyTest() 231 o.mt2.append(MyTest2()) 232 o.user_id=7 233 o.mt2[0].a = 'abcde' 234 pk_o = pickle.dumps(o) 235 236 o2 = pickle.loads(pk_o) 237 pk_o2 = pickle.dumps(o2) 238 239 240 # the above is kind of distrurbing, so let's do it again a little 241 # differently. the string-id in serialization thing is just an 242 # artifact of pickling that comes up in the first round-trip. 243 # a -> b differs in pickle memoization of 'mt2', but b -> c will 244 # serialize identically. 245 246 o3 = pickle.loads(pk_o2) 247 pk_o3 = pickle.dumps(o3) 248 o4 = pickle.loads(pk_o3) 249 250 # and lastly make sure we still have our data after all that. 251 # identical serialzation is great, *if* it's complete :) 252 self.assert_(o4.user_id == 7) 253 self.assert_(o4.user_name is None) 254 self.assert_(o4.email_address is None) 255 self.assert_(len(o4.mt2) == 1) 256 self.assert_(o4.mt2[0].a == 'abcde') 257 self.assert_(o4.mt2[0].b is None) 258 259 @testing.requires.predictable_gc 260 def test_state_gc(self): 261 """test that InstanceState always has a dict, even after host 262 object gc'ed.""" 263 264 class Foo(object): 265 pass 266 267 instrumentation.register_class(Foo) 268 f = Foo() 269 state = attributes.instance_state(f) 270 f.bar = "foo" 271 eq_(state.dict, {'bar': 'foo', state.manager.STATE_ATTR: state}) 272 del f 273 gc_collect() 274 assert state.obj() is None 275 assert state.dict == {} 276 277 @testing.requires.predictable_gc 278 def test_object_dereferenced_error(self): 279 class Foo(object): 280 pass 281 class Bar(object): 282 def __init__(self): 283 gc_collect() 284 285 instrumentation.register_class(Foo) 286 instrumentation.register_class(Bar) 287 attributes.register_attribute(Foo, 288 'bars', 289 uselist=True, 290 useobject=True) 291 292 assert_raises_message( 293 orm_exc.ObjectDereferencedError, 294 "Can't emit change event for attribute " 295 "'Foo.bars' - parent object of type <Foo> " 296 "has been garbage collected.", 297 lambda: Foo().bars.append(Bar()) 298 ) 299 300 def test_deferred(self): 301 class Foo(object): 302 pass 303 304 data = {'a':'this is a', 'b':12} 305 def loader(state, keys): 306 for k in keys: 307 state.dict[k] = data[k] 308 return attributes.ATTR_WAS_SET 309 310 instrumentation.register_class(Foo) 311 manager = attributes.manager_of_class(Foo) 312 manager.deferred_scalar_loader = loader 313 attributes.register_attribute(Foo, 'a', uselist=False, useobject=False) 314 attributes.register_attribute(Foo, 'b', uselist=False, useobject=False) 315 316 f = Foo() 317 attributes.instance_state(f)._expire(attributes.instance_dict(f), 318 set()) 319 eq_(f.a, 'this is a') 320 eq_(f.b, 12) 321 f.a = 'this is some new a' 322 attributes.instance_state(f)._expire(attributes.instance_dict(f), 323 set()) 324 eq_(f.a, 'this is a') 325 eq_(f.b, 12) 326 attributes.instance_state(f)._expire(attributes.instance_dict(f), 327 set()) 328 f.a = 'this is another new a' 329 eq_(f.a, 'this is another new a') 330 eq_(f.b, 12) 331 attributes.instance_state(f)._expire(attributes.instance_dict(f), 332 set()) 333 eq_(f.a, 'this is a') 334 eq_(f.b, 12) 335 del f.a 336 eq_(f.a, None) 337 eq_(f.b, 12) 338 attributes.instance_state(f)._commit_all(attributes.instance_dict(f), 339 set()) 340 eq_(f.a, None) 341 eq_(f.b, 12) 342 343 def test_deferred_pickleable(self): 344 data = {'a':'this is a', 'b':12} 345 def loader(state, keys): 346 for k in keys: 347 state.dict[k] = data[k] 348 return attributes.ATTR_WAS_SET 349 350 instrumentation.register_class(MyTest) 351 manager = attributes.manager_of_class(MyTest) 352 manager.deferred_scalar_loader=loader 353 attributes.register_attribute(MyTest, 'a', uselist=False, useobject=False) 354 attributes.register_attribute(MyTest, 'b', uselist=False, useobject=False) 355 356 m = MyTest() 357 attributes.instance_state(m)._expire(attributes.instance_dict(m), set()) 358 assert 'a' not in m.__dict__ 359 m2 = pickle.loads(pickle.dumps(m)) 360 assert 'a' not in m2.__dict__ 361 eq_(m2.a, "this is a") 362 eq_(m2.b, 12) 363 364 def test_list(self): 365 class User(object):pass 366 class Address(object):pass 367 368 instrumentation.register_class(User) 369 instrumentation.register_class(Address) 370 attributes.register_attribute(User, 'user_id', uselist=False, 371 useobject=False) 372 attributes.register_attribute(User, 'user_name', uselist=False, 373 useobject=False) 374 attributes.register_attribute(User, 'addresses', uselist=True, 375 useobject=True) 376 attributes.register_attribute(Address, 'address_id', 377 uselist=False, useobject=False) 378 attributes.register_attribute(Address, 'email_address', 379 uselist=False, useobject=False) 380 381 u = User() 382 u.user_id = 7 383 u.user_name = 'john' 384 u.addresses = [] 385 a = Address() 386 a.address_id = 10 387 a.email_address = 'lala@123.com' 388 u.addresses.append(a) 389 390 self.assert_(u.user_id == 7 and u.user_name == 'john' 391 and u.addresses[0].email_address == 'lala@123.com') 392 (u, 393 attributes.instance_state(a)._commit_all(attributes.instance_dict(a))) 394 self.assert_(u.user_id == 7 and u.user_name == 'john' 395 and u.addresses[0].email_address == 'lala@123.com') 396 397 u.user_name = 'heythere' 398 a = Address() 399 a.address_id = 11 400 a.email_address = 'foo@bar.com' 401 u.addresses.append(a) 402 403 eq_(u.user_id, 7) 404 eq_(u.user_name, 'heythere') 405 eq_(u.addresses[0].email_address,'lala@123.com') 406 eq_(u.addresses[1].email_address,'foo@bar.com') 407 408 def test_extension_commit_attr(self): 409 """test that an extension which commits attribute history 410 maintains the end-result history. 411 412 This won't work in conjunction with some unitofwork extensions. 413 414 """ 415 416 class Foo(fixtures.BasicEntity): 417 pass 418 class Bar(fixtures.BasicEntity): 419 pass 420 421 class ReceiveEvents(AttributeExtension): 422 def __init__(self, key): 423 self.key = key 424 425 def append(self, state, child, initiator): 426 if commit: 427 state._commit_all(state.dict) 428 return child 429 430 def remove(self, state, child, initiator): 431 if commit: 432 state._commit_all(state.dict) 433 return child 434 435 def set(self, state, child, oldchild, initiator): 436 if commit: 437 state._commit_all(state.dict) 438 return child 439 440 instrumentation.register_class(Foo) 441 instrumentation.register_class(Bar) 442 443 b1, b2, b3, b4 = Bar(id='b1'), Bar(id='b2'), Bar(id='b3'), Bar(id='b4') 444 445 def loadcollection(state, passive): 446 if passive is attributes.PASSIVE_NO_FETCH: 447 return attributes.PASSIVE_NO_RESULT 448 return [b1, b2] 449 450 def loadscalar(state, passive): 451 if passive is attributes.PASSIVE_NO_FETCH: 452 return attributes.PASSIVE_NO_RESULT 453 return b2 454 455 attributes.register_attribute(Foo, 'bars', 456 uselist=True, 457 useobject=True, 458 callable_=loadcollection, 459 extension=[ReceiveEvents('bars')]) 460 461 attributes.register_attribute(Foo, 'bar', 462 uselist=False, 463 useobject=True, 464 callable_=loadscalar, 465 extension=[ReceiveEvents('bar')]) 466 467 attributes.register_attribute(Foo, 'scalar', 468 uselist=False, 469 useobject=False, extension=[ReceiveEvents('scalar')]) 470 471 472 def create_hist(): 473 def hist(key, shouldmatch, fn, *arg): 474 attributes.instance_state(f1)._commit_all(attributes.instance_dict(f1)) 475 fn(*arg) 476 histories.append((shouldmatch, 477 attributes.get_history(f1, key))) 478 479 f1 = Foo() 480 hist('bars', True, f1.bars.append, b3) 481 hist('bars', True, f1.bars.append, b4) 482 hist('bars', False, f1.bars.remove, b2) 483 hist('bar', True, setattr, f1, 'bar', b3) 484 hist('bar', True, setattr, f1, 'bar', None) 485 hist('bar', True, setattr, f1, 'bar', b4) 486 hist('scalar', True, setattr, f1, 'scalar', 5) 487 hist('scalar', True, setattr, f1, 'scalar', None) 488 hist('scalar', True, setattr, f1, 'scalar', 4) 489 490 histories = [] 491 commit = False 492 create_hist() 493 without_commit = list(histories) 494 histories[:] = [] 495 commit = True 496 create_hist() 497 with_commit = histories 498 for without, with_ in zip(without_commit, with_commit): 499 shouldmatch, woc = without 500 shouldmatch, wic = with_ 501 if shouldmatch: 502 eq_(woc, wic) 503 else: 504 ne_(woc, wic) 505 506 def test_extension_lazyload_assertion(self): 507 class Foo(fixtures.BasicEntity): 508 pass 509 class Bar(fixtures.BasicEntity): 510 pass 511 512 class ReceiveEvents(AttributeExtension): 513 def append(self, state, child, initiator): 514 state.obj().bars 515 return child 516 517 def remove(self, state, child, initiator): 518 state.obj().bars 519 return child 520 521 def set(self, state, child, oldchild, initiator): 522 return child 523 524 instrumentation.register_class(Foo) 525 instrumentation.register_class(Bar) 526 527 bar1, bar2, bar3 = [Bar(id=1), Bar(id=2), Bar(id=3)] 528 def func1(state, passive): 529 if passive is attributes.PASSIVE_NO_FETCH: 530 return attributes.PASSIVE_NO_RESULT 531 532 return [bar1, bar2, bar3] 533 534 attributes.register_attribute(Foo, 'bars', uselist=True, 535 callable_=func1, useobject=True, 536 extension=[ReceiveEvents()]) 537 attributes.register_attribute(Bar, 'foos', uselist=True, 538 useobject=True, backref='bars') 539 540 x = Foo() 541 assert_raises(AssertionError, Bar(id=4).foos.append, x) 542 543 x.bars 544 b = Bar(id=4) 545 b.foos.append(x) 546 attributes.instance_state(x)._expire_attributes(attributes.instance_dict(x), 547 ['bars']) 548 assert_raises(AssertionError, b.foos.remove, x) 549 550 551 def test_scalar_listener(self): 552 553 # listeners on ScalarAttributeImpl aren't used normally. test that 554 # they work for the benefit of user extensions 555 556 class Foo(object): 557 558 pass 559 560 results = [] 561 class ReceiveEvents(AttributeExtension): 562 def append(self, state, child, initiator): 563 assert False 564 565 def remove(self, state, child, initiator): 566 results.append(("remove", state.obj(), child)) 567 568 def set(self, state, child, oldchild, initiator): 569 results.append(("set", state.obj(), child, oldchild)) 570 return child 571 572 instrumentation.register_class(Foo) 573 attributes.register_attribute(Foo, 'x', uselist=False, 574 useobject=False, 575 extension=ReceiveEvents()) 576 577 f = Foo() 578 f.x = 5 579 f.x = 17 580 del f.x 581 582 eq_(results, [ 583 ('set', f, 5, attributes.NEVER_SET), 584 ('set', f, 17, 5), 585 ('remove', f, 17), 586 ]) 587 588 def test_lazytrackparent(self): 589 """test that the "hasparent" flag works properly 590 when lazy loaders and backrefs are used 591 592 """ 593 594 class Post(object): 595 pass 596 class Blog(object): 597 pass 598 instrumentation.register_class(Post) 599 instrumentation.register_class(Blog) 600 601 # set up instrumented attributes with backrefs 602 attributes.register_attribute(Post, 'blog', uselist=False, 603 backref='posts', 604 trackparent=True, useobject=True) 605 attributes.register_attribute(Blog, 'posts', uselist=True, 606 backref='blog', 607 trackparent=True, useobject=True) 608 609 # create objects as if they'd been freshly loaded from the database (without history) 610 b = Blog() 611 p1 = Post() 612 _set_callable(attributes.instance_state(b), attributes.instance_dict(b), 613 'posts', lambda state, passive:[p1]) 614 _set_callable(attributes.instance_state(p1), attributes.instance_dict(p1), 615 'blog', lambda state, passive:b) 616 p1, attributes.instance_state(b)._commit_all(attributes.instance_dict(b)) 617 618 # no orphans (called before the lazy loaders fire off) 619 assert attributes.has_parent(Blog, p1, 'posts', optimistic=True) 620 assert attributes.has_parent(Post, b, 'blog', optimistic=True) 621 622 # assert connections 623 assert p1.blog is b 624 assert p1 in b.posts 625 626 # manual connections 627 b2 = Blog() 628 p2 = Post() 629 b2.posts.append(p2) 630 assert attributes.has_parent(Blog, p2, 'posts') 631 assert attributes.has_parent(Post, b2, 'blog') 632 633 def test_illegal_trackparent(self): 634 class Post(object):pass 635 class Blog(object):pass 636 instrumentation.register_class(Post) 637 instrumentation.register_class(Blog) 638 639 attributes.register_attribute(Post, 'blog', useobject=True) 640 assert_raises_message( 641 AssertionError, 642 "This AttributeImpl is not configured to track parents.", 643 attributes.has_parent, Post, Blog(), 'blog' 644 ) 645 assert_raises_message( 646 AssertionError, 647 "This AttributeImpl is not configured to track parents.", 648 Post.blog.impl.sethasparent, "x", "x", True 649 ) 650 651 652 def test_inheritance(self): 653 """tests that attributes are polymorphic""" 654 655 class Foo(object):pass 656 class Bar(Foo):pass 657 658 659 instrumentation.register_class(Foo) 660 instrumentation.register_class(Bar) 661 662 def func1(state, passive): 663 return "this is the foo attr" 664 def func2(state, passive): 665 return "this is the bar attr" 666 def func3(state, passive): 667 return "this is the shared attr" 668 attributes.register_attribute(Foo, 'element', uselist=False, 669 callable_=func1, useobject=True) 670 attributes.register_attribute(Foo, 'element2', uselist=False, 671 callable_=func3, useobject=True) 672 attributes.register_attribute(Bar, 'element', uselist=False, 673 callable_=func2, useobject=True) 674 675 x = Foo() 676 y = Bar() 677 assert x.element == 'this is the foo attr' 678 assert y.element == 'this is the bar attr' 679 assert x.element2 == 'this is the shared attr' 680 assert y.element2 == 'this is the shared attr' 681 682 def test_no_double_state(self): 683 states = set() 684 class Foo(object): 685 def __init__(self): 686 states.add(attributes.instance_state(self)) 687 class Bar(Foo): 688 def __init__(self): 689 states.add(attributes.instance_state(self)) 690 Foo.__init__(self) 691 692 693 instrumentation.register_class(Foo) 694 instrumentation.register_class(Bar) 695 696 b = Bar() 697 eq_(len(states), 1) 698 eq_(list(states)[0].obj(), b) 699 700 701 def test_inheritance2(self): 702 """test that the attribute manager can properly traverse the 703 managed attributes of an object, if the object is of a 704 descendant class with managed attributes in the parent class""" 705 706 class Foo(object): 707 pass 708 709 class Bar(Foo): 710 pass 711 712 class Element(object): 713 _state = True 714 715 instrumentation.register_class(Foo) 716 instrumentation.register_class(Bar) 717 attributes.register_attribute(Foo, 'element', uselist=False, 718 useobject=True) 719 el = Element() 720 x = Bar() 721 x.element = el 722 eq_(attributes.get_state_history(attributes.instance_state(x), 723 'element'), ([el], (), ())) 724 attributes.instance_state(x)._commit_all(attributes.instance_dict(x)) 725 added, unchanged, deleted = \ 726 attributes.get_state_history(attributes.instance_state(x), 727 'element') 728 assert added == () 729 assert unchanged == [el] 730 731 def test_lazyhistory(self): 732 """tests that history functions work with lazy-loading attributes""" 733 734 class Foo(fixtures.BasicEntity): 735 pass 736 class Bar(fixtures.BasicEntity): 737 pass 738 739 instrumentation.register_class(Foo) 740 instrumentation.register_class(Bar) 741 bar1, bar2, bar3, bar4 = [Bar(id=1), Bar(id=2), Bar(id=3), 742 Bar(id=4)] 743 744 def func1(state, passive): 745 return 'this is func 1' 746 747 def func2(state, passive): 748 return [bar1, bar2, bar3] 749 750 attributes.register_attribute(Foo, 'col1', uselist=False, 751 callable_=func1, useobject=True) 752 attributes.register_attribute(Foo, 'col2', uselist=True, 753 callable_=func2, useobject=True) 754 attributes.register_attribute(Bar, 'id', uselist=False, 755 useobject=True) 756 x = Foo() 757 attributes.instance_state(x)._commit_all(attributes.instance_dict(x)) 758 x.col2.append(bar4) 759 eq_(attributes.get_state_history(attributes.instance_state(x), 760 'col2'), ([bar4], [bar1, bar2, bar3], [])) 761 762 def test_parenttrack(self): 763 class Foo(object): 764 pass 765 766 class Bar(object): 767 pass 768 769 instrumentation.register_class(Foo) 770 instrumentation.register_class(Bar) 771 attributes.register_attribute(Foo, 'element', uselist=False, 772 trackparent=True, useobject=True) 773 attributes.register_attribute(Bar, 'element', uselist=False, 774 trackparent=True, useobject=True) 775 f1 = Foo() 776 f2 = Foo() 777 b1 = Bar() 778 b2 = Bar() 779 f1.element = b1 780 b2.element = f2 781 assert attributes.has_parent(Foo, b1, 'element') 782 assert not attributes.has_parent(Foo, b2, 'element') 783 assert not attributes.has_parent(Foo, f2, 'element') 784 assert attributes.has_parent(Bar, f2, 'element') 785 b2.element = None 786 assert not attributes.has_parent(Bar, f2, 'element') 787 788 # test that double assignment doesn't accidentally reset the 789 # 'parent' flag. 790 791 b3 = Bar() 792 f4 = Foo() 793 b3.element = f4 794 assert attributes.has_parent(Bar, f4, 'element') 795 b3.element = f4 796 assert attributes.has_parent(Bar, f4, 'element') 797 798 def test_descriptorattributes(self): 799 """changeset: 1633 broke ability to use ORM to map classes with 800 unusual descriptor attributes (for example, classes that inherit 801 from ones implementing zope.interface.Interface). This is a 802 simple regression test to prevent that defect. """ 803 804 class des(object): 805 806 def __get__(self, instance, owner): 807 raise AttributeError('fake attribute') 808 809 class Foo(object): 810 A = des() 811 812 instrumentation.register_class(Foo) 813 instrumentation.unregister_class(Foo) 814 815 def test_collectionclasses(self): 816 817 class Foo(object): 818 pass 819 820 instrumentation.register_class(Foo) 821 attributes.register_attribute(Foo, 'collection', uselist=True, 822 typecallable=set, useobject=True) 823 assert attributes.manager_of_class(Foo).is_instrumented('collection' 824 ) 825 assert isinstance(Foo().collection, set) 826 attributes.unregister_attribute(Foo, 'collection') 827 assert not attributes.manager_of_class(Foo).is_instrumented('collection' 828 ) 829 try: 830 attributes.register_attribute(Foo, 'collection', 831 uselist=True, typecallable=dict, useobject=True) 832 assert False 833 except sa_exc.ArgumentError as e: 834 assert str(e) \ 835 == 'Type InstrumentedDict must elect an appender '\ 836 'method to be a collection class' 837 838 class MyDict(dict): 839 840 @collection.appender 841 def append(self, item): 842 self[item.foo] = item 843 844 @collection.remover 845 def remove(self, item): 846 del self[item.foo] 847 848 attributes.register_attribute(Foo, 'collection', uselist=True, 849 typecallable=MyDict, useobject=True) 850 assert isinstance(Foo().collection, MyDict) 851 attributes.unregister_attribute(Foo, 'collection') 852 853 class MyColl(object): 854 pass 855 856 try: 857 attributes.register_attribute(Foo, 'collection', 858 uselist=True, typecallable=MyColl, useobject=True) 859 assert False 860 except sa_exc.ArgumentError as e: 861 assert str(e) \ 862 == 'Type MyColl must elect an appender method to be a '\ 863 'collection class' 864 865 class MyColl(object): 866 867 @collection.iterator 868 def __iter__(self): 869 return iter([]) 870 871 @collection.appender 872 def append(self, item): 873 pass 874 875 @collection.remover 876 def remove(self, item): 877 pass 878 879 attributes.register_attribute(Foo, 'collection', uselist=True, 880 typecallable=MyColl, useobject=True) 881 try: 882 Foo().collection 883 assert True 884 except sa_exc.ArgumentError as e: 885 assert False 886 887class GetNoValueTest(fixtures.ORMTest): 888 def _fixture(self, expected): 889 class Foo(object): 890 pass 891 892 class Bar(object): 893 pass 894 895 def lazy_callable(state, passive): 896 return expected 897 898 instrumentation.register_class(Foo) 899 instrumentation.register_class(Bar) 900 if expected is not None: 901 attributes.register_attribute(Foo, 902 "attr", useobject=True, 903 uselist=False, callable_=lazy_callable) 904 else: 905 attributes.register_attribute(Foo, 906 "attr", useobject=True, 907 uselist=False) 908 909 f1 = self.f1 = Foo() 910 return Foo.attr.impl,\ 911 attributes.instance_state(f1), \ 912 attributes.instance_dict(f1) 913 914 915 def test_passive_no_result(self): 916 attr, state, dict_ = self._fixture(attributes.PASSIVE_NO_RESULT) 917 eq_( 918 attr.get(state, dict_, passive=attributes.PASSIVE_NO_INITIALIZE), 919 attributes.PASSIVE_NO_RESULT 920 ) 921 922 def test_passive_no_result_never_set(self): 923 attr, state, dict_ = self._fixture(attributes.NEVER_SET) 924 eq_( 925 attr.get(state, dict_, passive=attributes.PASSIVE_NO_INITIALIZE), 926 attributes.PASSIVE_NO_RESULT 927 ) 928 assert 'attr' not in dict_ 929 930 def test_passive_ret_never_set_never_set(self): 931 attr, state, dict_ = self._fixture(attributes.NEVER_SET) 932 eq_( 933 attr.get(state, dict_, passive=attributes.PASSIVE_RETURN_NEVER_SET), 934 attributes.NEVER_SET 935 ) 936 assert 'attr' not in dict_ 937 938 def test_passive_ret_never_set_empty(self): 939 attr, state, dict_ = self._fixture(None) 940 eq_( 941 attr.get(state, dict_, passive=attributes.PASSIVE_RETURN_NEVER_SET), 942 attributes.NEVER_SET 943 ) 944 assert 'attr' not in dict_ 945 946 def test_off_empty(self): 947 attr, state, dict_ = self._fixture(None) 948 eq_( 949 attr.get(state, dict_, passive=attributes.PASSIVE_OFF), 950 None 951 ) 952 assert 'attr' not in dict_ 953 954class UtilTest(fixtures.ORMTest): 955 def test_helpers(self): 956 class Foo(object): 957 pass 958 959 class Bar(object): 960 pass 961 962 instrumentation.register_class(Foo) 963 instrumentation.register_class(Bar) 964 attributes.register_attribute(Foo, "coll", uselist=True, useobject=True) 965 966 f1 = Foo() 967 b1 = Bar() 968 b2 = Bar() 969 coll = attributes.init_collection(f1, "coll") 970 assert coll.data is f1.coll 971 assert attributes.get_attribute(f1, "coll") is f1.coll 972 attributes.set_attribute(f1, "coll", [b1]) 973 assert f1.coll == [b1] 974 eq_(attributes.get_history(f1, "coll"), ([b1], [], [])) 975 attributes.set_committed_value(f1, "coll", [b2]) 976 eq_(attributes.get_history(f1, "coll"), ((), [b2], ())) 977 978 attributes.del_attribute(f1, "coll") 979 assert "coll" not in f1.__dict__ 980 981class BackrefTest(fixtures.ORMTest): 982 983 def test_m2m(self): 984 class Student(object):pass 985 class Course(object):pass 986 987 instrumentation.register_class(Student) 988 instrumentation.register_class(Course) 989 attributes.register_attribute(Student, 'courses', uselist=True, 990 backref="students", useobject=True) 991 attributes.register_attribute(Course, 'students', uselist=True, 992 backref="courses", useobject=True) 993 994 s = Student() 995 c = Course() 996 s.courses.append(c) 997 self.assert_(c.students == [s]) 998 s.courses.remove(c) 999 self.assert_(c.students == []) 1000 1001 (s1, s2, s3) = (Student(), Student(), Student()) 1002 1003 c.students = [s1, s2, s3] 1004 self.assert_(s2.courses == [c]) 1005 self.assert_(s1.courses == [c]) 1006 s1.courses.remove(c) 1007 self.assert_(c.students == [s2,s3]) 1008 1009 def test_o2m(self): 1010 class Post(object):pass 1011 class Blog(object):pass 1012 1013 instrumentation.register_class(Post) 1014 instrumentation.register_class(Blog) 1015 attributes.register_attribute(Post, 'blog', uselist=False, 1016 backref='posts', 1017 trackparent=True, useobject=True) 1018 attributes.register_attribute(Blog, 'posts', uselist=True, 1019 backref='blog', 1020 trackparent=True, useobject=True) 1021 b = Blog() 1022 (p1, p2, p3) = (Post(), Post(), Post()) 1023 b.posts.append(p1) 1024 b.posts.append(p2) 1025 b.posts.append(p3) 1026 self.assert_(b.posts == [p1, p2, p3]) 1027 self.assert_(p2.blog is b) 1028 1029 p3.blog = None 1030 self.assert_(b.posts == [p1, p2]) 1031 p4 = Post() 1032 p4.blog = b 1033 self.assert_(b.posts == [p1, p2, p4]) 1034 1035 p4.blog = b 1036 p4.blog = b 1037 self.assert_(b.posts == [p1, p2, p4]) 1038 1039 # assert no failure removing None 1040 p5 = Post() 1041 p5.blog = None 1042 del p5.blog 1043 1044 def test_o2o(self): 1045 class Port(object):pass 1046 class Jack(object):pass 1047 instrumentation.register_class(Port) 1048 instrumentation.register_class(Jack) 1049 1050 attributes.register_attribute(Port, 'jack', uselist=False, 1051 useobject=True, backref="port") 1052 1053 attributes.register_attribute(Jack, 'port', uselist=False, 1054 useobject=True, backref="jack") 1055 1056 1057 p = Port() 1058 j = Jack() 1059 p.jack = j 1060 self.assert_(j.port is p) 1061 self.assert_(p.jack is not None) 1062 1063 j.port = None 1064 self.assert_(p.jack is None) 1065 1066 def test_symmetric_o2o_inheritance(self): 1067 """Test that backref 'initiator' catching goes against 1068 a token that is global to all InstrumentedAttribute objects 1069 within a particular class, not just the indvidual IA object 1070 since we use distinct objects in an inheritance scenario. 1071 1072 """ 1073 1074 class Parent(object): 1075 pass 1076 class Child(object): 1077 pass 1078 class SubChild(Child): 1079 pass 1080 1081 p_token = object() 1082 c_token = object() 1083 1084 instrumentation.register_class(Parent) 1085 instrumentation.register_class(Child) 1086 instrumentation.register_class(SubChild) 1087 attributes.register_attribute(Parent, 'child', uselist=False, 1088 backref="parent", 1089 parent_token = p_token, 1090 useobject=True) 1091 attributes.register_attribute(Child, 'parent', uselist=False, 1092 backref="child", 1093 parent_token = c_token, 1094 useobject=True) 1095 attributes.register_attribute(SubChild, 'parent', 1096 uselist=False, 1097 backref="child", 1098 parent_token = c_token, 1099 useobject=True) 1100 1101 p1 = Parent() 1102 c1 = Child() 1103 p1.child = c1 1104 1105 c2 = SubChild() 1106 c2.parent = p1 1107 1108 def test_symmetric_o2m_inheritance(self): 1109 class Parent(object): 1110 pass 1111 class SubParent(Parent): 1112 pass 1113 class Child(object): 1114 pass 1115 1116 p_token = object() 1117 c_token = object() 1118 1119 instrumentation.register_class(Parent) 1120 instrumentation.register_class(SubParent) 1121 instrumentation.register_class(Child) 1122 attributes.register_attribute(Parent, 'children', uselist=True, 1123 backref='parent', 1124 parent_token = p_token, 1125 useobject=True) 1126 attributes.register_attribute(SubParent, 'children', uselist=True, 1127 backref='parent', 1128 parent_token = p_token, 1129 useobject=True) 1130 attributes.register_attribute(Child, 'parent', uselist=False, 1131 backref='children', 1132 parent_token = c_token, 1133 useobject=True) 1134 1135 p1 = Parent() 1136 p2 = SubParent() 1137 c1 = Child() 1138 1139 p1.children.append(c1) 1140 1141 assert c1.parent is p1 1142 assert c1 in p1.children 1143 1144 p2.children.append(c1) 1145 assert c1.parent is p2 1146 1147 # event propagates to remove as of [ticket:2789] 1148 assert c1 not in p1.children 1149 1150class CyclicBackrefAssertionTest(fixtures.TestBase): 1151 """test that infinite recursion due to incorrect backref assignments 1152 is blocked. 1153 1154 """ 1155 def test_scalar_set_type_assertion(self): 1156 A, B, C = self._scalar_fixture() 1157 c1 = C() 1158 b1 = B() 1159 assert_raises_message( 1160 ValueError, 1161 'Bidirectional attribute conflict detected: ' 1162 'Passing object <B at .*> to attribute "C.a" ' 1163 'triggers a modify event on attribute "C.b" ' 1164 'via the backref "B.c".', 1165 setattr, c1, 'a', b1 1166 ) 1167 1168 def test_collection_append_type_assertion(self): 1169 A, B, C = self._collection_fixture() 1170 c1 = C() 1171 b1 = B() 1172 assert_raises_message( 1173 ValueError, 1174 'Bidirectional attribute conflict detected: ' 1175 'Passing object <B at .*> to attribute "C.a" ' 1176 'triggers a modify event on attribute "C.b" ' 1177 'via the backref "B.c".', 1178 c1.a.append, b1 1179 ) 1180 1181 1182 def _scalar_fixture(self): 1183 class A(object): 1184 pass 1185 class B(object): 1186 pass 1187 class C(object): 1188 pass 1189 instrumentation.register_class(A) 1190 instrumentation.register_class(B) 1191 instrumentation.register_class(C) 1192 attributes.register_attribute(C, 'a', backref='c', useobject=True) 1193 attributes.register_attribute(C, 'b', backref='c', useobject=True) 1194 1195 attributes.register_attribute(A, 'c', backref='a', useobject=True, 1196 uselist=True) 1197 attributes.register_attribute(B, 'c', backref='b', useobject=True, 1198 uselist=True) 1199 1200 return A, B, C 1201 1202 def _collection_fixture(self): 1203 class A(object): 1204 pass 1205 class B(object): 1206 pass 1207 class C(object): 1208 pass 1209 instrumentation.register_class(A) 1210 instrumentation.register_class(B) 1211 instrumentation.register_class(C) 1212 1213 attributes.register_attribute(C, 'a', backref='c', useobject=True, 1214 uselist=True) 1215 attributes.register_attribute(C, 'b', backref='c', useobject=True, 1216 uselist=True) 1217 1218 attributes.register_attribute(A, 'c', backref='a', useobject=True) 1219 attributes.register_attribute(B, 'c', backref='b', useobject=True) 1220 1221 return A, B, C 1222 1223 def _broken_collection_fixture(self): 1224 class A(object): 1225 pass 1226 class B(object): 1227 pass 1228 instrumentation.register_class(A) 1229 instrumentation.register_class(B) 1230 1231 attributes.register_attribute(A, 'b', backref='a1', useobject=True) 1232 attributes.register_attribute(B, 'a1', backref='b', useobject=True, 1233 uselist=True) 1234 1235 attributes.register_attribute(B, 'a2', backref='b', useobject=True, 1236 uselist=True) 1237 1238 return A, B 1239 1240 def test_broken_collection_assertion(self): 1241 A, B = self._broken_collection_fixture() 1242 b1 = B() 1243 a1 = A() 1244 assert_raises_message( 1245 ValueError, 1246 'Bidirectional attribute conflict detected: ' 1247 'Passing object <A at .*> to attribute "B.a2" ' 1248 'triggers a modify event on attribute "B.a1" ' 1249 'via the backref "A.b".', 1250 b1.a2.append, a1 1251 ) 1252 1253class PendingBackrefTest(fixtures.ORMTest): 1254 def _fixture(self): 1255 class Post(object): 1256 def __init__(self, name): 1257 self.name = name 1258 __hash__ = None 1259 def __eq__(self, other): 1260 return other is not None and other.name == self.name 1261 1262 class Blog(object): 1263 def __init__(self, name): 1264 self.name = name 1265 __hash__ = None 1266 def __eq__(self, other): 1267 return other is not None and other.name == self.name 1268 1269 lazy_posts = Mock() 1270 1271 instrumentation.register_class(Post) 1272 instrumentation.register_class(Blog) 1273 attributes.register_attribute(Post, 'blog', uselist=False, 1274 backref='posts', trackparent=True, useobject=True) 1275 attributes.register_attribute(Blog, 'posts', uselist=True, 1276 backref='blog', callable_=lazy_posts, trackparent=True, 1277 useobject=True) 1278 1279 return Post, Blog, lazy_posts 1280 1281 def test_lazy_add(self): 1282 Post, Blog, lazy_posts = self._fixture() 1283 1284 p1, p2, p3 = Post("post 1"), Post("post 2"), Post("post 3") 1285 lazy_posts.return_value = attributes.PASSIVE_NO_RESULT 1286 1287 b = Blog("blog 1") 1288 b1_state = attributes.instance_state(b) 1289 1290 p = Post("post 4") 1291 1292 p.blog = b 1293 eq_( 1294 lazy_posts.mock_calls, [ 1295 call(b1_state, attributes.PASSIVE_NO_FETCH) 1296 ] 1297 ) 1298 1299 p = Post("post 5") 1300 1301 # setting blog doesn't call 'posts' callable, calls with no fetch 1302 p.blog = b 1303 eq_( 1304 lazy_posts.mock_calls, [ 1305 call(b1_state, attributes.PASSIVE_NO_FETCH), 1306 call(b1_state, attributes.PASSIVE_NO_FETCH) 1307 ] 1308 ) 1309 1310 lazy_posts.return_value = [p1, p2, p3] 1311 1312 # calling backref calls the callable, populates extra posts 1313 eq_(b.posts, [p1, p2, p3, Post("post 4"), Post("post 5")]) 1314 eq_( 1315 lazy_posts.mock_calls, [ 1316 call(b1_state, attributes.PASSIVE_NO_FETCH), 1317 call(b1_state, attributes.PASSIVE_NO_FETCH), 1318 call(b1_state, attributes.PASSIVE_OFF) 1319 ] 1320 ) 1321 1322 def test_lazy_history_collection(self): 1323 Post, Blog, lazy_posts = self._fixture() 1324 1325 p1, p2, p3 = Post("post 1"), Post("post 2"), Post("post 3") 1326 lazy_posts.return_value = [p1, p2, p3] 1327 1328 b = Blog("blog 1") 1329 p = Post("post 4") 1330 p.blog = b 1331 1332 p4 = Post("post 5") 1333 p4.blog = b 1334 1335 eq_(lazy_posts.call_count, 1) 1336 1337 eq_(attributes.instance_state(b). 1338 get_history('posts', attributes.PASSIVE_OFF), 1339 ([p, p4], [p1, p2, p3], [])) 1340 eq_(lazy_posts.call_count, 1) 1341 1342 def test_passive_history_collection_never_set(self): 1343 Post, Blog, lazy_posts = self._fixture() 1344 1345 lazy_posts.return_value = attributes.PASSIVE_NO_RESULT 1346 1347 b = Blog("blog 1") 1348 p = Post("post 1") 1349 1350 state, dict_ = attributes.instance_state(b), attributes.instance_dict(b) 1351 1352 # this sets up NEVER_SET on b.posts 1353 p.blog = b 1354 1355 eq_(state.committed_state, {"posts": attributes.NEVER_SET}) 1356 assert 'posts' not in dict_ 1357 1358 # then suppose the object was made transient again, 1359 # the lazy loader would return this 1360 lazy_posts.return_value = attributes.ATTR_EMPTY 1361 1362 p2 = Post('asdf') 1363 p2.blog = b 1364 1365 eq_(state.committed_state, {"posts": attributes.NEVER_SET}) 1366 eq_(dict_['posts'], [p2]) 1367 1368 # then this would fail. 1369 eq_( 1370 Blog.posts.impl.get_history(state, dict_, passive=True), 1371 ([p2], (), ()) 1372 ) 1373 1374 eq_( 1375 Blog.posts.impl.get_all_pending(state, dict_), 1376 [(attributes.instance_state(p2), p2)] 1377 ) 1378 1379 def test_state_on_add_remove(self): 1380 Post, Blog, lazy_posts = self._fixture() 1381 lazy_posts.return_value = attributes.PASSIVE_NO_RESULT 1382 1383 b = Blog("blog 1") 1384 b1_state = attributes.instance_state(b) 1385 p = Post("post 1") 1386 p.blog = b 1387 eq_(lazy_posts.mock_calls, 1388 [call(b1_state, attributes.PASSIVE_NO_FETCH)]) 1389 p.blog = None 1390 eq_(lazy_posts.mock_calls, 1391 [call(b1_state, attributes.PASSIVE_NO_FETCH), 1392 call(b1_state, attributes.PASSIVE_NO_FETCH)]) 1393 lazy_posts.return_value = [] 1394 eq_(b.posts, []) 1395 eq_(lazy_posts.mock_calls, 1396 [call(b1_state, attributes.PASSIVE_NO_FETCH), 1397 call(b1_state, attributes.PASSIVE_NO_FETCH), 1398 call(b1_state, attributes.PASSIVE_OFF)]) 1399 1400 def test_pending_combines_with_lazy(self): 1401 Post, Blog, lazy_posts = self._fixture() 1402 1403 lazy_posts.return_value = attributes.PASSIVE_NO_RESULT 1404 1405 b = Blog("blog 1") 1406 p = Post("post 1") 1407 p2 = Post("post 2") 1408 p.blog = b 1409 eq_(lazy_posts.call_count, 1) 1410 1411 lazy_posts.return_value = [p, p2] 1412 1413 # lazy loaded + pending get added together. 1414 # This isn't seen often with the ORM due 1415 # to usual practices surrounding the 1416 # load/flush/load cycle. 1417 eq_(b.posts, [p, p2, p]) 1418 eq_(lazy_posts.call_count, 2) 1419 1420 def test_normal_load(self): 1421 Post, Blog, lazy_posts = self._fixture() 1422 1423 lazy_posts.return_value = \ 1424 (p1, p2, p3) = [Post("post 1"), Post("post 2"), Post("post 3")] 1425 1426 b = Blog("blog 1") 1427 1428 # assign without using backref system 1429 p2.__dict__['blog'] = b 1430 1431 eq_(b.posts, [Post("post 1"), Post("post 2"), Post("post 3")]) 1432 1433 eq_(lazy_posts.call_count, 1) 1434 p2.blog = None 1435 p4 = Post("post 4") 1436 p4.blog = b 1437 eq_(b.posts, [Post("post 1"), Post("post 3"), Post("post 4")]) 1438 1439 b_state = attributes.instance_state(b) 1440 1441 eq_(lazy_posts.call_count, 1) 1442 eq_(lazy_posts.mock_calls, 1443 [call(b_state, attributes.PASSIVE_OFF)]) 1444 1445 1446 def test_commit_removes_pending(self): 1447 Post, Blog, lazy_posts = self._fixture() 1448 1449 p1 = Post("post 1") 1450 1451 lazy_posts.return_value = attributes.PASSIVE_NO_RESULT 1452 b = Blog("blog 1") 1453 p1.blog = b 1454 1455 b_state = attributes.instance_state(b) 1456 p1_state = attributes.instance_state(p1) 1457 b_state._commit_all(attributes.instance_dict(b)) 1458 p1_state._commit_all(attributes.instance_dict(p1)) 1459 lazy_posts.return_value = [p1] 1460 eq_(b.posts, [Post("post 1")]) 1461 eq_(lazy_posts.mock_calls, 1462 [call(b_state, attributes.PASSIVE_NO_FETCH), 1463 call(b_state, attributes.PASSIVE_OFF)]) 1464 1465class HistoryTest(fixtures.TestBase): 1466 1467 def _fixture(self, uselist, useobject, active_history, **kw): 1468 class Foo(fixtures.BasicEntity): 1469 pass 1470 1471 instrumentation.register_class(Foo) 1472 attributes.register_attribute( 1473 Foo, 'someattr', 1474 uselist=uselist, 1475 useobject=useobject, 1476 active_history=active_history, 1477 **kw) 1478 return Foo 1479 1480 def _two_obj_fixture(self, uselist): 1481 class Foo(fixtures.BasicEntity): 1482 pass 1483 class Bar(fixtures.BasicEntity): 1484 def __bool__(self): 1485 assert False 1486 1487 instrumentation.register_class(Foo) 1488 instrumentation.register_class(Bar) 1489 attributes.register_attribute(Foo, 'someattr', uselist=uselist, 1490 useobject=True) 1491 return Foo, Bar 1492 1493 def _someattr_history(self, f, **kw): 1494 passive = kw.pop('passive', None) 1495 if passive is True: 1496 kw['passive'] = attributes.PASSIVE_NO_INITIALIZE 1497 elif passive is False: 1498 kw['passive'] = attributes.PASSIVE_OFF 1499 1500 return attributes.get_state_history( 1501 attributes.instance_state(f), 1502 'someattr', **kw) 1503 1504 def _commit_someattr(self, f): 1505 attributes.instance_state(f)._commit(attributes.instance_dict(f), 1506 ['someattr']) 1507 1508 def _someattr_committed_state(self, f): 1509 Foo = f.__class__ 1510 return Foo.someattr.impl.get_committed_value( 1511 attributes.instance_state(f), 1512 attributes.instance_dict(f)) 1513 1514 def test_committed_value_init(self): 1515 Foo = self._fixture(uselist=False, useobject=False, 1516 active_history=False) 1517 f = Foo() 1518 eq_(self._someattr_committed_state(f), None) 1519 1520 def test_committed_value_set(self): 1521 Foo = self._fixture(uselist=False, useobject=False, 1522 active_history=False) 1523 f = Foo() 1524 f.someattr = 3 1525 eq_(self._someattr_committed_state(f), None) 1526 1527 def test_committed_value_set_active_hist(self): 1528 Foo = self._fixture(uselist=False, useobject=False, 1529 active_history=True) 1530 f = Foo() 1531 f.someattr = 3 1532 eq_(self._someattr_committed_state(f), None) 1533 1534 def test_committed_value_set_commit(self): 1535 Foo = self._fixture(uselist=False, useobject=False, 1536 active_history=False) 1537 f = Foo() 1538 f.someattr = 3 1539 self._commit_someattr(f) 1540 eq_(self._someattr_committed_state(f), 3) 1541 1542 def test_scalar_init(self): 1543 Foo = self._fixture(uselist=False, useobject=False, 1544 active_history=False) 1545 f = Foo() 1546 eq_(self._someattr_history(f), ((), (), ())) 1547 1548 def test_object_init(self): 1549 Foo = self._fixture(uselist=False, useobject=True, 1550 active_history=False) 1551 f = Foo() 1552 eq_(self._someattr_history(f), ((), (), ())) 1553 1554 def test_object_init_active_history(self): 1555 Foo = self._fixture(uselist=False, useobject=True, 1556 active_history=True) 1557 f = Foo() 1558 eq_(self._someattr_history(f), ((), (), ())) 1559 1560 def test_scalar_no_init_side_effect(self): 1561 Foo = self._fixture(uselist=False, useobject=False, 1562 active_history=False) 1563 f = Foo() 1564 self._someattr_history(f) 1565 # no side effects 1566 assert 'someattr' not in f.__dict__ 1567 assert 'someattr' not in attributes.instance_state(f).committed_state 1568 1569 def test_scalar_set(self): 1570 Foo = self._fixture(uselist=False, useobject=False, 1571 active_history=False) 1572 f = Foo() 1573 f.someattr = 'hi' 1574 eq_(self._someattr_history(f), (['hi'], (), ())) 1575 1576 def test_scalar_set_None(self): 1577 # note - compare: 1578 # test_scalar_set_None, 1579 # test_scalar_get_first_set_None, 1580 # test_use_object_set_None, 1581 # test_use_object_get_first_set_None 1582 Foo = self._fixture(uselist=False, useobject=False, 1583 active_history=False) 1584 f = Foo() 1585 f.someattr = None 1586 eq_(self._someattr_history(f), ([None], (), ())) 1587 1588 def test_scalar_get_first_set_None(self): 1589 # note - compare: 1590 # test_scalar_set_None, 1591 # test_scalar_get_first_set_None, 1592 # test_use_object_set_None, 1593 # test_use_object_get_first_set_None 1594 Foo = self._fixture(uselist=False, useobject=False, 1595 active_history=False) 1596 f = Foo() 1597 assert f.someattr is None 1598 f.someattr = None 1599 eq_(self._someattr_history(f), ([None], (), ())) 1600 1601 def test_scalar_set_commit(self): 1602 Foo = self._fixture(uselist=False, useobject=False, 1603 active_history=False) 1604 f = Foo() 1605 f.someattr = 'hi' 1606 self._commit_someattr(f) 1607 eq_(self._someattr_history(f), ((), ['hi'], ())) 1608 1609 def test_scalar_set_commit_reset(self): 1610 Foo = self._fixture(uselist=False, useobject=False, 1611 active_history=False) 1612 f = Foo() 1613 f.someattr = 'hi' 1614 self._commit_someattr(f) 1615 f.someattr = 'there' 1616 eq_(self._someattr_history(f), (['there'], (), ['hi'])) 1617 1618 def test_scalar_set_commit_reset_commit(self): 1619 Foo = self._fixture(uselist=False, useobject=False, 1620 active_history=False) 1621 f = Foo() 1622 f.someattr = 'hi' 1623 self._commit_someattr(f) 1624 f.someattr = 'there' 1625 self._commit_someattr(f) 1626 eq_(self._someattr_history(f), ((), ['there'], ())) 1627 1628 def test_scalar_set_commit_reset_commit_del(self): 1629 Foo = self._fixture(uselist=False, useobject=False, 1630 active_history=False) 1631 f = Foo() 1632 f.someattr = 'there' 1633 self._commit_someattr(f) 1634 del f.someattr 1635 eq_(self._someattr_history(f), ((), (), ['there'])) 1636 1637 def test_scalar_set_dict(self): 1638 Foo = self._fixture(uselist=False, useobject=False, 1639 active_history=False) 1640 f = Foo() 1641 f.__dict__['someattr'] = 'new' 1642 eq_(self._someattr_history(f), ((), ['new'], ())) 1643 1644 def test_scalar_set_dict_set(self): 1645 Foo = self._fixture(uselist=False, useobject=False, 1646 active_history=False) 1647 f = Foo() 1648 f.__dict__['someattr'] = 'new' 1649 self._someattr_history(f) 1650 f.someattr = 'old' 1651 eq_(self._someattr_history(f), (['old'], (), ['new'])) 1652 1653 def test_scalar_set_dict_set_commit(self): 1654 Foo = self._fixture(uselist=False, useobject=False, 1655 active_history=False) 1656 f = Foo() 1657 f.__dict__['someattr'] = 'new' 1658 self._someattr_history(f) 1659 f.someattr = 'old' 1660 self._commit_someattr(f) 1661 eq_(self._someattr_history(f), ((), ['old'], ())) 1662 1663 def test_scalar_set_None(self): 1664 Foo = self._fixture(uselist=False, useobject=False, 1665 active_history=False) 1666 f = Foo() 1667 f.someattr = None 1668 eq_(self._someattr_history(f), ([None], (), ())) 1669 1670 def test_scalar_set_None_from_dict_set(self): 1671 Foo = self._fixture(uselist=False, useobject=False, 1672 active_history=False) 1673 f = Foo() 1674 f.__dict__['someattr'] = 'new' 1675 f.someattr = None 1676 eq_(self._someattr_history(f), ([None], (), ['new'])) 1677 1678 def test_scalar_set_twice_no_commit(self): 1679 Foo = self._fixture(uselist=False, useobject=False, 1680 active_history=False) 1681 f = Foo() 1682 f.someattr = 'one' 1683 eq_(self._someattr_history(f), (['one'], (), ())) 1684 f.someattr = 'two' 1685 eq_(self._someattr_history(f), (['two'], (), ())) 1686 1687 def test_scalar_active_init(self): 1688 Foo = self._fixture(uselist=False, useobject=False, 1689 active_history=True) 1690 f = Foo() 1691 eq_(self._someattr_history(f), ((), (), ())) 1692 1693 def test_scalar_active_no_init_side_effect(self): 1694 Foo = self._fixture(uselist=False, useobject=False, 1695 active_history=True) 1696 f = Foo() 1697 self._someattr_history(f) 1698 # no side effects 1699 assert 'someattr' not in f.__dict__ 1700 assert 'someattr' not in attributes.instance_state(f).committed_state 1701 1702 def test_collection_never_set(self): 1703 Foo = self._fixture(uselist=True, useobject=True, 1704 active_history=True) 1705 f = Foo() 1706 eq_(self._someattr_history(f, passive=True), (None, None, None)) 1707 1708 def test_scalar_obj_never_set(self): 1709 Foo = self._fixture(uselist=False, useobject=True, 1710 active_history=True) 1711 f = Foo() 1712 eq_(self._someattr_history(f, passive=True), (None, None, None)) 1713 1714 def test_scalar_never_set(self): 1715 Foo = self._fixture(uselist=False, useobject=False, 1716 active_history=True) 1717 f = Foo() 1718 eq_(self._someattr_history(f, passive=True), (None, None, None)) 1719 1720 def test_scalar_active_set(self): 1721 Foo = self._fixture(uselist=False, useobject=False, 1722 active_history=True) 1723 f = Foo() 1724 f.someattr = 'hi' 1725 eq_(self._someattr_history(f), (['hi'], (), ())) 1726 1727 def test_scalar_active_set_commit(self): 1728 Foo = self._fixture(uselist=False, useobject=False, 1729 active_history=True) 1730 f = Foo() 1731 f.someattr = 'hi' 1732 self._commit_someattr(f) 1733 eq_(self._someattr_history(f), ((), ['hi'], ())) 1734 1735 def test_scalar_active_set_commit_reset(self): 1736 Foo = self._fixture(uselist=False, useobject=False, 1737 active_history=True) 1738 f = Foo() 1739 f.someattr = 'hi' 1740 self._commit_someattr(f) 1741 f.someattr = 'there' 1742 eq_(self._someattr_history(f), (['there'], (), ['hi'])) 1743 1744 def test_scalar_active_set_commit_reset_commit(self): 1745 Foo = self._fixture(uselist=False, useobject=False, 1746 active_history=True) 1747 f = Foo() 1748 f.someattr = 'hi' 1749 self._commit_someattr(f) 1750 f.someattr = 'there' 1751 self._commit_someattr(f) 1752 eq_(self._someattr_history(f), ((), ['there'], ())) 1753 1754 def test_scalar_active_set_commit_reset_commit_del(self): 1755 Foo = self._fixture(uselist=False, useobject=False, 1756 active_history=True) 1757 f = Foo() 1758 f.someattr = 'there' 1759 self._commit_someattr(f) 1760 del f.someattr 1761 eq_(self._someattr_history(f), ((), (), ['there'])) 1762 1763 def test_scalar_active_set_dict(self): 1764 Foo = self._fixture(uselist=False, useobject=False, 1765 active_history=True) 1766 f = Foo() 1767 f.__dict__['someattr'] = 'new' 1768 eq_(self._someattr_history(f), ((), ['new'], ())) 1769 1770 def test_scalar_active_set_dict_set(self): 1771 Foo = self._fixture(uselist=False, useobject=False, 1772 active_history=True) 1773 f = Foo() 1774 f.__dict__['someattr'] = 'new' 1775 self._someattr_history(f) 1776 f.someattr = 'old' 1777 eq_(self._someattr_history(f), (['old'], (), ['new'])) 1778 1779 def test_scalar_active_set_dict_set_commit(self): 1780 Foo = self._fixture(uselist=False, useobject=False, 1781 active_history=True) 1782 f = Foo() 1783 f.__dict__['someattr'] = 'new' 1784 self._someattr_history(f) 1785 f.someattr = 'old' 1786 self._commit_someattr(f) 1787 eq_(self._someattr_history(f), ((), ['old'], ())) 1788 1789 def test_scalar_active_set_None(self): 1790 Foo = self._fixture(uselist=False, useobject=False, 1791 active_history=True) 1792 f = Foo() 1793 f.someattr = None 1794 eq_(self._someattr_history(f), ([None], (), ())) 1795 1796 def test_scalar_active_set_None_from_dict_set(self): 1797 Foo = self._fixture(uselist=False, useobject=False, 1798 active_history=True) 1799 f = Foo() 1800 f.__dict__['someattr'] = 'new' 1801 f.someattr = None 1802 eq_(self._someattr_history(f), ([None], (), ['new'])) 1803 1804 def test_scalar_active_set_twice_no_commit(self): 1805 Foo = self._fixture(uselist=False, useobject=False, 1806 active_history=True) 1807 f = Foo() 1808 f.someattr = 'one' 1809 eq_(self._someattr_history(f), (['one'], (), ())) 1810 f.someattr = 'two' 1811 eq_(self._someattr_history(f), (['two'], (), ())) 1812 1813 1814 def test_scalar_passive_flag(self): 1815 Foo = self._fixture(uselist=False, useobject=False, 1816 active_history=True) 1817 f = Foo() 1818 f.someattr = 'one' 1819 eq_(self._someattr_history(f), (['one'], (), ())) 1820 1821 self._commit_someattr(f) 1822 1823 state = attributes.instance_state(f) 1824 # do the same thing that 1825 # populators.expire.append((self.key, True)) 1826 # does in loading.py 1827 state.dict.pop('someattr', None) 1828 state.expired_attributes.add('someattr') 1829 1830 def scalar_loader(state, toload): 1831 state.dict['someattr'] = 'one' 1832 state.manager.deferred_scalar_loader = scalar_loader 1833 1834 eq_(self._someattr_history(f), ((), ['one'], ())) 1835 1836 1837 def test_scalar_inplace_mutation_set(self): 1838 Foo = self._fixture(uselist=False, useobject=False, 1839 active_history=False) 1840 f = Foo() 1841 f.someattr = {'a': 'b'} 1842 eq_(self._someattr_history(f), ([{'a': 'b'}], (), ())) 1843 1844 def test_scalar_inplace_mutation_set_commit(self): 1845 Foo = self._fixture(uselist=False, useobject=False, 1846 active_history=False) 1847 f = Foo() 1848 f.someattr = {'a': 'b'} 1849 self._commit_someattr(f) 1850 eq_(self._someattr_history(f), ((), [{'a': 'b'}], ())) 1851 1852 def test_scalar_inplace_mutation_set_commit_set(self): 1853 Foo = self._fixture(uselist=False, useobject=False, 1854 active_history=False) 1855 f = Foo() 1856 f.someattr = {'a': 'b'} 1857 self._commit_someattr(f) 1858 f.someattr['a'] = 'c' 1859 eq_(self._someattr_history(f), ((), [{'a': 'c'}], ())) 1860 1861 def test_scalar_inplace_mutation_set_commit_flag_modified(self): 1862 Foo = self._fixture(uselist=False, useobject=False, 1863 active_history=False) 1864 f = Foo() 1865 f.someattr = {'a': 'b'} 1866 self._commit_someattr(f) 1867 attributes.flag_modified(f, 'someattr') 1868 eq_(self._someattr_history(f), ([{'a': 'b'}], (), ())) 1869 1870 def test_scalar_inplace_mutation_set_commit_set_flag_modified(self): 1871 Foo = self._fixture(uselist=False, useobject=False, 1872 active_history=False) 1873 f = Foo() 1874 f.someattr = {'a': 'b'} 1875 self._commit_someattr(f) 1876 f.someattr['a'] = 'c' 1877 attributes.flag_modified(f, 'someattr') 1878 eq_(self._someattr_history(f), ([{'a': 'c'}], (), ())) 1879 1880 def test_scalar_inplace_mutation_set_commit_flag_modified_set(self): 1881 Foo = self._fixture(uselist=False, useobject=False, 1882 active_history=False) 1883 f = Foo() 1884 f.someattr = {'a': 'b'} 1885 self._commit_someattr(f) 1886 attributes.flag_modified(f, 'someattr') 1887 eq_(self._someattr_history(f), ([{'a': 'b'}], (), ())) 1888 f.someattr = ['a'] 1889 eq_(self._someattr_history(f), ([['a']], (), ())) 1890 1891 def test_scalar_inplace_mutation_replace_self_flag_modified_set(self): 1892 Foo = self._fixture(uselist=False, useobject=False, 1893 active_history=False) 1894 f = Foo() 1895 f.someattr = {'a': 'b'} 1896 self._commit_someattr(f) 1897 eq_(self._someattr_history(f), ((), [{'a': 'b'}], ())) 1898 1899 # set the attribute to itself; this places a copy 1900 # in committed_state 1901 f.someattr = f.someattr 1902 1903 attributes.flag_modified(f, 'someattr') 1904 eq_(self._someattr_history(f), ([{'a': 'b'}], (), ())) 1905 1906 1907 def test_use_object_init(self): 1908 Foo, Bar = self._two_obj_fixture(uselist=False) 1909 f = Foo() 1910 eq_(self._someattr_history(f), ((), (), ())) 1911 1912 def test_use_object_no_init_side_effect(self): 1913 Foo, Bar = self._two_obj_fixture(uselist=False) 1914 f = Foo() 1915 self._someattr_history(f) 1916 assert 'someattr' not in f.__dict__ 1917 assert 'someattr' not in attributes.instance_state(f).committed_state 1918 1919 def test_use_object_set(self): 1920 Foo, Bar = self._two_obj_fixture(uselist=False) 1921 f = Foo() 1922 hi = Bar(name='hi') 1923 f.someattr = hi 1924 eq_(self._someattr_history(f), ([hi], (), ())) 1925 1926 def test_use_object_set_commit(self): 1927 Foo, Bar = self._two_obj_fixture(uselist=False) 1928 f = Foo() 1929 hi = Bar(name='hi') 1930 f.someattr = hi 1931 self._commit_someattr(f) 1932 eq_(self._someattr_history(f), ((), [hi], ())) 1933 1934 def test_use_object_set_commit_set(self): 1935 Foo, Bar = self._two_obj_fixture(uselist=False) 1936 f = Foo() 1937 hi = Bar(name='hi') 1938 f.someattr = hi 1939 self._commit_someattr(f) 1940 there = Bar(name='there') 1941 f.someattr = there 1942 eq_(self._someattr_history(f), ([there], (), [hi])) 1943 1944 def test_use_object_set_commit_set_commit(self): 1945 Foo, Bar = self._two_obj_fixture(uselist=False) 1946 f = Foo() 1947 hi = Bar(name='hi') 1948 f.someattr = hi 1949 self._commit_someattr(f) 1950 there = Bar(name='there') 1951 f.someattr = there 1952 self._commit_someattr(f) 1953 eq_(self._someattr_history(f), ((), [there], ())) 1954 1955 def test_use_object_set_commit_del(self): 1956 Foo, Bar = self._two_obj_fixture(uselist=False) 1957 f = Foo() 1958 hi = Bar(name='hi') 1959 f.someattr = hi 1960 self._commit_someattr(f) 1961 del f.someattr 1962 eq_(self._someattr_history(f), ((), (), [hi])) 1963 1964 def test_use_object_set_dict(self): 1965 Foo, Bar = self._two_obj_fixture(uselist=False) 1966 f = Foo() 1967 hi = Bar(name='hi') 1968 f.__dict__['someattr'] = hi 1969 eq_(self._someattr_history(f), ((), [hi], ())) 1970 1971 def test_use_object_set_dict_set(self): 1972 Foo, Bar = self._two_obj_fixture(uselist=False) 1973 f = Foo() 1974 hi = Bar(name='hi') 1975 f.__dict__['someattr'] = hi 1976 1977 there = Bar(name='there') 1978 f.someattr = there 1979 eq_(self._someattr_history(f), ([there], (), [hi])) 1980 1981 def test_use_object_set_dict_set_commit(self): 1982 Foo, Bar = self._two_obj_fixture(uselist=False) 1983 f = Foo() 1984 hi = Bar(name='hi') 1985 f.__dict__['someattr'] = hi 1986 1987 there = Bar(name='there') 1988 f.someattr = there 1989 self._commit_someattr(f) 1990 eq_(self._someattr_history(f), ((), [there], ())) 1991 1992 def test_use_object_set_None(self): 1993 # note - compare: 1994 # test_scalar_set_None, 1995 # test_scalar_get_first_set_None, 1996 # test_use_object_set_None, 1997 # test_use_object_get_first_set_None 1998 Foo, Bar = self._two_obj_fixture(uselist=False) 1999 f = Foo() 2000 f.someattr = None 2001 eq_(self._someattr_history(f), ([None], (), ())) 2002 2003 def test_use_object_get_first_set_None(self): 2004 # note - compare: 2005 # test_scalar_set_None, 2006 # test_scalar_get_first_set_None, 2007 # test_use_object_set_None, 2008 # test_use_object_get_first_set_None 2009 Foo, Bar = self._two_obj_fixture(uselist=False) 2010 f = Foo() 2011 assert f.someattr is None 2012 f.someattr = None 2013 eq_(self._someattr_history(f), ([None], (), ())) 2014 2015 def test_use_object_set_dict_set_None(self): 2016 Foo, Bar = self._two_obj_fixture(uselist=False) 2017 f = Foo() 2018 hi = Bar(name='hi') 2019 f.__dict__['someattr'] = hi 2020 f.someattr = None 2021 eq_(self._someattr_history(f), ([None], (), [hi])) 2022 2023 def test_use_object_set_value_twice(self): 2024 Foo, Bar = self._two_obj_fixture(uselist=False) 2025 f = Foo() 2026 hi = Bar(name='hi') 2027 there = Bar(name='there') 2028 f.someattr = hi 2029 f.someattr = there 2030 eq_(self._someattr_history(f), ([there], (), ())) 2031 2032 2033 def test_object_collections_set(self): 2034 # TODO: break into individual tests 2035 2036 Foo, Bar = self._two_obj_fixture(uselist=True) 2037 hi = Bar(name='hi') 2038 there = Bar(name='there') 2039 old = Bar(name='old') 2040 new = Bar(name='new') 2041 2042 # case 1. new object 2043 2044 f = Foo() 2045 eq_(attributes.get_state_history(attributes.instance_state(f), 2046 'someattr'), ((), [], ())) 2047 f.someattr = [hi] 2048 eq_(attributes.get_state_history(attributes.instance_state(f), 2049 'someattr'), ([hi], [], [])) 2050 self._commit_someattr(f) 2051 eq_(attributes.get_state_history(attributes.instance_state(f), 2052 'someattr'), ((), [hi], ())) 2053 f.someattr = [there] 2054 eq_(attributes.get_state_history(attributes.instance_state(f), 2055 'someattr'), ([there], [], [hi])) 2056 self._commit_someattr(f) 2057 eq_(attributes.get_state_history(attributes.instance_state(f), 2058 'someattr'), ((), [there], ())) 2059 f.someattr = [hi] 2060 eq_(attributes.get_state_history(attributes.instance_state(f), 2061 'someattr'), ([hi], [], [there])) 2062 f.someattr = [old, new] 2063 eq_(attributes.get_state_history(attributes.instance_state(f), 2064 'someattr'), ([old, new], [], [there])) 2065 2066 # case 2. object with direct settings (similar to a load 2067 # operation) 2068 2069 f = Foo() 2070 collection = attributes.init_collection(f, 'someattr') 2071 collection.append_without_event(new) 2072 attributes.instance_state(f)._commit_all(attributes.instance_dict(f)) 2073 eq_(attributes.get_state_history(attributes.instance_state(f), 2074 'someattr'), ((), [new], ())) 2075 f.someattr = [old] 2076 eq_(attributes.get_state_history(attributes.instance_state(f), 2077 'someattr'), ([old], [], [new])) 2078 self._commit_someattr(f) 2079 eq_(attributes.get_state_history(attributes.instance_state(f), 2080 'someattr'), ((), [old], ())) 2081 2082 def test_dict_collections(self): 2083 # TODO: break into individual tests 2084 2085 class Foo(fixtures.BasicEntity): 2086 pass 2087 class Bar(fixtures.BasicEntity): 2088 pass 2089 2090 from sqlalchemy.orm.collections import attribute_mapped_collection 2091 instrumentation.register_class(Foo) 2092 instrumentation.register_class(Bar) 2093 attributes.register_attribute(Foo, 'someattr', uselist=True, 2094 useobject=True, 2095 typecallable=attribute_mapped_collection('name')) 2096 hi = Bar(name='hi') 2097 there = Bar(name='there') 2098 old = Bar(name='old') 2099 new = Bar(name='new') 2100 f = Foo() 2101 eq_(attributes.get_state_history(attributes.instance_state(f), 2102 'someattr'), ((), [], ())) 2103 f.someattr['hi'] = hi 2104 eq_(attributes.get_state_history(attributes.instance_state(f), 2105 'someattr'), ([hi], [], [])) 2106 f.someattr['there'] = there 2107 eq_(tuple([set(x) for x in 2108 attributes.get_state_history(attributes.instance_state(f), 2109 'someattr')]), (set([hi, there]), set(), set())) 2110 self._commit_someattr(f) 2111 eq_(tuple([set(x) for x in 2112 attributes.get_state_history(attributes.instance_state(f), 2113 'someattr')]), (set(), set([hi, there]), set())) 2114 2115 def test_object_collections_mutate(self): 2116 # TODO: break into individual tests 2117 2118 class Foo(fixtures.BasicEntity): 2119 pass 2120 class Bar(fixtures.BasicEntity): 2121 pass 2122 2123 instrumentation.register_class(Foo) 2124 attributes.register_attribute(Foo, 'someattr', uselist=True, 2125 useobject=True) 2126 attributes.register_attribute(Foo, 'id', uselist=False, 2127 useobject=False) 2128 instrumentation.register_class(Bar) 2129 hi = Bar(name='hi') 2130 there = Bar(name='there') 2131 old = Bar(name='old') 2132 new = Bar(name='new') 2133 2134 # case 1. new object 2135 2136 f = Foo(id=1) 2137 eq_(attributes.get_state_history(attributes.instance_state(f), 2138 'someattr'), ((), [], ())) 2139 f.someattr.append(hi) 2140 eq_(attributes.get_state_history(attributes.instance_state(f), 2141 'someattr'), ([hi], [], [])) 2142 self._commit_someattr(f) 2143 eq_(attributes.get_state_history(attributes.instance_state(f), 2144 'someattr'), ((), [hi], ())) 2145 f.someattr.append(there) 2146 eq_(attributes.get_state_history(attributes.instance_state(f), 2147 'someattr'), ([there], [hi], [])) 2148 self._commit_someattr(f) 2149 eq_(attributes.get_state_history(attributes.instance_state(f), 2150 'someattr'), ((), [hi, there], ())) 2151 f.someattr.remove(there) 2152 eq_(attributes.get_state_history(attributes.instance_state(f), 2153 'someattr'), ([], [hi], [there])) 2154 f.someattr.append(old) 2155 f.someattr.append(new) 2156 eq_(attributes.get_state_history(attributes.instance_state(f), 2157 'someattr'), ([old, new], [hi], [there])) 2158 attributes.instance_state(f)._commit(attributes.instance_dict(f), 2159 ['someattr']) 2160 eq_(attributes.get_state_history(attributes.instance_state(f), 2161 'someattr'), ((), [hi, old, new], ())) 2162 f.someattr.pop(0) 2163 eq_(attributes.get_state_history(attributes.instance_state(f), 2164 'someattr'), ([], [old, new], [hi])) 2165 2166 # case 2. object with direct settings (similar to a load 2167 # operation) 2168 2169 f = Foo() 2170 f.__dict__['id'] = 1 2171 collection = attributes.init_collection(f, 'someattr') 2172 collection.append_without_event(new) 2173 attributes.instance_state(f)._commit_all(attributes.instance_dict(f)) 2174 eq_(attributes.get_state_history(attributes.instance_state(f), 2175 'someattr'), ((), [new], ())) 2176 f.someattr.append(old) 2177 eq_(attributes.get_state_history(attributes.instance_state(f), 2178 'someattr'), ([old], [new], [])) 2179 attributes.instance_state(f)._commit(attributes.instance_dict(f), 2180 ['someattr']) 2181 eq_(attributes.get_state_history(attributes.instance_state(f), 2182 'someattr'), ((), [new, old], ())) 2183 f = Foo() 2184 collection = attributes.init_collection(f, 'someattr') 2185 collection.append_without_event(new) 2186 attributes.instance_state(f)._commit_all(attributes.instance_dict(f)) 2187 eq_(attributes.get_state_history(attributes.instance_state(f), 2188 'someattr'), ((), [new], ())) 2189 f.id = 1 2190 f.someattr.remove(new) 2191 eq_(attributes.get_state_history(attributes.instance_state(f), 2192 'someattr'), ([], [], [new])) 2193 2194 # case 3. mixing appends with sets 2195 2196 f = Foo() 2197 f.someattr.append(hi) 2198 eq_(attributes.get_state_history(attributes.instance_state(f), 2199 'someattr'), ([hi], [], [])) 2200 f.someattr.append(there) 2201 eq_(attributes.get_state_history(attributes.instance_state(f), 2202 'someattr'), ([hi, there], [], [])) 2203 f.someattr = [there] 2204 eq_(attributes.get_state_history(attributes.instance_state(f), 2205 'someattr'), ([there], [], [])) 2206 2207 # case 4. ensure duplicates show up, order is maintained 2208 2209 f = Foo() 2210 f.someattr.append(hi) 2211 f.someattr.append(there) 2212 f.someattr.append(hi) 2213 eq_(attributes.get_state_history(attributes.instance_state(f), 2214 'someattr'), ([hi, there, hi], [], [])) 2215 attributes.instance_state(f)._commit_all(attributes.instance_dict(f)) 2216 eq_(attributes.get_state_history(attributes.instance_state(f), 2217 'someattr'), ((), [hi, there, hi], ())) 2218 f.someattr = [] 2219 eq_(attributes.get_state_history(attributes.instance_state(f), 2220 'someattr'), ([], [], [hi, there, hi])) 2221 2222 def test_collections_via_backref(self): 2223 # TODO: break into individual tests 2224 2225 class Foo(fixtures.BasicEntity): 2226 pass 2227 class Bar(fixtures.BasicEntity): 2228 pass 2229 2230 instrumentation.register_class(Foo) 2231 instrumentation.register_class(Bar) 2232 attributes.register_attribute(Foo, 'bars', uselist=True, 2233 backref='foo', trackparent=True, useobject=True) 2234 attributes.register_attribute(Bar, 'foo', uselist=False, 2235 backref='bars', trackparent=True, useobject=True) 2236 f1 = Foo() 2237 b1 = Bar() 2238 eq_(attributes.get_state_history(attributes.instance_state(f1), 2239 'bars'), ((), [], ())) 2240 eq_(attributes.get_state_history(attributes.instance_state(b1), 2241 'foo'), ((), (), ())) 2242 2243 # b1.foo = f1 2244 2245 f1.bars.append(b1) 2246 eq_(attributes.get_state_history(attributes.instance_state(f1), 2247 'bars'), ([b1], [], [])) 2248 eq_(attributes.get_state_history(attributes.instance_state(b1), 2249 'foo'), ([f1], (), ())) 2250 b2 = Bar() 2251 f1.bars.append(b2) 2252 eq_(attributes.get_state_history(attributes.instance_state(f1), 2253 'bars'), ([b1, b2], [], [])) 2254 eq_(attributes.get_state_history(attributes.instance_state(b1), 2255 'foo'), ([f1], (), ())) 2256 eq_(attributes.get_state_history(attributes.instance_state(b2), 2257 'foo'), ([f1], (), ())) 2258 2259 def test_deprecated_flags(self): 2260 assert_raises_message( 2261 sa_exc.SADeprecationWarning, 2262 "Passing True for 'passive' is deprecated. " 2263 "Use attributes.PASSIVE_NO_INITIALIZE", 2264 attributes.get_history, object(), 'foo', True 2265 ) 2266 2267 assert_raises_message( 2268 sa_exc.SADeprecationWarning, 2269 "Passing False for 'passive' is deprecated. " 2270 "Use attributes.PASSIVE_OFF", 2271 attributes.get_history, object(), 'foo', False 2272 ) 2273 2274 2275class LazyloadHistoryTest(fixtures.TestBase): 2276 def test_lazy_backref_collections(self): 2277 # TODO: break into individual tests 2278 2279 class Foo(fixtures.BasicEntity): 2280 pass 2281 class Bar(fixtures.BasicEntity): 2282 pass 2283 2284 lazy_load = [] 2285 def lazyload(state, passive): 2286 return lazy_load 2287 2288 instrumentation.register_class(Foo) 2289 instrumentation.register_class(Bar) 2290 attributes.register_attribute(Foo, 'bars', uselist=True, 2291 backref='foo', trackparent=True, callable_=lazyload, 2292 useobject=True) 2293 attributes.register_attribute(Bar, 'foo', uselist=False, 2294 backref='bars', trackparent=True, useobject=True) 2295 bar1, bar2, bar3, bar4 = [Bar(id=1), Bar(id=2), Bar(id=3), 2296 Bar(id=4)] 2297 lazy_load = [bar1, bar2, bar3] 2298 f = Foo() 2299 bar4 = Bar() 2300 bar4.foo = f 2301 eq_(attributes.get_state_history(attributes.instance_state(f), 2302 'bars'), ([bar4], [bar1, bar2, bar3], [])) 2303 lazy_load = None 2304 f = Foo() 2305 bar4 = Bar() 2306 bar4.foo = f 2307 eq_(attributes.get_state_history(attributes.instance_state(f), 2308 'bars'), ([bar4], [], [])) 2309 lazy_load = [bar1, bar2, bar3] 2310 attributes.instance_state(f)._expire_attributes(attributes.instance_dict(f), 2311 ['bars']) 2312 eq_(attributes.get_state_history(attributes.instance_state(f), 2313 'bars'), ((), [bar1, bar2, bar3], ())) 2314 2315 def test_collections_via_lazyload(self): 2316 # TODO: break into individual tests 2317 2318 class Foo(fixtures.BasicEntity): 2319 pass 2320 class Bar(fixtures.BasicEntity): 2321 pass 2322 2323 lazy_load = [] 2324 def lazyload(state, passive): 2325 return lazy_load 2326 2327 instrumentation.register_class(Foo) 2328 instrumentation.register_class(Bar) 2329 attributes.register_attribute(Foo, 'bars', uselist=True, 2330 callable_=lazyload, trackparent=True, useobject=True) 2331 bar1, bar2, bar3, bar4 = [Bar(id=1), Bar(id=2), Bar(id=3), 2332 Bar(id=4)] 2333 lazy_load = [bar1, bar2, bar3] 2334 f = Foo() 2335 f.bars = [] 2336 eq_(attributes.get_state_history(attributes.instance_state(f), 2337 'bars'), ([], [], [bar1, bar2, bar3])) 2338 f = Foo() 2339 f.bars.append(bar4) 2340 eq_(attributes.get_state_history(attributes.instance_state(f), 2341 'bars'), ([bar4], [bar1, bar2, bar3], [])) 2342 f = Foo() 2343 f.bars.remove(bar2) 2344 eq_(attributes.get_state_history(attributes.instance_state(f), 2345 'bars'), ([], [bar1, bar3], [bar2])) 2346 f.bars.append(bar4) 2347 eq_(attributes.get_state_history(attributes.instance_state(f), 2348 'bars'), ([bar4], [bar1, bar3], [bar2])) 2349 f = Foo() 2350 del f.bars[1] 2351 eq_(attributes.get_state_history(attributes.instance_state(f), 2352 'bars'), ([], [bar1, bar3], [bar2])) 2353 lazy_load = None 2354 f = Foo() 2355 f.bars.append(bar2) 2356 eq_(attributes.get_state_history(attributes.instance_state(f), 2357 'bars'), ([bar2], [], [])) 2358 2359 def test_scalar_via_lazyload(self): 2360 # TODO: break into individual tests 2361 2362 class Foo(fixtures.BasicEntity): 2363 pass 2364 2365 lazy_load = None 2366 2367 def lazyload(state, passive): 2368 return lazy_load 2369 2370 instrumentation.register_class(Foo) 2371 attributes.register_attribute(Foo, 'bar', uselist=False, 2372 callable_=lazyload, useobject=False) 2373 lazy_load = 'hi' 2374 2375 # with scalar non-object and active_history=False, the lazy 2376 # callable is only executed on gets, not history operations 2377 2378 f = Foo() 2379 eq_(f.bar, 'hi') 2380 eq_(attributes.get_state_history(attributes.instance_state(f), 2381 'bar'), ((), ['hi'], ())) 2382 f = Foo() 2383 f.bar = None 2384 eq_(attributes.get_state_history(attributes.instance_state(f), 2385 'bar'), ([None], (), ())) 2386 f = Foo() 2387 f.bar = 'there' 2388 eq_(attributes.get_state_history(attributes.instance_state(f), 2389 'bar'), (['there'], (), ())) 2390 f.bar = 'hi' 2391 eq_(attributes.get_state_history(attributes.instance_state(f), 2392 'bar'), (['hi'], (), ())) 2393 f = Foo() 2394 eq_(f.bar, 'hi') 2395 del f.bar 2396 eq_(attributes.get_state_history(attributes.instance_state(f), 2397 'bar'), ((), (), ['hi'])) 2398 assert f.bar is None 2399 eq_(attributes.get_state_history(attributes.instance_state(f), 2400 'bar'), ((), (), ['hi'])) 2401 2402 def test_scalar_via_lazyload_with_active(self): 2403 # TODO: break into individual tests 2404 2405 class Foo(fixtures.BasicEntity): 2406 pass 2407 2408 lazy_load = None 2409 2410 def lazyload(state, passive): 2411 return lazy_load 2412 2413 instrumentation.register_class(Foo) 2414 attributes.register_attribute(Foo, 'bar', uselist=False, 2415 callable_=lazyload, useobject=False, 2416 active_history=True) 2417 lazy_load = 'hi' 2418 2419 # active_history=True means the lazy callable is executed on set 2420 # as well as get, causing the old value to appear in the history 2421 2422 f = Foo() 2423 eq_(f.bar, 'hi') 2424 eq_(attributes.get_state_history(attributes.instance_state(f), 2425 'bar'), ((), ['hi'], ())) 2426 f = Foo() 2427 f.bar = None 2428 eq_(attributes.get_state_history(attributes.instance_state(f), 2429 'bar'), ([None], (), ['hi'])) 2430 f = Foo() 2431 f.bar = 'there' 2432 eq_(attributes.get_state_history(attributes.instance_state(f), 2433 'bar'), (['there'], (), ['hi'])) 2434 f.bar = 'hi' 2435 eq_(attributes.get_state_history(attributes.instance_state(f), 2436 'bar'), ((), ['hi'], ())) 2437 f = Foo() 2438 eq_(f.bar, 'hi') 2439 del f.bar 2440 eq_(attributes.get_state_history(attributes.instance_state(f), 2441 'bar'), ((), (), ['hi'])) 2442 assert f.bar is None 2443 eq_(attributes.get_state_history(attributes.instance_state(f), 2444 'bar'), ((), (), ['hi'])) 2445 2446 def test_scalar_object_via_lazyload(self): 2447 # TODO: break into individual tests 2448 2449 class Foo(fixtures.BasicEntity): 2450 pass 2451 class Bar(fixtures.BasicEntity): 2452 pass 2453 2454 lazy_load = None 2455 def lazyload(state, passive): 2456 return lazy_load 2457 2458 instrumentation.register_class(Foo) 2459 instrumentation.register_class(Bar) 2460 attributes.register_attribute(Foo, 'bar', uselist=False, 2461 callable_=lazyload, trackparent=True, useobject=True) 2462 bar1, bar2 = [Bar(id=1), Bar(id=2)] 2463 lazy_load = bar1 2464 2465 # with scalar object, the lazy callable is only executed on gets 2466 # and history operations 2467 2468 f = Foo() 2469 eq_(attributes.get_state_history(attributes.instance_state(f), 2470 'bar'), ((), [bar1], ())) 2471 f = Foo() 2472 f.bar = None 2473 eq_(attributes.get_state_history(attributes.instance_state(f), 2474 'bar'), ([None], (), [bar1])) 2475 f = Foo() 2476 f.bar = bar2 2477 eq_(attributes.get_state_history(attributes.instance_state(f), 2478 'bar'), ([bar2], (), [bar1])) 2479 f.bar = bar1 2480 eq_(attributes.get_state_history(attributes.instance_state(f), 2481 'bar'), ((), [bar1], ())) 2482 f = Foo() 2483 eq_(f.bar, bar1) 2484 del f.bar 2485 eq_(attributes.get_state_history(attributes.instance_state(f), 2486 'bar'), ((), (), [bar1])) 2487 assert f.bar is None 2488 eq_(attributes.get_state_history(attributes.instance_state(f), 2489 'bar'), ((), (), [bar1])) 2490 2491class ListenerTest(fixtures.ORMTest): 2492 def test_receive_changes(self): 2493 """test that Listeners can mutate the given value.""" 2494 2495 class Foo(object): 2496 pass 2497 class Bar(object): 2498 pass 2499 2500 def append(state, child, initiator): 2501 b2 = Bar() 2502 b2.data = b1.data + " appended" 2503 return b2 2504 2505 def on_set(state, value, oldvalue, initiator): 2506 return value + " modified" 2507 2508 instrumentation.register_class(Foo) 2509 instrumentation.register_class(Bar) 2510 attributes.register_attribute(Foo, 'data', uselist=False, 2511 useobject=False) 2512 attributes.register_attribute(Foo, 'barlist', uselist=True, 2513 useobject=True) 2514 attributes.register_attribute(Foo, 'barset', typecallable=set, 2515 uselist=True, useobject=True) 2516 attributes.register_attribute(Bar, 'data', uselist=False, 2517 useobject=False) 2518 event.listen(Foo.data, 'set', on_set, retval=True) 2519 event.listen(Foo.barlist, 'append', append, retval=True) 2520 event.listen(Foo.barset, 'append', append, retval=True) 2521 f1 = Foo() 2522 f1.data = 'some data' 2523 eq_(f1.data, 'some data modified') 2524 b1 = Bar() 2525 b1.data = 'some bar' 2526 f1.barlist.append(b1) 2527 assert b1.data == 'some bar' 2528 assert f1.barlist[0].data == 'some bar appended' 2529 f1.barset.add(b1) 2530 assert f1.barset.pop().data == 'some bar appended' 2531 2532 def test_named(self): 2533 canary = Mock() 2534 2535 class Foo(object): 2536 pass 2537 2538 class Bar(object): 2539 pass 2540 2541 instrumentation.register_class(Foo) 2542 instrumentation.register_class(Bar) 2543 attributes.register_attribute( 2544 Foo, 'data', uselist=False, 2545 useobject=False) 2546 attributes.register_attribute( 2547 Foo, 'barlist', uselist=True, 2548 useobject=True) 2549 2550 event.listen(Foo.data, 'set', canary.set, named=True) 2551 event.listen(Foo.barlist, 'append', canary.append, named=True) 2552 event.listen(Foo.barlist, 'remove', canary.remove, named=True) 2553 2554 f1 = Foo() 2555 b1 = Bar() 2556 f1.data = 5 2557 f1.barlist.append(b1) 2558 f1.barlist.remove(b1) 2559 eq_( 2560 canary.mock_calls, 2561 [ 2562 call.set( 2563 oldvalue=attributes.NO_VALUE, 2564 initiator=attributes.Event( 2565 Foo.data.impl, attributes.OP_REPLACE), 2566 target=f1, value=5), 2567 call.append( 2568 initiator=attributes.Event( 2569 Foo.barlist.impl, attributes.OP_APPEND), 2570 target=f1, 2571 value=b1), 2572 call.remove( 2573 initiator=attributes.Event( 2574 Foo.barlist.impl, attributes.OP_REMOVE), 2575 target=f1, 2576 value=b1)] 2577 ) 2578 2579 def test_collection_link_events(self): 2580 class Foo(object): 2581 pass 2582 class Bar(object): 2583 pass 2584 instrumentation.register_class(Foo) 2585 instrumentation.register_class(Bar) 2586 attributes.register_attribute(Foo, 'barlist', uselist=True, 2587 useobject=True) 2588 2589 canary = Mock() 2590 event.listen(Foo.barlist, "init_collection", canary.init) 2591 event.listen(Foo.barlist, "dispose_collection", canary.dispose) 2592 2593 f1 = Foo() 2594 eq_(f1.barlist, []) 2595 adapter_one = f1.barlist._sa_adapter 2596 eq_(canary.init.mock_calls, [call(f1, [], adapter_one)]) 2597 2598 b1 = Bar() 2599 f1.barlist.append(b1) 2600 2601 b2 = Bar() 2602 f1.barlist = [b2] 2603 adapter_two = f1.barlist._sa_adapter 2604 eq_(canary.init.mock_calls, [ 2605 call(f1, [], adapter_one), 2606 call(f1, [b2], adapter_two), 2607 ]) 2608 eq_( 2609 canary.dispose.mock_calls, 2610 [ 2611 call(f1, [], adapter_one) 2612 ] 2613 ) 2614 2615 2616 def test_none_on_collection_event(self): 2617 """test that append/remove of None in collections emits events. 2618 2619 This is new behavior as of 0.8. 2620 2621 """ 2622 class Foo(object): 2623 pass 2624 class Bar(object): 2625 pass 2626 instrumentation.register_class(Foo) 2627 instrumentation.register_class(Bar) 2628 attributes.register_attribute(Foo, 'barlist', uselist=True, 2629 useobject=True) 2630 canary = [] 2631 def append(state, child, initiator): 2632 canary.append((state, child)) 2633 def remove(state, child, initiator): 2634 canary.append((state, child)) 2635 event.listen(Foo.barlist, 'append', append) 2636 event.listen(Foo.barlist, 'remove', remove) 2637 2638 b1, b2 = Bar(), Bar() 2639 f1 = Foo() 2640 f1.barlist.append(None) 2641 eq_(canary, [(f1, None)]) 2642 2643 canary[:] = [] 2644 f1 = Foo() 2645 f1.barlist = [None, b2] 2646 eq_(canary, [(f1, None), (f1, b2)]) 2647 2648 canary[:] = [] 2649 f1 = Foo() 2650 f1.barlist = [b1, None, b2] 2651 eq_(canary, [(f1, b1), (f1, None), (f1, b2)]) 2652 2653 f1.barlist.remove(None) 2654 eq_(canary, [(f1, b1), (f1, None), (f1, b2), (f1, None)]) 2655 2656 def test_none_init_scalar(self): 2657 canary = Mock() 2658 class Foo(object): 2659 pass 2660 instrumentation.register_class(Foo) 2661 attributes.register_attribute(Foo, 'bar') 2662 2663 event.listen(Foo.bar, "set", canary) 2664 2665 f1 = Foo() 2666 eq_(f1.bar, None) 2667 # reversal of approach in #3061 2668 eq_(canary.mock_calls, []) 2669 2670 def test_none_init_object(self): 2671 canary = Mock() 2672 class Foo(object): 2673 pass 2674 instrumentation.register_class(Foo) 2675 attributes.register_attribute(Foo, 'bar', useobject=True) 2676 2677 event.listen(Foo.bar, "set", canary) 2678 2679 f1 = Foo() 2680 eq_(f1.bar, None) 2681 # reversal of approach in #3061 2682 eq_(canary.mock_calls, []) 2683 2684 def test_none_init_collection(self): 2685 canary = Mock() 2686 class Foo(object): 2687 pass 2688 instrumentation.register_class(Foo) 2689 attributes.register_attribute(Foo, 'bar', useobject=True, uselist=True) 2690 2691 event.listen(Foo.bar, "set", canary) 2692 2693 f1 = Foo() 2694 eq_(f1.bar, []) 2695 # reversal of approach in #3061 2696 eq_(canary.mock_calls, []) 2697 2698 2699 def test_propagate(self): 2700 classes = [None, None, None] 2701 canary = [] 2702 def make_a(): 2703 class A(object): 2704 pass 2705 classes[0] = A 2706 2707 def make_b(): 2708 class B(classes[0]): 2709 pass 2710 classes[1] = B 2711 2712 def make_c(): 2713 class C(classes[1]): 2714 pass 2715 classes[2] = C 2716 2717 def instrument_a(): 2718 instrumentation.register_class(classes[0]) 2719 2720 def instrument_b(): 2721 instrumentation.register_class(classes[1]) 2722 2723 def instrument_c(): 2724 instrumentation.register_class(classes[2]) 2725 2726 def attr_a(): 2727 attributes.register_attribute(classes[0], 'attrib', 2728 uselist=False, useobject=False) 2729 2730 def attr_b(): 2731 attributes.register_attribute(classes[1], 'attrib', 2732 uselist=False, useobject=False) 2733 2734 def attr_c(): 2735 attributes.register_attribute(classes[2], 'attrib', 2736 uselist=False, useobject=False) 2737 2738 def set(state, value, oldvalue, initiator): 2739 canary.append(value) 2740 2741 def events_a(): 2742 event.listen(classes[0].attrib, 'set', set, propagate=True) 2743 2744 def teardown(): 2745 classes[:] = [None, None, None] 2746 canary[:] = [] 2747 2748 ordering = [ 2749 (instrument_a, instrument_b), 2750 (instrument_b, instrument_c), 2751 (attr_a, attr_b), 2752 (attr_b, attr_c), 2753 (make_a, instrument_a), 2754 (instrument_a, attr_a), 2755 (attr_a, events_a), 2756 (make_b, instrument_b), 2757 (instrument_b, attr_b), 2758 (make_c, instrument_c), 2759 (instrument_c, attr_c), 2760 (make_a, make_b), 2761 (make_b, make_c) 2762 ] 2763 elements = [make_a, make_b, make_c, 2764 instrument_a, instrument_b, instrument_c, 2765 attr_a, attr_b, attr_c, events_a] 2766 2767 for i, series in enumerate(all_partial_orderings(ordering, elements)): 2768 for fn in series: 2769 fn() 2770 2771 b = classes[1]() 2772 b.attrib = "foo" 2773 eq_(b.attrib, "foo") 2774 eq_(canary, ["foo"]) 2775 2776 c = classes[2]() 2777 c.attrib = "bar" 2778 eq_(c.attrib, "bar") 2779 eq_(canary, ["foo", "bar"]) 2780 2781 teardown() 2782 2783 2784class TestUnlink(fixtures.TestBase): 2785 def setUp(self): 2786 class A(object): 2787 pass 2788 class B(object): 2789 pass 2790 self.A = A 2791 self.B = B 2792 instrumentation.register_class(A) 2793 instrumentation.register_class(B) 2794 attributes.register_attribute(A, 'bs', uselist=True, 2795 useobject=True) 2796 2797 def test_expired(self): 2798 A, B = self.A, self.B 2799 a1 = A() 2800 coll = a1.bs 2801 a1.bs.append(B()) 2802 state = attributes.instance_state(a1) 2803 state._expire(state.dict, set()) 2804 assert_raises( 2805 Warning, 2806 coll.append, B() 2807 ) 2808 2809 def test_replaced(self): 2810 A, B = self.A, self.B 2811 a1 = A() 2812 coll = a1.bs 2813 a1.bs.append(B()) 2814 a1.bs = [] 2815 # a bulk replace empties the old collection 2816 assert len(coll) == 0 2817 coll.append(B()) 2818 assert len(coll) == 1 2819 2820 def test_pop_existing(self): 2821 A, B = self.A, self.B 2822 a1 = A() 2823 coll = a1.bs 2824 a1.bs.append(B()) 2825 state = attributes.instance_state(a1) 2826 state._reset(state.dict, "bs") 2827 assert_raises( 2828 Warning, 2829 coll.append, B() 2830 ) 2831 2832 def test_ad_hoc_lazy(self): 2833 A, B = self.A, self.B 2834 a1 = A() 2835 coll = a1.bs 2836 a1.bs.append(B()) 2837 state = attributes.instance_state(a1) 2838 _set_callable(state, state.dict, "bs", lambda: B()) 2839 assert_raises( 2840 Warning, 2841 coll.append, B() 2842 ) 2843