1"""Tests cyclical mapper relationships. 2 3We might want to try an automated generate of much of this, all combos of 4T1<->T2, with o2m or m2o between them, and a third T3 with o2m/m2o to one/both 5T1/T2. 6 7""" 8from sqlalchemy import testing 9from sqlalchemy import Integer, String, ForeignKey 10from sqlalchemy.testing.schema import Table, Column 11from sqlalchemy.orm import mapper, relationship, backref, \ 12 create_session, sessionmaker 13from sqlalchemy.testing import eq_, is_ 14from sqlalchemy.testing.assertsql import RegexSQL, CompiledSQL, AllOf 15from sqlalchemy.testing import fixtures 16 17 18class SelfReferentialTest(fixtures.MappedTest): 19 """A self-referential mapper with an additional list of child objects.""" 20 21 @classmethod 22 def define_tables(cls, metadata): 23 Table('t1', metadata, 24 Column('c1', Integer, primary_key=True, 25 test_needs_autoincrement=True), 26 Column('parent_c1', Integer, ForeignKey('t1.c1')), 27 Column('data', String(20))) 28 Table('t2', metadata, 29 Column('c1', Integer, primary_key=True, 30 test_needs_autoincrement=True), 31 Column('c1id', Integer, ForeignKey('t1.c1')), 32 Column('data', String(20))) 33 34 @classmethod 35 def setup_classes(cls): 36 class C1(cls.Basic): 37 def __init__(self, data=None): 38 self.data = data 39 40 class C2(cls.Basic): 41 def __init__(self, data=None): 42 self.data = data 43 44 def test_single(self): 45 C1, t1 = self.classes.C1, self.tables.t1 46 47 mapper(C1, t1, properties = { 48 'c1s':relationship(C1, cascade="all"), 49 'parent':relationship(C1, 50 primaryjoin=t1.c.parent_c1 == t1.c.c1, 51 remote_side=t1.c.c1, 52 lazy='select', 53 uselist=False)}) 54 a = C1('head c1') 55 a.c1s.append(C1('another c1')) 56 57 sess = create_session( ) 58 sess.add(a) 59 sess.flush() 60 sess.delete(a) 61 sess.flush() 62 63 def test_many_to_one_only(self): 64 """ 65 66 test that the circular dependency sort can assemble a many-to-one 67 dependency processor when only the object on the "many" side is 68 actually in the list of modified objects. 69 70 """ 71 72 C1, t1 = self.classes.C1, self.tables.t1 73 74 mapper(C1, t1, properties={ 75 'parent':relationship(C1, 76 primaryjoin=t1.c.parent_c1 == t1.c.c1, 77 remote_side=t1.c.c1)}) 78 79 c1 = C1() 80 81 sess = create_session() 82 sess.add(c1) 83 sess.flush() 84 sess.expunge_all() 85 c1 = sess.query(C1).get(c1.c1) 86 c2 = C1() 87 c2.parent = c1 88 sess.add(c2) 89 sess.flush() 90 assert c2.parent_c1==c1.c1 91 92 def test_cycle(self): 93 C2, C1, t2, t1 = (self.classes.C2, 94 self.classes.C1, 95 self.tables.t2, 96 self.tables.t1) 97 98 mapper(C1, t1, properties = { 99 'c1s' : relationship(C1, cascade="all"), 100 'c2s' : relationship(mapper(C2, t2), cascade="all, delete-orphan")}) 101 102 a = C1('head c1') 103 a.c1s.append(C1('child1')) 104 a.c1s.append(C1('child2')) 105 a.c1s[0].c1s.append(C1('subchild1')) 106 a.c1s[0].c1s.append(C1('subchild2')) 107 a.c1s[1].c2s.append(C2('child2 data1')) 108 a.c1s[1].c2s.append(C2('child2 data2')) 109 sess = create_session( ) 110 sess.add(a) 111 sess.flush() 112 113 sess.delete(a) 114 sess.flush() 115 116 def test_setnull_ondelete(self): 117 C1, t1 = self.classes.C1, self.tables.t1 118 119 mapper(C1, t1, properties={ 120 'children':relationship(C1) 121 }) 122 123 sess = create_session() 124 c1 = C1() 125 c2 = C1() 126 c1.children.append(c2) 127 sess.add(c1) 128 sess.flush() 129 assert c2.parent_c1 == c1.c1 130 131 sess.delete(c1) 132 sess.flush() 133 assert c2.parent_c1 is None 134 135 sess.expire_all() 136 assert c2.parent_c1 is None 137 138class SelfReferentialNoPKTest(fixtures.MappedTest): 139 """A self-referential relationship that joins on a column other than the primary key column""" 140 141 @classmethod 142 def define_tables(cls, metadata): 143 Table('item', metadata, 144 Column('id', Integer, primary_key=True, test_needs_autoincrement=True), 145 Column('uuid', String(32), unique=True, nullable=False), 146 Column('parent_uuid', String(32), ForeignKey('item.uuid'), 147 nullable=True)) 148 149 @classmethod 150 def setup_classes(cls): 151 class TT(cls.Basic): 152 def __init__(self): 153 self.uuid = hex(id(self)) 154 155 @classmethod 156 def setup_mappers(cls): 157 item, TT = cls.tables.item, cls.classes.TT 158 159 mapper(TT, item, properties={ 160 'children': relationship( 161 TT, 162 remote_side=[item.c.parent_uuid], 163 backref=backref('parent', remote_side=[item.c.uuid]))}) 164 165 def test_basic(self): 166 TT = self.classes.TT 167 168 t1 = TT() 169 t1.children.append(TT()) 170 t1.children.append(TT()) 171 172 s = create_session() 173 s.add(t1) 174 s.flush() 175 s.expunge_all() 176 t = s.query(TT).filter_by(id=t1.id).one() 177 eq_(t.children[0].parent_uuid, t1.uuid) 178 179 def test_lazy_clause(self): 180 TT = self.classes.TT 181 182 s = create_session() 183 t1 = TT() 184 t2 = TT() 185 t1.children.append(t2) 186 s.add(t1) 187 s.flush() 188 s.expunge_all() 189 190 t = s.query(TT).filter_by(id=t2.id).one() 191 eq_(t.uuid, t2.uuid) 192 eq_(t.parent.uuid, t1.uuid) 193 194 195class InheritTestOne(fixtures.MappedTest): 196 @classmethod 197 def define_tables(cls, metadata): 198 Table("parent", metadata, 199 Column("id", Integer, primary_key=True, test_needs_autoincrement=True), 200 Column("parent_data", String(50)), 201 Column("type", String(10))) 202 203 Table("child1", metadata, 204 Column("id", Integer, ForeignKey("parent.id"), primary_key=True), 205 Column("child1_data", String(50))) 206 207 Table("child2", metadata, 208 Column("id", Integer, ForeignKey("parent.id"), primary_key=True), 209 Column("child1_id", Integer, ForeignKey("child1.id"), 210 nullable=False), 211 Column("child2_data", String(50))) 212 213 @classmethod 214 def setup_classes(cls): 215 class Parent(cls.Basic): 216 pass 217 218 class Child1(Parent): 219 pass 220 221 class Child2(Parent): 222 pass 223 224 @classmethod 225 def setup_mappers(cls): 226 child1, child2, parent, Parent, Child1, Child2 = (cls.tables.child1, 227 cls.tables.child2, 228 cls.tables.parent, 229 cls.classes.Parent, 230 cls.classes.Child1, 231 cls.classes.Child2) 232 233 mapper(Parent, parent) 234 mapper(Child1, child1, inherits=Parent) 235 mapper(Child2, child2, inherits=Parent, properties=dict( 236 child1=relationship(Child1, 237 primaryjoin=child2.c.child1_id == child1.c.id))) 238 239 def test_many_to_one_only(self): 240 """test similar to SelfReferentialTest.testmanytooneonly""" 241 242 Child1, Child2 = self.classes.Child1, self.classes.Child2 243 244 245 session = create_session() 246 247 c1 = Child1() 248 c1.child1_data = "qwerty" 249 session.add(c1) 250 session.flush() 251 session.expunge_all() 252 253 c1 = session.query(Child1).filter_by(child1_data="qwerty").one() 254 c2 = Child2() 255 c2.child1 = c1 256 c2.child2_data = "asdfgh" 257 session.add(c2) 258 259 # the flush will fail if the UOW does not set up a many-to-one DP 260 # attached to a task corresponding to c1, since "child1_id" is not 261 # nullable 262 session.flush() 263 264 265class InheritTestTwo(fixtures.MappedTest): 266 """ 267 268 The fix in BiDirectionalManyToOneTest raised this issue, regarding the 269 'circular sort' containing UOWTasks that were still polymorphic, which 270 could create duplicate entries in the final sort 271 272 """ 273 274 @classmethod 275 def define_tables(cls, metadata): 276 Table('a', metadata, 277 Column('id', Integer, primary_key=True, test_needs_autoincrement=True), 278 Column('cid', Integer, ForeignKey('c.id'))) 279 280 Table('b', metadata, 281 Column('id', Integer, ForeignKey("a.id"), primary_key=True), 282 ) 283 284 Table('c', metadata, 285 Column('id', Integer, primary_key=True, test_needs_autoincrement=True), 286 Column('aid', Integer, 287 ForeignKey('a.id', name="foo"))) 288 289 @classmethod 290 def setup_classes(cls): 291 class A(cls.Basic): 292 pass 293 294 class B(A): 295 pass 296 297 class C(cls.Basic): 298 pass 299 300 def test_flush(self): 301 a, A, c, b, C, B = (self.tables.a, 302 self.classes.A, 303 self.tables.c, 304 self.tables.b, 305 self.classes.C, 306 self.classes.B) 307 308 mapper(A, a, properties={ 309 'cs':relationship(C, primaryjoin=a.c.cid==c.c.id)}) 310 311 mapper(B, b, inherits=A, inherit_condition=b.c.id == a.c.id) 312 313 mapper(C, c, properties={ 314 'arel':relationship(A, primaryjoin=a.c.id == c.c.aid)}) 315 316 sess = create_session() 317 bobj = B() 318 sess.add(bobj) 319 cobj = C() 320 sess.add(cobj) 321 sess.flush() 322 323 324class BiDirectionalManyToOneTest(fixtures.MappedTest): 325 run_define_tables = 'each' 326 327 @classmethod 328 def define_tables(cls, metadata): 329 Table('t1', metadata, 330 Column('id', Integer, primary_key=True, test_needs_autoincrement=True), 331 Column('data', String(30)), 332 Column('t2id', Integer, ForeignKey('t2.id'))) 333 Table('t2', metadata, 334 Column('id', Integer, primary_key=True, test_needs_autoincrement=True), 335 Column('data', String(30)), 336 Column('t1id', Integer, 337 ForeignKey('t1.id', name="foo_fk"))) 338 Table('t3', metadata, 339 Column('id', Integer, primary_key=True, test_needs_autoincrement=True), 340 Column('data', String(30)), 341 Column('t1id', Integer, ForeignKey('t1.id'), nullable=False), 342 Column('t2id', Integer, ForeignKey('t2.id'), nullable=False)) 343 344 @classmethod 345 def setup_classes(cls): 346 class T1(cls.Basic): 347 pass 348 class T2(cls.Basic): 349 pass 350 class T3(cls.Basic): 351 pass 352 353 @classmethod 354 def setup_mappers(cls): 355 t2, T2, T3, t1, t3, T1 = (cls.tables.t2, 356 cls.classes.T2, 357 cls.classes.T3, 358 cls.tables.t1, 359 cls.tables.t3, 360 cls.classes.T1) 361 362 mapper(T1, t1, properties={ 363 't2':relationship(T2, primaryjoin=t1.c.t2id == t2.c.id)}) 364 mapper(T2, t2, properties={ 365 't1':relationship(T1, primaryjoin=t2.c.t1id == t1.c.id)}) 366 mapper(T3, t3, properties={ 367 't1':relationship(T1), 368 't2':relationship(T2)}) 369 370 def test_reflush(self): 371 T2, T3, T1 = (self.classes.T2, 372 self.classes.T3, 373 self.classes.T1) 374 375 o1 = T1() 376 o1.t2 = T2() 377 sess = create_session() 378 sess.add(o1) 379 sess.flush() 380 381 # the bug here is that the dependency sort comes up with T1/T2 in a 382 # cycle, but there are no T1/T2 objects to be saved. therefore no 383 # "cyclical subtree" gets generated, and one or the other of T1/T2 384 # gets lost, and processors on T3 don't fire off. the test will then 385 # fail because the FK's on T3 are not nullable. 386 o3 = T3() 387 o3.t1 = o1 388 o3.t2 = o1.t2 389 sess.add(o3) 390 sess.flush() 391 392 def test_reflush_2(self): 393 """A variant on test_reflush()""" 394 395 T2, T3, T1 = (self.classes.T2, 396 self.classes.T3, 397 self.classes.T1) 398 399 o1 = T1() 400 o1.t2 = T2() 401 sess = create_session() 402 sess.add(o1) 403 sess.flush() 404 405 # in this case, T1, T2, and T3 tasks will all be in the cyclical 406 # tree normally. the dependency processors for T3 are part of the 407 # 'extradeps' collection so they all get assembled into the tree 408 # as well. 409 o1a = T1() 410 o2a = T2() 411 sess.add(o1a) 412 sess.add(o2a) 413 o3b = T3() 414 o3b.t1 = o1a 415 o3b.t2 = o2a 416 sess.add(o3b) 417 418 o3 = T3() 419 o3.t1 = o1 420 o3.t2 = o1.t2 421 sess.add(o3) 422 sess.flush() 423 424 425class BiDirectionalOneToManyTest(fixtures.MappedTest): 426 """tests two mappers with a one-to-many relationship to each other.""" 427 428 run_define_tables = 'each' 429 430 @classmethod 431 def define_tables(cls, metadata): 432 Table('t1', metadata, 433 Column('c1', Integer, primary_key=True, test_needs_autoincrement=True), 434 Column('c2', Integer, ForeignKey('t2.c1'))) 435 436 Table('t2', metadata, 437 Column('c1', Integer, primary_key=True, test_needs_autoincrement=True), 438 Column('c2', Integer, 439 ForeignKey('t1.c1', name='t1c1_fk'))) 440 441 @classmethod 442 def setup_classes(cls): 443 class C1(cls.Basic): 444 pass 445 446 class C2(cls.Basic): 447 pass 448 449 def test_cycle(self): 450 C2, C1, t2, t1 = (self.classes.C2, 451 self.classes.C1, 452 self.tables.t2, 453 self.tables.t1) 454 455 mapper(C2, t2, properties={ 456 'c1s': relationship(C1, 457 primaryjoin=t2.c.c1 == t1.c.c2, 458 uselist=True)}) 459 mapper(C1, t1, properties={ 460 'c2s': relationship(C2, 461 primaryjoin=t1.c.c1 == t2.c.c2, 462 uselist=True)}) 463 464 a = C1() 465 b = C2() 466 c = C1() 467 d = C2() 468 e = C2() 469 f = C2() 470 a.c2s.append(b) 471 d.c1s.append(c) 472 b.c1s.append(c) 473 sess = create_session() 474 sess.add_all((a, b, c, d, e, f)) 475 sess.flush() 476 477 478class BiDirectionalOneToManyTest2(fixtures.MappedTest): 479 """Two mappers with a one-to-many relationship to each other, 480 with a second one-to-many on one of the mappers""" 481 482 run_define_tables = 'each' 483 484 @classmethod 485 def define_tables(cls, metadata): 486 Table('t1', metadata, 487 Column('c1', Integer, primary_key=True, test_needs_autoincrement=True), 488 Column('c2', Integer, ForeignKey('t2.c1')), 489 test_needs_autoincrement=True) 490 491 Table('t2', metadata, 492 Column('c1', Integer, primary_key=True, test_needs_autoincrement=True), 493 Column('c2', Integer, 494 ForeignKey('t1.c1', name='t1c1_fq')), 495 test_needs_autoincrement=True) 496 497 Table('t1_data', metadata, 498 Column('c1', Integer, primary_key=True, test_needs_autoincrement=True), 499 Column('t1id', Integer, ForeignKey('t1.c1')), 500 Column('data', String(20)), 501 test_needs_autoincrement=True) 502 503 @classmethod 504 def setup_classes(cls): 505 class C1(cls.Basic): 506 pass 507 508 class C2(cls.Basic): 509 pass 510 511 class C1Data(cls.Basic): 512 pass 513 514 @classmethod 515 def setup_mappers(cls): 516 t2, t1, C1Data, t1_data, C2, C1 = (cls.tables.t2, 517 cls.tables.t1, 518 cls.classes.C1Data, 519 cls.tables.t1_data, 520 cls.classes.C2, 521 cls.classes.C1) 522 523 mapper(C2, t2, properties={ 524 'c1s': relationship(C1, 525 primaryjoin=t2.c.c1 == t1.c.c2, 526 uselist=True)}) 527 mapper(C1, t1, properties={ 528 'c2s': relationship(C2, 529 primaryjoin=t1.c.c1 == t2.c.c2, 530 uselist=True), 531 'data': relationship(mapper(C1Data, t1_data))}) 532 533 def test_cycle(self): 534 C2, C1, C1Data = (self.classes.C2, 535 self.classes.C1, 536 self.classes.C1Data) 537 538 a = C1() 539 b = C2() 540 c = C1() 541 d = C2() 542 e = C2() 543 f = C2() 544 a.c2s.append(b) 545 d.c1s.append(c) 546 b.c1s.append(c) 547 a.data.append(C1Data(data='c1data1')) 548 a.data.append(C1Data(data='c1data2')) 549 c.data.append(C1Data(data='c1data3')) 550 sess = create_session() 551 sess.add_all((a, b, c, d, e, f)) 552 sess.flush() 553 554 sess.delete(d) 555 sess.delete(c) 556 sess.flush() 557 558class OneToManyManyToOneTest(fixtures.MappedTest): 559 """ 560 561 Tests two mappers, one has a one-to-many on the other mapper, the other 562 has a separate many-to-one relationship to the first. two tests will have 563 a row for each item that is dependent on the other. without the 564 "post_update" flag, such relationships raise an exception when 565 dependencies are sorted. 566 567 """ 568 run_define_tables = 'each' 569 570 @classmethod 571 def define_tables(cls, metadata): 572 Table('ball', metadata, 573 Column('id', Integer, primary_key=True, test_needs_autoincrement=True), 574 Column('person_id', Integer, 575 ForeignKey('person.id', name='fk_person_id')), 576 Column('data', String(30))) 577 578 Table('person', metadata, 579 Column('id', Integer, primary_key=True, test_needs_autoincrement=True), 580 Column('favorite_ball_id', Integer, ForeignKey('ball.id')), 581 Column('data', String(30))) 582 583 @classmethod 584 def setup_classes(cls): 585 class Person(cls.Basic): 586 pass 587 588 class Ball(cls.Basic): 589 pass 590 591 def test_cycle(self): 592 """ 593 This test has a peculiar aspect in that it doesn't create as many 594 dependent relationships as the other tests, and revealed a small 595 glitch in the circular dependency sorting. 596 597 """ 598 599 person, ball, Ball, Person = (self.tables.person, 600 self.tables.ball, 601 self.classes.Ball, 602 self.classes.Person) 603 604 mapper(Ball, ball) 605 mapper(Person, person, properties=dict( 606 balls=relationship(Ball, 607 primaryjoin=ball.c.person_id == person.c.id, 608 remote_side=ball.c.person_id), 609 favorite=relationship(Ball, 610 primaryjoin=person.c.favorite_ball_id == ball.c.id, 611 remote_side=ball.c.id))) 612 613 b = Ball() 614 p = Person() 615 p.balls.append(b) 616 sess = create_session() 617 sess.add(p) 618 sess.flush() 619 620 def test_post_update_m2o(self): 621 """A cycle between two rows, with a post_update on the many-to-one""" 622 623 person, ball, Ball, Person = (self.tables.person, 624 self.tables.ball, 625 self.classes.Ball, 626 self.classes.Person) 627 628 mapper(Ball, ball) 629 mapper(Person, person, properties=dict( 630 balls=relationship(Ball, 631 primaryjoin=ball.c.person_id == person.c.id, 632 remote_side=ball.c.person_id, 633 post_update=False, 634 cascade="all, delete-orphan"), 635 favorite=relationship(Ball, 636 primaryjoin=person.c.favorite_ball_id == ball.c.id, 637 remote_side=person.c.favorite_ball_id, 638 post_update=True))) 639 640 b = Ball(data='some data') 641 p = Person(data='some data') 642 p.balls.append(b) 643 p.balls.append(Ball(data='some data')) 644 p.balls.append(Ball(data='some data')) 645 p.balls.append(Ball(data='some data')) 646 p.favorite = b 647 sess = create_session() 648 sess.add(b) 649 sess.add(p) 650 651 self.assert_sql_execution( 652 testing.db, 653 sess.flush, 654 RegexSQL("^INSERT INTO person", {'data':'some data'}), 655 RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}), 656 RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}), 657 RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}), 658 RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}), 659 CompiledSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id " 660 "WHERE person.id = :person_id", 661 lambda ctx:{'favorite_ball_id':p.favorite.id, 'person_id':p.id} 662 ), 663 ) 664 665 sess.delete(p) 666 667 self.assert_sql_execution( 668 testing.db, 669 sess.flush, 670 CompiledSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id " 671 "WHERE person.id = :person_id", 672 lambda ctx: {'person_id': p.id, 'favorite_ball_id': None}), 673 CompiledSQL("DELETE FROM ball WHERE ball.id = :id", None), # lambda ctx:[{'id': 1L}, {'id': 4L}, {'id': 3L}, {'id': 2L}]) 674 CompiledSQL("DELETE FROM person WHERE person.id = :id", lambda ctx:[{'id': p.id}]) 675 ) 676 677 def test_post_update_backref(self): 678 """test bidirectional post_update.""" 679 680 person, ball, Ball, Person = (self.tables.person, 681 self.tables.ball, 682 self.classes.Ball, 683 self.classes.Person) 684 685 686 mapper(Ball, ball) 687 mapper(Person, person, properties=dict( 688 balls=relationship(Ball, 689 primaryjoin=ball.c.person_id == person.c.id, 690 remote_side=ball.c.person_id, post_update=True, 691 backref=backref('person', post_update=True) 692 ), 693 favorite=relationship(Ball, 694 primaryjoin=person.c.favorite_ball_id == ball.c.id, 695 remote_side=person.c.favorite_ball_id) 696 697 )) 698 699 sess = sessionmaker()() 700 p1 = Person(data='p1') 701 p2 = Person(data='p2') 702 p3 = Person(data='p3') 703 704 b1 = Ball(data='b1') 705 706 b1.person = p1 707 sess.add_all([p1, p2, p3]) 708 sess.commit() 709 710 # switch here. the post_update 711 # on ball.person can't get tripped up 712 # by the fact that there's a "reverse" prop. 713 b1.person = p2 714 sess.commit() 715 eq_( 716 p2, b1.person 717 ) 718 719 # do it the other way 720 p3.balls.append(b1) 721 sess.commit() 722 eq_( 723 p3, b1.person 724 ) 725 726 727 728 def test_post_update_o2m(self): 729 """A cycle between two rows, with a post_update on the one-to-many""" 730 731 person, ball, Ball, Person = (self.tables.person, 732 self.tables.ball, 733 self.classes.Ball, 734 self.classes.Person) 735 736 737 mapper(Ball, ball) 738 mapper(Person, person, properties=dict( 739 balls=relationship(Ball, 740 primaryjoin=ball.c.person_id == person.c.id, 741 remote_side=ball.c.person_id, 742 cascade="all, delete-orphan", 743 post_update=True, 744 backref='person'), 745 favorite=relationship(Ball, 746 primaryjoin=person.c.favorite_ball_id == ball.c.id, 747 remote_side=person.c.favorite_ball_id))) 748 749 b = Ball(data='some data') 750 p = Person(data='some data') 751 p.balls.append(b) 752 b2 = Ball(data='some data') 753 p.balls.append(b2) 754 b3 = Ball(data='some data') 755 p.balls.append(b3) 756 b4 = Ball(data='some data') 757 p.balls.append(b4) 758 p.favorite = b 759 sess = create_session() 760 sess.add_all((b,p,b2,b3,b4)) 761 762 self.assert_sql_execution( 763 testing.db, 764 sess.flush, 765 CompiledSQL("INSERT INTO ball (person_id, data) " 766 "VALUES (:person_id, :data)", 767 {'person_id':None, 'data':'some data'}), 768 769 CompiledSQL("INSERT INTO ball (person_id, data) " 770 "VALUES (:person_id, :data)", 771 {'person_id':None, 'data':'some data'}), 772 773 CompiledSQL("INSERT INTO ball (person_id, data) " 774 "VALUES (:person_id, :data)", 775 {'person_id':None, 'data':'some data'}), 776 777 CompiledSQL("INSERT INTO ball (person_id, data) " 778 "VALUES (:person_id, :data)", 779 {'person_id':None, 'data':'some data'}), 780 781 CompiledSQL("INSERT INTO person (favorite_ball_id, data) " 782 "VALUES (:favorite_ball_id, :data)", 783 lambda ctx:{'favorite_ball_id':b.id, 'data':'some data'}), 784 785 CompiledSQL("UPDATE ball SET person_id=:person_id " 786 "WHERE ball.id = :ball_id", 787 lambda ctx:[ 788 {'person_id':p.id,'ball_id':b.id}, 789 {'person_id':p.id,'ball_id':b2.id}, 790 {'person_id':p.id,'ball_id':b3.id}, 791 {'person_id':p.id,'ball_id':b4.id} 792 ] 793 ), 794 795 ) 796 797 sess.delete(p) 798 799 self.assert_sql_execution(testing.db, sess.flush, 800 CompiledSQL("UPDATE ball SET person_id=:person_id " 801 "WHERE ball.id = :ball_id", 802 lambda ctx:[ 803 {'person_id': None, 'ball_id': b.id}, 804 {'person_id': None, 'ball_id': b2.id}, 805 {'person_id': None, 'ball_id': b3.id}, 806 {'person_id': None, 'ball_id': b4.id} 807 ] 808 ), 809 CompiledSQL("DELETE FROM person WHERE person.id = :id", 810 lambda ctx:[{'id':p.id}]), 811 812 CompiledSQL("DELETE FROM ball WHERE ball.id = :id", 813 lambda ctx:[{'id': b.id}, 814 {'id': b2.id}, 815 {'id': b3.id}, 816 {'id': b4.id}]) 817 ) 818 819 def test_post_update_m2o_detect_none(self): 820 person, ball, Ball, Person = ( 821 self.tables.person, 822 self.tables.ball, 823 self.classes.Ball, 824 self.classes.Person) 825 826 mapper(Ball, ball, properties={ 827 'person': relationship( 828 Person, post_update=True, 829 primaryjoin=person.c.id == ball.c.person_id) 830 }) 831 mapper(Person, person) 832 833 sess = create_session(autocommit=False, expire_on_commit=True) 834 sess.add(Ball(person=Person())) 835 sess.commit() 836 b1 = sess.query(Ball).first() 837 838 # needs to be unloaded 839 assert 'person' not in b1.__dict__ 840 b1.person = None 841 842 self.assert_sql_execution( 843 testing.db, 844 sess.flush, 845 CompiledSQL( 846 "UPDATE ball SET person_id=:person_id WHERE ball.id = :ball_id", 847 lambda ctx: {'person_id': None, 'ball_id': b1.id}) 848 ) 849 850 is_(b1.person, None) 851 852 853class SelfReferentialPostUpdateTest(fixtures.MappedTest): 854 """Post_update on a single self-referential mapper. 855 856 857 """ 858 859 @classmethod 860 def define_tables(cls, metadata): 861 Table('node', metadata, 862 Column('id', Integer, primary_key=True, 863 test_needs_autoincrement=True), 864 Column('path', String(50), nullable=False), 865 Column('parent_id', Integer, 866 ForeignKey('node.id'), nullable=True), 867 Column('prev_sibling_id', Integer, 868 ForeignKey('node.id'), nullable=True), 869 Column('next_sibling_id', Integer, 870 ForeignKey('node.id'), nullable=True)) 871 872 @classmethod 873 def setup_classes(cls): 874 class Node(cls.Basic): 875 def __init__(self, path=''): 876 self.path = path 877 878 def test_one(self): 879 """Post_update only fires off when needed. 880 881 This test case used to produce many superfluous update statements, 882 particularly upon delete 883 884 """ 885 886 node, Node = self.tables.node, self.classes.Node 887 888 889 mapper(Node, node, properties={ 890 'children': relationship( 891 Node, 892 primaryjoin=node.c.id==node.c.parent_id, 893 cascade="all", 894 backref=backref("parent", remote_side=node.c.id) 895 ), 896 'prev_sibling': relationship( 897 Node, 898 primaryjoin=node.c.prev_sibling_id==node.c.id, 899 remote_side=node.c.id, 900 uselist=False), 901 'next_sibling': relationship( 902 Node, 903 primaryjoin=node.c.next_sibling_id==node.c.id, 904 remote_side=node.c.id, 905 uselist=False, 906 post_update=True)}) 907 908 session = create_session() 909 910 def append_child(parent, child): 911 if parent.children: 912 parent.children[-1].next_sibling = child 913 child.prev_sibling = parent.children[-1] 914 parent.children.append(child) 915 916 def remove_child(parent, child): 917 child.parent = None 918 node = child.next_sibling 919 node.prev_sibling = child.prev_sibling 920 child.prev_sibling.next_sibling = node 921 session.delete(child) 922 root = Node('root') 923 924 about = Node('about') 925 cats = Node('cats') 926 stories = Node('stories') 927 bruce = Node('bruce') 928 929 append_child(root, about) 930 assert(about.prev_sibling is None) 931 append_child(root, cats) 932 assert(cats.prev_sibling is about) 933 assert(cats.next_sibling is None) 934 assert(about.next_sibling is cats) 935 assert(about.prev_sibling is None) 936 append_child(root, stories) 937 append_child(root, bruce) 938 session.add(root) 939 session.flush() 940 941 remove_child(root, cats) 942 943 # pre-trigger lazy loader on 'cats' to make the test easier 944 cats.children 945 self.assert_sql_execution( 946 testing.db, 947 session.flush, 948 AllOf( 949 CompiledSQL("UPDATE node SET prev_sibling_id=:prev_sibling_id " 950 "WHERE node.id = :node_id", 951 lambda ctx:{'prev_sibling_id':about.id, 'node_id':stories.id}), 952 953 CompiledSQL("UPDATE node SET next_sibling_id=:next_sibling_id " 954 "WHERE node.id = :node_id", 955 lambda ctx:{'next_sibling_id':stories.id, 'node_id':about.id}), 956 957 CompiledSQL("UPDATE node SET next_sibling_id=:next_sibling_id " 958 "WHERE node.id = :node_id", 959 lambda ctx:{'next_sibling_id':None, 'node_id':cats.id}), 960 ), 961 962 CompiledSQL("DELETE FROM node WHERE node.id = :id", 963 lambda ctx:[{'id':cats.id}]) 964 ) 965 966 session.delete(root) 967 968 self.assert_sql_execution( 969 testing.db, 970 session.flush, 971 CompiledSQL("UPDATE node SET next_sibling_id=:next_sibling_id " 972 "WHERE node.id = :node_id", 973 lambda ctx: [ 974 {'node_id': about.id, 'next_sibling_id': None}, 975 {'node_id': stories.id, 'next_sibling_id': None} 976 ] 977 ), 978 AllOf( 979 CompiledSQL("DELETE FROM node WHERE node.id = :id", 980 lambda ctx:{'id':about.id} 981 ), 982 CompiledSQL("DELETE FROM node WHERE node.id = :id", 983 lambda ctx:{'id':stories.id} 984 ), 985 CompiledSQL("DELETE FROM node WHERE node.id = :id", 986 lambda ctx:{'id':bruce.id} 987 ), 988 ), 989 CompiledSQL("DELETE FROM node WHERE node.id = :id", 990 lambda ctx:{'id':root.id} 991 ), 992 ) 993 about = Node('about') 994 cats = Node('cats') 995 about.next_sibling = cats 996 cats.prev_sibling = about 997 session.add(about) 998 session.flush() 999 session.delete(about) 1000 cats.prev_sibling = None 1001 session.flush() 1002 1003class SelfReferentialPostUpdateTest2(fixtures.MappedTest): 1004 1005 @classmethod 1006 def define_tables(cls, metadata): 1007 Table("a_table", metadata, 1008 Column("id", Integer(), primary_key=True, test_needs_autoincrement=True), 1009 Column("fui", String(128)), 1010 Column("b", Integer(), ForeignKey("a_table.id"))) 1011 1012 @classmethod 1013 def setup_classes(cls): 1014 class A(cls.Basic): 1015 pass 1016 1017 def test_one(self): 1018 """ 1019 Test that post_update remembers to be involved in update operations as 1020 well, since it replaces the normal dependency processing completely 1021 [ticket:413] 1022 1023 """ 1024 1025 A, a_table = self.classes.A, self.tables.a_table 1026 1027 1028 mapper(A, a_table, properties={ 1029 'foo': relationship(A, 1030 remote_side=[a_table.c.id], 1031 post_update=True)}) 1032 1033 session = create_session() 1034 1035 f1 = A(fui="f1") 1036 session.add(f1) 1037 session.flush() 1038 1039 f2 = A(fui="f2", foo=f1) 1040 1041 # at this point f1 is already inserted. but we need post_update 1042 # to fire off anyway 1043 session.add(f2) 1044 session.flush() 1045 session.expunge_all() 1046 1047 f1 = session.query(A).get(f1.id) 1048 f2 = session.query(A).get(f2.id) 1049 assert f2.foo is f1 1050 1051 1052class SelfReferentialPostUpdateTest3(fixtures.MappedTest): 1053 @classmethod 1054 def define_tables(cls, metadata): 1055 Table('parent', metadata, 1056 Column('id', Integer, primary_key=True, 1057 test_needs_autoincrement=True), 1058 Column('name', String(50), nullable=False), 1059 Column('child_id', Integer, 1060 ForeignKey('child.id', name='c1'), nullable=True)) 1061 1062 Table('child', metadata, 1063 Column('id', Integer, primary_key=True, 1064 test_needs_autoincrement=True), 1065 Column('name', String(50), nullable=False), 1066 Column('child_id', Integer, 1067 ForeignKey('child.id')), 1068 Column('parent_id', Integer, 1069 ForeignKey('parent.id'), nullable=True)) 1070 1071 @classmethod 1072 def setup_classes(cls): 1073 class Parent(cls.Basic): 1074 def __init__(self, name=''): 1075 self.name = name 1076 1077 class Child(cls.Basic): 1078 def __init__(self, name=''): 1079 self.name = name 1080 1081 def test_one(self): 1082 Child, Parent, parent, child = (self.classes.Child, 1083 self.classes.Parent, 1084 self.tables.parent, 1085 self.tables.child) 1086 1087 mapper(Parent, parent, properties={ 1088 'children':relationship(Child, primaryjoin=parent.c.id==child.c.parent_id), 1089 'child':relationship(Child, primaryjoin=parent.c.child_id==child.c.id, post_update=True) 1090 }) 1091 mapper(Child, child, properties={ 1092 'parent':relationship(Child, remote_side=child.c.id) 1093 }) 1094 1095 session = create_session() 1096 p1 = Parent('p1') 1097 c1 = Child('c1') 1098 c2 = Child('c2') 1099 p1.children =[c1, c2] 1100 c2.parent = c1 1101 p1.child = c2 1102 1103 session.add_all([p1, c1, c2]) 1104 session.flush() 1105 1106 p2 = Parent('p2') 1107 c3 = Child('c3') 1108 p2.children = [c3] 1109 p2.child = c3 1110 session.add(p2) 1111 1112 session.delete(c2) 1113 p1.children.remove(c2) 1114 p1.child = None 1115 session.flush() 1116 1117 p2.child = None 1118 session.flush() 1119 1120class PostUpdateBatchingTest(fixtures.MappedTest): 1121 """test that lots of post update cols batch together into a single UPDATE.""" 1122 1123 @classmethod 1124 def define_tables(cls, metadata): 1125 Table('parent', metadata, 1126 Column('id', Integer, primary_key=True, 1127 test_needs_autoincrement=True), 1128 Column('name', String(50), nullable=False), 1129 Column('c1_id', Integer, 1130 ForeignKey('child1.id', name='c1'), nullable=True), 1131 Column('c2_id', Integer, 1132 ForeignKey('child2.id', name='c2'), nullable=True), 1133 Column('c3_id', Integer, 1134 ForeignKey('child3.id', name='c3'), nullable=True) 1135 ) 1136 1137 Table('child1', metadata, 1138 Column('id', Integer, primary_key=True, 1139 test_needs_autoincrement=True), 1140 Column('name', String(50), nullable=False), 1141 Column('parent_id', Integer, 1142 ForeignKey('parent.id'), nullable=False)) 1143 1144 Table('child2', metadata, 1145 Column('id', Integer, primary_key=True, 1146 test_needs_autoincrement=True), 1147 Column('name', String(50), nullable=False), 1148 Column('parent_id', Integer, 1149 ForeignKey('parent.id'), nullable=False)) 1150 1151 Table('child3', metadata, 1152 Column('id', Integer, primary_key=True, 1153 test_needs_autoincrement=True), 1154 Column('name', String(50), nullable=False), 1155 Column('parent_id', Integer, 1156 ForeignKey('parent.id'), nullable=False)) 1157 1158 @classmethod 1159 def setup_classes(cls): 1160 class Parent(cls.Basic): 1161 def __init__(self, name=''): 1162 self.name = name 1163 class Child1(cls.Basic): 1164 def __init__(self, name=''): 1165 self.name = name 1166 class Child2(cls.Basic): 1167 def __init__(self, name=''): 1168 self.name = name 1169 class Child3(cls.Basic): 1170 def __init__(self, name=''): 1171 self.name = name 1172 1173 def test_one(self): 1174 child1, child2, child3, Parent, parent, Child1, Child2, Child3 = (self.tables.child1, 1175 self.tables.child2, 1176 self.tables.child3, 1177 self.classes.Parent, 1178 self.tables.parent, 1179 self.classes.Child1, 1180 self.classes.Child2, 1181 self.classes.Child3) 1182 1183 mapper(Parent, parent, properties={ 1184 'c1s':relationship(Child1, primaryjoin=child1.c.parent_id==parent.c.id), 1185 'c2s':relationship(Child2, primaryjoin=child2.c.parent_id==parent.c.id), 1186 'c3s':relationship(Child3, primaryjoin=child3.c.parent_id==parent.c.id), 1187 1188 'c1':relationship(Child1, primaryjoin=child1.c.id==parent.c.c1_id, post_update=True), 1189 'c2':relationship(Child2, primaryjoin=child2.c.id==parent.c.c2_id, post_update=True), 1190 'c3':relationship(Child3, primaryjoin=child3.c.id==parent.c.c3_id, post_update=True), 1191 }) 1192 mapper(Child1, child1) 1193 mapper(Child2, child2) 1194 mapper(Child3, child3) 1195 1196 sess = create_session() 1197 1198 p1 = Parent('p1') 1199 c11, c12, c13 = Child1('c1'), Child1('c2'), Child1('c3') 1200 c21, c22, c23 = Child2('c1'), Child2('c2'), Child2('c3') 1201 c31, c32, c33 = Child3('c1'), Child3('c2'), Child3('c3') 1202 1203 p1.c1s = [c11, c12, c13] 1204 p1.c2s = [c21, c22, c23] 1205 p1.c3s = [c31, c32, c33] 1206 sess.add(p1) 1207 sess.flush() 1208 1209 p1.c1 = c12 1210 p1.c2 = c23 1211 p1.c3 = c31 1212 1213 self.assert_sql_execution( 1214 testing.db, 1215 sess.flush, 1216 CompiledSQL( 1217 "UPDATE parent SET c1_id=:c1_id, c2_id=:c2_id, c3_id=:c3_id " 1218 "WHERE parent.id = :parent_id", 1219 lambda ctx: {'c2_id': c23.id, 'parent_id': p1.id, 1220 'c1_id': c12.id, 'c3_id': c31.id} 1221 ) 1222 ) 1223 1224 p1.c1 = p1.c2 = p1.c3 = None 1225 1226 self.assert_sql_execution( 1227 testing.db, 1228 sess.flush, 1229 CompiledSQL( 1230 "UPDATE parent SET c1_id=:c1_id, c2_id=:c2_id, c3_id=:c3_id " 1231 "WHERE parent.id = :parent_id", 1232 lambda ctx: {'c2_id': None, 'parent_id': p1.id, 1233 'c1_id': None, 'c3_id': None} 1234 ) 1235 ) 1236