1"""Tests cyclical mapper relationships.
2
3We might want to try an automated generate of much of this, all combos of
4T1<->T2, with o2m or m2o between them, and a third T3 with o2m/m2o to one/both
5T1/T2.
6
7"""
8from sqlalchemy import testing
9from sqlalchemy import Integer, String, ForeignKey
10from sqlalchemy.testing.schema import Table, Column
11from sqlalchemy.orm import mapper, relationship, backref, \
12                            create_session, sessionmaker
13from sqlalchemy.testing import eq_, is_
14from sqlalchemy.testing.assertsql import RegexSQL, CompiledSQL, AllOf
15from sqlalchemy.testing import fixtures
16
17
18class SelfReferentialTest(fixtures.MappedTest):
19    """A self-referential mapper with an additional list of child objects."""
20
21    @classmethod
22    def define_tables(cls, metadata):
23        Table('t1', metadata,
24              Column('c1', Integer, primary_key=True,
25                     test_needs_autoincrement=True),
26              Column('parent_c1', Integer, ForeignKey('t1.c1')),
27              Column('data', String(20)))
28        Table('t2', metadata,
29              Column('c1', Integer, primary_key=True,
30                     test_needs_autoincrement=True),
31              Column('c1id', Integer, ForeignKey('t1.c1')),
32              Column('data', String(20)))
33
34    @classmethod
35    def setup_classes(cls):
36        class C1(cls.Basic):
37            def __init__(self, data=None):
38                self.data = data
39
40        class C2(cls.Basic):
41            def __init__(self, data=None):
42                self.data = data
43
44    def test_single(self):
45        C1, t1 = self.classes.C1, self.tables.t1
46
47        mapper(C1, t1, properties = {
48            'c1s':relationship(C1, cascade="all"),
49            'parent':relationship(C1,
50                              primaryjoin=t1.c.parent_c1 == t1.c.c1,
51                              remote_side=t1.c.c1,
52                              lazy='select',
53                              uselist=False)})
54        a = C1('head c1')
55        a.c1s.append(C1('another c1'))
56
57        sess = create_session( )
58        sess.add(a)
59        sess.flush()
60        sess.delete(a)
61        sess.flush()
62
63    def test_many_to_one_only(self):
64        """
65
66        test that the circular dependency sort can assemble a many-to-one
67        dependency processor when only the object on the "many" side is
68        actually in the list of modified objects.
69
70        """
71
72        C1, t1 = self.classes.C1, self.tables.t1
73
74        mapper(C1, t1, properties={
75            'parent':relationship(C1,
76                              primaryjoin=t1.c.parent_c1 == t1.c.c1,
77                              remote_side=t1.c.c1)})
78
79        c1 = C1()
80
81        sess = create_session()
82        sess.add(c1)
83        sess.flush()
84        sess.expunge_all()
85        c1 = sess.query(C1).get(c1.c1)
86        c2 = C1()
87        c2.parent = c1
88        sess.add(c2)
89        sess.flush()
90        assert c2.parent_c1==c1.c1
91
92    def test_cycle(self):
93        C2, C1, t2, t1 = (self.classes.C2,
94                                self.classes.C1,
95                                self.tables.t2,
96                                self.tables.t1)
97
98        mapper(C1, t1, properties = {
99            'c1s' : relationship(C1, cascade="all"),
100            'c2s' : relationship(mapper(C2, t2), cascade="all, delete-orphan")})
101
102        a = C1('head c1')
103        a.c1s.append(C1('child1'))
104        a.c1s.append(C1('child2'))
105        a.c1s[0].c1s.append(C1('subchild1'))
106        a.c1s[0].c1s.append(C1('subchild2'))
107        a.c1s[1].c2s.append(C2('child2 data1'))
108        a.c1s[1].c2s.append(C2('child2 data2'))
109        sess = create_session( )
110        sess.add(a)
111        sess.flush()
112
113        sess.delete(a)
114        sess.flush()
115
116    def test_setnull_ondelete(self):
117        C1, t1 = self.classes.C1, self.tables.t1
118
119        mapper(C1, t1, properties={
120            'children':relationship(C1)
121        })
122
123        sess = create_session()
124        c1 = C1()
125        c2 = C1()
126        c1.children.append(c2)
127        sess.add(c1)
128        sess.flush()
129        assert c2.parent_c1 == c1.c1
130
131        sess.delete(c1)
132        sess.flush()
133        assert c2.parent_c1 is None
134
135        sess.expire_all()
136        assert c2.parent_c1 is None
137
138class SelfReferentialNoPKTest(fixtures.MappedTest):
139    """A self-referential relationship that joins on a column other than the primary key column"""
140
141    @classmethod
142    def define_tables(cls, metadata):
143        Table('item', metadata,
144           Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
145           Column('uuid', String(32), unique=True, nullable=False),
146           Column('parent_uuid', String(32), ForeignKey('item.uuid'),
147                  nullable=True))
148
149    @classmethod
150    def setup_classes(cls):
151        class TT(cls.Basic):
152            def __init__(self):
153                self.uuid = hex(id(self))
154
155    @classmethod
156    def setup_mappers(cls):
157        item, TT = cls.tables.item, cls.classes.TT
158
159        mapper(TT, item, properties={
160            'children': relationship(
161                TT,
162                remote_side=[item.c.parent_uuid],
163                backref=backref('parent', remote_side=[item.c.uuid]))})
164
165    def test_basic(self):
166        TT = self.classes.TT
167
168        t1 = TT()
169        t1.children.append(TT())
170        t1.children.append(TT())
171
172        s = create_session()
173        s.add(t1)
174        s.flush()
175        s.expunge_all()
176        t = s.query(TT).filter_by(id=t1.id).one()
177        eq_(t.children[0].parent_uuid, t1.uuid)
178
179    def test_lazy_clause(self):
180        TT = self.classes.TT
181
182        s = create_session()
183        t1 = TT()
184        t2 = TT()
185        t1.children.append(t2)
186        s.add(t1)
187        s.flush()
188        s.expunge_all()
189
190        t = s.query(TT).filter_by(id=t2.id).one()
191        eq_(t.uuid, t2.uuid)
192        eq_(t.parent.uuid, t1.uuid)
193
194
195class InheritTestOne(fixtures.MappedTest):
196    @classmethod
197    def define_tables(cls, metadata):
198        Table("parent", metadata,
199            Column("id", Integer, primary_key=True, test_needs_autoincrement=True),
200            Column("parent_data", String(50)),
201            Column("type", String(10)))
202
203        Table("child1", metadata,
204              Column("id", Integer, ForeignKey("parent.id"), primary_key=True),
205              Column("child1_data", String(50)))
206
207        Table("child2", metadata,
208            Column("id", Integer, ForeignKey("parent.id"), primary_key=True),
209            Column("child1_id", Integer, ForeignKey("child1.id"),
210                   nullable=False),
211            Column("child2_data", String(50)))
212
213    @classmethod
214    def setup_classes(cls):
215        class Parent(cls.Basic):
216            pass
217
218        class Child1(Parent):
219            pass
220
221        class Child2(Parent):
222            pass
223
224    @classmethod
225    def setup_mappers(cls):
226        child1, child2, parent, Parent, Child1, Child2 = (cls.tables.child1,
227                                cls.tables.child2,
228                                cls.tables.parent,
229                                cls.classes.Parent,
230                                cls.classes.Child1,
231                                cls.classes.Child2)
232
233        mapper(Parent, parent)
234        mapper(Child1, child1, inherits=Parent)
235        mapper(Child2, child2, inherits=Parent, properties=dict(
236            child1=relationship(Child1,
237                            primaryjoin=child2.c.child1_id == child1.c.id)))
238
239    def test_many_to_one_only(self):
240        """test similar to SelfReferentialTest.testmanytooneonly"""
241
242        Child1, Child2 = self.classes.Child1, self.classes.Child2
243
244
245        session = create_session()
246
247        c1 = Child1()
248        c1.child1_data = "qwerty"
249        session.add(c1)
250        session.flush()
251        session.expunge_all()
252
253        c1 = session.query(Child1).filter_by(child1_data="qwerty").one()
254        c2 = Child2()
255        c2.child1 = c1
256        c2.child2_data = "asdfgh"
257        session.add(c2)
258
259        # the flush will fail if the UOW does not set up a many-to-one DP
260        # attached to a task corresponding to c1, since "child1_id" is not
261        # nullable
262        session.flush()
263
264
265class InheritTestTwo(fixtures.MappedTest):
266    """
267
268    The fix in BiDirectionalManyToOneTest raised this issue, regarding the
269    'circular sort' containing UOWTasks that were still polymorphic, which
270    could create duplicate entries in the final sort
271
272    """
273
274    @classmethod
275    def define_tables(cls, metadata):
276        Table('a', metadata,
277            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
278            Column('cid', Integer, ForeignKey('c.id')))
279
280        Table('b', metadata,
281            Column('id', Integer, ForeignKey("a.id"), primary_key=True),
282            )
283
284        Table('c', metadata,
285            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
286            Column('aid', Integer,
287                   ForeignKey('a.id', name="foo")))
288
289    @classmethod
290    def setup_classes(cls):
291        class A(cls.Basic):
292            pass
293
294        class B(A):
295            pass
296
297        class C(cls.Basic):
298            pass
299
300    def test_flush(self):
301        a, A, c, b, C, B = (self.tables.a,
302                                self.classes.A,
303                                self.tables.c,
304                                self.tables.b,
305                                self.classes.C,
306                                self.classes.B)
307
308        mapper(A, a, properties={
309            'cs':relationship(C, primaryjoin=a.c.cid==c.c.id)})
310
311        mapper(B, b, inherits=A, inherit_condition=b.c.id == a.c.id)
312
313        mapper(C, c, properties={
314            'arel':relationship(A, primaryjoin=a.c.id == c.c.aid)})
315
316        sess = create_session()
317        bobj = B()
318        sess.add(bobj)
319        cobj = C()
320        sess.add(cobj)
321        sess.flush()
322
323
324class BiDirectionalManyToOneTest(fixtures.MappedTest):
325    run_define_tables = 'each'
326
327    @classmethod
328    def define_tables(cls, metadata):
329        Table('t1', metadata,
330            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
331            Column('data', String(30)),
332            Column('t2id', Integer, ForeignKey('t2.id')))
333        Table('t2', metadata,
334            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
335            Column('data', String(30)),
336            Column('t1id', Integer,
337                   ForeignKey('t1.id', name="foo_fk")))
338        Table('t3', metadata,
339            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
340            Column('data', String(30)),
341            Column('t1id', Integer, ForeignKey('t1.id'), nullable=False),
342            Column('t2id', Integer, ForeignKey('t2.id'), nullable=False))
343
344    @classmethod
345    def setup_classes(cls):
346        class T1(cls.Basic):
347            pass
348        class T2(cls.Basic):
349            pass
350        class T3(cls.Basic):
351            pass
352
353    @classmethod
354    def setup_mappers(cls):
355        t2, T2, T3, t1, t3, T1 = (cls.tables.t2,
356                                cls.classes.T2,
357                                cls.classes.T3,
358                                cls.tables.t1,
359                                cls.tables.t3,
360                                cls.classes.T1)
361
362        mapper(T1, t1, properties={
363            't2':relationship(T2, primaryjoin=t1.c.t2id == t2.c.id)})
364        mapper(T2, t2, properties={
365            't1':relationship(T1, primaryjoin=t2.c.t1id == t1.c.id)})
366        mapper(T3, t3, properties={
367            't1':relationship(T1),
368            't2':relationship(T2)})
369
370    def test_reflush(self):
371        T2, T3, T1 = (self.classes.T2,
372                                self.classes.T3,
373                                self.classes.T1)
374
375        o1 = T1()
376        o1.t2 = T2()
377        sess = create_session()
378        sess.add(o1)
379        sess.flush()
380
381        # the bug here is that the dependency sort comes up with T1/T2 in a
382        # cycle, but there are no T1/T2 objects to be saved.  therefore no
383        # "cyclical subtree" gets generated, and one or the other of T1/T2
384        # gets lost, and processors on T3 don't fire off.  the test will then
385        # fail because the FK's on T3 are not nullable.
386        o3 = T3()
387        o3.t1 = o1
388        o3.t2 = o1.t2
389        sess.add(o3)
390        sess.flush()
391
392    def test_reflush_2(self):
393        """A variant on test_reflush()"""
394
395        T2, T3, T1 = (self.classes.T2,
396                                self.classes.T3,
397                                self.classes.T1)
398
399        o1 = T1()
400        o1.t2 = T2()
401        sess = create_session()
402        sess.add(o1)
403        sess.flush()
404
405        # in this case, T1, T2, and T3 tasks will all be in the cyclical
406        # tree normally.  the dependency processors for T3 are part of the
407        # 'extradeps' collection so they all get assembled into the tree
408        # as well.
409        o1a = T1()
410        o2a = T2()
411        sess.add(o1a)
412        sess.add(o2a)
413        o3b = T3()
414        o3b.t1 = o1a
415        o3b.t2 = o2a
416        sess.add(o3b)
417
418        o3 = T3()
419        o3.t1 = o1
420        o3.t2 = o1.t2
421        sess.add(o3)
422        sess.flush()
423
424
425class BiDirectionalOneToManyTest(fixtures.MappedTest):
426    """tests two mappers with a one-to-many relationship to each other."""
427
428    run_define_tables = 'each'
429
430    @classmethod
431    def define_tables(cls, metadata):
432        Table('t1', metadata,
433              Column('c1', Integer, primary_key=True, test_needs_autoincrement=True),
434              Column('c2', Integer, ForeignKey('t2.c1')))
435
436        Table('t2', metadata,
437              Column('c1', Integer, primary_key=True, test_needs_autoincrement=True),
438              Column('c2', Integer,
439                     ForeignKey('t1.c1', name='t1c1_fk')))
440
441    @classmethod
442    def setup_classes(cls):
443        class C1(cls.Basic):
444            pass
445
446        class C2(cls.Basic):
447            pass
448
449    def test_cycle(self):
450        C2, C1, t2, t1 = (self.classes.C2,
451                                self.classes.C1,
452                                self.tables.t2,
453                                self.tables.t1)
454
455        mapper(C2, t2, properties={
456            'c1s': relationship(C1,
457                            primaryjoin=t2.c.c1 == t1.c.c2,
458                            uselist=True)})
459        mapper(C1, t1, properties={
460            'c2s': relationship(C2,
461                            primaryjoin=t1.c.c1 == t2.c.c2,
462                            uselist=True)})
463
464        a = C1()
465        b = C2()
466        c = C1()
467        d = C2()
468        e = C2()
469        f = C2()
470        a.c2s.append(b)
471        d.c1s.append(c)
472        b.c1s.append(c)
473        sess = create_session()
474        sess.add_all((a, b, c, d, e, f))
475        sess.flush()
476
477
478class BiDirectionalOneToManyTest2(fixtures.MappedTest):
479    """Two mappers with a one-to-many relationship to each other,
480    with a second one-to-many on one of the mappers"""
481
482    run_define_tables = 'each'
483
484    @classmethod
485    def define_tables(cls, metadata):
486        Table('t1', metadata,
487              Column('c1', Integer, primary_key=True, test_needs_autoincrement=True),
488              Column('c2', Integer, ForeignKey('t2.c1')),
489              test_needs_autoincrement=True)
490
491        Table('t2', metadata,
492              Column('c1', Integer, primary_key=True, test_needs_autoincrement=True),
493              Column('c2', Integer,
494                     ForeignKey('t1.c1', name='t1c1_fq')),
495              test_needs_autoincrement=True)
496
497        Table('t1_data', metadata,
498              Column('c1', Integer, primary_key=True, test_needs_autoincrement=True),
499              Column('t1id', Integer, ForeignKey('t1.c1')),
500              Column('data', String(20)),
501              test_needs_autoincrement=True)
502
503    @classmethod
504    def setup_classes(cls):
505        class C1(cls.Basic):
506            pass
507
508        class C2(cls.Basic):
509            pass
510
511        class C1Data(cls.Basic):
512            pass
513
514    @classmethod
515    def setup_mappers(cls):
516        t2, t1, C1Data, t1_data, C2, C1 = (cls.tables.t2,
517                                cls.tables.t1,
518                                cls.classes.C1Data,
519                                cls.tables.t1_data,
520                                cls.classes.C2,
521                                cls.classes.C1)
522
523        mapper(C2, t2, properties={
524            'c1s': relationship(C1,
525                            primaryjoin=t2.c.c1 == t1.c.c2,
526                            uselist=True)})
527        mapper(C1, t1, properties={
528            'c2s': relationship(C2,
529                             primaryjoin=t1.c.c1 == t2.c.c2,
530                             uselist=True),
531            'data': relationship(mapper(C1Data, t1_data))})
532
533    def test_cycle(self):
534        C2, C1, C1Data = (self.classes.C2,
535                                self.classes.C1,
536                                self.classes.C1Data)
537
538        a = C1()
539        b = C2()
540        c = C1()
541        d = C2()
542        e = C2()
543        f = C2()
544        a.c2s.append(b)
545        d.c1s.append(c)
546        b.c1s.append(c)
547        a.data.append(C1Data(data='c1data1'))
548        a.data.append(C1Data(data='c1data2'))
549        c.data.append(C1Data(data='c1data3'))
550        sess = create_session()
551        sess.add_all((a, b, c, d, e, f))
552        sess.flush()
553
554        sess.delete(d)
555        sess.delete(c)
556        sess.flush()
557
558class OneToManyManyToOneTest(fixtures.MappedTest):
559    """
560
561    Tests two mappers, one has a one-to-many on the other mapper, the other
562    has a separate many-to-one relationship to the first.  two tests will have
563    a row for each item that is dependent on the other.  without the
564    "post_update" flag, such relationships raise an exception when
565    dependencies are sorted.
566
567    """
568    run_define_tables = 'each'
569
570    @classmethod
571    def define_tables(cls, metadata):
572        Table('ball', metadata,
573              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
574              Column('person_id', Integer,
575                     ForeignKey('person.id', name='fk_person_id')),
576              Column('data', String(30)))
577
578        Table('person', metadata,
579              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
580              Column('favorite_ball_id', Integer, ForeignKey('ball.id')),
581              Column('data', String(30)))
582
583    @classmethod
584    def setup_classes(cls):
585        class Person(cls.Basic):
586            pass
587
588        class Ball(cls.Basic):
589            pass
590
591    def test_cycle(self):
592        """
593        This test has a peculiar aspect in that it doesn't create as many
594        dependent relationships as the other tests, and revealed a small
595        glitch in the circular dependency sorting.
596
597        """
598
599        person, ball, Ball, Person = (self.tables.person,
600                                self.tables.ball,
601                                self.classes.Ball,
602                                self.classes.Person)
603
604        mapper(Ball, ball)
605        mapper(Person, person, properties=dict(
606            balls=relationship(Ball,
607                           primaryjoin=ball.c.person_id == person.c.id,
608                           remote_side=ball.c.person_id),
609            favorite=relationship(Ball,
610                              primaryjoin=person.c.favorite_ball_id == ball.c.id,
611                              remote_side=ball.c.id)))
612
613        b = Ball()
614        p = Person()
615        p.balls.append(b)
616        sess = create_session()
617        sess.add(p)
618        sess.flush()
619
620    def test_post_update_m2o(self):
621        """A cycle between two rows, with a post_update on the many-to-one"""
622
623        person, ball, Ball, Person = (self.tables.person,
624                                self.tables.ball,
625                                self.classes.Ball,
626                                self.classes.Person)
627
628        mapper(Ball, ball)
629        mapper(Person, person, properties=dict(
630            balls=relationship(Ball,
631                           primaryjoin=ball.c.person_id == person.c.id,
632                           remote_side=ball.c.person_id,
633                           post_update=False,
634                           cascade="all, delete-orphan"),
635            favorite=relationship(Ball,
636                              primaryjoin=person.c.favorite_ball_id == ball.c.id,
637                              remote_side=person.c.favorite_ball_id,
638                              post_update=True)))
639
640        b = Ball(data='some data')
641        p = Person(data='some data')
642        p.balls.append(b)
643        p.balls.append(Ball(data='some data'))
644        p.balls.append(Ball(data='some data'))
645        p.balls.append(Ball(data='some data'))
646        p.favorite = b
647        sess = create_session()
648        sess.add(b)
649        sess.add(p)
650
651        self.assert_sql_execution(
652            testing.db,
653            sess.flush,
654            RegexSQL("^INSERT INTO person", {'data':'some data'}),
655            RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}),
656            RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}),
657            RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}),
658            RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}),
659            CompiledSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id "
660                        "WHERE person.id = :person_id",
661                        lambda ctx:{'favorite_ball_id':p.favorite.id, 'person_id':p.id}
662             ),
663        )
664
665        sess.delete(p)
666
667        self.assert_sql_execution(
668            testing.db,
669            sess.flush,
670            CompiledSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id "
671                "WHERE person.id = :person_id",
672                lambda ctx: {'person_id': p.id, 'favorite_ball_id': None}),
673            CompiledSQL("DELETE FROM ball WHERE ball.id = :id", None), # lambda ctx:[{'id': 1L}, {'id': 4L}, {'id': 3L}, {'id': 2L}])
674            CompiledSQL("DELETE FROM person WHERE person.id = :id", lambda ctx:[{'id': p.id}])
675        )
676
677    def test_post_update_backref(self):
678        """test bidirectional post_update."""
679
680        person, ball, Ball, Person = (self.tables.person,
681                                self.tables.ball,
682                                self.classes.Ball,
683                                self.classes.Person)
684
685
686        mapper(Ball, ball)
687        mapper(Person, person, properties=dict(
688            balls=relationship(Ball,
689                           primaryjoin=ball.c.person_id == person.c.id,
690                           remote_side=ball.c.person_id, post_update=True,
691                           backref=backref('person', post_update=True)
692                           ),
693           favorite=relationship(Ball,
694                             primaryjoin=person.c.favorite_ball_id == ball.c.id,
695                             remote_side=person.c.favorite_ball_id)
696
697            ))
698
699        sess = sessionmaker()()
700        p1 = Person(data='p1')
701        p2 = Person(data='p2')
702        p3 = Person(data='p3')
703
704        b1 = Ball(data='b1')
705
706        b1.person = p1
707        sess.add_all([p1, p2, p3])
708        sess.commit()
709
710        # switch here.  the post_update
711        # on ball.person can't get tripped up
712        # by the fact that there's a "reverse" prop.
713        b1.person = p2
714        sess.commit()
715        eq_(
716            p2, b1.person
717        )
718
719        # do it the other way
720        p3.balls.append(b1)
721        sess.commit()
722        eq_(
723            p3, b1.person
724        )
725
726
727
728    def test_post_update_o2m(self):
729        """A cycle between two rows, with a post_update on the one-to-many"""
730
731        person, ball, Ball, Person = (self.tables.person,
732                                self.tables.ball,
733                                self.classes.Ball,
734                                self.classes.Person)
735
736
737        mapper(Ball, ball)
738        mapper(Person, person, properties=dict(
739            balls=relationship(Ball,
740                           primaryjoin=ball.c.person_id == person.c.id,
741                           remote_side=ball.c.person_id,
742                           cascade="all, delete-orphan",
743                           post_update=True,
744                           backref='person'),
745            favorite=relationship(Ball,
746                              primaryjoin=person.c.favorite_ball_id == ball.c.id,
747                              remote_side=person.c.favorite_ball_id)))
748
749        b = Ball(data='some data')
750        p = Person(data='some data')
751        p.balls.append(b)
752        b2 = Ball(data='some data')
753        p.balls.append(b2)
754        b3 = Ball(data='some data')
755        p.balls.append(b3)
756        b4 = Ball(data='some data')
757        p.balls.append(b4)
758        p.favorite = b
759        sess = create_session()
760        sess.add_all((b,p,b2,b3,b4))
761
762        self.assert_sql_execution(
763            testing.db,
764            sess.flush,
765            CompiledSQL("INSERT INTO ball (person_id, data) "
766             "VALUES (:person_id, :data)",
767             {'person_id':None, 'data':'some data'}),
768
769            CompiledSQL("INSERT INTO ball (person_id, data) "
770             "VALUES (:person_id, :data)",
771             {'person_id':None, 'data':'some data'}),
772
773            CompiledSQL("INSERT INTO ball (person_id, data) "
774             "VALUES (:person_id, :data)",
775             {'person_id':None, 'data':'some data'}),
776
777            CompiledSQL("INSERT INTO ball (person_id, data) "
778             "VALUES (:person_id, :data)",
779             {'person_id':None, 'data':'some data'}),
780
781            CompiledSQL("INSERT INTO person (favorite_ball_id, data) "
782             "VALUES (:favorite_ball_id, :data)",
783             lambda ctx:{'favorite_ball_id':b.id, 'data':'some data'}),
784
785             CompiledSQL("UPDATE ball SET person_id=:person_id "
786              "WHERE ball.id = :ball_id",
787              lambda ctx:[
788                {'person_id':p.id,'ball_id':b.id},
789                {'person_id':p.id,'ball_id':b2.id},
790                {'person_id':p.id,'ball_id':b3.id},
791                {'person_id':p.id,'ball_id':b4.id}
792                ]
793            ),
794
795        )
796
797        sess.delete(p)
798
799        self.assert_sql_execution(testing.db, sess.flush,
800            CompiledSQL("UPDATE ball SET person_id=:person_id "
801                "WHERE ball.id = :ball_id",
802                lambda ctx:[
803                    {'person_id': None, 'ball_id': b.id},
804                    {'person_id': None, 'ball_id': b2.id},
805                    {'person_id': None, 'ball_id': b3.id},
806                    {'person_id': None, 'ball_id': b4.id}
807                ]
808            ),
809            CompiledSQL("DELETE FROM person WHERE person.id = :id",
810             lambda ctx:[{'id':p.id}]),
811
812            CompiledSQL("DELETE FROM ball WHERE ball.id = :id",
813             lambda ctx:[{'id': b.id},
814                         {'id': b2.id},
815                         {'id': b3.id},
816                         {'id': b4.id}])
817        )
818
819    def test_post_update_m2o_detect_none(self):
820        person, ball, Ball, Person = (
821            self.tables.person,
822            self.tables.ball,
823            self.classes.Ball,
824            self.classes.Person)
825
826        mapper(Ball, ball, properties={
827            'person': relationship(
828                Person, post_update=True,
829                primaryjoin=person.c.id == ball.c.person_id)
830        })
831        mapper(Person, person)
832
833        sess = create_session(autocommit=False, expire_on_commit=True)
834        sess.add(Ball(person=Person()))
835        sess.commit()
836        b1 = sess.query(Ball).first()
837
838        # needs to be unloaded
839        assert 'person' not in b1.__dict__
840        b1.person = None
841
842        self.assert_sql_execution(
843            testing.db,
844            sess.flush,
845            CompiledSQL(
846                "UPDATE ball SET person_id=:person_id WHERE ball.id = :ball_id",
847                lambda ctx: {'person_id': None, 'ball_id': b1.id})
848        )
849
850        is_(b1.person, None)
851
852
853class SelfReferentialPostUpdateTest(fixtures.MappedTest):
854    """Post_update on a single self-referential mapper.
855
856
857    """
858
859    @classmethod
860    def define_tables(cls, metadata):
861        Table('node', metadata,
862              Column('id', Integer, primary_key=True,
863                     test_needs_autoincrement=True),
864              Column('path', String(50), nullable=False),
865              Column('parent_id', Integer,
866                     ForeignKey('node.id'), nullable=True),
867              Column('prev_sibling_id', Integer,
868                     ForeignKey('node.id'), nullable=True),
869              Column('next_sibling_id', Integer,
870                     ForeignKey('node.id'), nullable=True))
871
872    @classmethod
873    def setup_classes(cls):
874        class Node(cls.Basic):
875            def __init__(self, path=''):
876                self.path = path
877
878    def test_one(self):
879        """Post_update only fires off when needed.
880
881        This test case used to produce many superfluous update statements,
882        particularly upon delete
883
884        """
885
886        node, Node = self.tables.node, self.classes.Node
887
888
889        mapper(Node, node, properties={
890            'children': relationship(
891                Node,
892                primaryjoin=node.c.id==node.c.parent_id,
893                cascade="all",
894                backref=backref("parent", remote_side=node.c.id)
895            ),
896            'prev_sibling': relationship(
897                Node,
898                primaryjoin=node.c.prev_sibling_id==node.c.id,
899                remote_side=node.c.id,
900                uselist=False),
901            'next_sibling': relationship(
902                Node,
903                primaryjoin=node.c.next_sibling_id==node.c.id,
904                remote_side=node.c.id,
905                uselist=False,
906                post_update=True)})
907
908        session = create_session()
909
910        def append_child(parent, child):
911            if parent.children:
912                parent.children[-1].next_sibling = child
913                child.prev_sibling = parent.children[-1]
914            parent.children.append(child)
915
916        def remove_child(parent, child):
917            child.parent = None
918            node = child.next_sibling
919            node.prev_sibling = child.prev_sibling
920            child.prev_sibling.next_sibling = node
921            session.delete(child)
922        root = Node('root')
923
924        about = Node('about')
925        cats = Node('cats')
926        stories = Node('stories')
927        bruce = Node('bruce')
928
929        append_child(root, about)
930        assert(about.prev_sibling is None)
931        append_child(root, cats)
932        assert(cats.prev_sibling is about)
933        assert(cats.next_sibling is None)
934        assert(about.next_sibling is cats)
935        assert(about.prev_sibling is None)
936        append_child(root, stories)
937        append_child(root, bruce)
938        session.add(root)
939        session.flush()
940
941        remove_child(root, cats)
942
943        # pre-trigger lazy loader on 'cats' to make the test easier
944        cats.children
945        self.assert_sql_execution(
946            testing.db,
947            session.flush,
948            AllOf(
949            CompiledSQL("UPDATE node SET prev_sibling_id=:prev_sibling_id "
950             "WHERE node.id = :node_id",
951             lambda ctx:{'prev_sibling_id':about.id, 'node_id':stories.id}),
952
953            CompiledSQL("UPDATE node SET next_sibling_id=:next_sibling_id "
954             "WHERE node.id = :node_id",
955             lambda ctx:{'next_sibling_id':stories.id, 'node_id':about.id}),
956
957            CompiledSQL("UPDATE node SET next_sibling_id=:next_sibling_id "
958             "WHERE node.id = :node_id",
959             lambda ctx:{'next_sibling_id':None, 'node_id':cats.id}),
960            ),
961
962            CompiledSQL("DELETE FROM node WHERE node.id = :id",
963             lambda ctx:[{'id':cats.id}])
964        )
965
966        session.delete(root)
967
968        self.assert_sql_execution(
969            testing.db,
970            session.flush,
971            CompiledSQL("UPDATE node SET next_sibling_id=:next_sibling_id "
972                "WHERE node.id = :node_id",
973                lambda ctx: [
974                            {'node_id': about.id, 'next_sibling_id': None},
975                            {'node_id': stories.id, 'next_sibling_id': None}
976                        ]
977            ),
978            AllOf(
979                CompiledSQL("DELETE FROM node WHERE node.id = :id",
980                    lambda ctx:{'id':about.id}
981                ),
982                CompiledSQL("DELETE FROM node WHERE node.id = :id",
983                    lambda ctx:{'id':stories.id}
984                ),
985                CompiledSQL("DELETE FROM node WHERE node.id = :id",
986                    lambda ctx:{'id':bruce.id}
987                ),
988            ),
989            CompiledSQL("DELETE FROM node WHERE node.id = :id",
990                lambda ctx:{'id':root.id}
991            ),
992        )
993        about = Node('about')
994        cats = Node('cats')
995        about.next_sibling = cats
996        cats.prev_sibling = about
997        session.add(about)
998        session.flush()
999        session.delete(about)
1000        cats.prev_sibling = None
1001        session.flush()
1002
1003class SelfReferentialPostUpdateTest2(fixtures.MappedTest):
1004
1005    @classmethod
1006    def define_tables(cls, metadata):
1007        Table("a_table", metadata,
1008              Column("id", Integer(), primary_key=True, test_needs_autoincrement=True),
1009              Column("fui", String(128)),
1010              Column("b", Integer(), ForeignKey("a_table.id")))
1011
1012    @classmethod
1013    def setup_classes(cls):
1014        class A(cls.Basic):
1015            pass
1016
1017    def test_one(self):
1018        """
1019        Test that post_update remembers to be involved in update operations as
1020        well, since it replaces the normal dependency processing completely
1021        [ticket:413]
1022
1023        """
1024
1025        A, a_table = self.classes.A, self.tables.a_table
1026
1027
1028        mapper(A, a_table, properties={
1029            'foo': relationship(A,
1030                            remote_side=[a_table.c.id],
1031                            post_update=True)})
1032
1033        session = create_session()
1034
1035        f1 = A(fui="f1")
1036        session.add(f1)
1037        session.flush()
1038
1039        f2 = A(fui="f2", foo=f1)
1040
1041        # at this point f1 is already inserted.  but we need post_update
1042        # to fire off anyway
1043        session.add(f2)
1044        session.flush()
1045        session.expunge_all()
1046
1047        f1 = session.query(A).get(f1.id)
1048        f2 = session.query(A).get(f2.id)
1049        assert f2.foo is f1
1050
1051
1052class SelfReferentialPostUpdateTest3(fixtures.MappedTest):
1053    @classmethod
1054    def define_tables(cls, metadata):
1055        Table('parent', metadata,
1056              Column('id', Integer, primary_key=True,
1057                     test_needs_autoincrement=True),
1058              Column('name', String(50), nullable=False),
1059              Column('child_id', Integer,
1060                     ForeignKey('child.id', name='c1'), nullable=True))
1061
1062        Table('child', metadata,
1063           Column('id', Integer, primary_key=True,
1064                  test_needs_autoincrement=True),
1065           Column('name', String(50), nullable=False),
1066           Column('child_id', Integer,
1067                  ForeignKey('child.id')),
1068           Column('parent_id', Integer,
1069                  ForeignKey('parent.id'), nullable=True))
1070
1071    @classmethod
1072    def setup_classes(cls):
1073        class Parent(cls.Basic):
1074            def __init__(self, name=''):
1075                self.name = name
1076
1077        class Child(cls.Basic):
1078            def __init__(self, name=''):
1079                self.name = name
1080
1081    def test_one(self):
1082        Child, Parent, parent, child = (self.classes.Child,
1083                                self.classes.Parent,
1084                                self.tables.parent,
1085                                self.tables.child)
1086
1087        mapper(Parent, parent, properties={
1088            'children':relationship(Child, primaryjoin=parent.c.id==child.c.parent_id),
1089            'child':relationship(Child, primaryjoin=parent.c.child_id==child.c.id, post_update=True)
1090        })
1091        mapper(Child, child, properties={
1092            'parent':relationship(Child, remote_side=child.c.id)
1093        })
1094
1095        session = create_session()
1096        p1 = Parent('p1')
1097        c1 = Child('c1')
1098        c2 = Child('c2')
1099        p1.children =[c1, c2]
1100        c2.parent = c1
1101        p1.child = c2
1102
1103        session.add_all([p1, c1, c2])
1104        session.flush()
1105
1106        p2 = Parent('p2')
1107        c3 = Child('c3')
1108        p2.children = [c3]
1109        p2.child = c3
1110        session.add(p2)
1111
1112        session.delete(c2)
1113        p1.children.remove(c2)
1114        p1.child = None
1115        session.flush()
1116
1117        p2.child = None
1118        session.flush()
1119
1120class PostUpdateBatchingTest(fixtures.MappedTest):
1121    """test that lots of post update cols batch together into a single UPDATE."""
1122
1123    @classmethod
1124    def define_tables(cls, metadata):
1125        Table('parent', metadata,
1126              Column('id', Integer, primary_key=True,
1127                     test_needs_autoincrement=True),
1128              Column('name', String(50), nullable=False),
1129              Column('c1_id', Integer,
1130                     ForeignKey('child1.id', name='c1'), nullable=True),
1131              Column('c2_id', Integer,
1132                    ForeignKey('child2.id', name='c2'), nullable=True),
1133              Column('c3_id', Integer,
1134                       ForeignKey('child3.id', name='c3'), nullable=True)
1135            )
1136
1137        Table('child1', metadata,
1138           Column('id', Integer, primary_key=True,
1139                  test_needs_autoincrement=True),
1140           Column('name', String(50), nullable=False),
1141           Column('parent_id', Integer,
1142                  ForeignKey('parent.id'), nullable=False))
1143
1144        Table('child2', metadata,
1145             Column('id', Integer, primary_key=True,
1146                    test_needs_autoincrement=True),
1147             Column('name', String(50), nullable=False),
1148             Column('parent_id', Integer,
1149                    ForeignKey('parent.id'), nullable=False))
1150
1151        Table('child3', metadata,
1152           Column('id', Integer, primary_key=True,
1153                  test_needs_autoincrement=True),
1154           Column('name', String(50), nullable=False),
1155           Column('parent_id', Integer,
1156                  ForeignKey('parent.id'), nullable=False))
1157
1158    @classmethod
1159    def setup_classes(cls):
1160        class Parent(cls.Basic):
1161            def __init__(self, name=''):
1162                self.name = name
1163        class Child1(cls.Basic):
1164            def __init__(self, name=''):
1165                self.name = name
1166        class Child2(cls.Basic):
1167            def __init__(self, name=''):
1168                self.name = name
1169        class Child3(cls.Basic):
1170            def __init__(self, name=''):
1171                self.name = name
1172
1173    def test_one(self):
1174        child1, child2, child3, Parent, parent, Child1, Child2, Child3 = (self.tables.child1,
1175                                self.tables.child2,
1176                                self.tables.child3,
1177                                self.classes.Parent,
1178                                self.tables.parent,
1179                                self.classes.Child1,
1180                                self.classes.Child2,
1181                                self.classes.Child3)
1182
1183        mapper(Parent, parent, properties={
1184            'c1s':relationship(Child1, primaryjoin=child1.c.parent_id==parent.c.id),
1185            'c2s':relationship(Child2, primaryjoin=child2.c.parent_id==parent.c.id),
1186            'c3s':relationship(Child3, primaryjoin=child3.c.parent_id==parent.c.id),
1187
1188            'c1':relationship(Child1, primaryjoin=child1.c.id==parent.c.c1_id, post_update=True),
1189            'c2':relationship(Child2, primaryjoin=child2.c.id==parent.c.c2_id, post_update=True),
1190            'c3':relationship(Child3, primaryjoin=child3.c.id==parent.c.c3_id, post_update=True),
1191        })
1192        mapper(Child1, child1)
1193        mapper(Child2, child2)
1194        mapper(Child3, child3)
1195
1196        sess = create_session()
1197
1198        p1 = Parent('p1')
1199        c11, c12, c13 = Child1('c1'), Child1('c2'), Child1('c3')
1200        c21, c22, c23 = Child2('c1'), Child2('c2'), Child2('c3')
1201        c31, c32, c33 = Child3('c1'), Child3('c2'), Child3('c3')
1202
1203        p1.c1s = [c11, c12, c13]
1204        p1.c2s = [c21, c22, c23]
1205        p1.c3s = [c31, c32, c33]
1206        sess.add(p1)
1207        sess.flush()
1208
1209        p1.c1 = c12
1210        p1.c2 = c23
1211        p1.c3 = c31
1212
1213        self.assert_sql_execution(
1214            testing.db,
1215            sess.flush,
1216            CompiledSQL(
1217                "UPDATE parent SET c1_id=:c1_id, c2_id=:c2_id, c3_id=:c3_id "
1218                "WHERE parent.id = :parent_id",
1219                lambda ctx: {'c2_id': c23.id, 'parent_id': p1.id,
1220                             'c1_id': c12.id, 'c3_id': c31.id}
1221            )
1222        )
1223
1224        p1.c1 = p1.c2 = p1.c3 = None
1225
1226        self.assert_sql_execution(
1227            testing.db,
1228            sess.flush,
1229            CompiledSQL(
1230                "UPDATE parent SET c1_id=:c1_id, c2_id=:c2_id, c3_id=:c3_id "
1231                "WHERE parent.id = :parent_id",
1232                lambda ctx: {'c2_id': None, 'parent_id': p1.id,
1233                             'c1_id': None, 'c3_id': None}
1234            )
1235        )
1236