1from sqlalchemy import and_
2from sqlalchemy import or_
3from sqlalchemy import testing
4from sqlalchemy.orm import create_session
5from sqlalchemy.orm import with_polymorphic
6from sqlalchemy.testing import eq_
7from ._poly_fixtures import _Polymorphic
8from ._poly_fixtures import _PolymorphicAliasedJoins
9from ._poly_fixtures import _PolymorphicFixtureBase
10from ._poly_fixtures import _PolymorphicJoins
11from ._poly_fixtures import _PolymorphicPolymorphic
12from ._poly_fixtures import _PolymorphicUnions
13from ._poly_fixtures import Boss
14from ._poly_fixtures import Engineer
15from ._poly_fixtures import Manager
16from ._poly_fixtures import Person
17
18
19class _WithPolymorphicBase(_PolymorphicFixtureBase):
20    def test_join_base_to_sub(self):
21        sess = create_session()
22        pa = with_polymorphic(Person, [Engineer])
23
24        def go():
25            eq_(
26                sess.query(pa)
27                .filter(pa.Engineer.primary_language == "java")
28                .all(),
29                self._emps_wo_relationships_fixture()[0:1],
30            )
31
32        self.assert_sql_count(testing.db, go, 1)
33
34    def test_col_expression_base_plus_two_subs(self):
35        sess = create_session()
36        pa = with_polymorphic(Person, [Engineer, Manager])
37
38        eq_(
39            sess.query(
40                pa.name, pa.Engineer.primary_language, pa.Manager.manager_name
41            )
42            .filter(
43                or_(
44                    pa.Engineer.primary_language == "java",
45                    pa.Manager.manager_name == "dogbert",
46                )
47            )
48            .order_by(pa.Engineer.type)
49            .all(),
50            [("dilbert", "java", None), ("dogbert", None, "dogbert")],
51        )
52
53    def test_join_to_join_entities(self):
54        sess = create_session()
55        pa = with_polymorphic(Person, [Engineer])
56        pa_alias = with_polymorphic(Person, [Engineer], aliased=True)
57
58        eq_(
59            [
60                (p1.name, type(p1), p2.name, type(p2))
61                for (p1, p2) in sess.query(pa, pa_alias)
62                .join(
63                    pa_alias,
64                    or_(
65                        pa.Engineer.primary_language
66                        == pa_alias.Engineer.primary_language,
67                        and_(
68                            pa.Engineer.primary_language == None,  # noqa
69                            pa_alias.Engineer.primary_language == None,
70                            pa.person_id > pa_alias.person_id,
71                        ),
72                    ),
73                )
74                .order_by(pa.name, pa_alias.name)
75            ],
76            [
77                ("dilbert", Engineer, "dilbert", Engineer),
78                ("dogbert", Manager, "pointy haired boss", Boss),
79                ("vlad", Engineer, "vlad", Engineer),
80                ("wally", Engineer, "wally", Engineer),
81            ],
82        )
83
84    def test_join_to_join_columns(self):
85        sess = create_session()
86        pa = with_polymorphic(Person, [Engineer])
87        pa_alias = with_polymorphic(Person, [Engineer], aliased=True)
88
89        eq_(
90            [
91                row
92                for row in sess.query(
93                    pa.name,
94                    pa.Engineer.primary_language,
95                    pa_alias.name,
96                    pa_alias.Engineer.primary_language,
97                )
98                .join(
99                    pa_alias,
100                    or_(
101                        pa.Engineer.primary_language
102                        == pa_alias.Engineer.primary_language,
103                        and_(
104                            pa.Engineer.primary_language == None,  # noqa
105                            pa_alias.Engineer.primary_language == None,
106                            pa.person_id > pa_alias.person_id,
107                        ),
108                    ),
109                )
110                .order_by(pa.name, pa_alias.name)
111            ],
112            [
113                ("dilbert", "java", "dilbert", "java"),
114                ("dogbert", None, "pointy haired boss", None),
115                ("vlad", "cobol", "vlad", "cobol"),
116                ("wally", "c++", "wally", "c++"),
117            ],
118        )
119
120
121class PolymorphicTest(_WithPolymorphicBase, _Polymorphic):
122    pass
123
124
125class PolymorphicPolymorphicTest(
126    _WithPolymorphicBase, _PolymorphicPolymorphic
127):
128    pass
129
130
131class PolymorphicUnionsTest(_WithPolymorphicBase, _PolymorphicUnions):
132    pass
133
134
135class PolymorphicAliasedJoinsTest(
136    _WithPolymorphicBase, _PolymorphicAliasedJoins
137):
138    pass
139
140
141class PolymorphicJoinsTest(_WithPolymorphicBase, _PolymorphicJoins):
142    pass
143