1import pickle
2from sqlalchemy.orm import attributes, instrumentation, exc as orm_exc
3from sqlalchemy.orm.collections import collection
4from sqlalchemy.orm.interfaces import AttributeExtension
5from sqlalchemy import exc as sa_exc
6from sqlalchemy.testing import eq_, ne_, assert_raises, \
7    assert_raises_message
8from sqlalchemy.testing import fixtures
9from sqlalchemy.testing.util import gc_collect, all_partial_orderings
10from sqlalchemy.util import jython
11from sqlalchemy import event
12from sqlalchemy import testing
13from sqlalchemy.testing.mock import Mock, call
14from sqlalchemy.orm.state import InstanceState
15
16# global for pickling tests
17MyTest = None
18MyTest2 = None
19
20
21def _set_callable(state, dict_, key, callable_):
22    fn = InstanceState._instance_level_callable_processor(
23        state.manager, callable_, key)
24    fn(state, dict_, None)
25
26
27class AttributeImplAPITest(fixtures.MappedTest):
28    def _scalar_obj_fixture(self):
29        class A(object):
30            pass
31        class B(object):
32            pass
33        instrumentation.register_class(A)
34        instrumentation.register_class(B)
35        attributes.register_attribute(A, "b", uselist=False, useobject=True)
36        return A, B
37
38    def _collection_obj_fixture(self):
39        class A(object):
40            pass
41        class B(object):
42            pass
43        instrumentation.register_class(A)
44        instrumentation.register_class(B)
45        attributes.register_attribute(A, "b", uselist=True, useobject=True)
46        return A, B
47
48    def test_scalar_obj_remove_invalid(self):
49        A, B = self._scalar_obj_fixture()
50
51        a1 = A()
52        b1 = B()
53        b2 = B()
54
55        A.b.impl.append(
56            attributes.instance_state(a1),
57            attributes.instance_dict(a1), b1, None
58        )
59
60        assert a1.b is b1
61
62        assert_raises_message(
63            ValueError,
64            "Object <B at .*?> not "
65            "associated with <A at .*?> on attribute 'b'",
66            A.b.impl.remove,
67                attributes.instance_state(a1),
68                attributes.instance_dict(a1), b2, None
69        )
70
71    def test_scalar_obj_pop_invalid(self):
72        A, B = self._scalar_obj_fixture()
73
74        a1 = A()
75        b1 = B()
76        b2 = B()
77
78        A.b.impl.append(
79            attributes.instance_state(a1),
80            attributes.instance_dict(a1), b1, None
81        )
82
83        assert a1.b is b1
84
85        A.b.impl.pop(
86            attributes.instance_state(a1),
87            attributes.instance_dict(a1), b2, None
88        )
89        assert a1.b is b1
90
91    def test_scalar_obj_pop_valid(self):
92        A, B = self._scalar_obj_fixture()
93
94        a1 = A()
95        b1 = B()
96
97        A.b.impl.append(
98            attributes.instance_state(a1),
99            attributes.instance_dict(a1), b1, None
100        )
101
102        assert a1.b is b1
103
104        A.b.impl.pop(
105            attributes.instance_state(a1),
106            attributes.instance_dict(a1), b1, None
107        )
108        assert a1.b is None
109
110    def test_collection_obj_remove_invalid(self):
111        A, B = self._collection_obj_fixture()
112
113        a1 = A()
114        b1 = B()
115        b2 = B()
116
117        A.b.impl.append(
118            attributes.instance_state(a1),
119            attributes.instance_dict(a1), b1, None
120        )
121
122        assert a1.b == [b1]
123
124        assert_raises_message(
125            ValueError,
126            r"list.remove\(.*?\): .* not in list",
127            A.b.impl.remove,
128                attributes.instance_state(a1),
129                attributes.instance_dict(a1), b2, None
130        )
131
132    def test_collection_obj_pop_invalid(self):
133        A, B = self._collection_obj_fixture()
134
135        a1 = A()
136        b1 = B()
137        b2 = B()
138
139        A.b.impl.append(
140            attributes.instance_state(a1),
141            attributes.instance_dict(a1), b1, None
142        )
143
144        assert a1.b == [b1]
145
146        A.b.impl.pop(
147            attributes.instance_state(a1),
148            attributes.instance_dict(a1), b2, None
149        )
150        assert a1.b == [b1]
151
152    def test_collection_obj_pop_valid(self):
153        A, B = self._collection_obj_fixture()
154
155        a1 = A()
156        b1 = B()
157
158        A.b.impl.append(
159            attributes.instance_state(a1),
160            attributes.instance_dict(a1), b1, None
161        )
162
163        assert a1.b == [b1]
164
165        A.b.impl.pop(
166            attributes.instance_state(a1),
167            attributes.instance_dict(a1), b1, None
168        )
169        assert a1.b == []
170
171
172class AttributesTest(fixtures.ORMTest):
173    def setup(self):
174        global MyTest, MyTest2
175        class MyTest(object): pass
176        class MyTest2(object): pass
177
178    def teardown(self):
179        global MyTest, MyTest2
180        MyTest, MyTest2 = None, None
181
182    def test_basic(self):
183        class User(object):
184            pass
185
186        instrumentation.register_class(User)
187        attributes.register_attribute(User, 'user_id', uselist=False,
188                useobject=False)
189        attributes.register_attribute(User, 'user_name', uselist=False,
190                useobject=False)
191        attributes.register_attribute(User, 'email_address',
192                uselist=False, useobject=False)
193        u = User()
194        u.user_id = 7
195        u.user_name = 'john'
196        u.email_address = 'lala@123.com'
197        self.assert_(u.user_id == 7 and u.user_name == 'john'
198                     and u.email_address == 'lala@123.com')
199        attributes.instance_state(u)._commit_all(attributes.instance_dict(u))
200        self.assert_(u.user_id == 7 and u.user_name == 'john'
201                     and u.email_address == 'lala@123.com')
202        u.user_name = 'heythere'
203        u.email_address = 'foo@bar.com'
204        self.assert_(u.user_id == 7 and u.user_name == 'heythere'
205                     and u.email_address == 'foo@bar.com')
206
207    def test_pickleness(self):
208        instrumentation.register_class(MyTest)
209        instrumentation.register_class(MyTest2)
210        attributes.register_attribute(MyTest, 'user_id', uselist=False,
211                useobject=False)
212        attributes.register_attribute(MyTest, 'user_name',
213                uselist=False, useobject=False)
214        attributes.register_attribute(MyTest, 'email_address',
215                uselist=False, useobject=False)
216        attributes.register_attribute(MyTest2, 'a', uselist=False,
217                useobject=False)
218        attributes.register_attribute(MyTest2, 'b', uselist=False,
219                useobject=False)
220
221        # shouldn't be pickling callables at the class level
222
223        def somecallable(state, passive):
224            return None
225
226        attributes.register_attribute(MyTest, 'mt2', uselist=True,
227                trackparent=True, callable_=somecallable,
228                useobject=True)
229
230        o = MyTest()
231        o.mt2.append(MyTest2())
232        o.user_id=7
233        o.mt2[0].a = 'abcde'
234        pk_o = pickle.dumps(o)
235
236        o2 = pickle.loads(pk_o)
237        pk_o2 = pickle.dumps(o2)
238
239
240        # the above is kind of distrurbing, so let's do it again a little
241        # differently.  the string-id in serialization thing is just an
242        # artifact of pickling that comes up in the first round-trip.
243        # a -> b differs in pickle memoization of 'mt2', but b -> c will
244        # serialize identically.
245
246        o3 = pickle.loads(pk_o2)
247        pk_o3 = pickle.dumps(o3)
248        o4 = pickle.loads(pk_o3)
249
250        # and lastly make sure we still have our data after all that.
251        # identical serialzation is great, *if* it's complete :)
252        self.assert_(o4.user_id == 7)
253        self.assert_(o4.user_name is None)
254        self.assert_(o4.email_address is None)
255        self.assert_(len(o4.mt2) == 1)
256        self.assert_(o4.mt2[0].a == 'abcde')
257        self.assert_(o4.mt2[0].b is None)
258
259    @testing.requires.predictable_gc
260    def test_state_gc(self):
261        """test that InstanceState always has a dict, even after host
262        object gc'ed."""
263
264        class Foo(object):
265            pass
266
267        instrumentation.register_class(Foo)
268        f = Foo()
269        state = attributes.instance_state(f)
270        f.bar = "foo"
271        eq_(state.dict, {'bar': 'foo', state.manager.STATE_ATTR: state})
272        del f
273        gc_collect()
274        assert state.obj() is None
275        assert state.dict == {}
276
277    @testing.requires.predictable_gc
278    def test_object_dereferenced_error(self):
279        class Foo(object):
280            pass
281        class Bar(object):
282            def __init__(self):
283                gc_collect()
284
285        instrumentation.register_class(Foo)
286        instrumentation.register_class(Bar)
287        attributes.register_attribute(Foo,
288                                    'bars',
289                                    uselist=True,
290                                    useobject=True)
291
292        assert_raises_message(
293            orm_exc.ObjectDereferencedError,
294            "Can't emit change event for attribute "
295            "'Foo.bars' - parent object of type <Foo> "
296            "has been garbage collected.",
297            lambda: Foo().bars.append(Bar())
298        )
299
300    def test_deferred(self):
301        class Foo(object):
302            pass
303
304        data = {'a':'this is a', 'b':12}
305        def loader(state, keys):
306            for k in keys:
307                state.dict[k] = data[k]
308            return attributes.ATTR_WAS_SET
309
310        instrumentation.register_class(Foo)
311        manager = attributes.manager_of_class(Foo)
312        manager.deferred_scalar_loader = loader
313        attributes.register_attribute(Foo, 'a', uselist=False, useobject=False)
314        attributes.register_attribute(Foo, 'b', uselist=False, useobject=False)
315
316        f = Foo()
317        attributes.instance_state(f)._expire(attributes.instance_dict(f),
318                set())
319        eq_(f.a, 'this is a')
320        eq_(f.b, 12)
321        f.a = 'this is some new a'
322        attributes.instance_state(f)._expire(attributes.instance_dict(f),
323                set())
324        eq_(f.a, 'this is a')
325        eq_(f.b, 12)
326        attributes.instance_state(f)._expire(attributes.instance_dict(f),
327                set())
328        f.a = 'this is another new a'
329        eq_(f.a, 'this is another new a')
330        eq_(f.b, 12)
331        attributes.instance_state(f)._expire(attributes.instance_dict(f),
332                set())
333        eq_(f.a, 'this is a')
334        eq_(f.b, 12)
335        del f.a
336        eq_(f.a, None)
337        eq_(f.b, 12)
338        attributes.instance_state(f)._commit_all(attributes.instance_dict(f),
339                set())
340        eq_(f.a, None)
341        eq_(f.b, 12)
342
343    def test_deferred_pickleable(self):
344        data = {'a':'this is a', 'b':12}
345        def loader(state, keys):
346            for k in keys:
347                state.dict[k] = data[k]
348            return attributes.ATTR_WAS_SET
349
350        instrumentation.register_class(MyTest)
351        manager = attributes.manager_of_class(MyTest)
352        manager.deferred_scalar_loader=loader
353        attributes.register_attribute(MyTest, 'a', uselist=False, useobject=False)
354        attributes.register_attribute(MyTest, 'b', uselist=False, useobject=False)
355
356        m = MyTest()
357        attributes.instance_state(m)._expire(attributes.instance_dict(m), set())
358        assert 'a' not in m.__dict__
359        m2 = pickle.loads(pickle.dumps(m))
360        assert 'a' not in m2.__dict__
361        eq_(m2.a, "this is a")
362        eq_(m2.b, 12)
363
364    def test_list(self):
365        class User(object):pass
366        class Address(object):pass
367
368        instrumentation.register_class(User)
369        instrumentation.register_class(Address)
370        attributes.register_attribute(User, 'user_id', uselist=False,
371                useobject=False)
372        attributes.register_attribute(User, 'user_name', uselist=False,
373                useobject=False)
374        attributes.register_attribute(User, 'addresses', uselist=True,
375                useobject=True)
376        attributes.register_attribute(Address, 'address_id',
377                uselist=False, useobject=False)
378        attributes.register_attribute(Address, 'email_address',
379                uselist=False, useobject=False)
380
381        u = User()
382        u.user_id = 7
383        u.user_name = 'john'
384        u.addresses = []
385        a = Address()
386        a.address_id = 10
387        a.email_address = 'lala@123.com'
388        u.addresses.append(a)
389
390        self.assert_(u.user_id == 7 and u.user_name == 'john'
391                     and u.addresses[0].email_address == 'lala@123.com')
392        (u,
393         attributes.instance_state(a)._commit_all(attributes.instance_dict(a)))
394        self.assert_(u.user_id == 7 and u.user_name == 'john'
395                     and u.addresses[0].email_address == 'lala@123.com')
396
397        u.user_name = 'heythere'
398        a = Address()
399        a.address_id = 11
400        a.email_address = 'foo@bar.com'
401        u.addresses.append(a)
402
403        eq_(u.user_id, 7)
404        eq_(u.user_name, 'heythere')
405        eq_(u.addresses[0].email_address,'lala@123.com')
406        eq_(u.addresses[1].email_address,'foo@bar.com')
407
408    def test_extension_commit_attr(self):
409        """test that an extension which commits attribute history
410        maintains the end-result history.
411
412        This won't work in conjunction with some unitofwork extensions.
413
414        """
415
416        class Foo(fixtures.BasicEntity):
417            pass
418        class Bar(fixtures.BasicEntity):
419            pass
420
421        class ReceiveEvents(AttributeExtension):
422            def __init__(self, key):
423                self.key = key
424
425            def append(self, state, child, initiator):
426                if commit:
427                    state._commit_all(state.dict)
428                return child
429
430            def remove(self, state, child, initiator):
431                if commit:
432                    state._commit_all(state.dict)
433                return child
434
435            def set(self, state, child, oldchild, initiator):
436                if commit:
437                    state._commit_all(state.dict)
438                return child
439
440        instrumentation.register_class(Foo)
441        instrumentation.register_class(Bar)
442
443        b1, b2, b3, b4 = Bar(id='b1'), Bar(id='b2'), Bar(id='b3'), Bar(id='b4')
444
445        def loadcollection(state, passive):
446            if passive is attributes.PASSIVE_NO_FETCH:
447                return attributes.PASSIVE_NO_RESULT
448            return [b1, b2]
449
450        def loadscalar(state, passive):
451            if passive is attributes.PASSIVE_NO_FETCH:
452                return attributes.PASSIVE_NO_RESULT
453            return b2
454
455        attributes.register_attribute(Foo, 'bars',
456                               uselist=True,
457                               useobject=True,
458                               callable_=loadcollection,
459                               extension=[ReceiveEvents('bars')])
460
461        attributes.register_attribute(Foo, 'bar',
462                              uselist=False,
463                              useobject=True,
464                              callable_=loadscalar,
465                              extension=[ReceiveEvents('bar')])
466
467        attributes.register_attribute(Foo, 'scalar',
468                            uselist=False,
469                            useobject=False, extension=[ReceiveEvents('scalar')])
470
471
472        def create_hist():
473            def hist(key, shouldmatch, fn, *arg):
474                attributes.instance_state(f1)._commit_all(attributes.instance_dict(f1))
475                fn(*arg)
476                histories.append((shouldmatch,
477                                 attributes.get_history(f1, key)))
478
479            f1 = Foo()
480            hist('bars', True, f1.bars.append, b3)
481            hist('bars', True, f1.bars.append, b4)
482            hist('bars', False, f1.bars.remove, b2)
483            hist('bar', True, setattr, f1, 'bar', b3)
484            hist('bar', True, setattr, f1, 'bar', None)
485            hist('bar', True, setattr, f1, 'bar', b4)
486            hist('scalar', True, setattr, f1, 'scalar', 5)
487            hist('scalar', True, setattr, f1, 'scalar', None)
488            hist('scalar', True, setattr, f1, 'scalar', 4)
489
490        histories = []
491        commit = False
492        create_hist()
493        without_commit = list(histories)
494        histories[:] = []
495        commit = True
496        create_hist()
497        with_commit = histories
498        for without, with_ in zip(without_commit, with_commit):
499            shouldmatch, woc = without
500            shouldmatch, wic = with_
501            if shouldmatch:
502                eq_(woc, wic)
503            else:
504                ne_(woc, wic)
505
506    def test_extension_lazyload_assertion(self):
507        class Foo(fixtures.BasicEntity):
508            pass
509        class Bar(fixtures.BasicEntity):
510            pass
511
512        class ReceiveEvents(AttributeExtension):
513            def append(self, state, child, initiator):
514                state.obj().bars
515                return child
516
517            def remove(self, state, child, initiator):
518                state.obj().bars
519                return child
520
521            def set(self, state, child, oldchild, initiator):
522                return child
523
524        instrumentation.register_class(Foo)
525        instrumentation.register_class(Bar)
526
527        bar1, bar2, bar3 = [Bar(id=1), Bar(id=2), Bar(id=3)]
528        def func1(state, passive):
529            if passive is attributes.PASSIVE_NO_FETCH:
530                return attributes.PASSIVE_NO_RESULT
531
532            return [bar1, bar2, bar3]
533
534        attributes.register_attribute(Foo, 'bars', uselist=True,
535                callable_=func1, useobject=True,
536                extension=[ReceiveEvents()])
537        attributes.register_attribute(Bar, 'foos', uselist=True,
538                useobject=True, backref='bars')
539
540        x = Foo()
541        assert_raises(AssertionError, Bar(id=4).foos.append, x)
542
543        x.bars
544        b = Bar(id=4)
545        b.foos.append(x)
546        attributes.instance_state(x)._expire_attributes(attributes.instance_dict(x),
547                ['bars'])
548        assert_raises(AssertionError, b.foos.remove, x)
549
550
551    def test_scalar_listener(self):
552
553        # listeners on ScalarAttributeImpl aren't used normally. test that
554        # they work for the benefit of user extensions
555
556        class Foo(object):
557
558            pass
559
560        results = []
561        class ReceiveEvents(AttributeExtension):
562            def append(self, state, child, initiator):
563                assert False
564
565            def remove(self, state, child, initiator):
566                results.append(("remove", state.obj(), child))
567
568            def set(self, state, child, oldchild, initiator):
569                results.append(("set", state.obj(), child, oldchild))
570                return child
571
572        instrumentation.register_class(Foo)
573        attributes.register_attribute(Foo, 'x', uselist=False,
574                useobject=False,
575                extension=ReceiveEvents())
576
577        f = Foo()
578        f.x = 5
579        f.x = 17
580        del f.x
581
582        eq_(results, [
583            ('set', f, 5, attributes.NEVER_SET),
584            ('set', f, 17, 5),
585            ('remove', f, 17),
586        ])
587
588    def test_lazytrackparent(self):
589        """test that the "hasparent" flag works properly
590           when lazy loaders and backrefs are used
591
592        """
593
594        class Post(object):
595            pass
596        class Blog(object):
597            pass
598        instrumentation.register_class(Post)
599        instrumentation.register_class(Blog)
600
601        # set up instrumented attributes with backrefs
602        attributes.register_attribute(Post, 'blog', uselist=False,
603                                        backref='posts',
604                                        trackparent=True, useobject=True)
605        attributes.register_attribute(Blog, 'posts', uselist=True,
606                                        backref='blog',
607                                        trackparent=True, useobject=True)
608
609        # create objects as if they'd been freshly loaded from the database (without history)
610        b = Blog()
611        p1 = Post()
612        _set_callable(attributes.instance_state(b), attributes.instance_dict(b),
613                                                    'posts', lambda state, passive:[p1])
614        _set_callable(attributes.instance_state(p1), attributes.instance_dict(p1),
615                                                    'blog', lambda state, passive:b)
616        p1, attributes.instance_state(b)._commit_all(attributes.instance_dict(b))
617
618        # no orphans (called before the lazy loaders fire off)
619        assert attributes.has_parent(Blog, p1, 'posts', optimistic=True)
620        assert attributes.has_parent(Post, b, 'blog', optimistic=True)
621
622        # assert connections
623        assert p1.blog is b
624        assert p1 in b.posts
625
626        # manual connections
627        b2 = Blog()
628        p2 = Post()
629        b2.posts.append(p2)
630        assert attributes.has_parent(Blog, p2, 'posts')
631        assert attributes.has_parent(Post, b2, 'blog')
632
633    def test_illegal_trackparent(self):
634        class Post(object):pass
635        class Blog(object):pass
636        instrumentation.register_class(Post)
637        instrumentation.register_class(Blog)
638
639        attributes.register_attribute(Post, 'blog', useobject=True)
640        assert_raises_message(
641            AssertionError,
642            "This AttributeImpl is not configured to track parents.",
643            attributes.has_parent, Post, Blog(), 'blog'
644        )
645        assert_raises_message(
646            AssertionError,
647            "This AttributeImpl is not configured to track parents.",
648            Post.blog.impl.sethasparent, "x", "x", True
649        )
650
651
652    def test_inheritance(self):
653        """tests that attributes are polymorphic"""
654
655        class Foo(object):pass
656        class Bar(Foo):pass
657
658
659        instrumentation.register_class(Foo)
660        instrumentation.register_class(Bar)
661
662        def func1(state, passive):
663            return "this is the foo attr"
664        def func2(state, passive):
665            return "this is the bar attr"
666        def func3(state, passive):
667            return "this is the shared attr"
668        attributes.register_attribute(Foo, 'element', uselist=False,
669                                            callable_=func1, useobject=True)
670        attributes.register_attribute(Foo, 'element2', uselist=False,
671                                            callable_=func3, useobject=True)
672        attributes.register_attribute(Bar, 'element', uselist=False,
673                                            callable_=func2, useobject=True)
674
675        x = Foo()
676        y = Bar()
677        assert x.element == 'this is the foo attr'
678        assert y.element == 'this is the bar attr'
679        assert x.element2 == 'this is the shared attr'
680        assert y.element2 == 'this is the shared attr'
681
682    def test_no_double_state(self):
683        states = set()
684        class Foo(object):
685            def __init__(self):
686                states.add(attributes.instance_state(self))
687        class Bar(Foo):
688            def __init__(self):
689                states.add(attributes.instance_state(self))
690                Foo.__init__(self)
691
692
693        instrumentation.register_class(Foo)
694        instrumentation.register_class(Bar)
695
696        b = Bar()
697        eq_(len(states), 1)
698        eq_(list(states)[0].obj(), b)
699
700
701    def test_inheritance2(self):
702        """test that the attribute manager can properly traverse the
703        managed attributes of an object, if the object is of a
704        descendant class with managed attributes in the parent class"""
705
706        class Foo(object):
707            pass
708
709        class Bar(Foo):
710            pass
711
712        class Element(object):
713            _state = True
714
715        instrumentation.register_class(Foo)
716        instrumentation.register_class(Bar)
717        attributes.register_attribute(Foo, 'element', uselist=False,
718                useobject=True)
719        el = Element()
720        x = Bar()
721        x.element = el
722        eq_(attributes.get_state_history(attributes.instance_state(x),
723            'element'), ([el], (), ()))
724        attributes.instance_state(x)._commit_all(attributes.instance_dict(x))
725        added, unchanged, deleted = \
726            attributes.get_state_history(attributes.instance_state(x),
727                'element')
728        assert added == ()
729        assert unchanged == [el]
730
731    def test_lazyhistory(self):
732        """tests that history functions work with lazy-loading attributes"""
733
734        class Foo(fixtures.BasicEntity):
735            pass
736        class Bar(fixtures.BasicEntity):
737            pass
738
739        instrumentation.register_class(Foo)
740        instrumentation.register_class(Bar)
741        bar1, bar2, bar3, bar4 = [Bar(id=1), Bar(id=2), Bar(id=3),
742                                  Bar(id=4)]
743
744        def func1(state, passive):
745            return 'this is func 1'
746
747        def func2(state, passive):
748            return [bar1, bar2, bar3]
749
750        attributes.register_attribute(Foo, 'col1', uselist=False,
751                callable_=func1, useobject=True)
752        attributes.register_attribute(Foo, 'col2', uselist=True,
753                callable_=func2, useobject=True)
754        attributes.register_attribute(Bar, 'id', uselist=False,
755                useobject=True)
756        x = Foo()
757        attributes.instance_state(x)._commit_all(attributes.instance_dict(x))
758        x.col2.append(bar4)
759        eq_(attributes.get_state_history(attributes.instance_state(x),
760            'col2'), ([bar4], [bar1, bar2, bar3], []))
761
762    def test_parenttrack(self):
763        class Foo(object):
764            pass
765
766        class Bar(object):
767            pass
768
769        instrumentation.register_class(Foo)
770        instrumentation.register_class(Bar)
771        attributes.register_attribute(Foo, 'element', uselist=False,
772                trackparent=True, useobject=True)
773        attributes.register_attribute(Bar, 'element', uselist=False,
774                trackparent=True, useobject=True)
775        f1 = Foo()
776        f2 = Foo()
777        b1 = Bar()
778        b2 = Bar()
779        f1.element = b1
780        b2.element = f2
781        assert attributes.has_parent(Foo, b1, 'element')
782        assert not attributes.has_parent(Foo, b2, 'element')
783        assert not attributes.has_parent(Foo, f2, 'element')
784        assert attributes.has_parent(Bar, f2, 'element')
785        b2.element = None
786        assert not attributes.has_parent(Bar, f2, 'element')
787
788        # test that double assignment doesn't accidentally reset the
789        # 'parent' flag.
790
791        b3 = Bar()
792        f4 = Foo()
793        b3.element = f4
794        assert attributes.has_parent(Bar, f4, 'element')
795        b3.element = f4
796        assert attributes.has_parent(Bar, f4, 'element')
797
798    def test_descriptorattributes(self):
799        """changeset: 1633 broke ability to use ORM to map classes with
800        unusual descriptor attributes (for example, classes that inherit
801        from ones implementing zope.interface.Interface). This is a
802        simple regression test to prevent that defect. """
803
804        class des(object):
805
806            def __get__(self, instance, owner):
807                raise AttributeError('fake attribute')
808
809        class Foo(object):
810            A = des()
811
812        instrumentation.register_class(Foo)
813        instrumentation.unregister_class(Foo)
814
815    def test_collectionclasses(self):
816
817        class Foo(object):
818            pass
819
820        instrumentation.register_class(Foo)
821        attributes.register_attribute(Foo, 'collection', uselist=True,
822                typecallable=set, useobject=True)
823        assert attributes.manager_of_class(Foo).is_instrumented('collection'
824                )
825        assert isinstance(Foo().collection, set)
826        attributes.unregister_attribute(Foo, 'collection')
827        assert not attributes.manager_of_class(Foo).is_instrumented('collection'
828                )
829        try:
830            attributes.register_attribute(Foo, 'collection',
831                    uselist=True, typecallable=dict, useobject=True)
832            assert False
833        except sa_exc.ArgumentError as e:
834            assert str(e) \
835                == 'Type InstrumentedDict must elect an appender '\
836                'method to be a collection class'
837
838        class MyDict(dict):
839
840            @collection.appender
841            def append(self, item):
842                self[item.foo] = item
843
844            @collection.remover
845            def remove(self, item):
846                del self[item.foo]
847
848        attributes.register_attribute(Foo, 'collection', uselist=True,
849                typecallable=MyDict, useobject=True)
850        assert isinstance(Foo().collection, MyDict)
851        attributes.unregister_attribute(Foo, 'collection')
852
853        class MyColl(object):
854            pass
855
856        try:
857            attributes.register_attribute(Foo, 'collection',
858                    uselist=True, typecallable=MyColl, useobject=True)
859            assert False
860        except sa_exc.ArgumentError as e:
861            assert str(e) \
862                == 'Type MyColl must elect an appender method to be a '\
863                'collection class'
864
865        class MyColl(object):
866
867            @collection.iterator
868            def __iter__(self):
869                return iter([])
870
871            @collection.appender
872            def append(self, item):
873                pass
874
875            @collection.remover
876            def remove(self, item):
877                pass
878
879        attributes.register_attribute(Foo, 'collection', uselist=True,
880                typecallable=MyColl, useobject=True)
881        try:
882            Foo().collection
883            assert True
884        except sa_exc.ArgumentError as e:
885            assert False
886
887class GetNoValueTest(fixtures.ORMTest):
888    def _fixture(self, expected):
889        class Foo(object):
890            pass
891
892        class Bar(object):
893            pass
894
895        def lazy_callable(state, passive):
896            return expected
897
898        instrumentation.register_class(Foo)
899        instrumentation.register_class(Bar)
900        if expected is not None:
901            attributes.register_attribute(Foo,
902                        "attr", useobject=True,
903                        uselist=False, callable_=lazy_callable)
904        else:
905            attributes.register_attribute(Foo,
906                        "attr", useobject=True,
907                        uselist=False)
908
909        f1 = self.f1 = Foo()
910        return Foo.attr.impl,\
911                attributes.instance_state(f1), \
912                attributes.instance_dict(f1)
913
914
915    def test_passive_no_result(self):
916        attr, state, dict_ = self._fixture(attributes.PASSIVE_NO_RESULT)
917        eq_(
918            attr.get(state, dict_, passive=attributes.PASSIVE_NO_INITIALIZE),
919            attributes.PASSIVE_NO_RESULT
920        )
921
922    def test_passive_no_result_never_set(self):
923        attr, state, dict_ = self._fixture(attributes.NEVER_SET)
924        eq_(
925            attr.get(state, dict_, passive=attributes.PASSIVE_NO_INITIALIZE),
926            attributes.PASSIVE_NO_RESULT
927        )
928        assert 'attr' not in dict_
929
930    def test_passive_ret_never_set_never_set(self):
931        attr, state, dict_ = self._fixture(attributes.NEVER_SET)
932        eq_(
933            attr.get(state, dict_, passive=attributes.PASSIVE_RETURN_NEVER_SET),
934            attributes.NEVER_SET
935        )
936        assert 'attr' not in dict_
937
938    def test_passive_ret_never_set_empty(self):
939        attr, state, dict_ = self._fixture(None)
940        eq_(
941            attr.get(state, dict_, passive=attributes.PASSIVE_RETURN_NEVER_SET),
942            attributes.NEVER_SET
943        )
944        assert 'attr' not in dict_
945
946    def test_off_empty(self):
947        attr, state, dict_ = self._fixture(None)
948        eq_(
949            attr.get(state, dict_, passive=attributes.PASSIVE_OFF),
950            None
951        )
952        assert 'attr' not in dict_
953
954class UtilTest(fixtures.ORMTest):
955    def test_helpers(self):
956        class Foo(object):
957            pass
958
959        class Bar(object):
960            pass
961
962        instrumentation.register_class(Foo)
963        instrumentation.register_class(Bar)
964        attributes.register_attribute(Foo, "coll", uselist=True, useobject=True)
965
966        f1 = Foo()
967        b1 = Bar()
968        b2 = Bar()
969        coll = attributes.init_collection(f1, "coll")
970        assert coll.data is f1.coll
971        assert attributes.get_attribute(f1, "coll") is f1.coll
972        attributes.set_attribute(f1, "coll", [b1])
973        assert f1.coll == [b1]
974        eq_(attributes.get_history(f1, "coll"), ([b1], [], []))
975        attributes.set_committed_value(f1, "coll", [b2])
976        eq_(attributes.get_history(f1, "coll"), ((), [b2], ()))
977
978        attributes.del_attribute(f1, "coll")
979        assert "coll" not in f1.__dict__
980
981class BackrefTest(fixtures.ORMTest):
982
983    def test_m2m(self):
984        class Student(object):pass
985        class Course(object):pass
986
987        instrumentation.register_class(Student)
988        instrumentation.register_class(Course)
989        attributes.register_attribute(Student, 'courses', uselist=True,
990                backref="students", useobject=True)
991        attributes.register_attribute(Course, 'students', uselist=True,
992                backref="courses", useobject=True)
993
994        s = Student()
995        c = Course()
996        s.courses.append(c)
997        self.assert_(c.students == [s])
998        s.courses.remove(c)
999        self.assert_(c.students == [])
1000
1001        (s1, s2, s3) = (Student(), Student(), Student())
1002
1003        c.students = [s1, s2, s3]
1004        self.assert_(s2.courses == [c])
1005        self.assert_(s1.courses == [c])
1006        s1.courses.remove(c)
1007        self.assert_(c.students == [s2,s3])
1008
1009    def test_o2m(self):
1010        class Post(object):pass
1011        class Blog(object):pass
1012
1013        instrumentation.register_class(Post)
1014        instrumentation.register_class(Blog)
1015        attributes.register_attribute(Post, 'blog', uselist=False,
1016                backref='posts',
1017                trackparent=True, useobject=True)
1018        attributes.register_attribute(Blog, 'posts', uselist=True,
1019                backref='blog',
1020                trackparent=True, useobject=True)
1021        b = Blog()
1022        (p1, p2, p3) = (Post(), Post(), Post())
1023        b.posts.append(p1)
1024        b.posts.append(p2)
1025        b.posts.append(p3)
1026        self.assert_(b.posts == [p1, p2, p3])
1027        self.assert_(p2.blog is b)
1028
1029        p3.blog = None
1030        self.assert_(b.posts == [p1, p2])
1031        p4 = Post()
1032        p4.blog = b
1033        self.assert_(b.posts == [p1, p2, p4])
1034
1035        p4.blog = b
1036        p4.blog = b
1037        self.assert_(b.posts == [p1, p2, p4])
1038
1039        # assert no failure removing None
1040        p5 = Post()
1041        p5.blog = None
1042        del p5.blog
1043
1044    def test_o2o(self):
1045        class Port(object):pass
1046        class Jack(object):pass
1047        instrumentation.register_class(Port)
1048        instrumentation.register_class(Jack)
1049
1050        attributes.register_attribute(Port, 'jack', uselist=False,
1051                                            useobject=True, backref="port")
1052
1053        attributes.register_attribute(Jack, 'port', uselist=False,
1054                                            useobject=True, backref="jack")
1055
1056
1057        p = Port()
1058        j = Jack()
1059        p.jack = j
1060        self.assert_(j.port is p)
1061        self.assert_(p.jack is not None)
1062
1063        j.port = None
1064        self.assert_(p.jack is None)
1065
1066    def test_symmetric_o2o_inheritance(self):
1067        """Test that backref 'initiator' catching goes against
1068        a token that is global to all InstrumentedAttribute objects
1069        within a particular class, not just the indvidual IA object
1070        since we use distinct objects in an inheritance scenario.
1071
1072        """
1073
1074        class Parent(object):
1075            pass
1076        class Child(object):
1077            pass
1078        class SubChild(Child):
1079            pass
1080
1081        p_token = object()
1082        c_token = object()
1083
1084        instrumentation.register_class(Parent)
1085        instrumentation.register_class(Child)
1086        instrumentation.register_class(SubChild)
1087        attributes.register_attribute(Parent, 'child', uselist=False,
1088                backref="parent",
1089                parent_token = p_token,
1090                useobject=True)
1091        attributes.register_attribute(Child, 'parent', uselist=False,
1092                backref="child",
1093                parent_token = c_token,
1094                useobject=True)
1095        attributes.register_attribute(SubChild, 'parent',
1096                uselist=False,
1097                backref="child",
1098                parent_token = c_token,
1099                useobject=True)
1100
1101        p1 = Parent()
1102        c1 = Child()
1103        p1.child = c1
1104
1105        c2 = SubChild()
1106        c2.parent = p1
1107
1108    def test_symmetric_o2m_inheritance(self):
1109        class Parent(object):
1110            pass
1111        class SubParent(Parent):
1112            pass
1113        class Child(object):
1114            pass
1115
1116        p_token = object()
1117        c_token = object()
1118
1119        instrumentation.register_class(Parent)
1120        instrumentation.register_class(SubParent)
1121        instrumentation.register_class(Child)
1122        attributes.register_attribute(Parent, 'children', uselist=True,
1123                backref='parent',
1124                parent_token = p_token,
1125                useobject=True)
1126        attributes.register_attribute(SubParent, 'children', uselist=True,
1127                backref='parent',
1128                parent_token = p_token,
1129                useobject=True)
1130        attributes.register_attribute(Child, 'parent', uselist=False,
1131                backref='children',
1132                parent_token = c_token,
1133                useobject=True)
1134
1135        p1 = Parent()
1136        p2 = SubParent()
1137        c1 = Child()
1138
1139        p1.children.append(c1)
1140
1141        assert c1.parent is p1
1142        assert c1 in p1.children
1143
1144        p2.children.append(c1)
1145        assert c1.parent is p2
1146
1147        # event propagates to remove as of [ticket:2789]
1148        assert c1 not in p1.children
1149
1150class CyclicBackrefAssertionTest(fixtures.TestBase):
1151    """test that infinite recursion due to incorrect backref assignments
1152    is blocked.
1153
1154    """
1155    def test_scalar_set_type_assertion(self):
1156        A, B, C = self._scalar_fixture()
1157        c1 = C()
1158        b1 = B()
1159        assert_raises_message(
1160            ValueError,
1161            'Bidirectional attribute conflict detected: '
1162            'Passing object <B at .*> to attribute "C.a" '
1163            'triggers a modify event on attribute "C.b" '
1164            'via the backref "B.c".',
1165            setattr, c1, 'a', b1
1166        )
1167
1168    def test_collection_append_type_assertion(self):
1169        A, B, C = self._collection_fixture()
1170        c1 = C()
1171        b1 = B()
1172        assert_raises_message(
1173            ValueError,
1174            'Bidirectional attribute conflict detected: '
1175            'Passing object <B at .*> to attribute "C.a" '
1176            'triggers a modify event on attribute "C.b" '
1177            'via the backref "B.c".',
1178            c1.a.append, b1
1179        )
1180
1181
1182    def _scalar_fixture(self):
1183        class A(object):
1184            pass
1185        class B(object):
1186            pass
1187        class C(object):
1188            pass
1189        instrumentation.register_class(A)
1190        instrumentation.register_class(B)
1191        instrumentation.register_class(C)
1192        attributes.register_attribute(C, 'a', backref='c', useobject=True)
1193        attributes.register_attribute(C, 'b', backref='c', useobject=True)
1194
1195        attributes.register_attribute(A, 'c', backref='a', useobject=True,
1196                        uselist=True)
1197        attributes.register_attribute(B, 'c', backref='b', useobject=True,
1198                        uselist=True)
1199
1200        return A, B, C
1201
1202    def _collection_fixture(self):
1203        class A(object):
1204            pass
1205        class B(object):
1206            pass
1207        class C(object):
1208            pass
1209        instrumentation.register_class(A)
1210        instrumentation.register_class(B)
1211        instrumentation.register_class(C)
1212
1213        attributes.register_attribute(C, 'a', backref='c', useobject=True,
1214                                                uselist=True)
1215        attributes.register_attribute(C, 'b', backref='c', useobject=True,
1216                                                uselist=True)
1217
1218        attributes.register_attribute(A, 'c', backref='a', useobject=True)
1219        attributes.register_attribute(B, 'c', backref='b', useobject=True)
1220
1221        return A, B, C
1222
1223    def _broken_collection_fixture(self):
1224        class A(object):
1225            pass
1226        class B(object):
1227            pass
1228        instrumentation.register_class(A)
1229        instrumentation.register_class(B)
1230
1231        attributes.register_attribute(A, 'b', backref='a1', useobject=True)
1232        attributes.register_attribute(B, 'a1', backref='b', useobject=True,
1233                                                uselist=True)
1234
1235        attributes.register_attribute(B, 'a2', backref='b', useobject=True,
1236                                                uselist=True)
1237
1238        return A, B
1239
1240    def test_broken_collection_assertion(self):
1241        A, B = self._broken_collection_fixture()
1242        b1 = B()
1243        a1 = A()
1244        assert_raises_message(
1245            ValueError,
1246            'Bidirectional attribute conflict detected: '
1247            'Passing object <A at .*> to attribute "B.a2" '
1248            'triggers a modify event on attribute "B.a1" '
1249            'via the backref "A.b".',
1250            b1.a2.append, a1
1251        )
1252
1253class PendingBackrefTest(fixtures.ORMTest):
1254    def _fixture(self):
1255        class Post(object):
1256            def __init__(self, name):
1257                self.name = name
1258            __hash__ = None
1259            def __eq__(self, other):
1260                return other is not None and other.name == self.name
1261
1262        class Blog(object):
1263            def __init__(self, name):
1264                self.name = name
1265            __hash__ = None
1266            def __eq__(self, other):
1267                return other is not None and other.name == self.name
1268
1269        lazy_posts = Mock()
1270
1271        instrumentation.register_class(Post)
1272        instrumentation.register_class(Blog)
1273        attributes.register_attribute(Post, 'blog', uselist=False,
1274                backref='posts', trackparent=True, useobject=True)
1275        attributes.register_attribute(Blog, 'posts', uselist=True,
1276                backref='blog', callable_=lazy_posts, trackparent=True,
1277                useobject=True)
1278
1279        return Post, Blog, lazy_posts
1280
1281    def test_lazy_add(self):
1282        Post, Blog, lazy_posts = self._fixture()
1283
1284        p1, p2, p3 = Post("post 1"), Post("post 2"), Post("post 3")
1285        lazy_posts.return_value = attributes.PASSIVE_NO_RESULT
1286
1287        b = Blog("blog 1")
1288        b1_state = attributes.instance_state(b)
1289
1290        p = Post("post 4")
1291
1292        p.blog = b
1293        eq_(
1294            lazy_posts.mock_calls, [
1295                call(b1_state, attributes.PASSIVE_NO_FETCH)
1296            ]
1297        )
1298
1299        p = Post("post 5")
1300
1301        # setting blog doesn't call 'posts' callable, calls with no fetch
1302        p.blog = b
1303        eq_(
1304            lazy_posts.mock_calls, [
1305                call(b1_state, attributes.PASSIVE_NO_FETCH),
1306                call(b1_state, attributes.PASSIVE_NO_FETCH)
1307            ]
1308        )
1309
1310        lazy_posts.return_value = [p1, p2, p3]
1311
1312        # calling backref calls the callable, populates extra posts
1313        eq_(b.posts, [p1, p2, p3, Post("post 4"), Post("post 5")])
1314        eq_(
1315            lazy_posts.mock_calls, [
1316                call(b1_state, attributes.PASSIVE_NO_FETCH),
1317                call(b1_state, attributes.PASSIVE_NO_FETCH),
1318                call(b1_state, attributes.PASSIVE_OFF)
1319            ]
1320        )
1321
1322    def test_lazy_history_collection(self):
1323        Post, Blog, lazy_posts = self._fixture()
1324
1325        p1, p2, p3 = Post("post 1"), Post("post 2"), Post("post 3")
1326        lazy_posts.return_value = [p1, p2, p3]
1327
1328        b = Blog("blog 1")
1329        p = Post("post 4")
1330        p.blog = b
1331
1332        p4 = Post("post 5")
1333        p4.blog = b
1334
1335        eq_(lazy_posts.call_count, 1)
1336
1337        eq_(attributes.instance_state(b).
1338                get_history('posts', attributes.PASSIVE_OFF),
1339                            ([p, p4], [p1, p2, p3], []))
1340        eq_(lazy_posts.call_count, 1)
1341
1342    def test_passive_history_collection_never_set(self):
1343        Post, Blog, lazy_posts = self._fixture()
1344
1345        lazy_posts.return_value = attributes.PASSIVE_NO_RESULT
1346
1347        b = Blog("blog 1")
1348        p = Post("post 1")
1349
1350        state, dict_ = attributes.instance_state(b), attributes.instance_dict(b)
1351
1352        # this sets up NEVER_SET on b.posts
1353        p.blog = b
1354
1355        eq_(state.committed_state, {"posts": attributes.NEVER_SET})
1356        assert 'posts' not in dict_
1357
1358        # then suppose the object was made transient again,
1359        # the lazy loader would return this
1360        lazy_posts.return_value = attributes.ATTR_EMPTY
1361
1362        p2 = Post('asdf')
1363        p2.blog = b
1364
1365        eq_(state.committed_state, {"posts": attributes.NEVER_SET})
1366        eq_(dict_['posts'], [p2])
1367
1368        # then this would fail.
1369        eq_(
1370            Blog.posts.impl.get_history(state, dict_, passive=True),
1371            ([p2], (), ())
1372        )
1373
1374        eq_(
1375            Blog.posts.impl.get_all_pending(state, dict_),
1376            [(attributes.instance_state(p2), p2)]
1377        )
1378
1379    def test_state_on_add_remove(self):
1380        Post, Blog, lazy_posts = self._fixture()
1381        lazy_posts.return_value = attributes.PASSIVE_NO_RESULT
1382
1383        b = Blog("blog 1")
1384        b1_state = attributes.instance_state(b)
1385        p = Post("post 1")
1386        p.blog = b
1387        eq_(lazy_posts.mock_calls,
1388                [call(b1_state, attributes.PASSIVE_NO_FETCH)])
1389        p.blog = None
1390        eq_(lazy_posts.mock_calls,
1391                [call(b1_state, attributes.PASSIVE_NO_FETCH),
1392                call(b1_state, attributes.PASSIVE_NO_FETCH)])
1393        lazy_posts.return_value = []
1394        eq_(b.posts, [])
1395        eq_(lazy_posts.mock_calls,
1396                [call(b1_state, attributes.PASSIVE_NO_FETCH),
1397                call(b1_state, attributes.PASSIVE_NO_FETCH),
1398                call(b1_state, attributes.PASSIVE_OFF)])
1399
1400    def test_pending_combines_with_lazy(self):
1401        Post, Blog, lazy_posts = self._fixture()
1402
1403        lazy_posts.return_value = attributes.PASSIVE_NO_RESULT
1404
1405        b = Blog("blog 1")
1406        p = Post("post 1")
1407        p2 = Post("post 2")
1408        p.blog = b
1409        eq_(lazy_posts.call_count, 1)
1410
1411        lazy_posts.return_value = [p, p2]
1412
1413        # lazy loaded + pending get added together.
1414        # This isn't seen often with the ORM due
1415        # to usual practices surrounding the
1416        # load/flush/load cycle.
1417        eq_(b.posts, [p, p2, p])
1418        eq_(lazy_posts.call_count, 2)
1419
1420    def test_normal_load(self):
1421        Post, Blog, lazy_posts = self._fixture()
1422
1423        lazy_posts.return_value = \
1424            (p1, p2, p3) = [Post("post 1"), Post("post 2"), Post("post 3")]
1425
1426        b = Blog("blog 1")
1427
1428        # assign without using backref system
1429        p2.__dict__['blog'] = b
1430
1431        eq_(b.posts, [Post("post 1"), Post("post 2"), Post("post 3")])
1432
1433        eq_(lazy_posts.call_count, 1)
1434        p2.blog = None
1435        p4 = Post("post 4")
1436        p4.blog = b
1437        eq_(b.posts, [Post("post 1"), Post("post 3"), Post("post 4")])
1438
1439        b_state = attributes.instance_state(b)
1440
1441        eq_(lazy_posts.call_count, 1)
1442        eq_(lazy_posts.mock_calls,
1443            [call(b_state, attributes.PASSIVE_OFF)])
1444
1445
1446    def test_commit_removes_pending(self):
1447        Post, Blog, lazy_posts = self._fixture()
1448
1449        p1 = Post("post 1")
1450
1451        lazy_posts.return_value = attributes.PASSIVE_NO_RESULT
1452        b = Blog("blog 1")
1453        p1.blog = b
1454
1455        b_state = attributes.instance_state(b)
1456        p1_state = attributes.instance_state(p1)
1457        b_state._commit_all(attributes.instance_dict(b))
1458        p1_state._commit_all(attributes.instance_dict(p1))
1459        lazy_posts.return_value = [p1]
1460        eq_(b.posts, [Post("post 1")])
1461        eq_(lazy_posts.mock_calls,
1462            [call(b_state, attributes.PASSIVE_NO_FETCH),
1463            call(b_state, attributes.PASSIVE_OFF)])
1464
1465class HistoryTest(fixtures.TestBase):
1466
1467    def _fixture(self, uselist, useobject, active_history, **kw):
1468        class Foo(fixtures.BasicEntity):
1469            pass
1470
1471        instrumentation.register_class(Foo)
1472        attributes.register_attribute(
1473                    Foo, 'someattr',
1474                    uselist=uselist,
1475                    useobject=useobject,
1476                    active_history=active_history,
1477                    **kw)
1478        return Foo
1479
1480    def _two_obj_fixture(self, uselist):
1481        class Foo(fixtures.BasicEntity):
1482            pass
1483        class Bar(fixtures.BasicEntity):
1484            def __bool__(self):
1485                assert False
1486
1487        instrumentation.register_class(Foo)
1488        instrumentation.register_class(Bar)
1489        attributes.register_attribute(Foo, 'someattr', uselist=uselist,
1490                useobject=True)
1491        return Foo, Bar
1492
1493    def _someattr_history(self, f, **kw):
1494        passive = kw.pop('passive', None)
1495        if passive is True:
1496            kw['passive'] = attributes.PASSIVE_NO_INITIALIZE
1497        elif passive is False:
1498            kw['passive'] = attributes.PASSIVE_OFF
1499
1500        return attributes.get_state_history(
1501                    attributes.instance_state(f),
1502                    'someattr', **kw)
1503
1504    def _commit_someattr(self, f):
1505        attributes.instance_state(f)._commit(attributes.instance_dict(f),
1506                ['someattr'])
1507
1508    def _someattr_committed_state(self, f):
1509        Foo = f.__class__
1510        return Foo.someattr.impl.get_committed_value(
1511            attributes.instance_state(f),
1512            attributes.instance_dict(f))
1513
1514    def test_committed_value_init(self):
1515        Foo = self._fixture(uselist=False, useobject=False,
1516                                active_history=False)
1517        f = Foo()
1518        eq_(self._someattr_committed_state(f), None)
1519
1520    def test_committed_value_set(self):
1521        Foo = self._fixture(uselist=False, useobject=False,
1522                                active_history=False)
1523        f = Foo()
1524        f.someattr = 3
1525        eq_(self._someattr_committed_state(f), None)
1526
1527    def test_committed_value_set_active_hist(self):
1528        Foo = self._fixture(uselist=False, useobject=False,
1529                                active_history=True)
1530        f = Foo()
1531        f.someattr = 3
1532        eq_(self._someattr_committed_state(f), None)
1533
1534    def test_committed_value_set_commit(self):
1535        Foo = self._fixture(uselist=False, useobject=False,
1536                                active_history=False)
1537        f = Foo()
1538        f.someattr = 3
1539        self._commit_someattr(f)
1540        eq_(self._someattr_committed_state(f), 3)
1541
1542    def test_scalar_init(self):
1543        Foo = self._fixture(uselist=False, useobject=False,
1544                                active_history=False)
1545        f = Foo()
1546        eq_(self._someattr_history(f), ((), (), ()))
1547
1548    def test_object_init(self):
1549        Foo = self._fixture(uselist=False, useobject=True,
1550                                active_history=False)
1551        f = Foo()
1552        eq_(self._someattr_history(f), ((), (), ()))
1553
1554    def test_object_init_active_history(self):
1555        Foo = self._fixture(uselist=False, useobject=True,
1556                                active_history=True)
1557        f = Foo()
1558        eq_(self._someattr_history(f), ((), (), ()))
1559
1560    def test_scalar_no_init_side_effect(self):
1561        Foo = self._fixture(uselist=False, useobject=False,
1562                                active_history=False)
1563        f = Foo()
1564        self._someattr_history(f)
1565        # no side effects
1566        assert 'someattr' not in f.__dict__
1567        assert 'someattr' not in attributes.instance_state(f).committed_state
1568
1569    def test_scalar_set(self):
1570        Foo = self._fixture(uselist=False, useobject=False,
1571                                active_history=False)
1572        f = Foo()
1573        f.someattr = 'hi'
1574        eq_(self._someattr_history(f), (['hi'], (), ()))
1575
1576    def test_scalar_set_None(self):
1577        # note - compare:
1578        # test_scalar_set_None,
1579        # test_scalar_get_first_set_None,
1580        # test_use_object_set_None,
1581        # test_use_object_get_first_set_None
1582        Foo = self._fixture(uselist=False, useobject=False,
1583                                active_history=False)
1584        f = Foo()
1585        f.someattr = None
1586        eq_(self._someattr_history(f), ([None], (), ()))
1587
1588    def test_scalar_get_first_set_None(self):
1589        # note - compare:
1590        # test_scalar_set_None,
1591        # test_scalar_get_first_set_None,
1592        # test_use_object_set_None,
1593        # test_use_object_get_first_set_None
1594        Foo = self._fixture(uselist=False, useobject=False,
1595                                active_history=False)
1596        f = Foo()
1597        assert f.someattr is None
1598        f.someattr = None
1599        eq_(self._someattr_history(f), ([None], (), ()))
1600
1601    def test_scalar_set_commit(self):
1602        Foo = self._fixture(uselist=False, useobject=False,
1603                                active_history=False)
1604        f = Foo()
1605        f.someattr = 'hi'
1606        self._commit_someattr(f)
1607        eq_(self._someattr_history(f), ((), ['hi'], ()))
1608
1609    def test_scalar_set_commit_reset(self):
1610        Foo = self._fixture(uselist=False, useobject=False,
1611                                active_history=False)
1612        f = Foo()
1613        f.someattr = 'hi'
1614        self._commit_someattr(f)
1615        f.someattr = 'there'
1616        eq_(self._someattr_history(f), (['there'], (), ['hi']))
1617
1618    def test_scalar_set_commit_reset_commit(self):
1619        Foo = self._fixture(uselist=False, useobject=False,
1620                                active_history=False)
1621        f = Foo()
1622        f.someattr = 'hi'
1623        self._commit_someattr(f)
1624        f.someattr = 'there'
1625        self._commit_someattr(f)
1626        eq_(self._someattr_history(f), ((), ['there'], ()))
1627
1628    def test_scalar_set_commit_reset_commit_del(self):
1629        Foo = self._fixture(uselist=False, useobject=False,
1630                                active_history=False)
1631        f = Foo()
1632        f.someattr = 'there'
1633        self._commit_someattr(f)
1634        del f.someattr
1635        eq_(self._someattr_history(f), ((), (), ['there']))
1636
1637    def test_scalar_set_dict(self):
1638        Foo = self._fixture(uselist=False, useobject=False,
1639                                active_history=False)
1640        f = Foo()
1641        f.__dict__['someattr'] = 'new'
1642        eq_(self._someattr_history(f), ((), ['new'], ()))
1643
1644    def test_scalar_set_dict_set(self):
1645        Foo = self._fixture(uselist=False, useobject=False,
1646                                active_history=False)
1647        f = Foo()
1648        f.__dict__['someattr'] = 'new'
1649        self._someattr_history(f)
1650        f.someattr = 'old'
1651        eq_(self._someattr_history(f), (['old'], (), ['new']))
1652
1653    def test_scalar_set_dict_set_commit(self):
1654        Foo = self._fixture(uselist=False, useobject=False,
1655                                active_history=False)
1656        f = Foo()
1657        f.__dict__['someattr'] = 'new'
1658        self._someattr_history(f)
1659        f.someattr = 'old'
1660        self._commit_someattr(f)
1661        eq_(self._someattr_history(f), ((), ['old'], ()))
1662
1663    def test_scalar_set_None(self):
1664        Foo = self._fixture(uselist=False, useobject=False,
1665                                active_history=False)
1666        f = Foo()
1667        f.someattr = None
1668        eq_(self._someattr_history(f), ([None], (), ()))
1669
1670    def test_scalar_set_None_from_dict_set(self):
1671        Foo = self._fixture(uselist=False, useobject=False,
1672                                active_history=False)
1673        f = Foo()
1674        f.__dict__['someattr'] = 'new'
1675        f.someattr = None
1676        eq_(self._someattr_history(f), ([None], (), ['new']))
1677
1678    def test_scalar_set_twice_no_commit(self):
1679        Foo = self._fixture(uselist=False, useobject=False,
1680                                active_history=False)
1681        f = Foo()
1682        f.someattr = 'one'
1683        eq_(self._someattr_history(f), (['one'], (), ()))
1684        f.someattr = 'two'
1685        eq_(self._someattr_history(f), (['two'], (), ()))
1686
1687    def test_scalar_active_init(self):
1688        Foo = self._fixture(uselist=False, useobject=False,
1689                                active_history=True)
1690        f = Foo()
1691        eq_(self._someattr_history(f), ((), (), ()))
1692
1693    def test_scalar_active_no_init_side_effect(self):
1694        Foo = self._fixture(uselist=False, useobject=False,
1695                                active_history=True)
1696        f = Foo()
1697        self._someattr_history(f)
1698        # no side effects
1699        assert 'someattr' not in f.__dict__
1700        assert 'someattr' not in attributes.instance_state(f).committed_state
1701
1702    def test_collection_never_set(self):
1703        Foo = self._fixture(uselist=True, useobject=True,
1704                                active_history=True)
1705        f = Foo()
1706        eq_(self._someattr_history(f, passive=True), (None, None, None))
1707
1708    def test_scalar_obj_never_set(self):
1709        Foo = self._fixture(uselist=False, useobject=True,
1710                                active_history=True)
1711        f = Foo()
1712        eq_(self._someattr_history(f, passive=True), (None, None, None))
1713
1714    def test_scalar_never_set(self):
1715        Foo = self._fixture(uselist=False, useobject=False,
1716                                active_history=True)
1717        f = Foo()
1718        eq_(self._someattr_history(f, passive=True), (None, None, None))
1719
1720    def test_scalar_active_set(self):
1721        Foo = self._fixture(uselist=False, useobject=False,
1722                                active_history=True)
1723        f = Foo()
1724        f.someattr = 'hi'
1725        eq_(self._someattr_history(f), (['hi'], (), ()))
1726
1727    def test_scalar_active_set_commit(self):
1728        Foo = self._fixture(uselist=False, useobject=False,
1729                                active_history=True)
1730        f = Foo()
1731        f.someattr = 'hi'
1732        self._commit_someattr(f)
1733        eq_(self._someattr_history(f), ((), ['hi'], ()))
1734
1735    def test_scalar_active_set_commit_reset(self):
1736        Foo = self._fixture(uselist=False, useobject=False,
1737                                active_history=True)
1738        f = Foo()
1739        f.someattr = 'hi'
1740        self._commit_someattr(f)
1741        f.someattr = 'there'
1742        eq_(self._someattr_history(f), (['there'], (), ['hi']))
1743
1744    def test_scalar_active_set_commit_reset_commit(self):
1745        Foo = self._fixture(uselist=False, useobject=False,
1746                                active_history=True)
1747        f = Foo()
1748        f.someattr = 'hi'
1749        self._commit_someattr(f)
1750        f.someattr = 'there'
1751        self._commit_someattr(f)
1752        eq_(self._someattr_history(f), ((), ['there'], ()))
1753
1754    def test_scalar_active_set_commit_reset_commit_del(self):
1755        Foo = self._fixture(uselist=False, useobject=False,
1756                                active_history=True)
1757        f = Foo()
1758        f.someattr = 'there'
1759        self._commit_someattr(f)
1760        del f.someattr
1761        eq_(self._someattr_history(f), ((), (), ['there']))
1762
1763    def test_scalar_active_set_dict(self):
1764        Foo = self._fixture(uselist=False, useobject=False,
1765                                active_history=True)
1766        f = Foo()
1767        f.__dict__['someattr'] = 'new'
1768        eq_(self._someattr_history(f), ((), ['new'], ()))
1769
1770    def test_scalar_active_set_dict_set(self):
1771        Foo = self._fixture(uselist=False, useobject=False,
1772                                active_history=True)
1773        f = Foo()
1774        f.__dict__['someattr'] = 'new'
1775        self._someattr_history(f)
1776        f.someattr = 'old'
1777        eq_(self._someattr_history(f), (['old'], (), ['new']))
1778
1779    def test_scalar_active_set_dict_set_commit(self):
1780        Foo = self._fixture(uselist=False, useobject=False,
1781                                active_history=True)
1782        f = Foo()
1783        f.__dict__['someattr'] = 'new'
1784        self._someattr_history(f)
1785        f.someattr = 'old'
1786        self._commit_someattr(f)
1787        eq_(self._someattr_history(f), ((), ['old'], ()))
1788
1789    def test_scalar_active_set_None(self):
1790        Foo = self._fixture(uselist=False, useobject=False,
1791                                active_history=True)
1792        f = Foo()
1793        f.someattr = None
1794        eq_(self._someattr_history(f), ([None], (), ()))
1795
1796    def test_scalar_active_set_None_from_dict_set(self):
1797        Foo = self._fixture(uselist=False, useobject=False,
1798                                active_history=True)
1799        f = Foo()
1800        f.__dict__['someattr'] = 'new'
1801        f.someattr = None
1802        eq_(self._someattr_history(f), ([None], (), ['new']))
1803
1804    def test_scalar_active_set_twice_no_commit(self):
1805        Foo = self._fixture(uselist=False, useobject=False,
1806                                active_history=True)
1807        f = Foo()
1808        f.someattr = 'one'
1809        eq_(self._someattr_history(f), (['one'], (), ()))
1810        f.someattr = 'two'
1811        eq_(self._someattr_history(f), (['two'], (), ()))
1812
1813
1814    def test_scalar_passive_flag(self):
1815        Foo = self._fixture(uselist=False, useobject=False,
1816                                active_history=True)
1817        f = Foo()
1818        f.someattr = 'one'
1819        eq_(self._someattr_history(f), (['one'], (), ()))
1820
1821        self._commit_someattr(f)
1822
1823        state = attributes.instance_state(f)
1824        # do the same thing that
1825        # populators.expire.append((self.key, True))
1826        # does in loading.py
1827        state.dict.pop('someattr', None)
1828        state.expired_attributes.add('someattr')
1829
1830        def scalar_loader(state, toload):
1831            state.dict['someattr'] = 'one'
1832        state.manager.deferred_scalar_loader = scalar_loader
1833
1834        eq_(self._someattr_history(f), ((), ['one'], ()))
1835
1836
1837    def test_scalar_inplace_mutation_set(self):
1838        Foo = self._fixture(uselist=False, useobject=False,
1839                                active_history=False)
1840        f = Foo()
1841        f.someattr = {'a': 'b'}
1842        eq_(self._someattr_history(f), ([{'a': 'b'}], (), ()))
1843
1844    def test_scalar_inplace_mutation_set_commit(self):
1845        Foo = self._fixture(uselist=False, useobject=False,
1846                                active_history=False)
1847        f = Foo()
1848        f.someattr = {'a': 'b'}
1849        self._commit_someattr(f)
1850        eq_(self._someattr_history(f), ((), [{'a': 'b'}], ()))
1851
1852    def test_scalar_inplace_mutation_set_commit_set(self):
1853        Foo = self._fixture(uselist=False, useobject=False,
1854                                active_history=False)
1855        f = Foo()
1856        f.someattr = {'a': 'b'}
1857        self._commit_someattr(f)
1858        f.someattr['a'] = 'c'
1859        eq_(self._someattr_history(f), ((), [{'a': 'c'}], ()))
1860
1861    def test_scalar_inplace_mutation_set_commit_flag_modified(self):
1862        Foo = self._fixture(uselist=False, useobject=False,
1863                                active_history=False)
1864        f = Foo()
1865        f.someattr = {'a': 'b'}
1866        self._commit_someattr(f)
1867        attributes.flag_modified(f, 'someattr')
1868        eq_(self._someattr_history(f), ([{'a': 'b'}], (), ()))
1869
1870    def test_scalar_inplace_mutation_set_commit_set_flag_modified(self):
1871        Foo = self._fixture(uselist=False, useobject=False,
1872                                active_history=False)
1873        f = Foo()
1874        f.someattr = {'a': 'b'}
1875        self._commit_someattr(f)
1876        f.someattr['a'] = 'c'
1877        attributes.flag_modified(f, 'someattr')
1878        eq_(self._someattr_history(f), ([{'a': 'c'}], (), ()))
1879
1880    def test_scalar_inplace_mutation_set_commit_flag_modified_set(self):
1881        Foo = self._fixture(uselist=False, useobject=False,
1882                                active_history=False)
1883        f = Foo()
1884        f.someattr = {'a': 'b'}
1885        self._commit_someattr(f)
1886        attributes.flag_modified(f, 'someattr')
1887        eq_(self._someattr_history(f), ([{'a': 'b'}], (), ()))
1888        f.someattr = ['a']
1889        eq_(self._someattr_history(f), ([['a']], (), ()))
1890
1891    def test_scalar_inplace_mutation_replace_self_flag_modified_set(self):
1892        Foo = self._fixture(uselist=False, useobject=False,
1893                                active_history=False)
1894        f = Foo()
1895        f.someattr = {'a': 'b'}
1896        self._commit_someattr(f)
1897        eq_(self._someattr_history(f), ((), [{'a': 'b'}], ()))
1898
1899        # set the attribute to itself; this places a copy
1900        # in committed_state
1901        f.someattr = f.someattr
1902
1903        attributes.flag_modified(f, 'someattr')
1904        eq_(self._someattr_history(f), ([{'a': 'b'}], (), ()))
1905
1906
1907    def test_use_object_init(self):
1908        Foo, Bar = self._two_obj_fixture(uselist=False)
1909        f = Foo()
1910        eq_(self._someattr_history(f), ((), (), ()))
1911
1912    def test_use_object_no_init_side_effect(self):
1913        Foo, Bar = self._two_obj_fixture(uselist=False)
1914        f = Foo()
1915        self._someattr_history(f)
1916        assert 'someattr' not in f.__dict__
1917        assert 'someattr' not in attributes.instance_state(f).committed_state
1918
1919    def test_use_object_set(self):
1920        Foo, Bar = self._two_obj_fixture(uselist=False)
1921        f = Foo()
1922        hi = Bar(name='hi')
1923        f.someattr = hi
1924        eq_(self._someattr_history(f), ([hi], (), ()))
1925
1926    def test_use_object_set_commit(self):
1927        Foo, Bar = self._two_obj_fixture(uselist=False)
1928        f = Foo()
1929        hi = Bar(name='hi')
1930        f.someattr = hi
1931        self._commit_someattr(f)
1932        eq_(self._someattr_history(f), ((), [hi], ()))
1933
1934    def test_use_object_set_commit_set(self):
1935        Foo, Bar = self._two_obj_fixture(uselist=False)
1936        f = Foo()
1937        hi = Bar(name='hi')
1938        f.someattr = hi
1939        self._commit_someattr(f)
1940        there = Bar(name='there')
1941        f.someattr = there
1942        eq_(self._someattr_history(f), ([there], (), [hi]))
1943
1944    def test_use_object_set_commit_set_commit(self):
1945        Foo, Bar = self._two_obj_fixture(uselist=False)
1946        f = Foo()
1947        hi = Bar(name='hi')
1948        f.someattr = hi
1949        self._commit_someattr(f)
1950        there = Bar(name='there')
1951        f.someattr = there
1952        self._commit_someattr(f)
1953        eq_(self._someattr_history(f), ((), [there], ()))
1954
1955    def test_use_object_set_commit_del(self):
1956        Foo, Bar = self._two_obj_fixture(uselist=False)
1957        f = Foo()
1958        hi = Bar(name='hi')
1959        f.someattr = hi
1960        self._commit_someattr(f)
1961        del f.someattr
1962        eq_(self._someattr_history(f), ((), (), [hi]))
1963
1964    def test_use_object_set_dict(self):
1965        Foo, Bar = self._two_obj_fixture(uselist=False)
1966        f = Foo()
1967        hi = Bar(name='hi')
1968        f.__dict__['someattr'] = hi
1969        eq_(self._someattr_history(f), ((), [hi], ()))
1970
1971    def test_use_object_set_dict_set(self):
1972        Foo, Bar = self._two_obj_fixture(uselist=False)
1973        f = Foo()
1974        hi = Bar(name='hi')
1975        f.__dict__['someattr'] = hi
1976
1977        there = Bar(name='there')
1978        f.someattr = there
1979        eq_(self._someattr_history(f), ([there], (), [hi]))
1980
1981    def test_use_object_set_dict_set_commit(self):
1982        Foo, Bar = self._two_obj_fixture(uselist=False)
1983        f = Foo()
1984        hi = Bar(name='hi')
1985        f.__dict__['someattr'] = hi
1986
1987        there = Bar(name='there')
1988        f.someattr = there
1989        self._commit_someattr(f)
1990        eq_(self._someattr_history(f), ((), [there], ()))
1991
1992    def test_use_object_set_None(self):
1993        # note - compare:
1994        # test_scalar_set_None,
1995        # test_scalar_get_first_set_None,
1996        # test_use_object_set_None,
1997        # test_use_object_get_first_set_None
1998        Foo, Bar = self._two_obj_fixture(uselist=False)
1999        f = Foo()
2000        f.someattr = None
2001        eq_(self._someattr_history(f), ([None], (), ()))
2002
2003    def test_use_object_get_first_set_None(self):
2004        # note - compare:
2005        # test_scalar_set_None,
2006        # test_scalar_get_first_set_None,
2007        # test_use_object_set_None,
2008        # test_use_object_get_first_set_None
2009        Foo, Bar = self._two_obj_fixture(uselist=False)
2010        f = Foo()
2011        assert f.someattr is None
2012        f.someattr = None
2013        eq_(self._someattr_history(f), ([None], (), ()))
2014
2015    def test_use_object_set_dict_set_None(self):
2016        Foo, Bar = self._two_obj_fixture(uselist=False)
2017        f = Foo()
2018        hi = Bar(name='hi')
2019        f.__dict__['someattr'] = hi
2020        f.someattr = None
2021        eq_(self._someattr_history(f), ([None], (), [hi]))
2022
2023    def test_use_object_set_value_twice(self):
2024        Foo, Bar = self._two_obj_fixture(uselist=False)
2025        f = Foo()
2026        hi = Bar(name='hi')
2027        there = Bar(name='there')
2028        f.someattr = hi
2029        f.someattr = there
2030        eq_(self._someattr_history(f), ([there], (), ()))
2031
2032
2033    def test_object_collections_set(self):
2034        # TODO: break into individual tests
2035
2036        Foo, Bar = self._two_obj_fixture(uselist=True)
2037        hi = Bar(name='hi')
2038        there = Bar(name='there')
2039        old = Bar(name='old')
2040        new = Bar(name='new')
2041
2042        # case 1.  new object
2043
2044        f = Foo()
2045        eq_(attributes.get_state_history(attributes.instance_state(f),
2046            'someattr'), ((), [], ()))
2047        f.someattr = [hi]
2048        eq_(attributes.get_state_history(attributes.instance_state(f),
2049            'someattr'), ([hi], [], []))
2050        self._commit_someattr(f)
2051        eq_(attributes.get_state_history(attributes.instance_state(f),
2052            'someattr'), ((), [hi], ()))
2053        f.someattr = [there]
2054        eq_(attributes.get_state_history(attributes.instance_state(f),
2055            'someattr'), ([there], [], [hi]))
2056        self._commit_someattr(f)
2057        eq_(attributes.get_state_history(attributes.instance_state(f),
2058            'someattr'), ((), [there], ()))
2059        f.someattr = [hi]
2060        eq_(attributes.get_state_history(attributes.instance_state(f),
2061            'someattr'), ([hi], [], [there]))
2062        f.someattr = [old, new]
2063        eq_(attributes.get_state_history(attributes.instance_state(f),
2064            'someattr'), ([old, new], [], [there]))
2065
2066        # case 2.  object with direct settings (similar to a load
2067        # operation)
2068
2069        f = Foo()
2070        collection = attributes.init_collection(f, 'someattr')
2071        collection.append_without_event(new)
2072        attributes.instance_state(f)._commit_all(attributes.instance_dict(f))
2073        eq_(attributes.get_state_history(attributes.instance_state(f),
2074            'someattr'), ((), [new], ()))
2075        f.someattr = [old]
2076        eq_(attributes.get_state_history(attributes.instance_state(f),
2077            'someattr'), ([old], [], [new]))
2078        self._commit_someattr(f)
2079        eq_(attributes.get_state_history(attributes.instance_state(f),
2080            'someattr'), ((), [old], ()))
2081
2082    def test_dict_collections(self):
2083        # TODO: break into individual tests
2084
2085        class Foo(fixtures.BasicEntity):
2086            pass
2087        class Bar(fixtures.BasicEntity):
2088            pass
2089
2090        from sqlalchemy.orm.collections import attribute_mapped_collection
2091        instrumentation.register_class(Foo)
2092        instrumentation.register_class(Bar)
2093        attributes.register_attribute(Foo, 'someattr', uselist=True,
2094                useobject=True,
2095                typecallable=attribute_mapped_collection('name'))
2096        hi = Bar(name='hi')
2097        there = Bar(name='there')
2098        old = Bar(name='old')
2099        new = Bar(name='new')
2100        f = Foo()
2101        eq_(attributes.get_state_history(attributes.instance_state(f),
2102            'someattr'), ((), [], ()))
2103        f.someattr['hi'] = hi
2104        eq_(attributes.get_state_history(attributes.instance_state(f),
2105            'someattr'), ([hi], [], []))
2106        f.someattr['there'] = there
2107        eq_(tuple([set(x) for x in
2108            attributes.get_state_history(attributes.instance_state(f),
2109            'someattr')]), (set([hi, there]), set(), set()))
2110        self._commit_someattr(f)
2111        eq_(tuple([set(x) for x in
2112            attributes.get_state_history(attributes.instance_state(f),
2113            'someattr')]), (set(), set([hi, there]), set()))
2114
2115    def test_object_collections_mutate(self):
2116        # TODO: break into individual tests
2117
2118        class Foo(fixtures.BasicEntity):
2119            pass
2120        class Bar(fixtures.BasicEntity):
2121            pass
2122
2123        instrumentation.register_class(Foo)
2124        attributes.register_attribute(Foo, 'someattr', uselist=True,
2125                useobject=True)
2126        attributes.register_attribute(Foo, 'id', uselist=False,
2127                useobject=False)
2128        instrumentation.register_class(Bar)
2129        hi = Bar(name='hi')
2130        there = Bar(name='there')
2131        old = Bar(name='old')
2132        new = Bar(name='new')
2133
2134        # case 1.  new object
2135
2136        f = Foo(id=1)
2137        eq_(attributes.get_state_history(attributes.instance_state(f),
2138            'someattr'), ((), [], ()))
2139        f.someattr.append(hi)
2140        eq_(attributes.get_state_history(attributes.instance_state(f),
2141            'someattr'), ([hi], [], []))
2142        self._commit_someattr(f)
2143        eq_(attributes.get_state_history(attributes.instance_state(f),
2144            'someattr'), ((), [hi], ()))
2145        f.someattr.append(there)
2146        eq_(attributes.get_state_history(attributes.instance_state(f),
2147            'someattr'), ([there], [hi], []))
2148        self._commit_someattr(f)
2149        eq_(attributes.get_state_history(attributes.instance_state(f),
2150            'someattr'), ((), [hi, there], ()))
2151        f.someattr.remove(there)
2152        eq_(attributes.get_state_history(attributes.instance_state(f),
2153            'someattr'), ([], [hi], [there]))
2154        f.someattr.append(old)
2155        f.someattr.append(new)
2156        eq_(attributes.get_state_history(attributes.instance_state(f),
2157            'someattr'), ([old, new], [hi], [there]))
2158        attributes.instance_state(f)._commit(attributes.instance_dict(f),
2159                ['someattr'])
2160        eq_(attributes.get_state_history(attributes.instance_state(f),
2161            'someattr'), ((), [hi, old, new], ()))
2162        f.someattr.pop(0)
2163        eq_(attributes.get_state_history(attributes.instance_state(f),
2164            'someattr'), ([], [old, new], [hi]))
2165
2166        # case 2.  object with direct settings (similar to a load
2167        # operation)
2168
2169        f = Foo()
2170        f.__dict__['id'] = 1
2171        collection = attributes.init_collection(f, 'someattr')
2172        collection.append_without_event(new)
2173        attributes.instance_state(f)._commit_all(attributes.instance_dict(f))
2174        eq_(attributes.get_state_history(attributes.instance_state(f),
2175            'someattr'), ((), [new], ()))
2176        f.someattr.append(old)
2177        eq_(attributes.get_state_history(attributes.instance_state(f),
2178            'someattr'), ([old], [new], []))
2179        attributes.instance_state(f)._commit(attributes.instance_dict(f),
2180                ['someattr'])
2181        eq_(attributes.get_state_history(attributes.instance_state(f),
2182            'someattr'), ((), [new, old], ()))
2183        f = Foo()
2184        collection = attributes.init_collection(f, 'someattr')
2185        collection.append_without_event(new)
2186        attributes.instance_state(f)._commit_all(attributes.instance_dict(f))
2187        eq_(attributes.get_state_history(attributes.instance_state(f),
2188            'someattr'), ((), [new], ()))
2189        f.id = 1
2190        f.someattr.remove(new)
2191        eq_(attributes.get_state_history(attributes.instance_state(f),
2192            'someattr'), ([], [], [new]))
2193
2194        # case 3.  mixing appends with sets
2195
2196        f = Foo()
2197        f.someattr.append(hi)
2198        eq_(attributes.get_state_history(attributes.instance_state(f),
2199            'someattr'), ([hi], [], []))
2200        f.someattr.append(there)
2201        eq_(attributes.get_state_history(attributes.instance_state(f),
2202            'someattr'), ([hi, there], [], []))
2203        f.someattr = [there]
2204        eq_(attributes.get_state_history(attributes.instance_state(f),
2205            'someattr'), ([there], [], []))
2206
2207        # case 4.  ensure duplicates show up, order is maintained
2208
2209        f = Foo()
2210        f.someattr.append(hi)
2211        f.someattr.append(there)
2212        f.someattr.append(hi)
2213        eq_(attributes.get_state_history(attributes.instance_state(f),
2214            'someattr'), ([hi, there, hi], [], []))
2215        attributes.instance_state(f)._commit_all(attributes.instance_dict(f))
2216        eq_(attributes.get_state_history(attributes.instance_state(f),
2217            'someattr'), ((), [hi, there, hi], ()))
2218        f.someattr = []
2219        eq_(attributes.get_state_history(attributes.instance_state(f),
2220            'someattr'), ([], [], [hi, there, hi]))
2221
2222    def test_collections_via_backref(self):
2223        # TODO: break into individual tests
2224
2225        class Foo(fixtures.BasicEntity):
2226            pass
2227        class Bar(fixtures.BasicEntity):
2228            pass
2229
2230        instrumentation.register_class(Foo)
2231        instrumentation.register_class(Bar)
2232        attributes.register_attribute(Foo, 'bars', uselist=True,
2233                backref='foo', trackparent=True, useobject=True)
2234        attributes.register_attribute(Bar, 'foo', uselist=False,
2235                backref='bars', trackparent=True, useobject=True)
2236        f1 = Foo()
2237        b1 = Bar()
2238        eq_(attributes.get_state_history(attributes.instance_state(f1),
2239            'bars'), ((), [], ()))
2240        eq_(attributes.get_state_history(attributes.instance_state(b1),
2241            'foo'), ((), (), ()))
2242
2243        # b1.foo = f1
2244
2245        f1.bars.append(b1)
2246        eq_(attributes.get_state_history(attributes.instance_state(f1),
2247            'bars'), ([b1], [], []))
2248        eq_(attributes.get_state_history(attributes.instance_state(b1),
2249            'foo'), ([f1], (), ()))
2250        b2 = Bar()
2251        f1.bars.append(b2)
2252        eq_(attributes.get_state_history(attributes.instance_state(f1),
2253            'bars'), ([b1, b2], [], []))
2254        eq_(attributes.get_state_history(attributes.instance_state(b1),
2255            'foo'), ([f1], (), ()))
2256        eq_(attributes.get_state_history(attributes.instance_state(b2),
2257            'foo'), ([f1], (), ()))
2258
2259    def test_deprecated_flags(self):
2260        assert_raises_message(
2261            sa_exc.SADeprecationWarning,
2262            "Passing True for 'passive' is deprecated. "
2263            "Use attributes.PASSIVE_NO_INITIALIZE",
2264            attributes.get_history, object(), 'foo', True
2265        )
2266
2267        assert_raises_message(
2268            sa_exc.SADeprecationWarning,
2269            "Passing False for 'passive' is deprecated.  "
2270            "Use attributes.PASSIVE_OFF",
2271            attributes.get_history, object(), 'foo', False
2272        )
2273
2274
2275class LazyloadHistoryTest(fixtures.TestBase):
2276    def test_lazy_backref_collections(self):
2277        # TODO: break into individual tests
2278
2279        class Foo(fixtures.BasicEntity):
2280            pass
2281        class Bar(fixtures.BasicEntity):
2282            pass
2283
2284        lazy_load = []
2285        def lazyload(state, passive):
2286            return lazy_load
2287
2288        instrumentation.register_class(Foo)
2289        instrumentation.register_class(Bar)
2290        attributes.register_attribute(Foo, 'bars', uselist=True,
2291                backref='foo', trackparent=True, callable_=lazyload,
2292                useobject=True)
2293        attributes.register_attribute(Bar, 'foo', uselist=False,
2294                backref='bars', trackparent=True, useobject=True)
2295        bar1, bar2, bar3, bar4 = [Bar(id=1), Bar(id=2), Bar(id=3),
2296                                  Bar(id=4)]
2297        lazy_load = [bar1, bar2, bar3]
2298        f = Foo()
2299        bar4 = Bar()
2300        bar4.foo = f
2301        eq_(attributes.get_state_history(attributes.instance_state(f),
2302            'bars'), ([bar4], [bar1, bar2, bar3], []))
2303        lazy_load = None
2304        f = Foo()
2305        bar4 = Bar()
2306        bar4.foo = f
2307        eq_(attributes.get_state_history(attributes.instance_state(f),
2308            'bars'), ([bar4], [], []))
2309        lazy_load = [bar1, bar2, bar3]
2310        attributes.instance_state(f)._expire_attributes(attributes.instance_dict(f),
2311                ['bars'])
2312        eq_(attributes.get_state_history(attributes.instance_state(f),
2313            'bars'), ((), [bar1, bar2, bar3], ()))
2314
2315    def test_collections_via_lazyload(self):
2316        # TODO: break into individual tests
2317
2318        class Foo(fixtures.BasicEntity):
2319            pass
2320        class Bar(fixtures.BasicEntity):
2321            pass
2322
2323        lazy_load = []
2324        def lazyload(state, passive):
2325            return lazy_load
2326
2327        instrumentation.register_class(Foo)
2328        instrumentation.register_class(Bar)
2329        attributes.register_attribute(Foo, 'bars', uselist=True,
2330                callable_=lazyload, trackparent=True, useobject=True)
2331        bar1, bar2, bar3, bar4 = [Bar(id=1), Bar(id=2), Bar(id=3),
2332                                  Bar(id=4)]
2333        lazy_load = [bar1, bar2, bar3]
2334        f = Foo()
2335        f.bars = []
2336        eq_(attributes.get_state_history(attributes.instance_state(f),
2337            'bars'), ([], [], [bar1, bar2, bar3]))
2338        f = Foo()
2339        f.bars.append(bar4)
2340        eq_(attributes.get_state_history(attributes.instance_state(f),
2341            'bars'), ([bar4], [bar1, bar2, bar3], []))
2342        f = Foo()
2343        f.bars.remove(bar2)
2344        eq_(attributes.get_state_history(attributes.instance_state(f),
2345            'bars'), ([], [bar1, bar3], [bar2]))
2346        f.bars.append(bar4)
2347        eq_(attributes.get_state_history(attributes.instance_state(f),
2348            'bars'), ([bar4], [bar1, bar3], [bar2]))
2349        f = Foo()
2350        del f.bars[1]
2351        eq_(attributes.get_state_history(attributes.instance_state(f),
2352            'bars'), ([], [bar1, bar3], [bar2]))
2353        lazy_load = None
2354        f = Foo()
2355        f.bars.append(bar2)
2356        eq_(attributes.get_state_history(attributes.instance_state(f),
2357            'bars'), ([bar2], [], []))
2358
2359    def test_scalar_via_lazyload(self):
2360        # TODO: break into individual tests
2361
2362        class Foo(fixtures.BasicEntity):
2363            pass
2364
2365        lazy_load = None
2366
2367        def lazyload(state, passive):
2368            return lazy_load
2369
2370        instrumentation.register_class(Foo)
2371        attributes.register_attribute(Foo, 'bar', uselist=False,
2372                callable_=lazyload, useobject=False)
2373        lazy_load = 'hi'
2374
2375        # with scalar non-object and active_history=False, the lazy
2376        # callable is only executed on gets, not history operations
2377
2378        f = Foo()
2379        eq_(f.bar, 'hi')
2380        eq_(attributes.get_state_history(attributes.instance_state(f),
2381            'bar'), ((), ['hi'], ()))
2382        f = Foo()
2383        f.bar = None
2384        eq_(attributes.get_state_history(attributes.instance_state(f),
2385            'bar'), ([None], (), ()))
2386        f = Foo()
2387        f.bar = 'there'
2388        eq_(attributes.get_state_history(attributes.instance_state(f),
2389            'bar'), (['there'], (), ()))
2390        f.bar = 'hi'
2391        eq_(attributes.get_state_history(attributes.instance_state(f),
2392            'bar'), (['hi'], (), ()))
2393        f = Foo()
2394        eq_(f.bar, 'hi')
2395        del f.bar
2396        eq_(attributes.get_state_history(attributes.instance_state(f),
2397            'bar'), ((), (), ['hi']))
2398        assert f.bar is None
2399        eq_(attributes.get_state_history(attributes.instance_state(f),
2400            'bar'), ((), (), ['hi']))
2401
2402    def test_scalar_via_lazyload_with_active(self):
2403        # TODO: break into individual tests
2404
2405        class Foo(fixtures.BasicEntity):
2406            pass
2407
2408        lazy_load = None
2409
2410        def lazyload(state, passive):
2411            return lazy_load
2412
2413        instrumentation.register_class(Foo)
2414        attributes.register_attribute(Foo, 'bar', uselist=False,
2415                callable_=lazyload, useobject=False,
2416                active_history=True)
2417        lazy_load = 'hi'
2418
2419        # active_history=True means the lazy callable is executed on set
2420        # as well as get, causing the old value to appear in the history
2421
2422        f = Foo()
2423        eq_(f.bar, 'hi')
2424        eq_(attributes.get_state_history(attributes.instance_state(f),
2425            'bar'), ((), ['hi'], ()))
2426        f = Foo()
2427        f.bar = None
2428        eq_(attributes.get_state_history(attributes.instance_state(f),
2429            'bar'), ([None], (), ['hi']))
2430        f = Foo()
2431        f.bar = 'there'
2432        eq_(attributes.get_state_history(attributes.instance_state(f),
2433            'bar'), (['there'], (), ['hi']))
2434        f.bar = 'hi'
2435        eq_(attributes.get_state_history(attributes.instance_state(f),
2436            'bar'), ((), ['hi'], ()))
2437        f = Foo()
2438        eq_(f.bar, 'hi')
2439        del f.bar
2440        eq_(attributes.get_state_history(attributes.instance_state(f),
2441            'bar'), ((), (), ['hi']))
2442        assert f.bar is None
2443        eq_(attributes.get_state_history(attributes.instance_state(f),
2444            'bar'), ((), (), ['hi']))
2445
2446    def test_scalar_object_via_lazyload(self):
2447        # TODO: break into individual tests
2448
2449        class Foo(fixtures.BasicEntity):
2450            pass
2451        class Bar(fixtures.BasicEntity):
2452            pass
2453
2454        lazy_load = None
2455        def lazyload(state, passive):
2456            return lazy_load
2457
2458        instrumentation.register_class(Foo)
2459        instrumentation.register_class(Bar)
2460        attributes.register_attribute(Foo, 'bar', uselist=False,
2461                callable_=lazyload, trackparent=True, useobject=True)
2462        bar1, bar2 = [Bar(id=1), Bar(id=2)]
2463        lazy_load = bar1
2464
2465        # with scalar object, the lazy callable is only executed on gets
2466        # and history operations
2467
2468        f = Foo()
2469        eq_(attributes.get_state_history(attributes.instance_state(f),
2470            'bar'), ((), [bar1], ()))
2471        f = Foo()
2472        f.bar = None
2473        eq_(attributes.get_state_history(attributes.instance_state(f),
2474            'bar'), ([None], (), [bar1]))
2475        f = Foo()
2476        f.bar = bar2
2477        eq_(attributes.get_state_history(attributes.instance_state(f),
2478            'bar'), ([bar2], (), [bar1]))
2479        f.bar = bar1
2480        eq_(attributes.get_state_history(attributes.instance_state(f),
2481            'bar'), ((), [bar1], ()))
2482        f = Foo()
2483        eq_(f.bar, bar1)
2484        del f.bar
2485        eq_(attributes.get_state_history(attributes.instance_state(f),
2486            'bar'), ((), (), [bar1]))
2487        assert f.bar is None
2488        eq_(attributes.get_state_history(attributes.instance_state(f),
2489            'bar'), ((), (), [bar1]))
2490
2491class ListenerTest(fixtures.ORMTest):
2492    def test_receive_changes(self):
2493        """test that Listeners can mutate the given value."""
2494
2495        class Foo(object):
2496            pass
2497        class Bar(object):
2498            pass
2499
2500        def append(state, child, initiator):
2501            b2 = Bar()
2502            b2.data = b1.data + " appended"
2503            return b2
2504
2505        def on_set(state, value, oldvalue, initiator):
2506            return value + " modified"
2507
2508        instrumentation.register_class(Foo)
2509        instrumentation.register_class(Bar)
2510        attributes.register_attribute(Foo, 'data', uselist=False,
2511                useobject=False)
2512        attributes.register_attribute(Foo, 'barlist', uselist=True,
2513                useobject=True)
2514        attributes.register_attribute(Foo, 'barset', typecallable=set,
2515                uselist=True, useobject=True)
2516        attributes.register_attribute(Bar, 'data', uselist=False,
2517                useobject=False)
2518        event.listen(Foo.data, 'set', on_set, retval=True)
2519        event.listen(Foo.barlist, 'append', append, retval=True)
2520        event.listen(Foo.barset, 'append', append, retval=True)
2521        f1 = Foo()
2522        f1.data = 'some data'
2523        eq_(f1.data, 'some data modified')
2524        b1 = Bar()
2525        b1.data = 'some bar'
2526        f1.barlist.append(b1)
2527        assert b1.data == 'some bar'
2528        assert f1.barlist[0].data == 'some bar appended'
2529        f1.barset.add(b1)
2530        assert f1.barset.pop().data == 'some bar appended'
2531
2532    def test_named(self):
2533        canary = Mock()
2534
2535        class Foo(object):
2536            pass
2537
2538        class Bar(object):
2539            pass
2540
2541        instrumentation.register_class(Foo)
2542        instrumentation.register_class(Bar)
2543        attributes.register_attribute(
2544            Foo, 'data', uselist=False,
2545            useobject=False)
2546        attributes.register_attribute(
2547            Foo, 'barlist', uselist=True,
2548            useobject=True)
2549
2550        event.listen(Foo.data, 'set', canary.set, named=True)
2551        event.listen(Foo.barlist, 'append', canary.append, named=True)
2552        event.listen(Foo.barlist, 'remove', canary.remove, named=True)
2553
2554        f1 = Foo()
2555        b1 = Bar()
2556        f1.data = 5
2557        f1.barlist.append(b1)
2558        f1.barlist.remove(b1)
2559        eq_(
2560            canary.mock_calls,
2561            [
2562                call.set(
2563                    oldvalue=attributes.NO_VALUE,
2564                    initiator=attributes.Event(
2565                        Foo.data.impl, attributes.OP_REPLACE),
2566                    target=f1, value=5),
2567                call.append(
2568                    initiator=attributes.Event(
2569                        Foo.barlist.impl, attributes.OP_APPEND),
2570                    target=f1,
2571                    value=b1),
2572                call.remove(
2573                    initiator=attributes.Event(
2574                        Foo.barlist.impl, attributes.OP_REMOVE),
2575                    target=f1,
2576                    value=b1)]
2577        )
2578
2579    def test_collection_link_events(self):
2580        class Foo(object):
2581            pass
2582        class Bar(object):
2583            pass
2584        instrumentation.register_class(Foo)
2585        instrumentation.register_class(Bar)
2586        attributes.register_attribute(Foo, 'barlist', uselist=True,
2587                useobject=True)
2588
2589        canary = Mock()
2590        event.listen(Foo.barlist, "init_collection", canary.init)
2591        event.listen(Foo.barlist, "dispose_collection", canary.dispose)
2592
2593        f1 = Foo()
2594        eq_(f1.barlist, [])
2595        adapter_one = f1.barlist._sa_adapter
2596        eq_(canary.init.mock_calls, [call(f1, [], adapter_one)])
2597
2598        b1 = Bar()
2599        f1.barlist.append(b1)
2600
2601        b2 = Bar()
2602        f1.barlist = [b2]
2603        adapter_two = f1.barlist._sa_adapter
2604        eq_(canary.init.mock_calls, [
2605            call(f1, [], adapter_one),
2606            call(f1, [b2], adapter_two),
2607        ])
2608        eq_(
2609            canary.dispose.mock_calls,
2610            [
2611                call(f1, [], adapter_one)
2612            ]
2613        )
2614
2615
2616    def test_none_on_collection_event(self):
2617        """test that append/remove of None in collections emits events.
2618
2619        This is new behavior as of 0.8.
2620
2621        """
2622        class Foo(object):
2623            pass
2624        class Bar(object):
2625            pass
2626        instrumentation.register_class(Foo)
2627        instrumentation.register_class(Bar)
2628        attributes.register_attribute(Foo, 'barlist', uselist=True,
2629                useobject=True)
2630        canary = []
2631        def append(state, child, initiator):
2632            canary.append((state, child))
2633        def remove(state, child, initiator):
2634            canary.append((state, child))
2635        event.listen(Foo.barlist, 'append', append)
2636        event.listen(Foo.barlist, 'remove', remove)
2637
2638        b1, b2 = Bar(), Bar()
2639        f1 = Foo()
2640        f1.barlist.append(None)
2641        eq_(canary, [(f1, None)])
2642
2643        canary[:] = []
2644        f1 = Foo()
2645        f1.barlist = [None, b2]
2646        eq_(canary, [(f1, None), (f1, b2)])
2647
2648        canary[:] = []
2649        f1 = Foo()
2650        f1.barlist = [b1, None, b2]
2651        eq_(canary, [(f1, b1), (f1, None), (f1, b2)])
2652
2653        f1.barlist.remove(None)
2654        eq_(canary, [(f1, b1), (f1, None), (f1, b2), (f1, None)])
2655
2656    def test_none_init_scalar(self):
2657        canary = Mock()
2658        class Foo(object):
2659            pass
2660        instrumentation.register_class(Foo)
2661        attributes.register_attribute(Foo, 'bar')
2662
2663        event.listen(Foo.bar, "set", canary)
2664
2665        f1 = Foo()
2666        eq_(f1.bar, None)
2667        # reversal of approach in #3061
2668        eq_(canary.mock_calls, [])
2669
2670    def test_none_init_object(self):
2671        canary = Mock()
2672        class Foo(object):
2673            pass
2674        instrumentation.register_class(Foo)
2675        attributes.register_attribute(Foo, 'bar', useobject=True)
2676
2677        event.listen(Foo.bar, "set", canary)
2678
2679        f1 = Foo()
2680        eq_(f1.bar, None)
2681        # reversal of approach in #3061
2682        eq_(canary.mock_calls, [])
2683
2684    def test_none_init_collection(self):
2685        canary = Mock()
2686        class Foo(object):
2687            pass
2688        instrumentation.register_class(Foo)
2689        attributes.register_attribute(Foo, 'bar', useobject=True, uselist=True)
2690
2691        event.listen(Foo.bar, "set", canary)
2692
2693        f1 = Foo()
2694        eq_(f1.bar, [])
2695        # reversal of approach in #3061
2696        eq_(canary.mock_calls, [])
2697
2698
2699    def test_propagate(self):
2700        classes = [None, None, None]
2701        canary = []
2702        def make_a():
2703            class A(object):
2704                pass
2705            classes[0] = A
2706
2707        def make_b():
2708            class B(classes[0]):
2709                pass
2710            classes[1] = B
2711
2712        def make_c():
2713            class C(classes[1]):
2714                pass
2715            classes[2] = C
2716
2717        def instrument_a():
2718            instrumentation.register_class(classes[0])
2719
2720        def instrument_b():
2721            instrumentation.register_class(classes[1])
2722
2723        def instrument_c():
2724            instrumentation.register_class(classes[2])
2725
2726        def attr_a():
2727            attributes.register_attribute(classes[0], 'attrib',
2728                    uselist=False, useobject=False)
2729
2730        def attr_b():
2731            attributes.register_attribute(classes[1], 'attrib',
2732                    uselist=False, useobject=False)
2733
2734        def attr_c():
2735            attributes.register_attribute(classes[2], 'attrib',
2736                    uselist=False, useobject=False)
2737
2738        def set(state, value, oldvalue, initiator):
2739            canary.append(value)
2740
2741        def events_a():
2742            event.listen(classes[0].attrib, 'set', set, propagate=True)
2743
2744        def teardown():
2745            classes[:] = [None, None, None]
2746            canary[:] = []
2747
2748        ordering = [
2749            (instrument_a, instrument_b),
2750            (instrument_b, instrument_c),
2751            (attr_a, attr_b),
2752            (attr_b, attr_c),
2753            (make_a, instrument_a),
2754            (instrument_a, attr_a),
2755            (attr_a, events_a),
2756            (make_b, instrument_b),
2757            (instrument_b, attr_b),
2758            (make_c, instrument_c),
2759            (instrument_c, attr_c),
2760            (make_a, make_b),
2761            (make_b, make_c)
2762        ]
2763        elements = [make_a, make_b, make_c,
2764                    instrument_a, instrument_b, instrument_c,
2765                    attr_a, attr_b, attr_c, events_a]
2766
2767        for i, series in enumerate(all_partial_orderings(ordering, elements)):
2768            for fn in series:
2769                fn()
2770
2771            b = classes[1]()
2772            b.attrib = "foo"
2773            eq_(b.attrib, "foo")
2774            eq_(canary, ["foo"])
2775
2776            c = classes[2]()
2777            c.attrib = "bar"
2778            eq_(c.attrib, "bar")
2779            eq_(canary, ["foo", "bar"])
2780
2781            teardown()
2782
2783
2784class TestUnlink(fixtures.TestBase):
2785    def setUp(self):
2786        class A(object):
2787            pass
2788        class B(object):
2789            pass
2790        self.A = A
2791        self.B = B
2792        instrumentation.register_class(A)
2793        instrumentation.register_class(B)
2794        attributes.register_attribute(A, 'bs', uselist=True,
2795                useobject=True)
2796
2797    def test_expired(self):
2798        A, B = self.A, self.B
2799        a1 = A()
2800        coll = a1.bs
2801        a1.bs.append(B())
2802        state = attributes.instance_state(a1)
2803        state._expire(state.dict, set())
2804        assert_raises(
2805            Warning,
2806            coll.append, B()
2807        )
2808
2809    def test_replaced(self):
2810        A, B = self.A, self.B
2811        a1 = A()
2812        coll = a1.bs
2813        a1.bs.append(B())
2814        a1.bs = []
2815        # a bulk replace empties the old collection
2816        assert len(coll) == 0
2817        coll.append(B())
2818        assert len(coll) == 1
2819
2820    def test_pop_existing(self):
2821        A, B = self.A, self.B
2822        a1 = A()
2823        coll = a1.bs
2824        a1.bs.append(B())
2825        state = attributes.instance_state(a1)
2826        state._reset(state.dict, "bs")
2827        assert_raises(
2828            Warning,
2829            coll.append, B()
2830        )
2831
2832    def test_ad_hoc_lazy(self):
2833        A, B = self.A, self.B
2834        a1 = A()
2835        coll = a1.bs
2836        a1.bs.append(B())
2837        state = attributes.instance_state(a1)
2838        _set_callable(state, state.dict, "bs", lambda: B())
2839        assert_raises(
2840            Warning,
2841            coll.append, B()
2842        )
2843