1"""Miscellaneous inheritance-related tests, many very old.
2These are generally tests derived from specific user issues.
3
4"""
5
6from sqlalchemy import exists
7from sqlalchemy import ForeignKey
8from sqlalchemy import func
9from sqlalchemy import Integer
10from sqlalchemy import select
11from sqlalchemy import Sequence
12from sqlalchemy import String
13from sqlalchemy import testing
14from sqlalchemy import Unicode
15from sqlalchemy import util
16from sqlalchemy.orm import class_mapper
17from sqlalchemy.orm import column_property
18from sqlalchemy.orm import contains_eager
19from sqlalchemy.orm import create_session
20from sqlalchemy.orm import join
21from sqlalchemy.orm import joinedload
22from sqlalchemy.orm import mapper
23from sqlalchemy.orm import polymorphic_union
24from sqlalchemy.orm import relationship
25from sqlalchemy.orm import Session
26from sqlalchemy.orm import with_polymorphic
27from sqlalchemy.orm.interfaces import MANYTOONE
28from sqlalchemy.testing import AssertsExecutionResults
29from sqlalchemy.testing import eq_
30from sqlalchemy.testing import fixtures
31from sqlalchemy.testing.schema import Column
32from sqlalchemy.testing.schema import Table
33
34
35class RelationshipTest1(fixtures.MappedTest):
36    """test self-referential relationships on polymorphic mappers"""
37
38    @classmethod
39    def define_tables(cls, metadata):
40        global people, managers
41
42        people = Table(
43            "people",
44            metadata,
45            Column(
46                "person_id",
47                Integer,
48                Sequence("person_id_seq", optional=True),
49                primary_key=True,
50            ),
51            Column(
52                "manager_id",
53                Integer,
54                ForeignKey(
55                    "managers.person_id", use_alter=True, name="mpid_fq"
56                ),
57            ),
58            Column("name", String(50)),
59            Column("type", String(30)),
60        )
61
62        managers = Table(
63            "managers",
64            metadata,
65            Column(
66                "person_id",
67                Integer,
68                ForeignKey("people.person_id"),
69                primary_key=True,
70            ),
71            Column("status", String(30)),
72            Column("manager_name", String(50)),
73        )
74
75    @classmethod
76    def setup_classes(cls):
77        class Person(cls.Comparable):
78            pass
79
80        class Manager(Person):
81            pass
82
83    def test_parent_refs_descendant(self):
84        Person, Manager = self.classes("Person", "Manager")
85
86        mapper(
87            Person,
88            people,
89            properties={
90                "manager": relationship(
91                    Manager,
92                    primaryjoin=(people.c.manager_id == managers.c.person_id),
93                    uselist=False,
94                    post_update=True,
95                )
96            },
97        )
98        mapper(
99            Manager,
100            managers,
101            inherits=Person,
102            inherit_condition=people.c.person_id == managers.c.person_id,
103        )
104
105        eq_(
106            class_mapper(Person).get_property("manager").synchronize_pairs,
107            [(managers.c.person_id, people.c.manager_id)],
108        )
109
110        session = create_session()
111        p = Person(name="some person")
112        m = Manager(name="some manager")
113        p.manager = m
114        session.add(p)
115        session.flush()
116        session.expunge_all()
117
118        p = session.query(Person).get(p.person_id)
119        m = session.query(Manager).get(m.person_id)
120        assert p.manager is m
121
122    def test_descendant_refs_parent(self):
123        Person, Manager = self.classes("Person", "Manager")
124
125        mapper(Person, people)
126        mapper(
127            Manager,
128            managers,
129            inherits=Person,
130            inherit_condition=people.c.person_id == managers.c.person_id,
131            properties={
132                "employee": relationship(
133                    Person,
134                    primaryjoin=(people.c.manager_id == managers.c.person_id),
135                    foreign_keys=[people.c.manager_id],
136                    uselist=False,
137                    post_update=True,
138                )
139            },
140        )
141
142        session = create_session()
143        p = Person(name="some person")
144        m = Manager(name="some manager")
145        m.employee = p
146        session.add(m)
147        session.flush()
148        session.expunge_all()
149
150        p = session.query(Person).get(p.person_id)
151        m = session.query(Manager).get(m.person_id)
152        assert m.employee is p
153
154
155class RelationshipTest2(fixtures.MappedTest):
156    """test self-referential relationships on polymorphic mappers"""
157
158    @classmethod
159    def define_tables(cls, metadata):
160        global people, managers, data
161        people = Table(
162            "people",
163            metadata,
164            Column(
165                "person_id",
166                Integer,
167                primary_key=True,
168                test_needs_autoincrement=True,
169            ),
170            Column("name", String(50)),
171            Column("type", String(30)),
172        )
173
174        managers = Table(
175            "managers",
176            metadata,
177            Column(
178                "person_id",
179                Integer,
180                ForeignKey("people.person_id"),
181                primary_key=True,
182            ),
183            Column("manager_id", Integer, ForeignKey("people.person_id")),
184            Column("status", String(30)),
185        )
186
187        data = Table(
188            "data",
189            metadata,
190            Column(
191                "person_id",
192                Integer,
193                ForeignKey("managers.person_id"),
194                primary_key=True,
195            ),
196            Column("data", String(30)),
197        )
198
199    @classmethod
200    def setup_classes(cls):
201        class Person(cls.Comparable):
202            pass
203
204        class Manager(Person):
205            pass
206
207    @testing.combinations(
208        ("join1",), ("join2",), ("join3",), argnames="jointype"
209    )
210    @testing.combinations(
211        ("usedata", True), ("nodata", False), id_="ia", argnames="usedata"
212    )
213    def test_relationshiponsubclass(self, jointype, usedata):
214        Person, Manager = self.classes("Person", "Manager")
215        if jointype == "join1":
216            poly_union = polymorphic_union(
217                {
218                    "person": people.select(people.c.type == "person"),
219                    "manager": join(
220                        people,
221                        managers,
222                        people.c.person_id == managers.c.person_id,
223                    ),
224                },
225                None,
226            )
227            polymorphic_on = poly_union.c.type
228        elif jointype == "join2":
229            poly_union = polymorphic_union(
230                {
231                    "person": people.select(people.c.type == "person"),
232                    "manager": managers.join(
233                        people, people.c.person_id == managers.c.person_id
234                    ),
235                },
236                None,
237            )
238            polymorphic_on = poly_union.c.type
239        elif jointype == "join3":
240            poly_union = None
241            polymorphic_on = people.c.type
242
243        if usedata:
244
245            class Data(object):
246                def __init__(self, data):
247                    self.data = data
248
249            mapper(Data, data)
250
251        mapper(
252            Person,
253            people,
254            with_polymorphic=("*", poly_union),
255            polymorphic_identity="person",
256            polymorphic_on=polymorphic_on,
257        )
258
259        if usedata:
260            mapper(
261                Manager,
262                managers,
263                inherits=Person,
264                inherit_condition=people.c.person_id == managers.c.person_id,
265                polymorphic_identity="manager",
266                properties={
267                    "colleague": relationship(
268                        Person,
269                        primaryjoin=managers.c.manager_id
270                        == people.c.person_id,
271                        lazy="select",
272                        uselist=False,
273                    ),
274                    "data": relationship(Data, uselist=False),
275                },
276            )
277        else:
278            mapper(
279                Manager,
280                managers,
281                inherits=Person,
282                inherit_condition=people.c.person_id == managers.c.person_id,
283                polymorphic_identity="manager",
284                properties={
285                    "colleague": relationship(
286                        Person,
287                        primaryjoin=managers.c.manager_id
288                        == people.c.person_id,
289                        lazy="select",
290                        uselist=False,
291                    )
292                },
293            )
294
295        sess = create_session()
296        p = Person(name="person1")
297        m = Manager(name="manager1")
298        m.colleague = p
299        if usedata:
300            m.data = Data("ms data")
301        sess.add(m)
302        sess.flush()
303
304        sess.expunge_all()
305        p = sess.query(Person).get(p.person_id)
306        m = sess.query(Manager).get(m.person_id)
307        assert m.colleague is p
308        if usedata:
309            assert m.data.data == "ms data"
310
311
312class RelationshipTest3(fixtures.MappedTest):
313    """test self-referential relationships on polymorphic mappers"""
314
315    @classmethod
316    def define_tables(cls, metadata):
317        global people, managers, data
318        people = Table(
319            "people",
320            metadata,
321            Column(
322                "person_id",
323                Integer,
324                primary_key=True,
325                test_needs_autoincrement=True,
326            ),
327            Column("colleague_id", Integer, ForeignKey("people.person_id")),
328            Column("name", String(50)),
329            Column("type", String(30)),
330        )
331
332        managers = Table(
333            "managers",
334            metadata,
335            Column(
336                "person_id",
337                Integer,
338                ForeignKey("people.person_id"),
339                primary_key=True,
340            ),
341            Column("status", String(30)),
342        )
343
344        data = Table(
345            "data",
346            metadata,
347            Column(
348                "person_id",
349                Integer,
350                ForeignKey("people.person_id"),
351                primary_key=True,
352            ),
353            Column("data", String(30)),
354        )
355
356    @classmethod
357    def setup_classes(cls):
358        class Person(cls.Comparable):
359            pass
360
361        class Manager(Person):
362            pass
363
364        class Data(cls.Comparable):
365            def __init__(self, data):
366                self.data = data
367
368    def _setup_mappings(self, jointype, usedata):
369        Person, Manager, Data = self.classes("Person", "Manager", "Data")
370        if jointype == "join1":
371            poly_union = polymorphic_union(
372                {
373                    "manager": managers.join(
374                        people, people.c.person_id == managers.c.person_id
375                    ),
376                    "person": people.select(people.c.type == "person"),
377                },
378                None,
379            )
380        elif jointype == "join2":
381            poly_union = polymorphic_union(
382                {
383                    "manager": join(
384                        people,
385                        managers,
386                        people.c.person_id == managers.c.person_id,
387                    ),
388                    "person": people.select(people.c.type == "person"),
389                },
390                None,
391            )
392        elif jointype == "join3":
393            poly_union = people.outerjoin(managers)
394        elif jointype == "join4":
395            poly_union = None
396        else:
397            assert False
398
399        if usedata:
400            mapper(Data, data)
401
402        if usedata:
403            mapper(
404                Person,
405                people,
406                with_polymorphic=("*", poly_union),
407                polymorphic_identity="person",
408                polymorphic_on=people.c.type,
409                properties={
410                    "colleagues": relationship(
411                        Person,
412                        primaryjoin=people.c.colleague_id
413                        == people.c.person_id,
414                        remote_side=people.c.colleague_id,
415                        uselist=True,
416                    ),
417                    "data": relationship(Data, uselist=False),
418                },
419            )
420        else:
421            mapper(
422                Person,
423                people,
424                with_polymorphic=("*", poly_union),
425                polymorphic_identity="person",
426                polymorphic_on=people.c.type,
427                properties={
428                    "colleagues": relationship(
429                        Person,
430                        primaryjoin=people.c.colleague_id
431                        == people.c.person_id,
432                        remote_side=people.c.colleague_id,
433                        uselist=True,
434                    )
435                },
436            )
437
438        mapper(
439            Manager,
440            managers,
441            inherits=Person,
442            inherit_condition=people.c.person_id == managers.c.person_id,
443            polymorphic_identity="manager",
444        )
445
446    @testing.combinations(
447        ("join1",), ("join2",), ("join3",), ("join4",), argnames="jointype"
448    )
449    @testing.combinations(
450        ("usedata", True), ("nodata", False), id_="ia", argnames="usedata"
451    )
452    def test_relationship_on_base_class(self, jointype, usedata):
453        self._setup_mappings(jointype, usedata)
454        Person, Manager, Data = self.classes("Person", "Manager", "Data")
455
456        sess = create_session()
457        p = Person(name="person1")
458        p2 = Person(name="person2")
459        p3 = Person(name="person3")
460        m = Manager(name="manager1")
461        p.colleagues.append(p2)
462        m.colleagues.append(p3)
463        if usedata:
464            p.data = Data("ps data")
465            m.data = Data("ms data")
466
467        sess.add(m)
468        sess.add(p)
469        sess.flush()
470
471        sess.expunge_all()
472        p = sess.query(Person).get(p.person_id)
473        p2 = sess.query(Person).get(p2.person_id)
474        p3 = sess.query(Person).get(p3.person_id)
475        m = sess.query(Person).get(m.person_id)
476        assert len(p.colleagues) == 1
477        assert p.colleagues == [p2]
478        assert m.colleagues == [p3]
479        if usedata:
480            assert p.data.data == "ps data"
481            assert m.data.data == "ms data"
482
483
484class RelationshipTest4(fixtures.MappedTest):
485    @classmethod
486    def define_tables(cls, metadata):
487        global people, engineers, managers, cars
488        people = Table(
489            "people",
490            metadata,
491            Column(
492                "person_id",
493                Integer,
494                primary_key=True,
495                test_needs_autoincrement=True,
496            ),
497            Column("name", String(50)),
498        )
499
500        engineers = Table(
501            "engineers",
502            metadata,
503            Column(
504                "person_id",
505                Integer,
506                ForeignKey("people.person_id"),
507                primary_key=True,
508            ),
509            Column("status", String(30)),
510        )
511
512        managers = Table(
513            "managers",
514            metadata,
515            Column(
516                "person_id",
517                Integer,
518                ForeignKey("people.person_id"),
519                primary_key=True,
520            ),
521            Column("longer_status", String(70)),
522        )
523
524        cars = Table(
525            "cars",
526            metadata,
527            Column(
528                "car_id",
529                Integer,
530                primary_key=True,
531                test_needs_autoincrement=True,
532            ),
533            Column("owner", Integer, ForeignKey("people.person_id")),
534        )
535
536    def test_many_to_one_polymorphic(self):
537        """in this test, the polymorphic union is between two subclasses, but
538        does not include the base table by itself in the union. however, the
539        primaryjoin condition is going to be against the base table, and its a
540        many-to-one relationship (unlike the test in polymorph.py) so the
541        column in the base table is explicit. Can the ClauseAdapter figure out
542        how to alias the primaryjoin to the polymorphic union ?"""
543
544        # class definitions
545        class Person(object):
546            def __init__(self, **kwargs):
547                for key, value in kwargs.items():
548                    setattr(self, key, value)
549
550            def __repr__(self):
551                return "Ordinary person %s" % self.name
552
553        class Engineer(Person):
554            def __repr__(self):
555                return "Engineer %s, status %s" % (self.name, self.status)
556
557        class Manager(Person):
558            def __repr__(self):
559                return "Manager %s, status %s" % (
560                    self.name,
561                    self.longer_status,
562                )
563
564        class Car(object):
565            def __init__(self, **kwargs):
566                for key, value in kwargs.items():
567                    setattr(self, key, value)
568
569            def __repr__(self):
570                return "Car number %d" % self.car_id
571
572        # create a union that represents both types of joins.
573        employee_join = polymorphic_union(
574            {
575                "engineer": people.join(engineers),
576                "manager": people.join(managers),
577            },
578            "type",
579            "employee_join",
580        )
581
582        person_mapper = mapper(
583            Person,
584            people,
585            with_polymorphic=("*", employee_join),
586            polymorphic_on=employee_join.c.type,
587            polymorphic_identity="person",
588        )
589        mapper(
590            Engineer,
591            engineers,
592            inherits=person_mapper,
593            polymorphic_identity="engineer",
594        )
595        mapper(
596            Manager,
597            managers,
598            inherits=person_mapper,
599            polymorphic_identity="manager",
600        )
601        mapper(Car, cars, properties={"employee": relationship(person_mapper)})
602
603        session = create_session()
604
605        # creating 5 managers named from M1 to E5
606        for i in range(1, 5):
607            session.add(Manager(name="M%d" % i, longer_status="YYYYYYYYY"))
608        # creating 5 engineers named from E1 to E5
609        for i in range(1, 5):
610            session.add(Engineer(name="E%d" % i, status="X"))
611
612        session.flush()
613
614        engineer4 = (
615            session.query(Engineer).filter(Engineer.name == "E4").first()
616        )
617        manager3 = session.query(Manager).filter(Manager.name == "M3").first()
618
619        car1 = Car(employee=engineer4)
620        session.add(car1)
621        car2 = Car(employee=manager3)
622        session.add(car2)
623        session.flush()
624
625        session.expunge_all()
626
627        def go():
628            testcar = (
629                session.query(Car)
630                .options(joinedload("employee"))
631                .get(car1.car_id)
632            )
633            assert str(testcar.employee) == "Engineer E4, status X"
634
635        self.assert_sql_count(testing.db, go, 1)
636
637        car1 = session.query(Car).get(car1.car_id)
638        usingGet = session.query(person_mapper).get(car1.owner)
639        usingProperty = car1.employee
640
641        assert str(engineer4) == "Engineer E4, status X"
642        assert str(usingGet) == "Engineer E4, status X"
643        assert str(usingProperty) == "Engineer E4, status X"
644
645        session.expunge_all()
646        # and now for the lightning round, eager !
647
648        def go():
649            testcar = (
650                session.query(Car)
651                .options(joinedload("employee"))
652                .get(car1.car_id)
653            )
654            assert str(testcar.employee) == "Engineer E4, status X"
655
656        self.assert_sql_count(testing.db, go, 1)
657
658        session.expunge_all()
659        s = session.query(Car)
660        c = s.join("employee").filter(Person.name == "E4")[0]
661        assert c.car_id == car1.car_id
662
663
664class RelationshipTest5(fixtures.MappedTest):
665    @classmethod
666    def define_tables(cls, metadata):
667        global people, engineers, managers, cars
668        people = Table(
669            "people",
670            metadata,
671            Column(
672                "person_id",
673                Integer,
674                primary_key=True,
675                test_needs_autoincrement=True,
676            ),
677            Column("name", String(50)),
678            Column("type", String(50)),
679        )
680
681        engineers = Table(
682            "engineers",
683            metadata,
684            Column(
685                "person_id",
686                Integer,
687                ForeignKey("people.person_id"),
688                primary_key=True,
689            ),
690            Column("status", String(30)),
691        )
692
693        managers = Table(
694            "managers",
695            metadata,
696            Column(
697                "person_id",
698                Integer,
699                ForeignKey("people.person_id"),
700                primary_key=True,
701            ),
702            Column("longer_status", String(70)),
703        )
704
705        cars = Table(
706            "cars",
707            metadata,
708            Column(
709                "car_id",
710                Integer,
711                primary_key=True,
712                test_needs_autoincrement=True,
713            ),
714            Column("owner", Integer, ForeignKey("people.person_id")),
715        )
716
717    def test_eager_empty(self):
718        """test parent object with child relationship to an inheriting mapper,
719        using eager loads, works when there are no child objects present"""
720
721        class Person(object):
722            def __init__(self, **kwargs):
723                for key, value in kwargs.items():
724                    setattr(self, key, value)
725
726            def __repr__(self):
727                return "Ordinary person %s" % self.name
728
729        class Engineer(Person):
730            def __repr__(self):
731                return "Engineer %s, status %s" % (self.name, self.status)
732
733        class Manager(Person):
734            def __repr__(self):
735                return "Manager %s, status %s" % (
736                    self.name,
737                    self.longer_status,
738                )
739
740        class Car(object):
741            def __init__(self, **kwargs):
742                for key, value in kwargs.items():
743                    setattr(self, key, value)
744
745            def __repr__(self):
746                return "Car number %d" % self.car_id
747
748        person_mapper = mapper(
749            Person,
750            people,
751            polymorphic_on=people.c.type,
752            polymorphic_identity="person",
753        )
754        mapper(
755            Engineer,
756            engineers,
757            inherits=person_mapper,
758            polymorphic_identity="engineer",
759        )
760        manager_mapper = mapper(
761            Manager,
762            managers,
763            inherits=person_mapper,
764            polymorphic_identity="manager",
765        )
766        mapper(
767            Car,
768            cars,
769            properties={
770                "manager": relationship(manager_mapper, lazy="joined")
771            },
772        )
773
774        sess = create_session()
775        car1 = Car()
776        car2 = Car()
777        car2.manager = Manager()
778        sess.add(car1)
779        sess.add(car2)
780        sess.flush()
781        sess.expunge_all()
782
783        carlist = sess.query(Car).all()
784        assert carlist[0].manager is None
785        assert carlist[1].manager.person_id == car2.manager.person_id
786
787
788class RelationshipTest6(fixtures.MappedTest):
789    """test self-referential relationships on a single joined-table
790    inheritance mapper"""
791
792    @classmethod
793    def define_tables(cls, metadata):
794        global people, managers, data
795        people = Table(
796            "people",
797            metadata,
798            Column(
799                "person_id",
800                Integer,
801                primary_key=True,
802                test_needs_autoincrement=True,
803            ),
804            Column("name", String(50)),
805        )
806
807        managers = Table(
808            "managers",
809            metadata,
810            Column(
811                "person_id",
812                Integer,
813                ForeignKey("people.person_id"),
814                primary_key=True,
815            ),
816            Column("colleague_id", Integer, ForeignKey("managers.person_id")),
817            Column("status", String(30)),
818        )
819
820    @classmethod
821    def setup_classes(cls):
822        class Person(cls.Comparable):
823            pass
824
825        class Manager(Person):
826            pass
827
828    def test_basic(self):
829        Person, Manager = self.classes("Person", "Manager")
830
831        mapper(Person, people)
832
833        mapper(
834            Manager,
835            managers,
836            inherits=Person,
837            inherit_condition=people.c.person_id == managers.c.person_id,
838            properties={
839                "colleague": relationship(
840                    Manager,
841                    primaryjoin=managers.c.colleague_id
842                    == managers.c.person_id,
843                    lazy="select",
844                    uselist=False,
845                )
846            },
847        )
848
849        sess = create_session()
850        m = Manager(name="manager1")
851        m2 = Manager(name="manager2")
852        m.colleague = m2
853        sess.add(m)
854        sess.flush()
855
856        sess.expunge_all()
857        m = sess.query(Manager).get(m.person_id)
858        m2 = sess.query(Manager).get(m2.person_id)
859        assert m.colleague is m2
860
861
862class RelationshipTest7(fixtures.MappedTest):
863    @classmethod
864    def define_tables(cls, metadata):
865        global people, engineers, managers, cars, offroad_cars
866        cars = Table(
867            "cars",
868            metadata,
869            Column(
870                "car_id",
871                Integer,
872                primary_key=True,
873                test_needs_autoincrement=True,
874            ),
875            Column("name", String(30)),
876        )
877
878        offroad_cars = Table(
879            "offroad_cars",
880            metadata,
881            Column(
882                "car_id",
883                Integer,
884                ForeignKey("cars.car_id"),
885                nullable=False,
886                primary_key=True,
887            ),
888        )
889
890        people = Table(
891            "people",
892            metadata,
893            Column(
894                "person_id",
895                Integer,
896                primary_key=True,
897                test_needs_autoincrement=True,
898            ),
899            Column(
900                "car_id", Integer, ForeignKey("cars.car_id"), nullable=False
901            ),
902            Column("name", String(50)),
903        )
904
905        engineers = Table(
906            "engineers",
907            metadata,
908            Column(
909                "person_id",
910                Integer,
911                ForeignKey("people.person_id"),
912                primary_key=True,
913            ),
914            Column("field", String(30)),
915        )
916
917        managers = Table(
918            "managers",
919            metadata,
920            Column(
921                "person_id",
922                Integer,
923                ForeignKey("people.person_id"),
924                primary_key=True,
925            ),
926            Column("category", String(70)),
927        )
928
929    def test_manytoone_lazyload(self):
930        """test that lazy load clause to a polymorphic child mapper generates
931        correctly [ticket:493]"""
932
933        class PersistentObject(object):
934            def __init__(self, **kwargs):
935                for key, value in kwargs.items():
936                    setattr(self, key, value)
937
938        class Status(PersistentObject):
939            def __repr__(self):
940                return "Status %s" % self.name
941
942        class Person(PersistentObject):
943            def __repr__(self):
944                return "Ordinary person %s" % self.name
945
946        class Engineer(Person):
947            def __repr__(self):
948                return "Engineer %s, field %s" % (self.name, self.field)
949
950        class Manager(Person):
951            def __repr__(self):
952                return "Manager %s, category %s" % (self.name, self.category)
953
954        class Car(PersistentObject):
955            def __repr__(self):
956                return "Car number %d, name %s" % (self.car_id, self.name)
957
958        class Offraod_Car(Car):
959            def __repr__(self):
960                return "Offroad Car number %d, name %s" % (
961                    self.car_id,
962                    self.name,
963                )
964
965        employee_join = polymorphic_union(
966            {
967                "engineer": people.join(engineers),
968                "manager": people.join(managers),
969            },
970            "type",
971            "employee_join",
972        )
973
974        car_join = polymorphic_union(
975            {
976                "car": cars.outerjoin(offroad_cars)
977                .select(offroad_cars.c.car_id == None)
978                .reduce_columns(),  # noqa
979                "offroad": cars.join(offroad_cars),
980            },
981            "type",
982            "car_join",
983        )
984
985        car_mapper = mapper(
986            Car,
987            cars,
988            with_polymorphic=("*", car_join),
989            polymorphic_on=car_join.c.type,
990            polymorphic_identity="car",
991        )
992        mapper(
993            Offraod_Car,
994            offroad_cars,
995            inherits=car_mapper,
996            polymorphic_identity="offroad",
997        )
998        person_mapper = mapper(
999            Person,
1000            people,
1001            with_polymorphic=("*", employee_join),
1002            polymorphic_on=employee_join.c.type,
1003            polymorphic_identity="person",
1004            properties={"car": relationship(car_mapper)},
1005        )
1006        mapper(
1007            Engineer,
1008            engineers,
1009            inherits=person_mapper,
1010            polymorphic_identity="engineer",
1011        )
1012        mapper(
1013            Manager,
1014            managers,
1015            inherits=person_mapper,
1016            polymorphic_identity="manager",
1017        )
1018
1019        session = create_session()
1020
1021        for i in range(1, 4):
1022            if i % 2:
1023                car = Car()
1024            else:
1025                car = Offraod_Car()
1026            session.add(Manager(name="M%d" % i, category="YYYYYYYYY", car=car))
1027            session.add(Engineer(name="E%d" % i, field="X", car=car))
1028            session.flush()
1029            session.expunge_all()
1030
1031        r = session.query(Person).all()
1032        for p in r:
1033            assert p.car_id == p.car.car_id
1034
1035
1036class RelationshipTest8(fixtures.MappedTest):
1037    @classmethod
1038    def define_tables(cls, metadata):
1039        global taggable, users
1040        taggable = Table(
1041            "taggable",
1042            metadata,
1043            Column(
1044                "id", Integer, primary_key=True, test_needs_autoincrement=True
1045            ),
1046            Column("type", String(30)),
1047            Column("owner_id", Integer, ForeignKey("taggable.id")),
1048        )
1049        users = Table(
1050            "users",
1051            metadata,
1052            Column("id", Integer, ForeignKey("taggable.id"), primary_key=True),
1053            Column("data", String(50)),
1054        )
1055
1056    def test_selfref_onjoined(self):
1057        class Taggable(fixtures.ComparableEntity):
1058            pass
1059
1060        class User(Taggable):
1061            pass
1062
1063        mapper(
1064            Taggable,
1065            taggable,
1066            polymorphic_on=taggable.c.type,
1067            polymorphic_identity="taggable",
1068            properties={
1069                "owner": relationship(
1070                    User,
1071                    primaryjoin=taggable.c.owner_id == taggable.c.id,
1072                    remote_side=taggable.c.id,
1073                )
1074            },
1075        )
1076
1077        mapper(
1078            User,
1079            users,
1080            inherits=Taggable,
1081            polymorphic_identity="user",
1082            inherit_condition=users.c.id == taggable.c.id,
1083        )
1084
1085        u1 = User(data="u1")
1086        t1 = Taggable(owner=u1)
1087        sess = create_session()
1088        sess.add(t1)
1089        sess.flush()
1090
1091        sess.expunge_all()
1092        eq_(
1093            sess.query(Taggable).order_by(Taggable.id).all(),
1094            [User(data="u1"), Taggable(owner=User(data="u1"))],
1095        )
1096
1097
1098class GenerativeTest(fixtures.MappedTest, AssertsExecutionResults):
1099    @classmethod
1100    def define_tables(cls, metadata):
1101        #  cars---owned by---  people (abstract) --- has a --- status
1102        #   |                  ^    ^                            |
1103        #   |                  |    |                            |
1104        #   |          engineers    managers                     |
1105        #   |                                                    |
1106        #   +--------------------------------------- has a ------+
1107
1108        # table definitions
1109        Table(
1110            "status",
1111            metadata,
1112            Column(
1113                "status_id",
1114                Integer,
1115                primary_key=True,
1116                test_needs_autoincrement=True,
1117            ),
1118            Column("name", String(20)),
1119        )
1120
1121        Table(
1122            "people",
1123            metadata,
1124            Column(
1125                "person_id",
1126                Integer,
1127                primary_key=True,
1128                test_needs_autoincrement=True,
1129            ),
1130            Column(
1131                "status_id",
1132                Integer,
1133                ForeignKey("status.status_id"),
1134                nullable=False,
1135            ),
1136            Column("name", String(50)),
1137        )
1138
1139        Table(
1140            "engineers",
1141            metadata,
1142            Column(
1143                "person_id",
1144                Integer,
1145                ForeignKey("people.person_id"),
1146                primary_key=True,
1147            ),
1148            Column("field", String(30)),
1149        )
1150
1151        Table(
1152            "managers",
1153            metadata,
1154            Column(
1155                "person_id",
1156                Integer,
1157                ForeignKey("people.person_id"),
1158                primary_key=True,
1159            ),
1160            Column("category", String(70)),
1161        )
1162
1163        Table(
1164            "cars",
1165            metadata,
1166            Column(
1167                "car_id",
1168                Integer,
1169                primary_key=True,
1170                test_needs_autoincrement=True,
1171            ),
1172            Column(
1173                "status_id",
1174                Integer,
1175                ForeignKey("status.status_id"),
1176                nullable=False,
1177            ),
1178            Column(
1179                "owner",
1180                Integer,
1181                ForeignKey("people.person_id"),
1182                nullable=False,
1183            ),
1184        )
1185
1186    @classmethod
1187    def setup_classes(cls):
1188        class Status(cls.Comparable):
1189            pass
1190
1191        class Person(cls.Comparable):
1192            pass
1193
1194        class Engineer(Person):
1195            pass
1196
1197        class Manager(Person):
1198            pass
1199
1200        class Car(cls.Comparable):
1201            pass
1202
1203    @classmethod
1204    def setup_mappers(cls):
1205        status, people, engineers, managers, cars = cls.tables(
1206            "status", "people", "engineers", "managers", "cars"
1207        )
1208        Status, Person, Engineer, Manager, Car = cls.classes(
1209            "Status", "Person", "Engineer", "Manager", "Car"
1210        )
1211        # create a union that represents both types of joins.
1212        employee_join = polymorphic_union(
1213            {
1214                "engineer": people.join(engineers),
1215                "manager": people.join(managers),
1216            },
1217            "type",
1218            "employee_join",
1219        )
1220
1221        status_mapper = mapper(Status, status)
1222        person_mapper = mapper(
1223            Person,
1224            people,
1225            with_polymorphic=("*", employee_join),
1226            polymorphic_on=employee_join.c.type,
1227            polymorphic_identity="person",
1228            properties={"status": relationship(status_mapper)},
1229        )
1230        mapper(
1231            Engineer,
1232            engineers,
1233            inherits=person_mapper,
1234            polymorphic_identity="engineer",
1235        )
1236        mapper(
1237            Manager,
1238            managers,
1239            inherits=person_mapper,
1240            polymorphic_identity="manager",
1241        )
1242        mapper(
1243            Car,
1244            cars,
1245            properties={
1246                "employee": relationship(person_mapper),
1247                "status": relationship(status_mapper),
1248            },
1249        )
1250
1251    @classmethod
1252    def insert_data(cls, connection):
1253        Status, Person, Engineer, Manager, Car = cls.classes(
1254            "Status", "Person", "Engineer", "Manager", "Car"
1255        )
1256        session = create_session(connection)
1257
1258        active = Status(name="active")
1259        dead = Status(name="dead")
1260
1261        session.add(active)
1262        session.add(dead)
1263        session.flush()
1264
1265        # TODO: we haven't created assertions for all
1266        # the data combinations created here
1267
1268        # creating 5 managers named from M1 to M5
1269        # and 5 engineers named from E1 to E5
1270        # M4, M5, E4 and E5 are dead
1271        for i in range(1, 5):
1272            if i < 4:
1273                st = active
1274            else:
1275                st = dead
1276            session.add(
1277                Manager(name="M%d" % i, category="YYYYYYYYY", status=st)
1278            )
1279            session.add(Engineer(name="E%d" % i, field="X", status=st))
1280
1281        session.flush()
1282
1283        # get E4
1284        engineer4 = session.query(Engineer).filter_by(name="E4").one()
1285
1286        # create 2 cars for E4, one active and one dead
1287        car1 = Car(employee=engineer4, status=active)
1288        car2 = Car(employee=engineer4, status=dead)
1289        session.add(car1)
1290        session.add(car2)
1291        session.flush()
1292
1293    def test_join_to_q_person(self):
1294        Status, Person, Engineer, Manager, Car = self.classes(
1295            "Status", "Person", "Engineer", "Manager", "Car"
1296        )
1297        session = create_session()
1298
1299        r = (
1300            session.query(Person)
1301            .filter(Person.name.like("%2"))
1302            .join("status")
1303            .filter_by(name="active")
1304            .order_by(Person.person_id)
1305        )
1306        eq_(
1307            list(r),
1308            [
1309                Manager(
1310                    name="M2",
1311                    category="YYYYYYYYY",
1312                    status=Status(name="active"),
1313                ),
1314                Engineer(name="E2", field="X", status=Status(name="active")),
1315            ],
1316        )
1317
1318    def test_join_to_q_engineer(self):
1319        Status, Person, Engineer, Manager, Car = self.classes(
1320            "Status", "Person", "Engineer", "Manager", "Car"
1321        )
1322        session = create_session()
1323        r = (
1324            session.query(Engineer)
1325            .join("status")
1326            .filter(
1327                Person.name.in_(["E2", "E3", "E4", "M4", "M2", "M1"])
1328                & (Status.name == "active")
1329            )
1330            .order_by(Person.name)
1331        )
1332        eq_(
1333            list(r),
1334            [
1335                Engineer(name="E2", field="X", status=Status(name="active")),
1336                Engineer(name="E3", field="X", status=Status(name="active")),
1337            ],
1338        )
1339
1340    def test_join_to_q_person_car(self):
1341        Status, Person, Engineer, Manager, Car = self.classes(
1342            "Status", "Person", "Engineer", "Manager", "Car"
1343        )
1344        session = create_session()
1345        r = session.query(Person).filter(
1346            exists([1], Car.owner == Person.person_id)
1347        )
1348
1349        eq_(
1350            list(r),
1351            [Engineer(name="E4", field="X", status=Status(name="dead"))],
1352        )
1353
1354
1355class MultiLevelTest(fixtures.MappedTest):
1356    @classmethod
1357    def define_tables(cls, metadata):
1358        global table_Employee, table_Engineer, table_Manager
1359        table_Employee = Table(
1360            "Employee",
1361            metadata,
1362            Column("name", type_=String(100)),
1363            Column(
1364                "id",
1365                primary_key=True,
1366                type_=Integer,
1367                test_needs_autoincrement=True,
1368            ),
1369            Column("atype", type_=String(100)),
1370        )
1371
1372        table_Engineer = Table(
1373            "Engineer",
1374            metadata,
1375            Column("machine", type_=String(100)),
1376            Column("id", Integer, ForeignKey("Employee.id"), primary_key=True),
1377        )
1378
1379        table_Manager = Table(
1380            "Manager",
1381            metadata,
1382            Column("duties", type_=String(100)),
1383            Column("id", Integer, ForeignKey("Engineer.id"), primary_key=True),
1384        )
1385
1386    def test_threelevels(self):
1387        class Employee(object):
1388            def set(me, **kargs):
1389                for k, v in kargs.items():
1390                    setattr(me, k, v)
1391                return me
1392
1393            def __str__(me):
1394                return str(me.__class__.__name__) + ":" + str(me.name)
1395
1396            __repr__ = __str__
1397
1398        class Engineer(Employee):
1399            pass
1400
1401        class Manager(Engineer):
1402            pass
1403
1404        pu_Employee = polymorphic_union(
1405            {
1406                "Manager": table_Employee.join(table_Engineer).join(
1407                    table_Manager
1408                ),
1409                "Engineer": select(
1410                    [table_Employee, table_Engineer.c.machine],
1411                    table_Employee.c.atype == "Engineer",
1412                    from_obj=[table_Employee.join(table_Engineer)],
1413                ),
1414                "Employee": table_Employee.select(
1415                    table_Employee.c.atype == "Employee"
1416                ),
1417            },
1418            None,
1419            "pu_employee",
1420        )
1421
1422        mapper_Employee = mapper(
1423            Employee,
1424            table_Employee,
1425            polymorphic_identity="Employee",
1426            polymorphic_on=pu_Employee.c.atype,
1427            with_polymorphic=("*", pu_Employee),
1428        )
1429
1430        pu_Engineer = polymorphic_union(
1431            {
1432                "Manager": table_Employee.join(table_Engineer).join(
1433                    table_Manager
1434                ),
1435                "Engineer": select(
1436                    [table_Employee, table_Engineer.c.machine],
1437                    table_Employee.c.atype == "Engineer",
1438                    from_obj=[table_Employee.join(table_Engineer)],
1439                ),
1440            },
1441            None,
1442            "pu_engineer",
1443        )
1444        mapper_Engineer = mapper(
1445            Engineer,
1446            table_Engineer,
1447            inherit_condition=table_Engineer.c.id == table_Employee.c.id,
1448            inherits=mapper_Employee,
1449            polymorphic_identity="Engineer",
1450            polymorphic_on=pu_Engineer.c.atype,
1451            with_polymorphic=("*", pu_Engineer),
1452        )
1453
1454        mapper(
1455            Manager,
1456            table_Manager,
1457            inherit_condition=table_Manager.c.id == table_Engineer.c.id,
1458            inherits=mapper_Engineer,
1459            polymorphic_identity="Manager",
1460        )
1461
1462        a = Employee().set(name="one")
1463        b = Engineer().set(egn="two", machine="any")
1464        c = Manager().set(name="head", machine="fast", duties="many")
1465
1466        session = create_session()
1467        session.add(a)
1468        session.add(b)
1469        session.add(c)
1470        session.flush()
1471        assert set(session.query(Employee).all()) == set([a, b, c])
1472        assert set(session.query(Engineer).all()) == set([b, c])
1473        assert session.query(Manager).all() == [c]
1474
1475
1476class ManyToManyPolyTest(fixtures.MappedTest):
1477    @classmethod
1478    def define_tables(cls, metadata):
1479        global base_item_table, item_table
1480        global base_item_collection_table, collection_table
1481        base_item_table = Table(
1482            "base_item",
1483            metadata,
1484            Column(
1485                "id", Integer, primary_key=True, test_needs_autoincrement=True
1486            ),
1487            Column("child_name", String(255), default=None),
1488        )
1489
1490        item_table = Table(
1491            "item",
1492            metadata,
1493            Column(
1494                "id", Integer, ForeignKey("base_item.id"), primary_key=True
1495            ),
1496            Column("dummy", Integer, default=0),
1497        )
1498
1499        base_item_collection_table = Table(
1500            "base_item_collection",
1501            metadata,
1502            Column("item_id", Integer, ForeignKey("base_item.id")),
1503            Column("collection_id", Integer, ForeignKey("collection.id")),
1504        )
1505
1506        collection_table = Table(
1507            "collection",
1508            metadata,
1509            Column(
1510                "id", Integer, primary_key=True, test_needs_autoincrement=True
1511            ),
1512            Column("name", Unicode(255)),
1513        )
1514
1515    def test_pjoin_compile(self):
1516        """test that remote_side columns in the secondary join table
1517        aren't attempted to be matched to the target polymorphic
1518        selectable"""
1519
1520        class BaseItem(object):
1521            pass
1522
1523        class Item(BaseItem):
1524            pass
1525
1526        class Collection(object):
1527            pass
1528
1529        item_join = polymorphic_union(
1530            {
1531                "BaseItem": base_item_table.select(
1532                    base_item_table.c.child_name == "BaseItem"
1533                ),
1534                "Item": base_item_table.join(item_table),
1535            },
1536            None,
1537            "item_join",
1538        )
1539
1540        mapper(
1541            BaseItem,
1542            base_item_table,
1543            with_polymorphic=("*", item_join),
1544            polymorphic_on=base_item_table.c.child_name,
1545            polymorphic_identity="BaseItem",
1546            properties=dict(
1547                collections=relationship(
1548                    Collection,
1549                    secondary=base_item_collection_table,
1550                    backref="items",
1551                )
1552            ),
1553        )
1554
1555        mapper(
1556            Item, item_table, inherits=BaseItem, polymorphic_identity="Item"
1557        )
1558
1559        mapper(Collection, collection_table)
1560
1561        class_mapper(BaseItem)
1562
1563
1564class CustomPKTest(fixtures.MappedTest):
1565    @classmethod
1566    def define_tables(cls, metadata):
1567        global t1, t2
1568        t1 = Table(
1569            "t1",
1570            metadata,
1571            Column(
1572                "id", Integer, primary_key=True, test_needs_autoincrement=True
1573            ),
1574            Column("type", String(30), nullable=False),
1575            Column("data", String(30)),
1576        )
1577        # note that the primary key column in t2 is named differently
1578        t2 = Table(
1579            "t2",
1580            metadata,
1581            Column("t2id", Integer, ForeignKey("t1.id"), primary_key=True),
1582            Column("t2data", String(30)),
1583        )
1584
1585    def test_custompk(self):
1586        """test that the primary_key attribute is propagated to the
1587        polymorphic mapper"""
1588
1589        class T1(object):
1590            pass
1591
1592        class T2(T1):
1593            pass
1594
1595        # create a polymorphic union with the select against the base table
1596        # first. with the join being second, the alias of the union will
1597        # pick up two "primary key" columns.  technically the alias should have
1598        # a 2-col pk in any case but the leading select has a NULL for the
1599        # "t2id" column
1600        d = util.OrderedDict()
1601        d["t1"] = t1.select(t1.c.type == "t1")
1602        d["t2"] = t1.join(t2)
1603        pjoin = polymorphic_union(d, None, "pjoin")
1604
1605        mapper(
1606            T1,
1607            t1,
1608            polymorphic_on=t1.c.type,
1609            polymorphic_identity="t1",
1610            with_polymorphic=("*", pjoin),
1611            primary_key=[pjoin.c.id],
1612        )
1613        mapper(T2, t2, inherits=T1, polymorphic_identity="t2")
1614        ot1 = T1()
1615        ot2 = T2()
1616        sess = create_session()
1617        sess.add(ot1)
1618        sess.add(ot2)
1619        sess.flush()
1620        sess.expunge_all()
1621
1622        # query using get(), using only one value.
1623        # this requires the select_table mapper
1624        # has the same single-col primary key.
1625        assert sess.query(T1).get(ot1.id).id == ot1.id
1626
1627        ot1 = sess.query(T1).get(ot1.id)
1628        ot1.data = "hi"
1629        sess.flush()
1630
1631    def test_pk_collapses(self):
1632        """test that a composite primary key attribute formed by a join
1633        is "collapsed" into its minimal columns"""
1634
1635        class T1(object):
1636            pass
1637
1638        class T2(T1):
1639            pass
1640
1641        # create a polymorphic union with the select against the base table
1642        # first. with the join being second, the alias of the union will
1643        # pick up two "primary key" columns.  technically the alias should have
1644        # a 2-col pk in any case but the leading select has a NULL for the
1645        # "t2id" column
1646        d = util.OrderedDict()
1647        d["t1"] = t1.select(t1.c.type == "t1")
1648        d["t2"] = t1.join(t2)
1649        pjoin = polymorphic_union(d, None, "pjoin")
1650
1651        mapper(
1652            T1,
1653            t1,
1654            polymorphic_on=t1.c.type,
1655            polymorphic_identity="t1",
1656            with_polymorphic=("*", pjoin),
1657        )
1658        mapper(T2, t2, inherits=T1, polymorphic_identity="t2")
1659        assert len(class_mapper(T1).primary_key) == 1
1660
1661        ot1 = T1()
1662        ot2 = T2()
1663        sess = create_session()
1664        sess.add(ot1)
1665        sess.add(ot2)
1666        sess.flush()
1667        sess.expunge_all()
1668
1669        # query using get(), using only one value.  this requires the
1670        # select_table mapper
1671        # has the same single-col primary key.
1672        assert sess.query(T1).get(ot1.id).id == ot1.id
1673
1674        ot1 = sess.query(T1).get(ot1.id)
1675        ot1.data = "hi"
1676        sess.flush()
1677
1678
1679class InheritingEagerTest(fixtures.MappedTest):
1680    @classmethod
1681    def define_tables(cls, metadata):
1682        global people, employees, tags, peopleTags
1683
1684        people = Table(
1685            "people",
1686            metadata,
1687            Column(
1688                "id", Integer, primary_key=True, test_needs_autoincrement=True
1689            ),
1690            Column("_type", String(30), nullable=False),
1691        )
1692
1693        employees = Table(
1694            "employees",
1695            metadata,
1696            Column("id", Integer, ForeignKey("people.id"), primary_key=True),
1697        )
1698
1699        tags = Table(
1700            "tags",
1701            metadata,
1702            Column(
1703                "id", Integer, primary_key=True, test_needs_autoincrement=True
1704            ),
1705            Column("label", String(50), nullable=False),
1706        )
1707
1708        peopleTags = Table(
1709            "peopleTags",
1710            metadata,
1711            Column("person_id", Integer, ForeignKey("people.id")),
1712            Column("tag_id", Integer, ForeignKey("tags.id")),
1713        )
1714
1715    def test_basic(self):
1716        """test that Query uses the full set of mapper._eager_loaders
1717        when generating SQL"""
1718
1719        class Person(fixtures.ComparableEntity):
1720            pass
1721
1722        class Employee(Person):
1723            def __init__(self, name="bob"):
1724                self.name = name
1725
1726        class Tag(fixtures.ComparableEntity):
1727            def __init__(self, label):
1728                self.label = label
1729
1730        mapper(
1731            Person,
1732            people,
1733            polymorphic_on=people.c._type,
1734            polymorphic_identity="person",
1735            properties={
1736                "tags": relationship(
1737                    Tag, secondary=peopleTags, backref="people", lazy="joined"
1738                )
1739            },
1740        )
1741        mapper(
1742            Employee,
1743            employees,
1744            inherits=Person,
1745            polymorphic_identity="employee",
1746        )
1747        mapper(Tag, tags)
1748
1749        session = create_session()
1750
1751        bob = Employee()
1752        session.add(bob)
1753
1754        tag = Tag("crazy")
1755        bob.tags.append(tag)
1756
1757        tag = Tag("funny")
1758        bob.tags.append(tag)
1759        session.flush()
1760
1761        session.expunge_all()
1762        # query from Employee with limit, query needs to apply eager limiting
1763        # subquery
1764        instance = session.query(Employee).filter_by(id=1).limit(1).first()
1765        assert len(instance.tags) == 2
1766
1767
1768class MissingPolymorphicOnTest(fixtures.MappedTest):
1769    @classmethod
1770    def define_tables(cls, metadata):
1771        Table(
1772            "tablea",
1773            metadata,
1774            Column(
1775                "id", Integer, primary_key=True, test_needs_autoincrement=True
1776            ),
1777            Column("adata", String(50)),
1778        )
1779        Table(
1780            "tableb",
1781            metadata,
1782            Column(
1783                "id", Integer, primary_key=True, test_needs_autoincrement=True
1784            ),
1785            Column("aid", Integer, ForeignKey("tablea.id")),
1786            Column("data", String(50)),
1787        )
1788        Table(
1789            "tablec",
1790            metadata,
1791            Column("id", Integer, ForeignKey("tablea.id"), primary_key=True),
1792            Column("cdata", String(50)),
1793        )
1794        Table(
1795            "tabled",
1796            metadata,
1797            Column("id", Integer, ForeignKey("tablec.id"), primary_key=True),
1798            Column("ddata", String(50)),
1799        )
1800
1801    @classmethod
1802    def setup_classes(cls):
1803        class A(cls.Comparable):
1804            pass
1805
1806        class B(cls.Comparable):
1807            pass
1808
1809        class C(A):
1810            pass
1811
1812        class D(C):
1813            pass
1814
1815    def test_polyon_col_setsup(self):
1816        tablea, tableb, tablec, tabled = (
1817            self.tables.tablea,
1818            self.tables.tableb,
1819            self.tables.tablec,
1820            self.tables.tabled,
1821        )
1822        A, B, C, D = (
1823            self.classes.A,
1824            self.classes.B,
1825            self.classes.C,
1826            self.classes.D,
1827        )
1828        poly_select = select(
1829            [tablea, tableb.c.data.label("discriminator")],
1830            from_obj=tablea.join(tableb),
1831        ).alias("poly")
1832
1833        mapper(B, tableb)
1834        mapper(
1835            A,
1836            tablea,
1837            with_polymorphic=("*", poly_select),
1838            polymorphic_on=poly_select.c.discriminator,
1839            properties={"b": relationship(B, uselist=False)},
1840        )
1841        mapper(C, tablec, inherits=A, polymorphic_identity="c")
1842        mapper(D, tabled, inherits=C, polymorphic_identity="d")
1843
1844        c = C(cdata="c1", adata="a1", b=B(data="c"))
1845        d = D(cdata="c2", adata="a2", ddata="d2", b=B(data="d"))
1846        sess = create_session()
1847        sess.add(c)
1848        sess.add(d)
1849        sess.flush()
1850        sess.expunge_all()
1851        eq_(
1852            sess.query(A).all(),
1853            [C(cdata="c1", adata="a1"), D(cdata="c2", adata="a2", ddata="d2")],
1854        )
1855
1856
1857class JoinedInhAdjacencyTest(fixtures.MappedTest):
1858    @classmethod
1859    def define_tables(cls, metadata):
1860        Table(
1861            "people",
1862            metadata,
1863            Column(
1864                "id", Integer, primary_key=True, test_needs_autoincrement=True
1865            ),
1866            Column("type", String(30)),
1867        )
1868        Table(
1869            "users",
1870            metadata,
1871            Column("id", Integer, ForeignKey("people.id"), primary_key=True),
1872            Column("supervisor_id", Integer, ForeignKey("people.id")),
1873        )
1874        Table(
1875            "dudes",
1876            metadata,
1877            Column("id", Integer, ForeignKey("users.id"), primary_key=True),
1878        )
1879
1880    @classmethod
1881    def setup_classes(cls):
1882        class Person(cls.Comparable):
1883            pass
1884
1885        class User(Person):
1886            pass
1887
1888        class Dude(User):
1889            pass
1890
1891    def _roundtrip(self):
1892        User = self.classes.User
1893        sess = Session()
1894        u1 = User()
1895        u2 = User()
1896        u2.supervisor = u1
1897        sess.add_all([u1, u2])
1898        sess.commit()
1899
1900        assert u2.supervisor is u1
1901
1902    def _dude_roundtrip(self):
1903        Dude, User = self.classes.Dude, self.classes.User
1904        sess = Session()
1905        u1 = User()
1906        d1 = Dude()
1907        d1.supervisor = u1
1908        sess.add_all([u1, d1])
1909        sess.commit()
1910
1911        assert d1.supervisor is u1
1912
1913    def test_joined_to_base(self):
1914        people, users = self.tables.people, self.tables.users
1915        Person, User = self.classes.Person, self.classes.User
1916
1917        mapper(
1918            Person,
1919            people,
1920            polymorphic_on=people.c.type,
1921            polymorphic_identity="person",
1922        )
1923        mapper(
1924            User,
1925            users,
1926            inherits=Person,
1927            polymorphic_identity="user",
1928            inherit_condition=(users.c.id == people.c.id),
1929            properties={
1930                "supervisor": relationship(
1931                    Person, primaryjoin=users.c.supervisor_id == people.c.id
1932                )
1933            },
1934        )
1935
1936        assert User.supervisor.property.direction is MANYTOONE
1937        self._roundtrip()
1938
1939    def test_joined_to_same_subclass(self):
1940        people, users = self.tables.people, self.tables.users
1941        Person, User = self.classes.Person, self.classes.User
1942
1943        mapper(
1944            Person,
1945            people,
1946            polymorphic_on=people.c.type,
1947            polymorphic_identity="person",
1948        )
1949        mapper(
1950            User,
1951            users,
1952            inherits=Person,
1953            polymorphic_identity="user",
1954            inherit_condition=(users.c.id == people.c.id),
1955            properties={
1956                "supervisor": relationship(
1957                    User,
1958                    primaryjoin=users.c.supervisor_id == people.c.id,
1959                    remote_side=people.c.id,
1960                    foreign_keys=[users.c.supervisor_id],
1961                )
1962            },
1963        )
1964        assert User.supervisor.property.direction is MANYTOONE
1965        self._roundtrip()
1966
1967    def test_joined_subclass_to_superclass(self):
1968        people, users, dudes = (
1969            self.tables.people,
1970            self.tables.users,
1971            self.tables.dudes,
1972        )
1973        Person, User, Dude = (
1974            self.classes.Person,
1975            self.classes.User,
1976            self.classes.Dude,
1977        )
1978
1979        mapper(
1980            Person,
1981            people,
1982            polymorphic_on=people.c.type,
1983            polymorphic_identity="person",
1984        )
1985        mapper(
1986            User,
1987            users,
1988            inherits=Person,
1989            polymorphic_identity="user",
1990            inherit_condition=(users.c.id == people.c.id),
1991        )
1992        mapper(
1993            Dude,
1994            dudes,
1995            inherits=User,
1996            polymorphic_identity="dude",
1997            inherit_condition=(dudes.c.id == users.c.id),
1998            properties={
1999                "supervisor": relationship(
2000                    User,
2001                    primaryjoin=users.c.supervisor_id == people.c.id,
2002                    remote_side=people.c.id,
2003                    foreign_keys=[users.c.supervisor_id],
2004                )
2005            },
2006        )
2007        assert Dude.supervisor.property.direction is MANYTOONE
2008        self._dude_roundtrip()
2009
2010
2011class Ticket2419Test(fixtures.DeclarativeMappedTest):
2012    """Test [ticket:2419]'s test case."""
2013
2014    @classmethod
2015    def setup_classes(cls):
2016        Base = cls.DeclarativeBasic
2017
2018        class A(Base):
2019            __tablename__ = "a"
2020
2021            id = Column(
2022                Integer, primary_key=True, test_needs_autoincrement=True
2023            )
2024
2025        class B(Base):
2026            __tablename__ = "b"
2027
2028            id = Column(
2029                Integer, primary_key=True, test_needs_autoincrement=True
2030            )
2031            ds = relationship("D")
2032            es = relationship("E")
2033
2034        class C(A):
2035            __tablename__ = "c"
2036
2037            id = Column(Integer, ForeignKey("a.id"), primary_key=True)
2038            b_id = Column(Integer, ForeignKey("b.id"))
2039            b = relationship("B", primaryjoin=b_id == B.id)
2040
2041        class D(Base):
2042            __tablename__ = "d"
2043
2044            id = Column(
2045                Integer, primary_key=True, test_needs_autoincrement=True
2046            )
2047            b_id = Column(Integer, ForeignKey("b.id"))
2048
2049        class E(Base):
2050            __tablename__ = "e"
2051            id = Column(
2052                Integer, primary_key=True, test_needs_autoincrement=True
2053            )
2054            b_id = Column(Integer, ForeignKey("b.id"))
2055
2056    @testing.fails_on(
2057        ["oracle", "mssql"],
2058        "Oracle / SQL server engines can't handle this, "
2059        "not clear if there's an expression-level bug on our "
2060        "end though",
2061    )
2062    def test_join_w_eager_w_any(self):
2063        B, C, D = (self.classes.B, self.classes.C, self.classes.D)
2064        s = Session(testing.db)
2065
2066        b = B(ds=[D()])
2067        s.add_all([C(b=b)])
2068
2069        s.commit()
2070
2071        q = s.query(B, B.ds.any(D.id == 1)).options(joinedload("es"))
2072        q = q.join(C, C.b_id == B.id)
2073        q = q.limit(5)
2074        eq_(q.all(), [(b, True)])
2075
2076
2077class ColSubclassTest(
2078    fixtures.DeclarativeMappedTest, testing.AssertsCompiledSQL
2079):
2080    """Test [ticket:2918]'s test case."""
2081
2082    run_create_tables = run_deletes = None
2083    __dialect__ = "default"
2084
2085    @classmethod
2086    def setup_classes(cls):
2087        from sqlalchemy.schema import Column
2088
2089        Base = cls.DeclarativeBasic
2090
2091        class A(Base):
2092            __tablename__ = "a"
2093
2094            id = Column(Integer, primary_key=True)
2095
2096        class MySpecialColumn(Column):
2097            pass
2098
2099        class B(A):
2100            __tablename__ = "b"
2101
2102            id = Column(ForeignKey("a.id"), primary_key=True)
2103            x = MySpecialColumn(String)
2104
2105    def test_polymorphic_adaptation(self):
2106        A, B = self.classes.A, self.classes.B
2107
2108        s = Session()
2109        self.assert_compile(
2110            s.query(A).join(B).filter(B.x == "test"),
2111            "SELECT a.id AS a_id FROM a JOIN "
2112            "(a AS a_1 JOIN b AS b_1 ON a_1.id = b_1.id) "
2113            "ON a.id = b_1.id WHERE b_1.x = :x_1",
2114        )
2115
2116
2117class CorrelateExceptWPolyAdaptTest(
2118    fixtures.DeclarativeMappedTest, testing.AssertsCompiledSQL
2119):
2120    # test [ticket:4537]'s test case.
2121
2122    run_create_tables = run_deletes = None
2123    run_setup_classes = run_setup_mappers = run_define_tables = "each"
2124    __dialect__ = "default"
2125
2126    def _fixture(self, use_correlate_except):
2127
2128        Base = self.DeclarativeBasic
2129
2130        class Superclass(Base):
2131            __tablename__ = "s1"
2132            id = Column(Integer, primary_key=True)
2133            common_id = Column(ForeignKey("c.id"))
2134            common_relationship = relationship(
2135                "Common", uselist=False, innerjoin=True, lazy="noload"
2136            )
2137            discriminator_field = Column(String)
2138            __mapper_args__ = {
2139                "polymorphic_identity": "superclass",
2140                "polymorphic_on": discriminator_field,
2141            }
2142
2143        class Subclass(Superclass):
2144            __tablename__ = "s2"
2145            id = Column(ForeignKey("s1.id"), primary_key=True)
2146            __mapper_args__ = {"polymorphic_identity": "subclass"}
2147
2148        class Common(Base):
2149            __tablename__ = "c"
2150            id = Column(Integer, primary_key=True)
2151
2152            if use_correlate_except:
2153                num_superclass = column_property(
2154                    select([func.count(Superclass.id)])
2155                    .where(Superclass.common_id == id)
2156                    .correlate_except(Superclass)
2157                )
2158
2159        if not use_correlate_except:
2160            Common.num_superclass = column_property(
2161                select([func.count(Superclass.id)])
2162                .where(Superclass.common_id == Common.id)
2163                .correlate(Common)
2164            )
2165
2166        return Common, Superclass
2167
2168    def test_poly_query_on_correlate(self):
2169        Common, Superclass = self._fixture(False)
2170
2171        poly = with_polymorphic(Superclass, "*")
2172
2173        s = Session()
2174        q = (
2175            s.query(poly)
2176            .options(contains_eager(poly.common_relationship))
2177            .join(poly.common_relationship)
2178            .filter(Common.id == 1)
2179        )
2180
2181        # note the order of c.id, subquery changes based on if we
2182        # used correlate or correlate_except; this is only with the
2183        # patch in place.   Not sure why this happens.
2184        self.assert_compile(
2185            q,
2186            "SELECT c.id AS c_id, (SELECT count(s1.id) AS count_1 "
2187            "FROM s1 LEFT OUTER JOIN s2 ON s1.id = s2.id "
2188            "WHERE s1.common_id = c.id) AS anon_1, "
2189            "s1.id AS s1_id, "
2190            "s1.common_id AS s1_common_id, "
2191            "s1.discriminator_field AS s1_discriminator_field, "
2192            "s2.id AS s2_id FROM s1 "
2193            "LEFT OUTER JOIN s2 ON s1.id = s2.id "
2194            "JOIN c ON c.id = s1.common_id WHERE c.id = :id_1",
2195        )
2196
2197    def test_poly_query_on_correlate_except(self):
2198        Common, Superclass = self._fixture(True)
2199
2200        poly = with_polymorphic(Superclass, "*")
2201
2202        s = Session()
2203        q = (
2204            s.query(poly)
2205            .options(contains_eager(poly.common_relationship))
2206            .join(poly.common_relationship)
2207            .filter(Common.id == 1)
2208        )
2209
2210        # c.id, subquery are reversed.
2211        self.assert_compile(
2212            q,
2213            "SELECT (SELECT count(s1.id) AS count_1 "
2214            "FROM s1 LEFT OUTER JOIN s2 ON s1.id = s2.id "
2215            "WHERE s1.common_id = c.id) AS anon_1, "
2216            "c.id AS c_id, s1.id AS s1_id, "
2217            "s1.common_id AS s1_common_id, "
2218            "s1.discriminator_field AS s1_discriminator_field, "
2219            "s2.id AS s2_id FROM s1 "
2220            "LEFT OUTER JOIN s2 ON s1.id = s2.id "
2221            "JOIN c ON c.id = s1.common_id WHERE c.id = :id_1",
2222        )
2223