1"""Miscellaneous inheritance-related tests, many very old. 2These are generally tests derived from specific user issues. 3 4""" 5 6from sqlalchemy import exists 7from sqlalchemy import ForeignKey 8from sqlalchemy import func 9from sqlalchemy import Integer 10from sqlalchemy import select 11from sqlalchemy import Sequence 12from sqlalchemy import String 13from sqlalchemy import testing 14from sqlalchemy import Unicode 15from sqlalchemy import util 16from sqlalchemy.orm import class_mapper 17from sqlalchemy.orm import column_property 18from sqlalchemy.orm import contains_eager 19from sqlalchemy.orm import create_session 20from sqlalchemy.orm import join 21from sqlalchemy.orm import joinedload 22from sqlalchemy.orm import mapper 23from sqlalchemy.orm import polymorphic_union 24from sqlalchemy.orm import relationship 25from sqlalchemy.orm import Session 26from sqlalchemy.orm import with_polymorphic 27from sqlalchemy.orm.interfaces import MANYTOONE 28from sqlalchemy.testing import AssertsExecutionResults 29from sqlalchemy.testing import eq_ 30from sqlalchemy.testing import fixtures 31from sqlalchemy.testing.schema import Column 32from sqlalchemy.testing.schema import Table 33 34 35class RelationshipTest1(fixtures.MappedTest): 36 """test self-referential relationships on polymorphic mappers""" 37 38 @classmethod 39 def define_tables(cls, metadata): 40 global people, managers 41 42 people = Table( 43 "people", 44 metadata, 45 Column( 46 "person_id", 47 Integer, 48 Sequence("person_id_seq", optional=True), 49 primary_key=True, 50 ), 51 Column( 52 "manager_id", 53 Integer, 54 ForeignKey( 55 "managers.person_id", use_alter=True, name="mpid_fq" 56 ), 57 ), 58 Column("name", String(50)), 59 Column("type", String(30)), 60 ) 61 62 managers = Table( 63 "managers", 64 metadata, 65 Column( 66 "person_id", 67 Integer, 68 ForeignKey("people.person_id"), 69 primary_key=True, 70 ), 71 Column("status", String(30)), 72 Column("manager_name", String(50)), 73 ) 74 75 @classmethod 76 def setup_classes(cls): 77 class Person(cls.Comparable): 78 pass 79 80 class Manager(Person): 81 pass 82 83 def test_parent_refs_descendant(self): 84 Person, Manager = self.classes("Person", "Manager") 85 86 mapper( 87 Person, 88 people, 89 properties={ 90 "manager": relationship( 91 Manager, 92 primaryjoin=(people.c.manager_id == managers.c.person_id), 93 uselist=False, 94 post_update=True, 95 ) 96 }, 97 ) 98 mapper( 99 Manager, 100 managers, 101 inherits=Person, 102 inherit_condition=people.c.person_id == managers.c.person_id, 103 ) 104 105 eq_( 106 class_mapper(Person).get_property("manager").synchronize_pairs, 107 [(managers.c.person_id, people.c.manager_id)], 108 ) 109 110 session = create_session() 111 p = Person(name="some person") 112 m = Manager(name="some manager") 113 p.manager = m 114 session.add(p) 115 session.flush() 116 session.expunge_all() 117 118 p = session.query(Person).get(p.person_id) 119 m = session.query(Manager).get(m.person_id) 120 assert p.manager is m 121 122 def test_descendant_refs_parent(self): 123 Person, Manager = self.classes("Person", "Manager") 124 125 mapper(Person, people) 126 mapper( 127 Manager, 128 managers, 129 inherits=Person, 130 inherit_condition=people.c.person_id == managers.c.person_id, 131 properties={ 132 "employee": relationship( 133 Person, 134 primaryjoin=(people.c.manager_id == managers.c.person_id), 135 foreign_keys=[people.c.manager_id], 136 uselist=False, 137 post_update=True, 138 ) 139 }, 140 ) 141 142 session = create_session() 143 p = Person(name="some person") 144 m = Manager(name="some manager") 145 m.employee = p 146 session.add(m) 147 session.flush() 148 session.expunge_all() 149 150 p = session.query(Person).get(p.person_id) 151 m = session.query(Manager).get(m.person_id) 152 assert m.employee is p 153 154 155class RelationshipTest2(fixtures.MappedTest): 156 """test self-referential relationships on polymorphic mappers""" 157 158 @classmethod 159 def define_tables(cls, metadata): 160 global people, managers, data 161 people = Table( 162 "people", 163 metadata, 164 Column( 165 "person_id", 166 Integer, 167 primary_key=True, 168 test_needs_autoincrement=True, 169 ), 170 Column("name", String(50)), 171 Column("type", String(30)), 172 ) 173 174 managers = Table( 175 "managers", 176 metadata, 177 Column( 178 "person_id", 179 Integer, 180 ForeignKey("people.person_id"), 181 primary_key=True, 182 ), 183 Column("manager_id", Integer, ForeignKey("people.person_id")), 184 Column("status", String(30)), 185 ) 186 187 data = Table( 188 "data", 189 metadata, 190 Column( 191 "person_id", 192 Integer, 193 ForeignKey("managers.person_id"), 194 primary_key=True, 195 ), 196 Column("data", String(30)), 197 ) 198 199 @classmethod 200 def setup_classes(cls): 201 class Person(cls.Comparable): 202 pass 203 204 class Manager(Person): 205 pass 206 207 @testing.combinations( 208 ("join1",), ("join2",), ("join3",), argnames="jointype" 209 ) 210 @testing.combinations( 211 ("usedata", True), ("nodata", False), id_="ia", argnames="usedata" 212 ) 213 def test_relationshiponsubclass(self, jointype, usedata): 214 Person, Manager = self.classes("Person", "Manager") 215 if jointype == "join1": 216 poly_union = polymorphic_union( 217 { 218 "person": people.select(people.c.type == "person"), 219 "manager": join( 220 people, 221 managers, 222 people.c.person_id == managers.c.person_id, 223 ), 224 }, 225 None, 226 ) 227 polymorphic_on = poly_union.c.type 228 elif jointype == "join2": 229 poly_union = polymorphic_union( 230 { 231 "person": people.select(people.c.type == "person"), 232 "manager": managers.join( 233 people, people.c.person_id == managers.c.person_id 234 ), 235 }, 236 None, 237 ) 238 polymorphic_on = poly_union.c.type 239 elif jointype == "join3": 240 poly_union = None 241 polymorphic_on = people.c.type 242 243 if usedata: 244 245 class Data(object): 246 def __init__(self, data): 247 self.data = data 248 249 mapper(Data, data) 250 251 mapper( 252 Person, 253 people, 254 with_polymorphic=("*", poly_union), 255 polymorphic_identity="person", 256 polymorphic_on=polymorphic_on, 257 ) 258 259 if usedata: 260 mapper( 261 Manager, 262 managers, 263 inherits=Person, 264 inherit_condition=people.c.person_id == managers.c.person_id, 265 polymorphic_identity="manager", 266 properties={ 267 "colleague": relationship( 268 Person, 269 primaryjoin=managers.c.manager_id 270 == people.c.person_id, 271 lazy="select", 272 uselist=False, 273 ), 274 "data": relationship(Data, uselist=False), 275 }, 276 ) 277 else: 278 mapper( 279 Manager, 280 managers, 281 inherits=Person, 282 inherit_condition=people.c.person_id == managers.c.person_id, 283 polymorphic_identity="manager", 284 properties={ 285 "colleague": relationship( 286 Person, 287 primaryjoin=managers.c.manager_id 288 == people.c.person_id, 289 lazy="select", 290 uselist=False, 291 ) 292 }, 293 ) 294 295 sess = create_session() 296 p = Person(name="person1") 297 m = Manager(name="manager1") 298 m.colleague = p 299 if usedata: 300 m.data = Data("ms data") 301 sess.add(m) 302 sess.flush() 303 304 sess.expunge_all() 305 p = sess.query(Person).get(p.person_id) 306 m = sess.query(Manager).get(m.person_id) 307 assert m.colleague is p 308 if usedata: 309 assert m.data.data == "ms data" 310 311 312class RelationshipTest3(fixtures.MappedTest): 313 """test self-referential relationships on polymorphic mappers""" 314 315 @classmethod 316 def define_tables(cls, metadata): 317 global people, managers, data 318 people = Table( 319 "people", 320 metadata, 321 Column( 322 "person_id", 323 Integer, 324 primary_key=True, 325 test_needs_autoincrement=True, 326 ), 327 Column("colleague_id", Integer, ForeignKey("people.person_id")), 328 Column("name", String(50)), 329 Column("type", String(30)), 330 ) 331 332 managers = Table( 333 "managers", 334 metadata, 335 Column( 336 "person_id", 337 Integer, 338 ForeignKey("people.person_id"), 339 primary_key=True, 340 ), 341 Column("status", String(30)), 342 ) 343 344 data = Table( 345 "data", 346 metadata, 347 Column( 348 "person_id", 349 Integer, 350 ForeignKey("people.person_id"), 351 primary_key=True, 352 ), 353 Column("data", String(30)), 354 ) 355 356 @classmethod 357 def setup_classes(cls): 358 class Person(cls.Comparable): 359 pass 360 361 class Manager(Person): 362 pass 363 364 class Data(cls.Comparable): 365 def __init__(self, data): 366 self.data = data 367 368 def _setup_mappings(self, jointype, usedata): 369 Person, Manager, Data = self.classes("Person", "Manager", "Data") 370 if jointype == "join1": 371 poly_union = polymorphic_union( 372 { 373 "manager": managers.join( 374 people, people.c.person_id == managers.c.person_id 375 ), 376 "person": people.select(people.c.type == "person"), 377 }, 378 None, 379 ) 380 elif jointype == "join2": 381 poly_union = polymorphic_union( 382 { 383 "manager": join( 384 people, 385 managers, 386 people.c.person_id == managers.c.person_id, 387 ), 388 "person": people.select(people.c.type == "person"), 389 }, 390 None, 391 ) 392 elif jointype == "join3": 393 poly_union = people.outerjoin(managers) 394 elif jointype == "join4": 395 poly_union = None 396 else: 397 assert False 398 399 if usedata: 400 mapper(Data, data) 401 402 if usedata: 403 mapper( 404 Person, 405 people, 406 with_polymorphic=("*", poly_union), 407 polymorphic_identity="person", 408 polymorphic_on=people.c.type, 409 properties={ 410 "colleagues": relationship( 411 Person, 412 primaryjoin=people.c.colleague_id 413 == people.c.person_id, 414 remote_side=people.c.colleague_id, 415 uselist=True, 416 ), 417 "data": relationship(Data, uselist=False), 418 }, 419 ) 420 else: 421 mapper( 422 Person, 423 people, 424 with_polymorphic=("*", poly_union), 425 polymorphic_identity="person", 426 polymorphic_on=people.c.type, 427 properties={ 428 "colleagues": relationship( 429 Person, 430 primaryjoin=people.c.colleague_id 431 == people.c.person_id, 432 remote_side=people.c.colleague_id, 433 uselist=True, 434 ) 435 }, 436 ) 437 438 mapper( 439 Manager, 440 managers, 441 inherits=Person, 442 inherit_condition=people.c.person_id == managers.c.person_id, 443 polymorphic_identity="manager", 444 ) 445 446 @testing.combinations( 447 ("join1",), ("join2",), ("join3",), ("join4",), argnames="jointype" 448 ) 449 @testing.combinations( 450 ("usedata", True), ("nodata", False), id_="ia", argnames="usedata" 451 ) 452 def test_relationship_on_base_class(self, jointype, usedata): 453 self._setup_mappings(jointype, usedata) 454 Person, Manager, Data = self.classes("Person", "Manager", "Data") 455 456 sess = create_session() 457 p = Person(name="person1") 458 p2 = Person(name="person2") 459 p3 = Person(name="person3") 460 m = Manager(name="manager1") 461 p.colleagues.append(p2) 462 m.colleagues.append(p3) 463 if usedata: 464 p.data = Data("ps data") 465 m.data = Data("ms data") 466 467 sess.add(m) 468 sess.add(p) 469 sess.flush() 470 471 sess.expunge_all() 472 p = sess.query(Person).get(p.person_id) 473 p2 = sess.query(Person).get(p2.person_id) 474 p3 = sess.query(Person).get(p3.person_id) 475 m = sess.query(Person).get(m.person_id) 476 assert len(p.colleagues) == 1 477 assert p.colleagues == [p2] 478 assert m.colleagues == [p3] 479 if usedata: 480 assert p.data.data == "ps data" 481 assert m.data.data == "ms data" 482 483 484class RelationshipTest4(fixtures.MappedTest): 485 @classmethod 486 def define_tables(cls, metadata): 487 global people, engineers, managers, cars 488 people = Table( 489 "people", 490 metadata, 491 Column( 492 "person_id", 493 Integer, 494 primary_key=True, 495 test_needs_autoincrement=True, 496 ), 497 Column("name", String(50)), 498 ) 499 500 engineers = Table( 501 "engineers", 502 metadata, 503 Column( 504 "person_id", 505 Integer, 506 ForeignKey("people.person_id"), 507 primary_key=True, 508 ), 509 Column("status", String(30)), 510 ) 511 512 managers = Table( 513 "managers", 514 metadata, 515 Column( 516 "person_id", 517 Integer, 518 ForeignKey("people.person_id"), 519 primary_key=True, 520 ), 521 Column("longer_status", String(70)), 522 ) 523 524 cars = Table( 525 "cars", 526 metadata, 527 Column( 528 "car_id", 529 Integer, 530 primary_key=True, 531 test_needs_autoincrement=True, 532 ), 533 Column("owner", Integer, ForeignKey("people.person_id")), 534 ) 535 536 def test_many_to_one_polymorphic(self): 537 """in this test, the polymorphic union is between two subclasses, but 538 does not include the base table by itself in the union. however, the 539 primaryjoin condition is going to be against the base table, and its a 540 many-to-one relationship (unlike the test in polymorph.py) so the 541 column in the base table is explicit. Can the ClauseAdapter figure out 542 how to alias the primaryjoin to the polymorphic union ?""" 543 544 # class definitions 545 class Person(object): 546 def __init__(self, **kwargs): 547 for key, value in kwargs.items(): 548 setattr(self, key, value) 549 550 def __repr__(self): 551 return "Ordinary person %s" % self.name 552 553 class Engineer(Person): 554 def __repr__(self): 555 return "Engineer %s, status %s" % (self.name, self.status) 556 557 class Manager(Person): 558 def __repr__(self): 559 return "Manager %s, status %s" % ( 560 self.name, 561 self.longer_status, 562 ) 563 564 class Car(object): 565 def __init__(self, **kwargs): 566 for key, value in kwargs.items(): 567 setattr(self, key, value) 568 569 def __repr__(self): 570 return "Car number %d" % self.car_id 571 572 # create a union that represents both types of joins. 573 employee_join = polymorphic_union( 574 { 575 "engineer": people.join(engineers), 576 "manager": people.join(managers), 577 }, 578 "type", 579 "employee_join", 580 ) 581 582 person_mapper = mapper( 583 Person, 584 people, 585 with_polymorphic=("*", employee_join), 586 polymorphic_on=employee_join.c.type, 587 polymorphic_identity="person", 588 ) 589 mapper( 590 Engineer, 591 engineers, 592 inherits=person_mapper, 593 polymorphic_identity="engineer", 594 ) 595 mapper( 596 Manager, 597 managers, 598 inherits=person_mapper, 599 polymorphic_identity="manager", 600 ) 601 mapper(Car, cars, properties={"employee": relationship(person_mapper)}) 602 603 session = create_session() 604 605 # creating 5 managers named from M1 to E5 606 for i in range(1, 5): 607 session.add(Manager(name="M%d" % i, longer_status="YYYYYYYYY")) 608 # creating 5 engineers named from E1 to E5 609 for i in range(1, 5): 610 session.add(Engineer(name="E%d" % i, status="X")) 611 612 session.flush() 613 614 engineer4 = ( 615 session.query(Engineer).filter(Engineer.name == "E4").first() 616 ) 617 manager3 = session.query(Manager).filter(Manager.name == "M3").first() 618 619 car1 = Car(employee=engineer4) 620 session.add(car1) 621 car2 = Car(employee=manager3) 622 session.add(car2) 623 session.flush() 624 625 session.expunge_all() 626 627 def go(): 628 testcar = ( 629 session.query(Car) 630 .options(joinedload("employee")) 631 .get(car1.car_id) 632 ) 633 assert str(testcar.employee) == "Engineer E4, status X" 634 635 self.assert_sql_count(testing.db, go, 1) 636 637 car1 = session.query(Car).get(car1.car_id) 638 usingGet = session.query(person_mapper).get(car1.owner) 639 usingProperty = car1.employee 640 641 assert str(engineer4) == "Engineer E4, status X" 642 assert str(usingGet) == "Engineer E4, status X" 643 assert str(usingProperty) == "Engineer E4, status X" 644 645 session.expunge_all() 646 # and now for the lightning round, eager ! 647 648 def go(): 649 testcar = ( 650 session.query(Car) 651 .options(joinedload("employee")) 652 .get(car1.car_id) 653 ) 654 assert str(testcar.employee) == "Engineer E4, status X" 655 656 self.assert_sql_count(testing.db, go, 1) 657 658 session.expunge_all() 659 s = session.query(Car) 660 c = s.join("employee").filter(Person.name == "E4")[0] 661 assert c.car_id == car1.car_id 662 663 664class RelationshipTest5(fixtures.MappedTest): 665 @classmethod 666 def define_tables(cls, metadata): 667 global people, engineers, managers, cars 668 people = Table( 669 "people", 670 metadata, 671 Column( 672 "person_id", 673 Integer, 674 primary_key=True, 675 test_needs_autoincrement=True, 676 ), 677 Column("name", String(50)), 678 Column("type", String(50)), 679 ) 680 681 engineers = Table( 682 "engineers", 683 metadata, 684 Column( 685 "person_id", 686 Integer, 687 ForeignKey("people.person_id"), 688 primary_key=True, 689 ), 690 Column("status", String(30)), 691 ) 692 693 managers = Table( 694 "managers", 695 metadata, 696 Column( 697 "person_id", 698 Integer, 699 ForeignKey("people.person_id"), 700 primary_key=True, 701 ), 702 Column("longer_status", String(70)), 703 ) 704 705 cars = Table( 706 "cars", 707 metadata, 708 Column( 709 "car_id", 710 Integer, 711 primary_key=True, 712 test_needs_autoincrement=True, 713 ), 714 Column("owner", Integer, ForeignKey("people.person_id")), 715 ) 716 717 def test_eager_empty(self): 718 """test parent object with child relationship to an inheriting mapper, 719 using eager loads, works when there are no child objects present""" 720 721 class Person(object): 722 def __init__(self, **kwargs): 723 for key, value in kwargs.items(): 724 setattr(self, key, value) 725 726 def __repr__(self): 727 return "Ordinary person %s" % self.name 728 729 class Engineer(Person): 730 def __repr__(self): 731 return "Engineer %s, status %s" % (self.name, self.status) 732 733 class Manager(Person): 734 def __repr__(self): 735 return "Manager %s, status %s" % ( 736 self.name, 737 self.longer_status, 738 ) 739 740 class Car(object): 741 def __init__(self, **kwargs): 742 for key, value in kwargs.items(): 743 setattr(self, key, value) 744 745 def __repr__(self): 746 return "Car number %d" % self.car_id 747 748 person_mapper = mapper( 749 Person, 750 people, 751 polymorphic_on=people.c.type, 752 polymorphic_identity="person", 753 ) 754 mapper( 755 Engineer, 756 engineers, 757 inherits=person_mapper, 758 polymorphic_identity="engineer", 759 ) 760 manager_mapper = mapper( 761 Manager, 762 managers, 763 inherits=person_mapper, 764 polymorphic_identity="manager", 765 ) 766 mapper( 767 Car, 768 cars, 769 properties={ 770 "manager": relationship(manager_mapper, lazy="joined") 771 }, 772 ) 773 774 sess = create_session() 775 car1 = Car() 776 car2 = Car() 777 car2.manager = Manager() 778 sess.add(car1) 779 sess.add(car2) 780 sess.flush() 781 sess.expunge_all() 782 783 carlist = sess.query(Car).all() 784 assert carlist[0].manager is None 785 assert carlist[1].manager.person_id == car2.manager.person_id 786 787 788class RelationshipTest6(fixtures.MappedTest): 789 """test self-referential relationships on a single joined-table 790 inheritance mapper""" 791 792 @classmethod 793 def define_tables(cls, metadata): 794 global people, managers, data 795 people = Table( 796 "people", 797 metadata, 798 Column( 799 "person_id", 800 Integer, 801 primary_key=True, 802 test_needs_autoincrement=True, 803 ), 804 Column("name", String(50)), 805 ) 806 807 managers = Table( 808 "managers", 809 metadata, 810 Column( 811 "person_id", 812 Integer, 813 ForeignKey("people.person_id"), 814 primary_key=True, 815 ), 816 Column("colleague_id", Integer, ForeignKey("managers.person_id")), 817 Column("status", String(30)), 818 ) 819 820 @classmethod 821 def setup_classes(cls): 822 class Person(cls.Comparable): 823 pass 824 825 class Manager(Person): 826 pass 827 828 def test_basic(self): 829 Person, Manager = self.classes("Person", "Manager") 830 831 mapper(Person, people) 832 833 mapper( 834 Manager, 835 managers, 836 inherits=Person, 837 inherit_condition=people.c.person_id == managers.c.person_id, 838 properties={ 839 "colleague": relationship( 840 Manager, 841 primaryjoin=managers.c.colleague_id 842 == managers.c.person_id, 843 lazy="select", 844 uselist=False, 845 ) 846 }, 847 ) 848 849 sess = create_session() 850 m = Manager(name="manager1") 851 m2 = Manager(name="manager2") 852 m.colleague = m2 853 sess.add(m) 854 sess.flush() 855 856 sess.expunge_all() 857 m = sess.query(Manager).get(m.person_id) 858 m2 = sess.query(Manager).get(m2.person_id) 859 assert m.colleague is m2 860 861 862class RelationshipTest7(fixtures.MappedTest): 863 @classmethod 864 def define_tables(cls, metadata): 865 global people, engineers, managers, cars, offroad_cars 866 cars = Table( 867 "cars", 868 metadata, 869 Column( 870 "car_id", 871 Integer, 872 primary_key=True, 873 test_needs_autoincrement=True, 874 ), 875 Column("name", String(30)), 876 ) 877 878 offroad_cars = Table( 879 "offroad_cars", 880 metadata, 881 Column( 882 "car_id", 883 Integer, 884 ForeignKey("cars.car_id"), 885 nullable=False, 886 primary_key=True, 887 ), 888 ) 889 890 people = Table( 891 "people", 892 metadata, 893 Column( 894 "person_id", 895 Integer, 896 primary_key=True, 897 test_needs_autoincrement=True, 898 ), 899 Column( 900 "car_id", Integer, ForeignKey("cars.car_id"), nullable=False 901 ), 902 Column("name", String(50)), 903 ) 904 905 engineers = Table( 906 "engineers", 907 metadata, 908 Column( 909 "person_id", 910 Integer, 911 ForeignKey("people.person_id"), 912 primary_key=True, 913 ), 914 Column("field", String(30)), 915 ) 916 917 managers = Table( 918 "managers", 919 metadata, 920 Column( 921 "person_id", 922 Integer, 923 ForeignKey("people.person_id"), 924 primary_key=True, 925 ), 926 Column("category", String(70)), 927 ) 928 929 def test_manytoone_lazyload(self): 930 """test that lazy load clause to a polymorphic child mapper generates 931 correctly [ticket:493]""" 932 933 class PersistentObject(object): 934 def __init__(self, **kwargs): 935 for key, value in kwargs.items(): 936 setattr(self, key, value) 937 938 class Status(PersistentObject): 939 def __repr__(self): 940 return "Status %s" % self.name 941 942 class Person(PersistentObject): 943 def __repr__(self): 944 return "Ordinary person %s" % self.name 945 946 class Engineer(Person): 947 def __repr__(self): 948 return "Engineer %s, field %s" % (self.name, self.field) 949 950 class Manager(Person): 951 def __repr__(self): 952 return "Manager %s, category %s" % (self.name, self.category) 953 954 class Car(PersistentObject): 955 def __repr__(self): 956 return "Car number %d, name %s" % (self.car_id, self.name) 957 958 class Offraod_Car(Car): 959 def __repr__(self): 960 return "Offroad Car number %d, name %s" % ( 961 self.car_id, 962 self.name, 963 ) 964 965 employee_join = polymorphic_union( 966 { 967 "engineer": people.join(engineers), 968 "manager": people.join(managers), 969 }, 970 "type", 971 "employee_join", 972 ) 973 974 car_join = polymorphic_union( 975 { 976 "car": cars.outerjoin(offroad_cars) 977 .select(offroad_cars.c.car_id == None) 978 .reduce_columns(), # noqa 979 "offroad": cars.join(offroad_cars), 980 }, 981 "type", 982 "car_join", 983 ) 984 985 car_mapper = mapper( 986 Car, 987 cars, 988 with_polymorphic=("*", car_join), 989 polymorphic_on=car_join.c.type, 990 polymorphic_identity="car", 991 ) 992 mapper( 993 Offraod_Car, 994 offroad_cars, 995 inherits=car_mapper, 996 polymorphic_identity="offroad", 997 ) 998 person_mapper = mapper( 999 Person, 1000 people, 1001 with_polymorphic=("*", employee_join), 1002 polymorphic_on=employee_join.c.type, 1003 polymorphic_identity="person", 1004 properties={"car": relationship(car_mapper)}, 1005 ) 1006 mapper( 1007 Engineer, 1008 engineers, 1009 inherits=person_mapper, 1010 polymorphic_identity="engineer", 1011 ) 1012 mapper( 1013 Manager, 1014 managers, 1015 inherits=person_mapper, 1016 polymorphic_identity="manager", 1017 ) 1018 1019 session = create_session() 1020 1021 for i in range(1, 4): 1022 if i % 2: 1023 car = Car() 1024 else: 1025 car = Offraod_Car() 1026 session.add(Manager(name="M%d" % i, category="YYYYYYYYY", car=car)) 1027 session.add(Engineer(name="E%d" % i, field="X", car=car)) 1028 session.flush() 1029 session.expunge_all() 1030 1031 r = session.query(Person).all() 1032 for p in r: 1033 assert p.car_id == p.car.car_id 1034 1035 1036class RelationshipTest8(fixtures.MappedTest): 1037 @classmethod 1038 def define_tables(cls, metadata): 1039 global taggable, users 1040 taggable = Table( 1041 "taggable", 1042 metadata, 1043 Column( 1044 "id", Integer, primary_key=True, test_needs_autoincrement=True 1045 ), 1046 Column("type", String(30)), 1047 Column("owner_id", Integer, ForeignKey("taggable.id")), 1048 ) 1049 users = Table( 1050 "users", 1051 metadata, 1052 Column("id", Integer, ForeignKey("taggable.id"), primary_key=True), 1053 Column("data", String(50)), 1054 ) 1055 1056 def test_selfref_onjoined(self): 1057 class Taggable(fixtures.ComparableEntity): 1058 pass 1059 1060 class User(Taggable): 1061 pass 1062 1063 mapper( 1064 Taggable, 1065 taggable, 1066 polymorphic_on=taggable.c.type, 1067 polymorphic_identity="taggable", 1068 properties={ 1069 "owner": relationship( 1070 User, 1071 primaryjoin=taggable.c.owner_id == taggable.c.id, 1072 remote_side=taggable.c.id, 1073 ) 1074 }, 1075 ) 1076 1077 mapper( 1078 User, 1079 users, 1080 inherits=Taggable, 1081 polymorphic_identity="user", 1082 inherit_condition=users.c.id == taggable.c.id, 1083 ) 1084 1085 u1 = User(data="u1") 1086 t1 = Taggable(owner=u1) 1087 sess = create_session() 1088 sess.add(t1) 1089 sess.flush() 1090 1091 sess.expunge_all() 1092 eq_( 1093 sess.query(Taggable).order_by(Taggable.id).all(), 1094 [User(data="u1"), Taggable(owner=User(data="u1"))], 1095 ) 1096 1097 1098class GenerativeTest(fixtures.MappedTest, AssertsExecutionResults): 1099 @classmethod 1100 def define_tables(cls, metadata): 1101 # cars---owned by--- people (abstract) --- has a --- status 1102 # | ^ ^ | 1103 # | | | | 1104 # | engineers managers | 1105 # | | 1106 # +--------------------------------------- has a ------+ 1107 1108 # table definitions 1109 Table( 1110 "status", 1111 metadata, 1112 Column( 1113 "status_id", 1114 Integer, 1115 primary_key=True, 1116 test_needs_autoincrement=True, 1117 ), 1118 Column("name", String(20)), 1119 ) 1120 1121 Table( 1122 "people", 1123 metadata, 1124 Column( 1125 "person_id", 1126 Integer, 1127 primary_key=True, 1128 test_needs_autoincrement=True, 1129 ), 1130 Column( 1131 "status_id", 1132 Integer, 1133 ForeignKey("status.status_id"), 1134 nullable=False, 1135 ), 1136 Column("name", String(50)), 1137 ) 1138 1139 Table( 1140 "engineers", 1141 metadata, 1142 Column( 1143 "person_id", 1144 Integer, 1145 ForeignKey("people.person_id"), 1146 primary_key=True, 1147 ), 1148 Column("field", String(30)), 1149 ) 1150 1151 Table( 1152 "managers", 1153 metadata, 1154 Column( 1155 "person_id", 1156 Integer, 1157 ForeignKey("people.person_id"), 1158 primary_key=True, 1159 ), 1160 Column("category", String(70)), 1161 ) 1162 1163 Table( 1164 "cars", 1165 metadata, 1166 Column( 1167 "car_id", 1168 Integer, 1169 primary_key=True, 1170 test_needs_autoincrement=True, 1171 ), 1172 Column( 1173 "status_id", 1174 Integer, 1175 ForeignKey("status.status_id"), 1176 nullable=False, 1177 ), 1178 Column( 1179 "owner", 1180 Integer, 1181 ForeignKey("people.person_id"), 1182 nullable=False, 1183 ), 1184 ) 1185 1186 @classmethod 1187 def setup_classes(cls): 1188 class Status(cls.Comparable): 1189 pass 1190 1191 class Person(cls.Comparable): 1192 pass 1193 1194 class Engineer(Person): 1195 pass 1196 1197 class Manager(Person): 1198 pass 1199 1200 class Car(cls.Comparable): 1201 pass 1202 1203 @classmethod 1204 def setup_mappers(cls): 1205 status, people, engineers, managers, cars = cls.tables( 1206 "status", "people", "engineers", "managers", "cars" 1207 ) 1208 Status, Person, Engineer, Manager, Car = cls.classes( 1209 "Status", "Person", "Engineer", "Manager", "Car" 1210 ) 1211 # create a union that represents both types of joins. 1212 employee_join = polymorphic_union( 1213 { 1214 "engineer": people.join(engineers), 1215 "manager": people.join(managers), 1216 }, 1217 "type", 1218 "employee_join", 1219 ) 1220 1221 status_mapper = mapper(Status, status) 1222 person_mapper = mapper( 1223 Person, 1224 people, 1225 with_polymorphic=("*", employee_join), 1226 polymorphic_on=employee_join.c.type, 1227 polymorphic_identity="person", 1228 properties={"status": relationship(status_mapper)}, 1229 ) 1230 mapper( 1231 Engineer, 1232 engineers, 1233 inherits=person_mapper, 1234 polymorphic_identity="engineer", 1235 ) 1236 mapper( 1237 Manager, 1238 managers, 1239 inherits=person_mapper, 1240 polymorphic_identity="manager", 1241 ) 1242 mapper( 1243 Car, 1244 cars, 1245 properties={ 1246 "employee": relationship(person_mapper), 1247 "status": relationship(status_mapper), 1248 }, 1249 ) 1250 1251 @classmethod 1252 def insert_data(cls, connection): 1253 Status, Person, Engineer, Manager, Car = cls.classes( 1254 "Status", "Person", "Engineer", "Manager", "Car" 1255 ) 1256 session = create_session(connection) 1257 1258 active = Status(name="active") 1259 dead = Status(name="dead") 1260 1261 session.add(active) 1262 session.add(dead) 1263 session.flush() 1264 1265 # TODO: we haven't created assertions for all 1266 # the data combinations created here 1267 1268 # creating 5 managers named from M1 to M5 1269 # and 5 engineers named from E1 to E5 1270 # M4, M5, E4 and E5 are dead 1271 for i in range(1, 5): 1272 if i < 4: 1273 st = active 1274 else: 1275 st = dead 1276 session.add( 1277 Manager(name="M%d" % i, category="YYYYYYYYY", status=st) 1278 ) 1279 session.add(Engineer(name="E%d" % i, field="X", status=st)) 1280 1281 session.flush() 1282 1283 # get E4 1284 engineer4 = session.query(Engineer).filter_by(name="E4").one() 1285 1286 # create 2 cars for E4, one active and one dead 1287 car1 = Car(employee=engineer4, status=active) 1288 car2 = Car(employee=engineer4, status=dead) 1289 session.add(car1) 1290 session.add(car2) 1291 session.flush() 1292 1293 def test_join_to_q_person(self): 1294 Status, Person, Engineer, Manager, Car = self.classes( 1295 "Status", "Person", "Engineer", "Manager", "Car" 1296 ) 1297 session = create_session() 1298 1299 r = ( 1300 session.query(Person) 1301 .filter(Person.name.like("%2")) 1302 .join("status") 1303 .filter_by(name="active") 1304 .order_by(Person.person_id) 1305 ) 1306 eq_( 1307 list(r), 1308 [ 1309 Manager( 1310 name="M2", 1311 category="YYYYYYYYY", 1312 status=Status(name="active"), 1313 ), 1314 Engineer(name="E2", field="X", status=Status(name="active")), 1315 ], 1316 ) 1317 1318 def test_join_to_q_engineer(self): 1319 Status, Person, Engineer, Manager, Car = self.classes( 1320 "Status", "Person", "Engineer", "Manager", "Car" 1321 ) 1322 session = create_session() 1323 r = ( 1324 session.query(Engineer) 1325 .join("status") 1326 .filter( 1327 Person.name.in_(["E2", "E3", "E4", "M4", "M2", "M1"]) 1328 & (Status.name == "active") 1329 ) 1330 .order_by(Person.name) 1331 ) 1332 eq_( 1333 list(r), 1334 [ 1335 Engineer(name="E2", field="X", status=Status(name="active")), 1336 Engineer(name="E3", field="X", status=Status(name="active")), 1337 ], 1338 ) 1339 1340 def test_join_to_q_person_car(self): 1341 Status, Person, Engineer, Manager, Car = self.classes( 1342 "Status", "Person", "Engineer", "Manager", "Car" 1343 ) 1344 session = create_session() 1345 r = session.query(Person).filter( 1346 exists([1], Car.owner == Person.person_id) 1347 ) 1348 1349 eq_( 1350 list(r), 1351 [Engineer(name="E4", field="X", status=Status(name="dead"))], 1352 ) 1353 1354 1355class MultiLevelTest(fixtures.MappedTest): 1356 @classmethod 1357 def define_tables(cls, metadata): 1358 global table_Employee, table_Engineer, table_Manager 1359 table_Employee = Table( 1360 "Employee", 1361 metadata, 1362 Column("name", type_=String(100)), 1363 Column( 1364 "id", 1365 primary_key=True, 1366 type_=Integer, 1367 test_needs_autoincrement=True, 1368 ), 1369 Column("atype", type_=String(100)), 1370 ) 1371 1372 table_Engineer = Table( 1373 "Engineer", 1374 metadata, 1375 Column("machine", type_=String(100)), 1376 Column("id", Integer, ForeignKey("Employee.id"), primary_key=True), 1377 ) 1378 1379 table_Manager = Table( 1380 "Manager", 1381 metadata, 1382 Column("duties", type_=String(100)), 1383 Column("id", Integer, ForeignKey("Engineer.id"), primary_key=True), 1384 ) 1385 1386 def test_threelevels(self): 1387 class Employee(object): 1388 def set(me, **kargs): 1389 for k, v in kargs.items(): 1390 setattr(me, k, v) 1391 return me 1392 1393 def __str__(me): 1394 return str(me.__class__.__name__) + ":" + str(me.name) 1395 1396 __repr__ = __str__ 1397 1398 class Engineer(Employee): 1399 pass 1400 1401 class Manager(Engineer): 1402 pass 1403 1404 pu_Employee = polymorphic_union( 1405 { 1406 "Manager": table_Employee.join(table_Engineer).join( 1407 table_Manager 1408 ), 1409 "Engineer": select( 1410 [table_Employee, table_Engineer.c.machine], 1411 table_Employee.c.atype == "Engineer", 1412 from_obj=[table_Employee.join(table_Engineer)], 1413 ), 1414 "Employee": table_Employee.select( 1415 table_Employee.c.atype == "Employee" 1416 ), 1417 }, 1418 None, 1419 "pu_employee", 1420 ) 1421 1422 mapper_Employee = mapper( 1423 Employee, 1424 table_Employee, 1425 polymorphic_identity="Employee", 1426 polymorphic_on=pu_Employee.c.atype, 1427 with_polymorphic=("*", pu_Employee), 1428 ) 1429 1430 pu_Engineer = polymorphic_union( 1431 { 1432 "Manager": table_Employee.join(table_Engineer).join( 1433 table_Manager 1434 ), 1435 "Engineer": select( 1436 [table_Employee, table_Engineer.c.machine], 1437 table_Employee.c.atype == "Engineer", 1438 from_obj=[table_Employee.join(table_Engineer)], 1439 ), 1440 }, 1441 None, 1442 "pu_engineer", 1443 ) 1444 mapper_Engineer = mapper( 1445 Engineer, 1446 table_Engineer, 1447 inherit_condition=table_Engineer.c.id == table_Employee.c.id, 1448 inherits=mapper_Employee, 1449 polymorphic_identity="Engineer", 1450 polymorphic_on=pu_Engineer.c.atype, 1451 with_polymorphic=("*", pu_Engineer), 1452 ) 1453 1454 mapper( 1455 Manager, 1456 table_Manager, 1457 inherit_condition=table_Manager.c.id == table_Engineer.c.id, 1458 inherits=mapper_Engineer, 1459 polymorphic_identity="Manager", 1460 ) 1461 1462 a = Employee().set(name="one") 1463 b = Engineer().set(egn="two", machine="any") 1464 c = Manager().set(name="head", machine="fast", duties="many") 1465 1466 session = create_session() 1467 session.add(a) 1468 session.add(b) 1469 session.add(c) 1470 session.flush() 1471 assert set(session.query(Employee).all()) == set([a, b, c]) 1472 assert set(session.query(Engineer).all()) == set([b, c]) 1473 assert session.query(Manager).all() == [c] 1474 1475 1476class ManyToManyPolyTest(fixtures.MappedTest): 1477 @classmethod 1478 def define_tables(cls, metadata): 1479 global base_item_table, item_table 1480 global base_item_collection_table, collection_table 1481 base_item_table = Table( 1482 "base_item", 1483 metadata, 1484 Column( 1485 "id", Integer, primary_key=True, test_needs_autoincrement=True 1486 ), 1487 Column("child_name", String(255), default=None), 1488 ) 1489 1490 item_table = Table( 1491 "item", 1492 metadata, 1493 Column( 1494 "id", Integer, ForeignKey("base_item.id"), primary_key=True 1495 ), 1496 Column("dummy", Integer, default=0), 1497 ) 1498 1499 base_item_collection_table = Table( 1500 "base_item_collection", 1501 metadata, 1502 Column("item_id", Integer, ForeignKey("base_item.id")), 1503 Column("collection_id", Integer, ForeignKey("collection.id")), 1504 ) 1505 1506 collection_table = Table( 1507 "collection", 1508 metadata, 1509 Column( 1510 "id", Integer, primary_key=True, test_needs_autoincrement=True 1511 ), 1512 Column("name", Unicode(255)), 1513 ) 1514 1515 def test_pjoin_compile(self): 1516 """test that remote_side columns in the secondary join table 1517 aren't attempted to be matched to the target polymorphic 1518 selectable""" 1519 1520 class BaseItem(object): 1521 pass 1522 1523 class Item(BaseItem): 1524 pass 1525 1526 class Collection(object): 1527 pass 1528 1529 item_join = polymorphic_union( 1530 { 1531 "BaseItem": base_item_table.select( 1532 base_item_table.c.child_name == "BaseItem" 1533 ), 1534 "Item": base_item_table.join(item_table), 1535 }, 1536 None, 1537 "item_join", 1538 ) 1539 1540 mapper( 1541 BaseItem, 1542 base_item_table, 1543 with_polymorphic=("*", item_join), 1544 polymorphic_on=base_item_table.c.child_name, 1545 polymorphic_identity="BaseItem", 1546 properties=dict( 1547 collections=relationship( 1548 Collection, 1549 secondary=base_item_collection_table, 1550 backref="items", 1551 ) 1552 ), 1553 ) 1554 1555 mapper( 1556 Item, item_table, inherits=BaseItem, polymorphic_identity="Item" 1557 ) 1558 1559 mapper(Collection, collection_table) 1560 1561 class_mapper(BaseItem) 1562 1563 1564class CustomPKTest(fixtures.MappedTest): 1565 @classmethod 1566 def define_tables(cls, metadata): 1567 global t1, t2 1568 t1 = Table( 1569 "t1", 1570 metadata, 1571 Column( 1572 "id", Integer, primary_key=True, test_needs_autoincrement=True 1573 ), 1574 Column("type", String(30), nullable=False), 1575 Column("data", String(30)), 1576 ) 1577 # note that the primary key column in t2 is named differently 1578 t2 = Table( 1579 "t2", 1580 metadata, 1581 Column("t2id", Integer, ForeignKey("t1.id"), primary_key=True), 1582 Column("t2data", String(30)), 1583 ) 1584 1585 def test_custompk(self): 1586 """test that the primary_key attribute is propagated to the 1587 polymorphic mapper""" 1588 1589 class T1(object): 1590 pass 1591 1592 class T2(T1): 1593 pass 1594 1595 # create a polymorphic union with the select against the base table 1596 # first. with the join being second, the alias of the union will 1597 # pick up two "primary key" columns. technically the alias should have 1598 # a 2-col pk in any case but the leading select has a NULL for the 1599 # "t2id" column 1600 d = util.OrderedDict() 1601 d["t1"] = t1.select(t1.c.type == "t1") 1602 d["t2"] = t1.join(t2) 1603 pjoin = polymorphic_union(d, None, "pjoin") 1604 1605 mapper( 1606 T1, 1607 t1, 1608 polymorphic_on=t1.c.type, 1609 polymorphic_identity="t1", 1610 with_polymorphic=("*", pjoin), 1611 primary_key=[pjoin.c.id], 1612 ) 1613 mapper(T2, t2, inherits=T1, polymorphic_identity="t2") 1614 ot1 = T1() 1615 ot2 = T2() 1616 sess = create_session() 1617 sess.add(ot1) 1618 sess.add(ot2) 1619 sess.flush() 1620 sess.expunge_all() 1621 1622 # query using get(), using only one value. 1623 # this requires the select_table mapper 1624 # has the same single-col primary key. 1625 assert sess.query(T1).get(ot1.id).id == ot1.id 1626 1627 ot1 = sess.query(T1).get(ot1.id) 1628 ot1.data = "hi" 1629 sess.flush() 1630 1631 def test_pk_collapses(self): 1632 """test that a composite primary key attribute formed by a join 1633 is "collapsed" into its minimal columns""" 1634 1635 class T1(object): 1636 pass 1637 1638 class T2(T1): 1639 pass 1640 1641 # create a polymorphic union with the select against the base table 1642 # first. with the join being second, the alias of the union will 1643 # pick up two "primary key" columns. technically the alias should have 1644 # a 2-col pk in any case but the leading select has a NULL for the 1645 # "t2id" column 1646 d = util.OrderedDict() 1647 d["t1"] = t1.select(t1.c.type == "t1") 1648 d["t2"] = t1.join(t2) 1649 pjoin = polymorphic_union(d, None, "pjoin") 1650 1651 mapper( 1652 T1, 1653 t1, 1654 polymorphic_on=t1.c.type, 1655 polymorphic_identity="t1", 1656 with_polymorphic=("*", pjoin), 1657 ) 1658 mapper(T2, t2, inherits=T1, polymorphic_identity="t2") 1659 assert len(class_mapper(T1).primary_key) == 1 1660 1661 ot1 = T1() 1662 ot2 = T2() 1663 sess = create_session() 1664 sess.add(ot1) 1665 sess.add(ot2) 1666 sess.flush() 1667 sess.expunge_all() 1668 1669 # query using get(), using only one value. this requires the 1670 # select_table mapper 1671 # has the same single-col primary key. 1672 assert sess.query(T1).get(ot1.id).id == ot1.id 1673 1674 ot1 = sess.query(T1).get(ot1.id) 1675 ot1.data = "hi" 1676 sess.flush() 1677 1678 1679class InheritingEagerTest(fixtures.MappedTest): 1680 @classmethod 1681 def define_tables(cls, metadata): 1682 global people, employees, tags, peopleTags 1683 1684 people = Table( 1685 "people", 1686 metadata, 1687 Column( 1688 "id", Integer, primary_key=True, test_needs_autoincrement=True 1689 ), 1690 Column("_type", String(30), nullable=False), 1691 ) 1692 1693 employees = Table( 1694 "employees", 1695 metadata, 1696 Column("id", Integer, ForeignKey("people.id"), primary_key=True), 1697 ) 1698 1699 tags = Table( 1700 "tags", 1701 metadata, 1702 Column( 1703 "id", Integer, primary_key=True, test_needs_autoincrement=True 1704 ), 1705 Column("label", String(50), nullable=False), 1706 ) 1707 1708 peopleTags = Table( 1709 "peopleTags", 1710 metadata, 1711 Column("person_id", Integer, ForeignKey("people.id")), 1712 Column("tag_id", Integer, ForeignKey("tags.id")), 1713 ) 1714 1715 def test_basic(self): 1716 """test that Query uses the full set of mapper._eager_loaders 1717 when generating SQL""" 1718 1719 class Person(fixtures.ComparableEntity): 1720 pass 1721 1722 class Employee(Person): 1723 def __init__(self, name="bob"): 1724 self.name = name 1725 1726 class Tag(fixtures.ComparableEntity): 1727 def __init__(self, label): 1728 self.label = label 1729 1730 mapper( 1731 Person, 1732 people, 1733 polymorphic_on=people.c._type, 1734 polymorphic_identity="person", 1735 properties={ 1736 "tags": relationship( 1737 Tag, secondary=peopleTags, backref="people", lazy="joined" 1738 ) 1739 }, 1740 ) 1741 mapper( 1742 Employee, 1743 employees, 1744 inherits=Person, 1745 polymorphic_identity="employee", 1746 ) 1747 mapper(Tag, tags) 1748 1749 session = create_session() 1750 1751 bob = Employee() 1752 session.add(bob) 1753 1754 tag = Tag("crazy") 1755 bob.tags.append(tag) 1756 1757 tag = Tag("funny") 1758 bob.tags.append(tag) 1759 session.flush() 1760 1761 session.expunge_all() 1762 # query from Employee with limit, query needs to apply eager limiting 1763 # subquery 1764 instance = session.query(Employee).filter_by(id=1).limit(1).first() 1765 assert len(instance.tags) == 2 1766 1767 1768class MissingPolymorphicOnTest(fixtures.MappedTest): 1769 @classmethod 1770 def define_tables(cls, metadata): 1771 Table( 1772 "tablea", 1773 metadata, 1774 Column( 1775 "id", Integer, primary_key=True, test_needs_autoincrement=True 1776 ), 1777 Column("adata", String(50)), 1778 ) 1779 Table( 1780 "tableb", 1781 metadata, 1782 Column( 1783 "id", Integer, primary_key=True, test_needs_autoincrement=True 1784 ), 1785 Column("aid", Integer, ForeignKey("tablea.id")), 1786 Column("data", String(50)), 1787 ) 1788 Table( 1789 "tablec", 1790 metadata, 1791 Column("id", Integer, ForeignKey("tablea.id"), primary_key=True), 1792 Column("cdata", String(50)), 1793 ) 1794 Table( 1795 "tabled", 1796 metadata, 1797 Column("id", Integer, ForeignKey("tablec.id"), primary_key=True), 1798 Column("ddata", String(50)), 1799 ) 1800 1801 @classmethod 1802 def setup_classes(cls): 1803 class A(cls.Comparable): 1804 pass 1805 1806 class B(cls.Comparable): 1807 pass 1808 1809 class C(A): 1810 pass 1811 1812 class D(C): 1813 pass 1814 1815 def test_polyon_col_setsup(self): 1816 tablea, tableb, tablec, tabled = ( 1817 self.tables.tablea, 1818 self.tables.tableb, 1819 self.tables.tablec, 1820 self.tables.tabled, 1821 ) 1822 A, B, C, D = ( 1823 self.classes.A, 1824 self.classes.B, 1825 self.classes.C, 1826 self.classes.D, 1827 ) 1828 poly_select = select( 1829 [tablea, tableb.c.data.label("discriminator")], 1830 from_obj=tablea.join(tableb), 1831 ).alias("poly") 1832 1833 mapper(B, tableb) 1834 mapper( 1835 A, 1836 tablea, 1837 with_polymorphic=("*", poly_select), 1838 polymorphic_on=poly_select.c.discriminator, 1839 properties={"b": relationship(B, uselist=False)}, 1840 ) 1841 mapper(C, tablec, inherits=A, polymorphic_identity="c") 1842 mapper(D, tabled, inherits=C, polymorphic_identity="d") 1843 1844 c = C(cdata="c1", adata="a1", b=B(data="c")) 1845 d = D(cdata="c2", adata="a2", ddata="d2", b=B(data="d")) 1846 sess = create_session() 1847 sess.add(c) 1848 sess.add(d) 1849 sess.flush() 1850 sess.expunge_all() 1851 eq_( 1852 sess.query(A).all(), 1853 [C(cdata="c1", adata="a1"), D(cdata="c2", adata="a2", ddata="d2")], 1854 ) 1855 1856 1857class JoinedInhAdjacencyTest(fixtures.MappedTest): 1858 @classmethod 1859 def define_tables(cls, metadata): 1860 Table( 1861 "people", 1862 metadata, 1863 Column( 1864 "id", Integer, primary_key=True, test_needs_autoincrement=True 1865 ), 1866 Column("type", String(30)), 1867 ) 1868 Table( 1869 "users", 1870 metadata, 1871 Column("id", Integer, ForeignKey("people.id"), primary_key=True), 1872 Column("supervisor_id", Integer, ForeignKey("people.id")), 1873 ) 1874 Table( 1875 "dudes", 1876 metadata, 1877 Column("id", Integer, ForeignKey("users.id"), primary_key=True), 1878 ) 1879 1880 @classmethod 1881 def setup_classes(cls): 1882 class Person(cls.Comparable): 1883 pass 1884 1885 class User(Person): 1886 pass 1887 1888 class Dude(User): 1889 pass 1890 1891 def _roundtrip(self): 1892 User = self.classes.User 1893 sess = Session() 1894 u1 = User() 1895 u2 = User() 1896 u2.supervisor = u1 1897 sess.add_all([u1, u2]) 1898 sess.commit() 1899 1900 assert u2.supervisor is u1 1901 1902 def _dude_roundtrip(self): 1903 Dude, User = self.classes.Dude, self.classes.User 1904 sess = Session() 1905 u1 = User() 1906 d1 = Dude() 1907 d1.supervisor = u1 1908 sess.add_all([u1, d1]) 1909 sess.commit() 1910 1911 assert d1.supervisor is u1 1912 1913 def test_joined_to_base(self): 1914 people, users = self.tables.people, self.tables.users 1915 Person, User = self.classes.Person, self.classes.User 1916 1917 mapper( 1918 Person, 1919 people, 1920 polymorphic_on=people.c.type, 1921 polymorphic_identity="person", 1922 ) 1923 mapper( 1924 User, 1925 users, 1926 inherits=Person, 1927 polymorphic_identity="user", 1928 inherit_condition=(users.c.id == people.c.id), 1929 properties={ 1930 "supervisor": relationship( 1931 Person, primaryjoin=users.c.supervisor_id == people.c.id 1932 ) 1933 }, 1934 ) 1935 1936 assert User.supervisor.property.direction is MANYTOONE 1937 self._roundtrip() 1938 1939 def test_joined_to_same_subclass(self): 1940 people, users = self.tables.people, self.tables.users 1941 Person, User = self.classes.Person, self.classes.User 1942 1943 mapper( 1944 Person, 1945 people, 1946 polymorphic_on=people.c.type, 1947 polymorphic_identity="person", 1948 ) 1949 mapper( 1950 User, 1951 users, 1952 inherits=Person, 1953 polymorphic_identity="user", 1954 inherit_condition=(users.c.id == people.c.id), 1955 properties={ 1956 "supervisor": relationship( 1957 User, 1958 primaryjoin=users.c.supervisor_id == people.c.id, 1959 remote_side=people.c.id, 1960 foreign_keys=[users.c.supervisor_id], 1961 ) 1962 }, 1963 ) 1964 assert User.supervisor.property.direction is MANYTOONE 1965 self._roundtrip() 1966 1967 def test_joined_subclass_to_superclass(self): 1968 people, users, dudes = ( 1969 self.tables.people, 1970 self.tables.users, 1971 self.tables.dudes, 1972 ) 1973 Person, User, Dude = ( 1974 self.classes.Person, 1975 self.classes.User, 1976 self.classes.Dude, 1977 ) 1978 1979 mapper( 1980 Person, 1981 people, 1982 polymorphic_on=people.c.type, 1983 polymorphic_identity="person", 1984 ) 1985 mapper( 1986 User, 1987 users, 1988 inherits=Person, 1989 polymorphic_identity="user", 1990 inherit_condition=(users.c.id == people.c.id), 1991 ) 1992 mapper( 1993 Dude, 1994 dudes, 1995 inherits=User, 1996 polymorphic_identity="dude", 1997 inherit_condition=(dudes.c.id == users.c.id), 1998 properties={ 1999 "supervisor": relationship( 2000 User, 2001 primaryjoin=users.c.supervisor_id == people.c.id, 2002 remote_side=people.c.id, 2003 foreign_keys=[users.c.supervisor_id], 2004 ) 2005 }, 2006 ) 2007 assert Dude.supervisor.property.direction is MANYTOONE 2008 self._dude_roundtrip() 2009 2010 2011class Ticket2419Test(fixtures.DeclarativeMappedTest): 2012 """Test [ticket:2419]'s test case.""" 2013 2014 @classmethod 2015 def setup_classes(cls): 2016 Base = cls.DeclarativeBasic 2017 2018 class A(Base): 2019 __tablename__ = "a" 2020 2021 id = Column( 2022 Integer, primary_key=True, test_needs_autoincrement=True 2023 ) 2024 2025 class B(Base): 2026 __tablename__ = "b" 2027 2028 id = Column( 2029 Integer, primary_key=True, test_needs_autoincrement=True 2030 ) 2031 ds = relationship("D") 2032 es = relationship("E") 2033 2034 class C(A): 2035 __tablename__ = "c" 2036 2037 id = Column(Integer, ForeignKey("a.id"), primary_key=True) 2038 b_id = Column(Integer, ForeignKey("b.id")) 2039 b = relationship("B", primaryjoin=b_id == B.id) 2040 2041 class D(Base): 2042 __tablename__ = "d" 2043 2044 id = Column( 2045 Integer, primary_key=True, test_needs_autoincrement=True 2046 ) 2047 b_id = Column(Integer, ForeignKey("b.id")) 2048 2049 class E(Base): 2050 __tablename__ = "e" 2051 id = Column( 2052 Integer, primary_key=True, test_needs_autoincrement=True 2053 ) 2054 b_id = Column(Integer, ForeignKey("b.id")) 2055 2056 @testing.fails_on( 2057 ["oracle", "mssql"], 2058 "Oracle / SQL server engines can't handle this, " 2059 "not clear if there's an expression-level bug on our " 2060 "end though", 2061 ) 2062 def test_join_w_eager_w_any(self): 2063 B, C, D = (self.classes.B, self.classes.C, self.classes.D) 2064 s = Session(testing.db) 2065 2066 b = B(ds=[D()]) 2067 s.add_all([C(b=b)]) 2068 2069 s.commit() 2070 2071 q = s.query(B, B.ds.any(D.id == 1)).options(joinedload("es")) 2072 q = q.join(C, C.b_id == B.id) 2073 q = q.limit(5) 2074 eq_(q.all(), [(b, True)]) 2075 2076 2077class ColSubclassTest( 2078 fixtures.DeclarativeMappedTest, testing.AssertsCompiledSQL 2079): 2080 """Test [ticket:2918]'s test case.""" 2081 2082 run_create_tables = run_deletes = None 2083 __dialect__ = "default" 2084 2085 @classmethod 2086 def setup_classes(cls): 2087 from sqlalchemy.schema import Column 2088 2089 Base = cls.DeclarativeBasic 2090 2091 class A(Base): 2092 __tablename__ = "a" 2093 2094 id = Column(Integer, primary_key=True) 2095 2096 class MySpecialColumn(Column): 2097 pass 2098 2099 class B(A): 2100 __tablename__ = "b" 2101 2102 id = Column(ForeignKey("a.id"), primary_key=True) 2103 x = MySpecialColumn(String) 2104 2105 def test_polymorphic_adaptation(self): 2106 A, B = self.classes.A, self.classes.B 2107 2108 s = Session() 2109 self.assert_compile( 2110 s.query(A).join(B).filter(B.x == "test"), 2111 "SELECT a.id AS a_id FROM a JOIN " 2112 "(a AS a_1 JOIN b AS b_1 ON a_1.id = b_1.id) " 2113 "ON a.id = b_1.id WHERE b_1.x = :x_1", 2114 ) 2115 2116 2117class CorrelateExceptWPolyAdaptTest( 2118 fixtures.DeclarativeMappedTest, testing.AssertsCompiledSQL 2119): 2120 # test [ticket:4537]'s test case. 2121 2122 run_create_tables = run_deletes = None 2123 run_setup_classes = run_setup_mappers = run_define_tables = "each" 2124 __dialect__ = "default" 2125 2126 def _fixture(self, use_correlate_except): 2127 2128 Base = self.DeclarativeBasic 2129 2130 class Superclass(Base): 2131 __tablename__ = "s1" 2132 id = Column(Integer, primary_key=True) 2133 common_id = Column(ForeignKey("c.id")) 2134 common_relationship = relationship( 2135 "Common", uselist=False, innerjoin=True, lazy="noload" 2136 ) 2137 discriminator_field = Column(String) 2138 __mapper_args__ = { 2139 "polymorphic_identity": "superclass", 2140 "polymorphic_on": discriminator_field, 2141 } 2142 2143 class Subclass(Superclass): 2144 __tablename__ = "s2" 2145 id = Column(ForeignKey("s1.id"), primary_key=True) 2146 __mapper_args__ = {"polymorphic_identity": "subclass"} 2147 2148 class Common(Base): 2149 __tablename__ = "c" 2150 id = Column(Integer, primary_key=True) 2151 2152 if use_correlate_except: 2153 num_superclass = column_property( 2154 select([func.count(Superclass.id)]) 2155 .where(Superclass.common_id == id) 2156 .correlate_except(Superclass) 2157 ) 2158 2159 if not use_correlate_except: 2160 Common.num_superclass = column_property( 2161 select([func.count(Superclass.id)]) 2162 .where(Superclass.common_id == Common.id) 2163 .correlate(Common) 2164 ) 2165 2166 return Common, Superclass 2167 2168 def test_poly_query_on_correlate(self): 2169 Common, Superclass = self._fixture(False) 2170 2171 poly = with_polymorphic(Superclass, "*") 2172 2173 s = Session() 2174 q = ( 2175 s.query(poly) 2176 .options(contains_eager(poly.common_relationship)) 2177 .join(poly.common_relationship) 2178 .filter(Common.id == 1) 2179 ) 2180 2181 # note the order of c.id, subquery changes based on if we 2182 # used correlate or correlate_except; this is only with the 2183 # patch in place. Not sure why this happens. 2184 self.assert_compile( 2185 q, 2186 "SELECT c.id AS c_id, (SELECT count(s1.id) AS count_1 " 2187 "FROM s1 LEFT OUTER JOIN s2 ON s1.id = s2.id " 2188 "WHERE s1.common_id = c.id) AS anon_1, " 2189 "s1.id AS s1_id, " 2190 "s1.common_id AS s1_common_id, " 2191 "s1.discriminator_field AS s1_discriminator_field, " 2192 "s2.id AS s2_id FROM s1 " 2193 "LEFT OUTER JOIN s2 ON s1.id = s2.id " 2194 "JOIN c ON c.id = s1.common_id WHERE c.id = :id_1", 2195 ) 2196 2197 def test_poly_query_on_correlate_except(self): 2198 Common, Superclass = self._fixture(True) 2199 2200 poly = with_polymorphic(Superclass, "*") 2201 2202 s = Session() 2203 q = ( 2204 s.query(poly) 2205 .options(contains_eager(poly.common_relationship)) 2206 .join(poly.common_relationship) 2207 .filter(Common.id == 1) 2208 ) 2209 2210 # c.id, subquery are reversed. 2211 self.assert_compile( 2212 q, 2213 "SELECT (SELECT count(s1.id) AS count_1 " 2214 "FROM s1 LEFT OUTER JOIN s2 ON s1.id = s2.id " 2215 "WHERE s1.common_id = c.id) AS anon_1, " 2216 "c.id AS c_id, s1.id AS s1_id, " 2217 "s1.common_id AS s1_common_id, " 2218 "s1.discriminator_field AS s1_discriminator_field, " 2219 "s2.id AS s2_id FROM s1 " 2220 "LEFT OUTER JOIN s2 ON s1.id = s2.id " 2221 "JOIN c ON c.id = s1.common_id WHERE c.id = :id_1", 2222 ) 2223