1import sqlalchemy as sa
2from sqlalchemy import bindparam
3from sqlalchemy import ForeignKey
4from sqlalchemy import inspect
5from sqlalchemy import Integer
6from sqlalchemy import String
7from sqlalchemy import testing
8from sqlalchemy.orm import aliased
9from sqlalchemy.orm import clear_mappers
10from sqlalchemy.orm import create_session
11from sqlalchemy.orm import deferred
12from sqlalchemy.orm import joinedload
13from sqlalchemy.orm import mapper
14from sqlalchemy.orm import relationship
15from sqlalchemy.orm import Session
16from sqlalchemy.orm import subqueryload
17from sqlalchemy.orm import subqueryload_all
18from sqlalchemy.orm import undefer
19from sqlalchemy.orm import with_polymorphic
20from sqlalchemy.testing import assert_raises
21from sqlalchemy.testing import assert_raises_message
22from sqlalchemy.testing import eq_
23from sqlalchemy.testing import fixtures
24from sqlalchemy.testing import is_
25from sqlalchemy.testing import is_not_
26from sqlalchemy.testing import is_true
27from sqlalchemy.testing.assertsql import CompiledSQL
28from sqlalchemy.testing.entities import ComparableEntity
29from sqlalchemy.testing.schema import Column
30from sqlalchemy.testing.schema import Table
31from test.orm import _fixtures
32from .inheritance._poly_fixtures import _Polymorphic
33from .inheritance._poly_fixtures import Company
34from .inheritance._poly_fixtures import Engineer
35from .inheritance._poly_fixtures import Machine
36from .inheritance._poly_fixtures import MachineType
37from .inheritance._poly_fixtures import Page
38from .inheritance._poly_fixtures import Paperwork
39from .inheritance._poly_fixtures import Person
40
41
42class EagerTest(_fixtures.FixtureTest, testing.AssertsCompiledSQL):
43    run_inserts = "once"
44    run_deletes = None
45
46    def test_basic(self):
47        users, Address, addresses, User = (
48            self.tables.users,
49            self.classes.Address,
50            self.tables.addresses,
51            self.classes.User,
52        )
53
54        mapper(
55            User,
56            users,
57            properties={
58                "addresses": relationship(
59                    mapper(Address, addresses), order_by=Address.id
60                )
61            },
62        )
63        sess = create_session()
64
65        q = sess.query(User).options(subqueryload(User.addresses))
66
67        def go():
68            eq_(
69                [
70                    User(
71                        id=7,
72                        addresses=[
73                            Address(id=1, email_address="jack@bean.com")
74                        ],
75                    )
76                ],
77                q.filter(User.id == 7).all(),
78            )
79
80        self.assert_sql_count(testing.db, go, 2)
81
82        def go():
83            eq_(self.static.user_address_result, q.order_by(User.id).all())
84
85        self.assert_sql_count(testing.db, go, 2)
86
87    def test_from_aliased(self):
88        users, Dingaling, User, dingalings, Address, addresses = (
89            self.tables.users,
90            self.classes.Dingaling,
91            self.classes.User,
92            self.tables.dingalings,
93            self.classes.Address,
94            self.tables.addresses,
95        )
96
97        mapper(Dingaling, dingalings)
98        mapper(
99            Address,
100            addresses,
101            properties={
102                "dingalings": relationship(Dingaling, order_by=Dingaling.id)
103            },
104        )
105        mapper(
106            User,
107            users,
108            properties={
109                "addresses": relationship(Address, order_by=Address.id)
110            },
111        )
112        sess = create_session()
113
114        u = aliased(User)
115
116        q = sess.query(u).options(subqueryload(u.addresses))
117
118        def go():
119            eq_(
120                [
121                    User(
122                        id=7,
123                        addresses=[
124                            Address(id=1, email_address="jack@bean.com")
125                        ],
126                    )
127                ],
128                q.filter(u.id == 7).all(),
129            )
130
131        self.assert_sql_count(testing.db, go, 2)
132
133        def go():
134            eq_(self.static.user_address_result, q.order_by(u.id).all())
135
136        self.assert_sql_count(testing.db, go, 2)
137
138        q = sess.query(u).options(
139            subqueryload_all(u.addresses, Address.dingalings)
140        )
141
142        def go():
143            eq_(
144                [
145                    User(
146                        id=8,
147                        addresses=[
148                            Address(
149                                id=2,
150                                email_address="ed@wood.com",
151                                dingalings=[Dingaling()],
152                            ),
153                            Address(id=3, email_address="ed@bettyboop.com"),
154                            Address(id=4, email_address="ed@lala.com"),
155                        ],
156                    ),
157                    User(
158                        id=9,
159                        addresses=[Address(id=5, dingalings=[Dingaling()])],
160                    ),
161                ],
162                q.filter(u.id.in_([8, 9])).all(),
163            )
164
165        self.assert_sql_count(testing.db, go, 3)
166
167    def test_from_get(self):
168        users, Address, addresses, User = (
169            self.tables.users,
170            self.classes.Address,
171            self.tables.addresses,
172            self.classes.User,
173        )
174
175        mapper(
176            User,
177            users,
178            properties={
179                "addresses": relationship(
180                    mapper(Address, addresses), order_by=Address.id
181                )
182            },
183        )
184        sess = create_session()
185
186        q = sess.query(User).options(subqueryload(User.addresses))
187
188        def go():
189            eq_(
190                User(
191                    id=7,
192                    addresses=[Address(id=1, email_address="jack@bean.com")],
193                ),
194                q.get(7),
195            )
196
197        self.assert_sql_count(testing.db, go, 2)
198
199    def test_from_params(self):
200        users, Address, addresses, User = (
201            self.tables.users,
202            self.classes.Address,
203            self.tables.addresses,
204            self.classes.User,
205        )
206
207        mapper(
208            User,
209            users,
210            properties={
211                "addresses": relationship(
212                    mapper(Address, addresses), order_by=Address.id
213                )
214            },
215        )
216        sess = create_session()
217
218        q = sess.query(User).options(subqueryload(User.addresses))
219
220        def go():
221            eq_(
222                User(
223                    id=7,
224                    addresses=[Address(id=1, email_address="jack@bean.com")],
225                ),
226                q.filter(User.id == bindparam("foo")).params(foo=7).one(),
227            )
228
229        self.assert_sql_count(testing.db, go, 2)
230
231    def test_disable_dynamic(self):
232        """test no subquery option on a dynamic."""
233
234        users, Address, addresses, User = (
235            self.tables.users,
236            self.classes.Address,
237            self.tables.addresses,
238            self.classes.User,
239        )
240
241        mapper(
242            User,
243            users,
244            properties={"addresses": relationship(Address, lazy="dynamic")},
245        )
246        mapper(Address, addresses)
247        sess = create_session()
248
249        # previously this would not raise, but would emit
250        # the query needlessly and put the result nowhere.
251        assert_raises_message(
252            sa.exc.InvalidRequestError,
253            "User.addresses' does not support object population - eager "
254            "loading cannot be applied.",
255            sess.query(User).options(subqueryload(User.addresses)).first,
256        )
257
258    def test_many_to_many_plain(self):
259        keywords, items, item_keywords, Keyword, Item = (
260            self.tables.keywords,
261            self.tables.items,
262            self.tables.item_keywords,
263            self.classes.Keyword,
264            self.classes.Item,
265        )
266
267        mapper(Keyword, keywords)
268        mapper(
269            Item,
270            items,
271            properties=dict(
272                keywords=relationship(
273                    Keyword,
274                    secondary=item_keywords,
275                    lazy="subquery",
276                    order_by=keywords.c.id,
277                )
278            ),
279        )
280
281        q = create_session().query(Item).order_by(Item.id)
282
283        def go():
284            eq_(self.static.item_keyword_result, q.all())
285
286        self.assert_sql_count(testing.db, go, 2)
287
288    def test_many_to_many_with_join(self):
289        keywords, items, item_keywords, Keyword, Item = (
290            self.tables.keywords,
291            self.tables.items,
292            self.tables.item_keywords,
293            self.classes.Keyword,
294            self.classes.Item,
295        )
296
297        mapper(Keyword, keywords)
298        mapper(
299            Item,
300            items,
301            properties=dict(
302                keywords=relationship(
303                    Keyword,
304                    secondary=item_keywords,
305                    lazy="subquery",
306                    order_by=keywords.c.id,
307                )
308            ),
309        )
310
311        q = create_session().query(Item).order_by(Item.id)
312
313        def go():
314            eq_(
315                self.static.item_keyword_result[0:2],
316                q.join("keywords").filter(Keyword.name == "red").all(),
317            )
318
319        self.assert_sql_count(testing.db, go, 2)
320
321    def test_many_to_many_with_join_alias(self):
322        keywords, items, item_keywords, Keyword, Item = (
323            self.tables.keywords,
324            self.tables.items,
325            self.tables.item_keywords,
326            self.classes.Keyword,
327            self.classes.Item,
328        )
329
330        mapper(Keyword, keywords)
331        mapper(
332            Item,
333            items,
334            properties=dict(
335                keywords=relationship(
336                    Keyword,
337                    secondary=item_keywords,
338                    lazy="subquery",
339                    order_by=keywords.c.id,
340                )
341            ),
342        )
343
344        q = create_session().query(Item).order_by(Item.id)
345
346        def go():
347            eq_(
348                self.static.item_keyword_result[0:2],
349                (
350                    q.join("keywords", aliased=True).filter(
351                        Keyword.name == "red"
352                    )
353                ).all(),
354            )
355
356        self.assert_sql_count(testing.db, go, 2)
357
358    def test_orderby(self):
359        users, Address, addresses, User = (
360            self.tables.users,
361            self.classes.Address,
362            self.tables.addresses,
363            self.classes.User,
364        )
365
366        mapper(
367            User,
368            users,
369            properties={
370                "addresses": relationship(
371                    mapper(Address, addresses),
372                    lazy="subquery",
373                    order_by=addresses.c.email_address,
374                )
375            },
376        )
377        q = create_session().query(User)
378        eq_(
379            [
380                User(id=7, addresses=[Address(id=1)]),
381                User(
382                    id=8,
383                    addresses=[
384                        Address(id=3, email_address="ed@bettyboop.com"),
385                        Address(id=4, email_address="ed@lala.com"),
386                        Address(id=2, email_address="ed@wood.com"),
387                    ],
388                ),
389                User(id=9, addresses=[Address(id=5)]),
390                User(id=10, addresses=[]),
391            ],
392            q.order_by(User.id).all(),
393        )
394
395    def test_orderby_multi(self):
396        users, Address, addresses, User = (
397            self.tables.users,
398            self.classes.Address,
399            self.tables.addresses,
400            self.classes.User,
401        )
402
403        mapper(
404            User,
405            users,
406            properties={
407                "addresses": relationship(
408                    mapper(Address, addresses),
409                    lazy="subquery",
410                    order_by=[addresses.c.email_address, addresses.c.id],
411                )
412            },
413        )
414        q = create_session().query(User)
415        eq_(
416            [
417                User(id=7, addresses=[Address(id=1)]),
418                User(
419                    id=8,
420                    addresses=[
421                        Address(id=3, email_address="ed@bettyboop.com"),
422                        Address(id=4, email_address="ed@lala.com"),
423                        Address(id=2, email_address="ed@wood.com"),
424                    ],
425                ),
426                User(id=9, addresses=[Address(id=5)]),
427                User(id=10, addresses=[]),
428            ],
429            q.order_by(User.id).all(),
430        )
431
432    def test_orderby_related(self):
433        """A regular mapper select on a single table can
434            order by a relationship to a second table"""
435
436        Address, addresses, users, User = (
437            self.classes.Address,
438            self.tables.addresses,
439            self.tables.users,
440            self.classes.User,
441        )
442
443        mapper(Address, addresses)
444        mapper(
445            User,
446            users,
447            properties=dict(
448                addresses=relationship(
449                    Address, lazy="subquery", order_by=addresses.c.id
450                )
451            ),
452        )
453
454        q = create_session().query(User)
455        result = (
456            q.filter(User.id == Address.user_id)
457            .order_by(Address.email_address)
458            .all()
459        )
460
461        eq_(
462            [
463                User(
464                    id=8,
465                    addresses=[
466                        Address(id=2, email_address="ed@wood.com"),
467                        Address(id=3, email_address="ed@bettyboop.com"),
468                        Address(id=4, email_address="ed@lala.com"),
469                    ],
470                ),
471                User(id=9, addresses=[Address(id=5)]),
472                User(id=7, addresses=[Address(id=1)]),
473            ],
474            result,
475        )
476
477    def test_orderby_desc(self):
478        Address, addresses, users, User = (
479            self.classes.Address,
480            self.tables.addresses,
481            self.tables.users,
482            self.classes.User,
483        )
484
485        mapper(Address, addresses)
486        mapper(
487            User,
488            users,
489            properties=dict(
490                addresses=relationship(
491                    Address,
492                    lazy="subquery",
493                    order_by=[sa.desc(addresses.c.email_address)],
494                )
495            ),
496        )
497        sess = create_session()
498        eq_(
499            [
500                User(id=7, addresses=[Address(id=1)]),
501                User(
502                    id=8,
503                    addresses=[
504                        Address(id=2, email_address="ed@wood.com"),
505                        Address(id=4, email_address="ed@lala.com"),
506                        Address(id=3, email_address="ed@bettyboop.com"),
507                    ],
508                ),
509                User(id=9, addresses=[Address(id=5)]),
510                User(id=10, addresses=[]),
511            ],
512            sess.query(User).order_by(User.id).all(),
513        )
514
515    _pathing_runs = [
516        ("lazyload", "lazyload", "lazyload", 15),
517        ("subqueryload", "lazyload", "lazyload", 12),
518        ("subqueryload", "subqueryload", "lazyload", 8),
519        ("joinedload", "subqueryload", "lazyload", 7),
520        ("lazyload", "lazyload", "subqueryload", 12),
521        ("subqueryload", "subqueryload", "subqueryload", 4),
522        ("subqueryload", "subqueryload", "joinedload", 3),
523    ]
524
525    def test_options_pathing(self):
526        self._do_options_test(self._pathing_runs)
527
528    def test_mapper_pathing(self):
529        self._do_mapper_test(self._pathing_runs)
530
531    def _do_options_test(self, configs):
532        (
533            users,
534            Keyword,
535            orders,
536            items,
537            order_items,
538            Order,
539            Item,
540            User,
541            keywords,
542            item_keywords,
543        ) = (
544            self.tables.users,
545            self.classes.Keyword,
546            self.tables.orders,
547            self.tables.items,
548            self.tables.order_items,
549            self.classes.Order,
550            self.classes.Item,
551            self.classes.User,
552            self.tables.keywords,
553            self.tables.item_keywords,
554        )
555
556        mapper(
557            User,
558            users,
559            properties={
560                "orders": relationship(Order, order_by=orders.c.id)  # o2m, m2o
561            },
562        )
563        mapper(
564            Order,
565            orders,
566            properties={
567                "items": relationship(
568                    Item, secondary=order_items, order_by=items.c.id
569                )  # m2m
570            },
571        )
572        mapper(
573            Item,
574            items,
575            properties={
576                "keywords": relationship(
577                    Keyword, secondary=item_keywords, order_by=keywords.c.id
578                )  # m2m
579            },
580        )
581        mapper(Keyword, keywords)
582
583        callables = {"joinedload": joinedload, "subqueryload": subqueryload}
584
585        for o, i, k, count in configs:
586            options = []
587            if o in callables:
588                options.append(callables[o](User.orders))
589            if i in callables:
590                options.append(callables[i](User.orders, Order.items))
591            if k in callables:
592                options.append(
593                    callables[k](User.orders, Order.items, Item.keywords)
594                )
595
596            self._do_query_tests(options, count)
597
598    def _do_mapper_test(self, configs):
599        (
600            users,
601            Keyword,
602            orders,
603            items,
604            order_items,
605            Order,
606            Item,
607            User,
608            keywords,
609            item_keywords,
610        ) = (
611            self.tables.users,
612            self.classes.Keyword,
613            self.tables.orders,
614            self.tables.items,
615            self.tables.order_items,
616            self.classes.Order,
617            self.classes.Item,
618            self.classes.User,
619            self.tables.keywords,
620            self.tables.item_keywords,
621        )
622
623        opts = {
624            "lazyload": "select",
625            "joinedload": "joined",
626            "subqueryload": "subquery",
627        }
628
629        for o, i, k, count in configs:
630            mapper(
631                User,
632                users,
633                properties={
634                    "orders": relationship(
635                        Order, lazy=opts[o], order_by=orders.c.id
636                    )
637                },
638            )
639            mapper(
640                Order,
641                orders,
642                properties={
643                    "items": relationship(
644                        Item,
645                        secondary=order_items,
646                        lazy=opts[i],
647                        order_by=items.c.id,
648                    )
649                },
650            )
651            mapper(
652                Item,
653                items,
654                properties={
655                    "keywords": relationship(
656                        Keyword,
657                        lazy=opts[k],
658                        secondary=item_keywords,
659                        order_by=keywords.c.id,
660                    )
661                },
662            )
663            mapper(Keyword, keywords)
664
665            try:
666                self._do_query_tests([], count)
667            finally:
668                clear_mappers()
669
670    def _do_query_tests(self, opts, count):
671        Order, User = self.classes.Order, self.classes.User
672
673        sess = create_session()
674
675        def go():
676            eq_(
677                sess.query(User).options(*opts).order_by(User.id).all(),
678                self.static.user_item_keyword_result,
679            )
680
681        self.assert_sql_count(testing.db, go, count)
682
683        eq_(
684            sess.query(User)
685            .options(*opts)
686            .filter(User.name == "fred")
687            .order_by(User.id)
688            .all(),
689            self.static.user_item_keyword_result[2:3],
690        )
691
692        sess = create_session()
693        eq_(
694            sess.query(User)
695            .options(*opts)
696            .join(User.orders)
697            .filter(Order.id == 3)
698            .order_by(User.id)
699            .all(),
700            self.static.user_item_keyword_result[0:1],
701        )
702
703    def test_cyclical(self):
704        """A circular eager relationship breaks the cycle with a lazy loader"""
705
706        Address, addresses, users, User = (
707            self.classes.Address,
708            self.tables.addresses,
709            self.tables.users,
710            self.classes.User,
711        )
712
713        mapper(Address, addresses)
714        mapper(
715            User,
716            users,
717            properties=dict(
718                addresses=relationship(
719                    Address,
720                    lazy="subquery",
721                    backref=sa.orm.backref("user", lazy="subquery"),
722                    order_by=Address.id,
723                )
724            ),
725        )
726        is_(
727            sa.orm.class_mapper(User).get_property("addresses").lazy,
728            "subquery",
729        )
730        is_(sa.orm.class_mapper(Address).get_property("user").lazy, "subquery")
731
732        sess = create_session()
733        eq_(
734            self.static.user_address_result,
735            sess.query(User).order_by(User.id).all(),
736        )
737
738    def test_cyclical_explicit_join_depth(self):
739        """A circular eager relationship breaks the cycle with a lazy loader"""
740
741        Address, addresses, users, User = (
742            self.classes.Address,
743            self.tables.addresses,
744            self.tables.users,
745            self.classes.User,
746        )
747
748        mapper(Address, addresses)
749        mapper(
750            User,
751            users,
752            properties=dict(
753                addresses=relationship(
754                    Address,
755                    lazy="subquery",
756                    join_depth=1,
757                    backref=sa.orm.backref(
758                        "user", lazy="subquery", join_depth=1
759                    ),
760                    order_by=Address.id,
761                )
762            ),
763        )
764        is_(
765            sa.orm.class_mapper(User).get_property("addresses").lazy,
766            "subquery",
767        )
768        is_(sa.orm.class_mapper(Address).get_property("user").lazy, "subquery")
769
770        sess = create_session()
771        eq_(
772            self.static.user_address_result,
773            sess.query(User).order_by(User.id).all(),
774        )
775
776    def test_add_arbitrary_exprs(self):
777        Address, addresses, users, User = (
778            self.classes.Address,
779            self.tables.addresses,
780            self.tables.users,
781            self.classes.User,
782        )
783
784        mapper(Address, addresses)
785        mapper(
786            User,
787            users,
788            properties=dict(addresses=relationship(Address, lazy="subquery")),
789        )
790
791        sess = create_session()
792
793        self.assert_compile(
794            sess.query(User, "1"),
795            "SELECT users.id AS users_id, users.name AS users_name, "
796            "1 FROM users",
797        )
798
799    def test_double(self):
800        """Eager loading with two relationships simultaneously,
801            from the same table, using aliases."""
802
803        users, orders, User, Address, Order, addresses = (
804            self.tables.users,
805            self.tables.orders,
806            self.classes.User,
807            self.classes.Address,
808            self.classes.Order,
809            self.tables.addresses,
810        )
811
812        openorders = sa.alias(orders, "openorders")
813        closedorders = sa.alias(orders, "closedorders")
814
815        mapper(Address, addresses)
816        mapper(Order, orders)
817
818        open_mapper = mapper(Order, openorders, non_primary=True)
819        closed_mapper = mapper(Order, closedorders, non_primary=True)
820
821        mapper(
822            User,
823            users,
824            properties=dict(
825                addresses=relationship(
826                    Address, lazy="subquery", order_by=addresses.c.id
827                ),
828                open_orders=relationship(
829                    open_mapper,
830                    primaryjoin=sa.and_(
831                        openorders.c.isopen == 1,
832                        users.c.id == openorders.c.user_id,
833                    ),
834                    lazy="subquery",
835                    order_by=openorders.c.id,
836                ),
837                closed_orders=relationship(
838                    closed_mapper,
839                    primaryjoin=sa.and_(
840                        closedorders.c.isopen == 0,
841                        users.c.id == closedorders.c.user_id,
842                    ),
843                    lazy="subquery",
844                    order_by=closedorders.c.id,
845                ),
846            ),
847        )
848
849        q = create_session().query(User).order_by(User.id)
850
851        def go():
852            eq_(
853                [
854                    User(
855                        id=7,
856                        addresses=[Address(id=1)],
857                        open_orders=[Order(id=3)],
858                        closed_orders=[Order(id=1), Order(id=5)],
859                    ),
860                    User(
861                        id=8,
862                        addresses=[
863                            Address(id=2),
864                            Address(id=3),
865                            Address(id=4),
866                        ],
867                        open_orders=[],
868                        closed_orders=[],
869                    ),
870                    User(
871                        id=9,
872                        addresses=[Address(id=5)],
873                        open_orders=[Order(id=4)],
874                        closed_orders=[Order(id=2)],
875                    ),
876                    User(id=10),
877                ],
878                q.all(),
879            )
880
881        self.assert_sql_count(testing.db, go, 4)
882
883    def test_double_same_mappers(self):
884        """Eager loading with two relationships simultaneously,
885        from the same table, using aliases."""
886
887        (
888            addresses,
889            items,
890            order_items,
891            orders,
892            Item,
893            User,
894            Address,
895            Order,
896            users,
897        ) = (
898            self.tables.addresses,
899            self.tables.items,
900            self.tables.order_items,
901            self.tables.orders,
902            self.classes.Item,
903            self.classes.User,
904            self.classes.Address,
905            self.classes.Order,
906            self.tables.users,
907        )
908
909        mapper(Address, addresses)
910        mapper(
911            Order,
912            orders,
913            properties={
914                "items": relationship(
915                    Item,
916                    secondary=order_items,
917                    lazy="subquery",
918                    order_by=items.c.id,
919                )
920            },
921        )
922        mapper(Item, items)
923        mapper(
924            User,
925            users,
926            properties=dict(
927                addresses=relationship(
928                    Address, lazy="subquery", order_by=addresses.c.id
929                ),
930                open_orders=relationship(
931                    Order,
932                    primaryjoin=sa.and_(
933                        orders.c.isopen == 1, users.c.id == orders.c.user_id
934                    ),
935                    lazy="subquery",
936                    order_by=orders.c.id,
937                ),
938                closed_orders=relationship(
939                    Order,
940                    primaryjoin=sa.and_(
941                        orders.c.isopen == 0, users.c.id == orders.c.user_id
942                    ),
943                    lazy="subquery",
944                    order_by=orders.c.id,
945                ),
946            ),
947        )
948        q = create_session().query(User).order_by(User.id)
949
950        def go():
951            eq_(
952                [
953                    User(
954                        id=7,
955                        addresses=[Address(id=1)],
956                        open_orders=[
957                            Order(
958                                id=3,
959                                items=[Item(id=3), Item(id=4), Item(id=5)],
960                            )
961                        ],
962                        closed_orders=[
963                            Order(
964                                id=1,
965                                items=[Item(id=1), Item(id=2), Item(id=3)],
966                            ),
967                            Order(id=5, items=[Item(id=5)]),
968                        ],
969                    ),
970                    User(
971                        id=8,
972                        addresses=[
973                            Address(id=2),
974                            Address(id=3),
975                            Address(id=4),
976                        ],
977                        open_orders=[],
978                        closed_orders=[],
979                    ),
980                    User(
981                        id=9,
982                        addresses=[Address(id=5)],
983                        open_orders=[
984                            Order(id=4, items=[Item(id=1), Item(id=5)])
985                        ],
986                        closed_orders=[
987                            Order(
988                                id=2,
989                                items=[Item(id=1), Item(id=2), Item(id=3)],
990                            )
991                        ],
992                    ),
993                    User(id=10),
994                ],
995                q.all(),
996            )
997
998        self.assert_sql_count(testing.db, go, 6)
999
1000    def test_limit(self):
1001        """Limit operations combined with lazy-load relationships."""
1002
1003        (
1004            users,
1005            items,
1006            order_items,
1007            orders,
1008            Item,
1009            User,
1010            Address,
1011            Order,
1012            addresses,
1013        ) = (
1014            self.tables.users,
1015            self.tables.items,
1016            self.tables.order_items,
1017            self.tables.orders,
1018            self.classes.Item,
1019            self.classes.User,
1020            self.classes.Address,
1021            self.classes.Order,
1022            self.tables.addresses,
1023        )
1024
1025        mapper(Item, items)
1026        mapper(
1027            Order,
1028            orders,
1029            properties={
1030                "items": relationship(
1031                    Item,
1032                    secondary=order_items,
1033                    lazy="subquery",
1034                    order_by=items.c.id,
1035                )
1036            },
1037        )
1038        mapper(
1039            User,
1040            users,
1041            properties={
1042                "addresses": relationship(
1043                    mapper(Address, addresses),
1044                    lazy="subquery",
1045                    order_by=addresses.c.id,
1046                ),
1047                "orders": relationship(
1048                    Order, lazy="select", order_by=orders.c.id
1049                ),
1050            },
1051        )
1052
1053        sess = create_session()
1054        q = sess.query(User)
1055
1056        result = q.order_by(User.id).limit(2).offset(1).all()
1057        eq_(self.static.user_all_result[1:3], result)
1058
1059        result = q.order_by(sa.desc(User.id)).limit(2).offset(2).all()
1060        eq_(list(reversed(self.static.user_all_result[0:2])), result)
1061
1062    @testing.uses_deprecated("Mapper.order_by")
1063    def test_mapper_order_by(self):
1064        users, User, Address, addresses = (
1065            self.tables.users,
1066            self.classes.User,
1067            self.classes.Address,
1068            self.tables.addresses,
1069        )
1070
1071        mapper(Address, addresses)
1072        mapper(
1073            User,
1074            users,
1075            properties={
1076                "addresses": relationship(
1077                    Address, lazy="subquery", order_by=addresses.c.id
1078                )
1079            },
1080            order_by=users.c.id.desc(),
1081        )
1082
1083        sess = create_session()
1084        q = sess.query(User)
1085
1086        result = q.limit(2).all()
1087        eq_(result, list(reversed(self.static.user_address_result[2:4])))
1088
1089    def test_one_to_many_scalar(self):
1090        Address, addresses, users, User = (
1091            self.classes.Address,
1092            self.tables.addresses,
1093            self.tables.users,
1094            self.classes.User,
1095        )
1096
1097        mapper(
1098            User,
1099            users,
1100            properties=dict(
1101                address=relationship(
1102                    mapper(Address, addresses), lazy="subquery", uselist=False
1103                )
1104            ),
1105        )
1106        q = create_session().query(User)
1107
1108        def go():
1109            result = q.filter(users.c.id == 7).all()
1110            eq_([User(id=7, address=Address(id=1))], result)
1111
1112        self.assert_sql_count(testing.db, go, 2)
1113
1114    def test_many_to_one(self):
1115        users, Address, addresses, User = (
1116            self.tables.users,
1117            self.classes.Address,
1118            self.tables.addresses,
1119            self.classes.User,
1120        )
1121
1122        mapper(
1123            Address,
1124            addresses,
1125            properties=dict(
1126                user=relationship(mapper(User, users), lazy="subquery")
1127            ),
1128        )
1129        sess = create_session()
1130        q = sess.query(Address)
1131
1132        def go():
1133            a = q.filter(addresses.c.id == 1).one()
1134            is_not_(a.user, None)
1135            u1 = sess.query(User).get(7)
1136            is_(a.user, u1)
1137
1138        self.assert_sql_count(testing.db, go, 2)
1139
1140    def test_double_with_aggregate(self):
1141        User, users, orders, Order = (
1142            self.classes.User,
1143            self.tables.users,
1144            self.tables.orders,
1145            self.classes.Order,
1146        )
1147
1148        max_orders_by_user = sa.select(
1149            [sa.func.max(orders.c.id).label("order_id")],
1150            group_by=[orders.c.user_id],
1151        ).alias("max_orders_by_user")
1152
1153        max_orders = orders.select(
1154            orders.c.id == max_orders_by_user.c.order_id
1155        ).alias("max_orders")
1156
1157        mapper(Order, orders)
1158        mapper(
1159            User,
1160            users,
1161            properties={
1162                "orders": relationship(
1163                    Order,
1164                    backref="user",
1165                    lazy="subquery",
1166                    order_by=orders.c.id,
1167                ),
1168                "max_order": relationship(
1169                    mapper(Order, max_orders, non_primary=True),
1170                    lazy="subquery",
1171                    uselist=False,
1172                ),
1173            },
1174        )
1175
1176        q = create_session().query(User)
1177
1178        def go():
1179            eq_(
1180                [
1181                    User(
1182                        id=7,
1183                        orders=[Order(id=1), Order(id=3), Order(id=5)],
1184                        max_order=Order(id=5),
1185                    ),
1186                    User(id=8, orders=[]),
1187                    User(
1188                        id=9,
1189                        orders=[Order(id=2), Order(id=4)],
1190                        max_order=Order(id=4),
1191                    ),
1192                    User(id=10),
1193                ],
1194                q.order_by(User.id).all(),
1195            )
1196
1197        self.assert_sql_count(testing.db, go, 3)
1198
1199    def test_uselist_false_warning(self):
1200        """test that multiple rows received by a
1201        uselist=False raises a warning."""
1202
1203        User, users, orders, Order = (
1204            self.classes.User,
1205            self.tables.users,
1206            self.tables.orders,
1207            self.classes.Order,
1208        )
1209
1210        mapper(
1211            User,
1212            users,
1213            properties={"order": relationship(Order, uselist=False)},
1214        )
1215        mapper(Order, orders)
1216        s = create_session()
1217        assert_raises(
1218            sa.exc.SAWarning,
1219            s.query(User).options(subqueryload(User.order)).all,
1220        )
1221
1222
1223class LoadOnExistingTest(_fixtures.FixtureTest):
1224    """test that loaders from a base Query fully populate."""
1225
1226    run_inserts = "once"
1227    run_deletes = None
1228
1229    def _collection_to_scalar_fixture(self):
1230        User, Address, Dingaling = (
1231            self.classes.User,
1232            self.classes.Address,
1233            self.classes.Dingaling,
1234        )
1235        mapper(
1236            User,
1237            self.tables.users,
1238            properties={"addresses": relationship(Address)},
1239        )
1240        mapper(
1241            Address,
1242            self.tables.addresses,
1243            properties={"dingaling": relationship(Dingaling)},
1244        )
1245        mapper(Dingaling, self.tables.dingalings)
1246
1247        sess = Session(autoflush=False)
1248        return User, Address, Dingaling, sess
1249
1250    def _collection_to_collection_fixture(self):
1251        User, Order, Item = (
1252            self.classes.User,
1253            self.classes.Order,
1254            self.classes.Item,
1255        )
1256        mapper(
1257            User, self.tables.users, properties={"orders": relationship(Order)}
1258        )
1259        mapper(
1260            Order,
1261            self.tables.orders,
1262            properties={
1263                "items": relationship(Item, secondary=self.tables.order_items)
1264            },
1265        )
1266        mapper(Item, self.tables.items)
1267
1268        sess = Session(autoflush=False)
1269        return User, Order, Item, sess
1270
1271    def _eager_config_fixture(self):
1272        User, Address = self.classes.User, self.classes.Address
1273        mapper(
1274            User,
1275            self.tables.users,
1276            properties={"addresses": relationship(Address, lazy="subquery")},
1277        )
1278        mapper(Address, self.tables.addresses)
1279        sess = Session(autoflush=False)
1280        return User, Address, sess
1281
1282    def _deferred_config_fixture(self):
1283        User, Address = self.classes.User, self.classes.Address
1284        mapper(
1285            User,
1286            self.tables.users,
1287            properties={
1288                "name": deferred(self.tables.users.c.name),
1289                "addresses": relationship(Address, lazy="subquery"),
1290            },
1291        )
1292        mapper(Address, self.tables.addresses)
1293        sess = Session(autoflush=False)
1294        return User, Address, sess
1295
1296    def test_no_query_on_refresh(self):
1297        User, Address, sess = self._eager_config_fixture()
1298
1299        u1 = sess.query(User).get(8)
1300        assert "addresses" in u1.__dict__
1301        sess.expire(u1)
1302
1303        def go():
1304            eq_(u1.id, 8)
1305
1306        self.assert_sql_count(testing.db, go, 1)
1307        assert "addresses" not in u1.__dict__
1308
1309    def test_no_query_on_deferred(self):
1310        User, Address, sess = self._deferred_config_fixture()
1311        u1 = sess.query(User).get(8)
1312        assert "addresses" in u1.__dict__
1313        sess.expire(u1, ["addresses"])
1314
1315        def go():
1316            eq_(u1.name, "ed")
1317
1318        self.assert_sql_count(testing.db, go, 1)
1319        assert "addresses" not in u1.__dict__
1320
1321    def test_populate_existing_propagate(self):
1322        User, Address, sess = self._eager_config_fixture()
1323        u1 = sess.query(User).get(8)
1324        u1.addresses[2].email_address = "foofoo"
1325        del u1.addresses[1]
1326        u1 = sess.query(User).populate_existing().filter_by(id=8).one()
1327        # collection is reverted
1328        eq_(len(u1.addresses), 3)
1329
1330        # attributes on related items reverted
1331        eq_(u1.addresses[2].email_address, "ed@lala.com")
1332
1333    def test_loads_second_level_collection_to_scalar(self):
1334        User, Address, Dingaling, sess = self._collection_to_scalar_fixture()
1335
1336        u1 = sess.query(User).get(8)
1337        a1 = Address()
1338        u1.addresses.append(a1)
1339        a2 = u1.addresses[0]
1340        a2.email_address = "foo"
1341        sess.query(User).options(
1342            subqueryload_all("addresses.dingaling")
1343        ).filter_by(id=8).all()
1344        assert u1.addresses[-1] is a1
1345        for a in u1.addresses:
1346            if a is not a1:
1347                assert "dingaling" in a.__dict__
1348            else:
1349                assert "dingaling" not in a.__dict__
1350            if a is a2:
1351                eq_(a2.email_address, "foo")
1352
1353    def test_loads_second_level_collection_to_collection(self):
1354        User, Order, Item, sess = self._collection_to_collection_fixture()
1355
1356        u1 = sess.query(User).get(7)
1357        u1.orders
1358        o1 = Order()
1359        u1.orders.append(o1)
1360        sess.query(User).options(subqueryload_all("orders.items")).filter_by(
1361            id=7
1362        ).all()
1363        for o in u1.orders:
1364            if o is not o1:
1365                assert "items" in o.__dict__
1366            else:
1367                assert "items" not in o.__dict__
1368
1369    def test_load_two_levels_collection_to_scalar(self):
1370        User, Address, Dingaling, sess = self._collection_to_scalar_fixture()
1371
1372        u1 = (
1373            sess.query(User)
1374            .filter_by(id=8)
1375            .options(subqueryload("addresses"))
1376            .one()
1377        )
1378        sess.query(User).filter_by(id=8).options(
1379            subqueryload_all("addresses.dingaling")
1380        ).first()
1381        assert "dingaling" in u1.addresses[0].__dict__
1382
1383    def test_load_two_levels_collection_to_collection(self):
1384        User, Order, Item, sess = self._collection_to_collection_fixture()
1385
1386        u1 = (
1387            sess.query(User)
1388            .filter_by(id=7)
1389            .options(subqueryload("orders"))
1390            .one()
1391        )
1392        sess.query(User).filter_by(id=7).options(
1393            subqueryload_all("orders.items")
1394        ).first()
1395        assert "items" in u1.orders[0].__dict__
1396
1397
1398class OrderBySecondaryTest(fixtures.MappedTest):
1399    @classmethod
1400    def define_tables(cls, metadata):
1401        Table(
1402            "m2m",
1403            metadata,
1404            Column(
1405                "id", Integer, primary_key=True, test_needs_autoincrement=True
1406            ),
1407            Column("aid", Integer, ForeignKey("a.id")),
1408            Column("bid", Integer, ForeignKey("b.id")),
1409        )
1410
1411        Table(
1412            "a",
1413            metadata,
1414            Column(
1415                "id", Integer, primary_key=True, test_needs_autoincrement=True
1416            ),
1417            Column("data", String(50)),
1418        )
1419        Table(
1420            "b",
1421            metadata,
1422            Column(
1423                "id", Integer, primary_key=True, test_needs_autoincrement=True
1424            ),
1425            Column("data", String(50)),
1426        )
1427
1428    @classmethod
1429    def fixtures(cls):
1430        return dict(
1431            a=(("id", "data"), (1, "a1"), (2, "a2")),
1432            b=(("id", "data"), (1, "b1"), (2, "b2"), (3, "b3"), (4, "b4")),
1433            m2m=(
1434                ("id", "aid", "bid"),
1435                (2, 1, 1),
1436                (4, 2, 4),
1437                (1, 1, 3),
1438                (6, 2, 2),
1439                (3, 1, 2),
1440                (5, 2, 3),
1441            ),
1442        )
1443
1444    def test_ordering(self):
1445        a, m2m, b = (self.tables.a, self.tables.m2m, self.tables.b)
1446
1447        class A(fixtures.ComparableEntity):
1448            pass
1449
1450        class B(fixtures.ComparableEntity):
1451            pass
1452
1453        mapper(
1454            A,
1455            a,
1456            properties={
1457                "bs": relationship(
1458                    B, secondary=m2m, lazy="subquery", order_by=m2m.c.id
1459                )
1460            },
1461        )
1462        mapper(B, b)
1463
1464        sess = create_session()
1465
1466        def go():
1467            eq_(
1468                sess.query(A).all(),
1469                [
1470                    A(
1471                        data="a1",
1472                        bs=[B(data="b3"), B(data="b1"), B(data="b2")],
1473                    ),
1474                    A(bs=[B(data="b4"), B(data="b3"), B(data="b2")]),
1475                ],
1476            )
1477
1478        self.assert_sql_count(testing.db, go, 2)
1479
1480
1481class BaseRelationFromJoinedSubclassTest(_Polymorphic):
1482    @classmethod
1483    def define_tables(cls, metadata):
1484        people = Table(
1485            "people",
1486            metadata,
1487            Column(
1488                "person_id",
1489                Integer,
1490                primary_key=True,
1491                test_needs_autoincrement=True,
1492            ),
1493            Column("name", String(50)),
1494            Column("type", String(30)),
1495        )
1496
1497        # to test fully, PK of engineers table must be
1498        # named differently from that of people
1499        engineers = Table(
1500            "engineers",
1501            metadata,
1502            Column(
1503                "engineer_id",
1504                Integer,
1505                ForeignKey("people.person_id"),
1506                primary_key=True,
1507            ),
1508            Column("primary_language", String(50)),
1509        )
1510
1511        paperwork = Table(
1512            "paperwork",
1513            metadata,
1514            Column(
1515                "paperwork_id",
1516                Integer,
1517                primary_key=True,
1518                test_needs_autoincrement=True,
1519            ),
1520            Column("description", String(50)),
1521            Column("person_id", Integer, ForeignKey("people.person_id")),
1522        )
1523
1524        pages = Table(
1525            "pages",
1526            metadata,
1527            Column(
1528                "page_id",
1529                Integer,
1530                primary_key=True,
1531                test_needs_autoincrement=True,
1532            ),
1533            Column("stuff", String(50)),
1534            Column("paperwork_id", ForeignKey("paperwork.paperwork_id")),
1535        )
1536
1537    @classmethod
1538    def setup_mappers(cls):
1539        people = cls.tables.people
1540        engineers = cls.tables.engineers
1541        paperwork = cls.tables.paperwork
1542        pages = cls.tables.pages
1543
1544        mapper(
1545            Person,
1546            people,
1547            polymorphic_on=people.c.type,
1548            polymorphic_identity="person",
1549            properties={
1550                "paperwork": relationship(
1551                    Paperwork, order_by=paperwork.c.paperwork_id
1552                )
1553            },
1554        )
1555
1556        mapper(
1557            Engineer,
1558            engineers,
1559            inherits=Person,
1560            polymorphic_identity="engineer",
1561        )
1562
1563        mapper(
1564            Paperwork,
1565            paperwork,
1566            properties={"pages": relationship(Page, order_by=pages.c.page_id)},
1567        )
1568
1569        mapper(Page, pages)
1570
1571    @classmethod
1572    def insert_data(cls):
1573
1574        e1 = Engineer(primary_language="java")
1575        e2 = Engineer(primary_language="c++")
1576        e1.paperwork = [
1577            Paperwork(
1578                description="tps report #1",
1579                pages=[
1580                    Page(stuff="report1 page1"),
1581                    Page(stuff="report1 page2"),
1582                ],
1583            ),
1584            Paperwork(
1585                description="tps report #2",
1586                pages=[
1587                    Page(stuff="report2 page1"),
1588                    Page(stuff="report2 page2"),
1589                ],
1590            ),
1591        ]
1592        e2.paperwork = [Paperwork(description="tps report #3")]
1593        sess = create_session()
1594        sess.add_all([e1, e2])
1595        sess.flush()
1596
1597    def test_correct_subquery_nofrom(self):
1598        sess = create_session()
1599        # use Person.paperwork here just to give the least
1600        # amount of context
1601        q = (
1602            sess.query(Engineer)
1603            .filter(Engineer.primary_language == "java")
1604            .options(subqueryload(Person.paperwork))
1605        )
1606
1607        def go():
1608            eq_(
1609                q.all()[0].paperwork,
1610                [
1611                    Paperwork(description="tps report #1"),
1612                    Paperwork(description="tps report #2"),
1613                ],
1614            )
1615
1616        self.assert_sql_execution(
1617            testing.db,
1618            go,
1619            CompiledSQL(
1620                "SELECT people.person_id AS people_person_id, "
1621                "people.name AS people_name, people.type AS people_type, "
1622                "engineers.engineer_id AS engineers_engineer_id, "
1623                "engineers.primary_language AS engineers_primary_language "
1624                "FROM people JOIN engineers ON "
1625                "people.person_id = engineers.engineer_id "
1626                "WHERE engineers.primary_language = :primary_language_1",
1627                {"primary_language_1": "java"},
1628            ),
1629            # ensure we get "people JOIN engineer" here, even though
1630            # primary key "people.person_id" is against "Person"
1631            # *and* the path comes out as "Person.paperwork", still
1632            # want to select from "Engineer" entity
1633            CompiledSQL(
1634                "SELECT paperwork.paperwork_id AS paperwork_paperwork_id, "
1635                "paperwork.description AS paperwork_description, "
1636                "paperwork.person_id AS paperwork_person_id, "
1637                "anon_1.people_person_id AS anon_1_people_person_id "
1638                "FROM (SELECT people.person_id AS people_person_id "
1639                "FROM people JOIN engineers "
1640                "ON people.person_id = engineers.engineer_id "
1641                "WHERE engineers.primary_language = "
1642                ":primary_language_1) AS anon_1 "
1643                "JOIN paperwork "
1644                "ON anon_1.people_person_id = paperwork.person_id "
1645                "ORDER BY anon_1.people_person_id, paperwork.paperwork_id",
1646                {"primary_language_1": "java"},
1647            ),
1648        )
1649
1650    def test_correct_subquery_existingfrom(self):
1651        sess = create_session()
1652        # use Person.paperwork here just to give the least
1653        # amount of context
1654        q = (
1655            sess.query(Engineer)
1656            .filter(Engineer.primary_language == "java")
1657            .join(Engineer.paperwork)
1658            .filter(Paperwork.description == "tps report #2")
1659            .options(subqueryload(Person.paperwork))
1660        )
1661
1662        def go():
1663            eq_(
1664                q.one().paperwork,
1665                [
1666                    Paperwork(description="tps report #1"),
1667                    Paperwork(description="tps report #2"),
1668                ],
1669            )
1670
1671        self.assert_sql_execution(
1672            testing.db,
1673            go,
1674            CompiledSQL(
1675                "SELECT people.person_id AS people_person_id, "
1676                "people.name AS people_name, people.type AS people_type, "
1677                "engineers.engineer_id AS engineers_engineer_id, "
1678                "engineers.primary_language AS engineers_primary_language "
1679                "FROM people JOIN engineers "
1680                "ON people.person_id = engineers.engineer_id "
1681                "JOIN paperwork ON people.person_id = paperwork.person_id "
1682                "WHERE engineers.primary_language = :primary_language_1 "
1683                "AND paperwork.description = :description_1",
1684                {
1685                    "primary_language_1": "java",
1686                    "description_1": "tps report #2",
1687                },
1688            ),
1689            CompiledSQL(
1690                "SELECT paperwork.paperwork_id AS paperwork_paperwork_id, "
1691                "paperwork.description AS paperwork_description, "
1692                "paperwork.person_id AS paperwork_person_id, "
1693                "anon_1.people_person_id AS anon_1_people_person_id "
1694                "FROM (SELECT people.person_id AS people_person_id "
1695                "FROM people JOIN engineers ON people.person_id = "
1696                "engineers.engineer_id JOIN paperwork "
1697                "ON people.person_id = paperwork.person_id "
1698                "WHERE engineers.primary_language = :primary_language_1 AND "
1699                "paperwork.description = :description_1) AS anon_1 "
1700                "JOIN paperwork ON anon_1.people_person_id = "
1701                "paperwork.person_id "
1702                "ORDER BY anon_1.people_person_id, paperwork.paperwork_id",
1703                {
1704                    "primary_language_1": "java",
1705                    "description_1": "tps report #2",
1706                },
1707            ),
1708        )
1709
1710    def test_correct_subquery_multilevel(self):
1711        sess = create_session()
1712        # use Person.paperwork here just to give the least
1713        # amount of context
1714        q = (
1715            sess.query(Engineer)
1716            .filter(Engineer.primary_language == "java")
1717            .options(
1718                subqueryload(Engineer.paperwork).subqueryload(Paperwork.pages)
1719            )
1720        )
1721
1722        def go():
1723            eq_(
1724                q.one().paperwork,
1725                [
1726                    Paperwork(
1727                        description="tps report #1",
1728                        pages=[
1729                            Page(stuff="report1 page1"),
1730                            Page(stuff="report1 page2"),
1731                        ],
1732                    ),
1733                    Paperwork(
1734                        description="tps report #2",
1735                        pages=[
1736                            Page(stuff="report2 page1"),
1737                            Page(stuff="report2 page2"),
1738                        ],
1739                    ),
1740                ],
1741            )
1742
1743        self.assert_sql_execution(
1744            testing.db,
1745            go,
1746            CompiledSQL(
1747                "SELECT people.person_id AS people_person_id, "
1748                "people.name AS people_name, people.type AS people_type, "
1749                "engineers.engineer_id AS engineers_engineer_id, "
1750                "engineers.primary_language AS engineers_primary_language "
1751                "FROM people JOIN engineers "
1752                "ON people.person_id = engineers.engineer_id "
1753                "WHERE engineers.primary_language = :primary_language_1",
1754                {"primary_language_1": "java"},
1755            ),
1756            CompiledSQL(
1757                "SELECT paperwork.paperwork_id AS paperwork_paperwork_id, "
1758                "paperwork.description AS paperwork_description, "
1759                "paperwork.person_id AS paperwork_person_id, "
1760                "anon_1.people_person_id AS anon_1_people_person_id "
1761                "FROM (SELECT people.person_id AS people_person_id "
1762                "FROM people JOIN engineers "
1763                "ON people.person_id = engineers.engineer_id "
1764                "WHERE engineers.primary_language = :primary_language_1) "
1765                "AS anon_1 JOIN paperwork "
1766                "ON anon_1.people_person_id = paperwork.person_id "
1767                "ORDER BY anon_1.people_person_id, paperwork.paperwork_id",
1768                {"primary_language_1": "java"},
1769            ),
1770            CompiledSQL(
1771                "SELECT pages.page_id AS pages_page_id, "
1772                "pages.stuff AS pages_stuff, "
1773                "pages.paperwork_id AS pages_paperwork_id, "
1774                "paperwork_1.paperwork_id AS paperwork_1_paperwork_id "
1775                "FROM (SELECT people.person_id AS people_person_id "
1776                "FROM people JOIN engineers ON people.person_id = "
1777                "engineers.engineer_id "
1778                "WHERE engineers.primary_language = :primary_language_1) "
1779                "AS anon_1 JOIN paperwork AS paperwork_1 "
1780                "ON anon_1.people_person_id = paperwork_1.person_id "
1781                "JOIN pages ON paperwork_1.paperwork_id = pages.paperwork_id "
1782                "ORDER BY paperwork_1.paperwork_id, pages.page_id",
1783                {"primary_language_1": "java"},
1784            ),
1785        )
1786
1787    def test_correct_subquery_with_polymorphic_no_alias(self):
1788        # test #3106
1789        sess = create_session()
1790
1791        wp = with_polymorphic(Person, [Engineer])
1792        q = (
1793            sess.query(wp)
1794            .options(subqueryload(wp.paperwork))
1795            .order_by(Engineer.primary_language.desc())
1796        )
1797
1798        def go():
1799            eq_(
1800                q.first(),
1801                Engineer(
1802                    paperwork=[
1803                        Paperwork(description="tps report #1"),
1804                        Paperwork(description="tps report #2"),
1805                    ],
1806                    primary_language="java",
1807                ),
1808            )
1809
1810        self.assert_sql_execution(
1811            testing.db,
1812            go,
1813            CompiledSQL(
1814                "SELECT people.person_id AS people_person_id, "
1815                "people.name AS people_name, people.type AS people_type, "
1816                "engineers.engineer_id AS engineers_engineer_id, "
1817                "engineers.primary_language AS engineers_primary_language "
1818                "FROM people LEFT OUTER JOIN engineers ON people.person_id = "
1819                "engineers.engineer_id ORDER BY engineers.primary_language "
1820                "DESC LIMIT :param_1"
1821            ),
1822            CompiledSQL(
1823                "SELECT paperwork.paperwork_id AS paperwork_paperwork_id, "
1824                "paperwork.description AS paperwork_description, "
1825                "paperwork.person_id AS paperwork_person_id, "
1826                "anon_1.people_person_id AS anon_1_people_person_id FROM "
1827                "(SELECT people.person_id AS people_person_id FROM people "
1828                "LEFT OUTER JOIN engineers ON people.person_id = "
1829                "engineers.engineer_id ORDER BY engineers.primary_language "
1830                "DESC LIMIT :param_1) AS anon_1 JOIN paperwork "
1831                "ON anon_1.people_person_id = paperwork.person_id "
1832                "ORDER BY anon_1.people_person_id, paperwork.paperwork_id"
1833            ),
1834        )
1835
1836    def test_correct_subquery_with_polymorphic_alias(self):
1837        # test #3106
1838        sess = create_session()
1839
1840        wp = with_polymorphic(Person, [Engineer], aliased=True)
1841        q = (
1842            sess.query(wp)
1843            .options(subqueryload(wp.paperwork))
1844            .order_by(wp.Engineer.primary_language.desc())
1845        )
1846
1847        def go():
1848            eq_(
1849                q.first(),
1850                Engineer(
1851                    paperwork=[
1852                        Paperwork(description="tps report #1"),
1853                        Paperwork(description="tps report #2"),
1854                    ],
1855                    primary_language="java",
1856                ),
1857            )
1858
1859        self.assert_sql_execution(
1860            testing.db,
1861            go,
1862            CompiledSQL(
1863                "SELECT anon_1.people_person_id AS anon_1_people_person_id, "
1864                "anon_1.people_name AS anon_1_people_name, "
1865                "anon_1.people_type AS anon_1_people_type, "
1866                "anon_1.engineers_engineer_id AS "
1867                "anon_1_engineers_engineer_id, "
1868                "anon_1.engineers_primary_language "
1869                "AS anon_1_engineers_primary_language FROM "
1870                "(SELECT people.person_id AS people_person_id, "
1871                "people.name AS people_name, people.type AS people_type, "
1872                "engineers.engineer_id AS engineers_engineer_id, "
1873                "engineers.primary_language AS engineers_primary_language "
1874                "FROM people LEFT OUTER JOIN engineers ON people.person_id = "
1875                "engineers.engineer_id) AS anon_1 "
1876                "ORDER BY anon_1.engineers_primary_language DESC "
1877                "LIMIT :param_1"
1878            ),
1879            CompiledSQL(
1880                "SELECT paperwork.paperwork_id AS paperwork_paperwork_id, "
1881                "paperwork.description AS paperwork_description, "
1882                "paperwork.person_id AS paperwork_person_id, "
1883                "anon_1.anon_2_people_person_id AS "
1884                "anon_1_anon_2_people_person_id FROM "
1885                "(SELECT DISTINCT anon_2.people_person_id AS "
1886                "anon_2_people_person_id, "
1887                "anon_2.engineers_primary_language AS "
1888                "anon_2_engineers_primary_language FROM "
1889                "(SELECT people.person_id AS people_person_id, "
1890                "people.name AS people_name, people.type AS people_type, "
1891                "engineers.engineer_id AS engineers_engineer_id, "
1892                "engineers.primary_language AS engineers_primary_language "
1893                "FROM people LEFT OUTER JOIN engineers ON people.person_id = "
1894                "engineers.engineer_id) AS anon_2 "
1895                "ORDER BY anon_2.engineers_primary_language "
1896                "DESC LIMIT :param_1) AS anon_1 "
1897                "JOIN paperwork "
1898                "ON anon_1.anon_2_people_person_id = paperwork.person_id "
1899                "ORDER BY anon_1.anon_2_people_person_id, "
1900                "paperwork.paperwork_id"
1901            ),
1902        )
1903
1904    def test_correct_subquery_with_polymorphic_flat_alias(self):
1905        # test #3106
1906        sess = create_session()
1907
1908        wp = with_polymorphic(Person, [Engineer], aliased=True, flat=True)
1909        q = (
1910            sess.query(wp)
1911            .options(subqueryload(wp.paperwork))
1912            .order_by(wp.Engineer.primary_language.desc())
1913        )
1914
1915        def go():
1916            eq_(
1917                q.first(),
1918                Engineer(
1919                    paperwork=[
1920                        Paperwork(description="tps report #1"),
1921                        Paperwork(description="tps report #2"),
1922                    ],
1923                    primary_language="java",
1924                ),
1925            )
1926
1927        self.assert_sql_execution(
1928            testing.db,
1929            go,
1930            CompiledSQL(
1931                "SELECT people_1.person_id AS people_1_person_id, "
1932                "people_1.name AS people_1_name, "
1933                "people_1.type AS people_1_type, "
1934                "engineers_1.engineer_id AS engineers_1_engineer_id, "
1935                "engineers_1.primary_language AS engineers_1_primary_language "
1936                "FROM people AS people_1 "
1937                "LEFT OUTER JOIN engineers AS engineers_1 "
1938                "ON people_1.person_id = engineers_1.engineer_id "
1939                "ORDER BY engineers_1.primary_language DESC LIMIT :param_1"
1940            ),
1941            CompiledSQL(
1942                "SELECT paperwork.paperwork_id AS paperwork_paperwork_id, "
1943                "paperwork.description AS paperwork_description, "
1944                "paperwork.person_id AS paperwork_person_id, "
1945                "anon_1.people_1_person_id AS anon_1_people_1_person_id "
1946                "FROM (SELECT people_1.person_id AS people_1_person_id "
1947                "FROM people AS people_1 "
1948                "LEFT OUTER JOIN engineers AS engineers_1 "
1949                "ON people_1.person_id = engineers_1.engineer_id "
1950                "ORDER BY engineers_1.primary_language DESC LIMIT :param_1) "
1951                "AS anon_1 JOIN paperwork ON anon_1.people_1_person_id = "
1952                "paperwork.person_id ORDER BY anon_1.people_1_person_id, "
1953                "paperwork.paperwork_id"
1954            ),
1955        )
1956
1957
1958class SubRelationFromJoinedSubclassMultiLevelTest(_Polymorphic):
1959    @classmethod
1960    def define_tables(cls, metadata):
1961        Table(
1962            "companies",
1963            metadata,
1964            Column(
1965                "company_id",
1966                Integer,
1967                primary_key=True,
1968                test_needs_autoincrement=True,
1969            ),
1970            Column("name", String(50)),
1971        )
1972
1973        Table(
1974            "people",
1975            metadata,
1976            Column(
1977                "person_id",
1978                Integer,
1979                primary_key=True,
1980                test_needs_autoincrement=True,
1981            ),
1982            Column("company_id", ForeignKey("companies.company_id")),
1983            Column("name", String(50)),
1984            Column("type", String(30)),
1985        )
1986
1987        Table(
1988            "engineers",
1989            metadata,
1990            Column(
1991                "engineer_id", ForeignKey("people.person_id"), primary_key=True
1992            ),
1993            Column("primary_language", String(50)),
1994        )
1995
1996        Table(
1997            "machines",
1998            metadata,
1999            Column(
2000                "machine_id",
2001                Integer,
2002                primary_key=True,
2003                test_needs_autoincrement=True,
2004            ),
2005            Column("name", String(50)),
2006            Column("engineer_id", ForeignKey("engineers.engineer_id")),
2007            Column(
2008                "machine_type_id", ForeignKey("machine_type.machine_type_id")
2009            ),
2010        )
2011
2012        Table(
2013            "machine_type",
2014            metadata,
2015            Column(
2016                "machine_type_id",
2017                Integer,
2018                primary_key=True,
2019                test_needs_autoincrement=True,
2020            ),
2021            Column("name", String(50)),
2022        )
2023
2024    @classmethod
2025    def setup_mappers(cls):
2026        companies = cls.tables.companies
2027        people = cls.tables.people
2028        engineers = cls.tables.engineers
2029        machines = cls.tables.machines
2030        machine_type = cls.tables.machine_type
2031
2032        mapper(
2033            Company,
2034            companies,
2035            properties={
2036                "employees": relationship(Person, order_by=people.c.person_id)
2037            },
2038        )
2039        mapper(
2040            Person,
2041            people,
2042            polymorphic_on=people.c.type,
2043            polymorphic_identity="person",
2044            with_polymorphic="*",
2045        )
2046
2047        mapper(
2048            Engineer,
2049            engineers,
2050            inherits=Person,
2051            polymorphic_identity="engineer",
2052            properties={
2053                "machines": relationship(
2054                    Machine, order_by=machines.c.machine_id
2055                )
2056            },
2057        )
2058
2059        mapper(
2060            Machine, machines, properties={"type": relationship(MachineType)}
2061        )
2062        mapper(MachineType, machine_type)
2063
2064    @classmethod
2065    def insert_data(cls):
2066        c1 = cls._fixture()
2067        sess = create_session()
2068        sess.add(c1)
2069        sess.flush()
2070
2071    @classmethod
2072    def _fixture(cls):
2073        mt1 = MachineType(name="mt1")
2074        mt2 = MachineType(name="mt2")
2075        return Company(
2076            employees=[
2077                Engineer(
2078                    name="e1",
2079                    machines=[
2080                        Machine(name="m1", type=mt1),
2081                        Machine(name="m2", type=mt2),
2082                    ],
2083                ),
2084                Engineer(
2085                    name="e2",
2086                    machines=[
2087                        Machine(name="m3", type=mt1),
2088                        Machine(name="m4", type=mt1),
2089                    ],
2090                ),
2091            ]
2092        )
2093
2094    def test_chained_subq_subclass(self):
2095        s = Session()
2096        q = s.query(Company).options(
2097            subqueryload(Company.employees.of_type(Engineer))
2098            .subqueryload(Engineer.machines)
2099            .subqueryload(Machine.type)
2100        )
2101
2102        def go():
2103            eq_(q.all(), [self._fixture()])
2104
2105        self.assert_sql_count(testing.db, go, 4)
2106
2107
2108class SelfReferentialTest(fixtures.MappedTest):
2109    @classmethod
2110    def define_tables(cls, metadata):
2111        Table(
2112            "nodes",
2113            metadata,
2114            Column(
2115                "id", Integer, primary_key=True, test_needs_autoincrement=True
2116            ),
2117            Column("parent_id", Integer, ForeignKey("nodes.id")),
2118            Column("data", String(30)),
2119        )
2120
2121    def test_basic(self):
2122        nodes = self.tables.nodes
2123
2124        class Node(fixtures.ComparableEntity):
2125            def append(self, node):
2126                self.children.append(node)
2127
2128        mapper(
2129            Node,
2130            nodes,
2131            properties={
2132                "children": relationship(
2133                    Node, lazy="subquery", join_depth=3, order_by=nodes.c.id
2134                )
2135            },
2136        )
2137        sess = create_session()
2138        n1 = Node(data="n1")
2139        n1.append(Node(data="n11"))
2140        n1.append(Node(data="n12"))
2141        n1.append(Node(data="n13"))
2142        n1.children[1].append(Node(data="n121"))
2143        n1.children[1].append(Node(data="n122"))
2144        n1.children[1].append(Node(data="n123"))
2145        n2 = Node(data="n2")
2146        n2.append(Node(data="n21"))
2147        n2.children[0].append(Node(data="n211"))
2148        n2.children[0].append(Node(data="n212"))
2149
2150        sess.add(n1)
2151        sess.add(n2)
2152        sess.flush()
2153        sess.expunge_all()
2154
2155        def go():
2156            d = (
2157                sess.query(Node)
2158                .filter(Node.data.in_(["n1", "n2"]))
2159                .order_by(Node.data)
2160                .all()
2161            )
2162            eq_(
2163                [
2164                    Node(
2165                        data="n1",
2166                        children=[
2167                            Node(data="n11"),
2168                            Node(
2169                                data="n12",
2170                                children=[
2171                                    Node(data="n121"),
2172                                    Node(data="n122"),
2173                                    Node(data="n123"),
2174                                ],
2175                            ),
2176                            Node(data="n13"),
2177                        ],
2178                    ),
2179                    Node(
2180                        data="n2",
2181                        children=[
2182                            Node(
2183                                data="n21",
2184                                children=[
2185                                    Node(data="n211"),
2186                                    Node(data="n212"),
2187                                ],
2188                            )
2189                        ],
2190                    ),
2191                ],
2192                d,
2193            )
2194
2195        self.assert_sql_count(testing.db, go, 4)
2196
2197    def test_lazy_fallback_doesnt_affect_eager(self):
2198        nodes = self.tables.nodes
2199
2200        class Node(fixtures.ComparableEntity):
2201            def append(self, node):
2202                self.children.append(node)
2203
2204        mapper(
2205            Node,
2206            nodes,
2207            properties={
2208                "children": relationship(
2209                    Node, lazy="subquery", join_depth=1, order_by=nodes.c.id
2210                )
2211            },
2212        )
2213        sess = create_session()
2214        n1 = Node(data="n1")
2215        n1.append(Node(data="n11"))
2216        n1.append(Node(data="n12"))
2217        n1.append(Node(data="n13"))
2218        n1.children[0].append(Node(data="n111"))
2219        n1.children[0].append(Node(data="n112"))
2220        n1.children[1].append(Node(data="n121"))
2221        n1.children[1].append(Node(data="n122"))
2222        n1.children[1].append(Node(data="n123"))
2223        sess.add(n1)
2224        sess.flush()
2225        sess.expunge_all()
2226
2227        def go():
2228            allnodes = sess.query(Node).order_by(Node.data).all()
2229
2230            n11 = allnodes[1]
2231            eq_(n11.data, "n11")
2232            eq_([Node(data="n111"), Node(data="n112")], list(n11.children))
2233
2234            n12 = allnodes[4]
2235            eq_(n12.data, "n12")
2236            eq_(
2237                [Node(data="n121"), Node(data="n122"), Node(data="n123")],
2238                list(n12.children),
2239            )
2240
2241        self.assert_sql_count(testing.db, go, 2)
2242
2243    def test_with_deferred(self):
2244        nodes = self.tables.nodes
2245
2246        class Node(fixtures.ComparableEntity):
2247            def append(self, node):
2248                self.children.append(node)
2249
2250        mapper(
2251            Node,
2252            nodes,
2253            properties={
2254                "children": relationship(
2255                    Node, lazy="subquery", join_depth=3, order_by=nodes.c.id
2256                ),
2257                "data": deferred(nodes.c.data),
2258            },
2259        )
2260        sess = create_session()
2261        n1 = Node(data="n1")
2262        n1.append(Node(data="n11"))
2263        n1.append(Node(data="n12"))
2264        sess.add(n1)
2265        sess.flush()
2266        sess.expunge_all()
2267
2268        def go():
2269            eq_(
2270                Node(data="n1", children=[Node(data="n11"), Node(data="n12")]),
2271                sess.query(Node).order_by(Node.id).first(),
2272            )
2273
2274        self.assert_sql_count(testing.db, go, 6)
2275
2276        sess.expunge_all()
2277
2278        def go():
2279            eq_(
2280                Node(data="n1", children=[Node(data="n11"), Node(data="n12")]),
2281                sess.query(Node)
2282                .options(undefer("data"))
2283                .order_by(Node.id)
2284                .first(),
2285            )
2286
2287        self.assert_sql_count(testing.db, go, 5)
2288
2289        sess.expunge_all()
2290
2291        def go():
2292            eq_(
2293                Node(data="n1", children=[Node(data="n11"), Node(data="n12")]),
2294                sess.query(Node)
2295                .options(undefer("data"), undefer("children.data"))
2296                .first(),
2297            )
2298
2299        self.assert_sql_count(testing.db, go, 3)
2300
2301    def test_options(self):
2302        nodes = self.tables.nodes
2303
2304        class Node(fixtures.ComparableEntity):
2305            def append(self, node):
2306                self.children.append(node)
2307
2308        mapper(
2309            Node,
2310            nodes,
2311            properties={"children": relationship(Node, order_by=nodes.c.id)},
2312        )
2313        sess = create_session()
2314        n1 = Node(data="n1")
2315        n1.append(Node(data="n11"))
2316        n1.append(Node(data="n12"))
2317        n1.append(Node(data="n13"))
2318        n1.children[1].append(Node(data="n121"))
2319        n1.children[1].append(Node(data="n122"))
2320        n1.children[1].append(Node(data="n123"))
2321        sess.add(n1)
2322        sess.flush()
2323        sess.expunge_all()
2324
2325        def go():
2326            d = (
2327                sess.query(Node)
2328                .filter_by(data="n1")
2329                .order_by(Node.id)
2330                .options(subqueryload_all("children.children"))
2331                .first()
2332            )
2333            eq_(
2334                Node(
2335                    data="n1",
2336                    children=[
2337                        Node(data="n11"),
2338                        Node(
2339                            data="n12",
2340                            children=[
2341                                Node(data="n121"),
2342                                Node(data="n122"),
2343                                Node(data="n123"),
2344                            ],
2345                        ),
2346                        Node(data="n13"),
2347                    ],
2348                ),
2349                d,
2350            )
2351
2352        self.assert_sql_count(testing.db, go, 3)
2353
2354    def test_no_depth(self):
2355        """no join depth is set, so no eager loading occurs."""
2356
2357        nodes = self.tables.nodes
2358
2359        class Node(fixtures.ComparableEntity):
2360            def append(self, node):
2361                self.children.append(node)
2362
2363        mapper(
2364            Node,
2365            nodes,
2366            properties={"children": relationship(Node, lazy="subquery")},
2367        )
2368        sess = create_session()
2369        n1 = Node(data="n1")
2370        n1.append(Node(data="n11"))
2371        n1.append(Node(data="n12"))
2372        n1.append(Node(data="n13"))
2373        n1.children[1].append(Node(data="n121"))
2374        n1.children[1].append(Node(data="n122"))
2375        n1.children[1].append(Node(data="n123"))
2376        n2 = Node(data="n2")
2377        n2.append(Node(data="n21"))
2378        sess.add(n1)
2379        sess.add(n2)
2380        sess.flush()
2381        sess.expunge_all()
2382
2383        def go():
2384            d = (
2385                sess.query(Node)
2386                .filter(Node.data.in_(["n1", "n2"]))
2387                .order_by(Node.data)
2388                .all()
2389            )
2390            eq_(
2391                [
2392                    Node(
2393                        data="n1",
2394                        children=[
2395                            Node(data="n11"),
2396                            Node(
2397                                data="n12",
2398                                children=[
2399                                    Node(data="n121"),
2400                                    Node(data="n122"),
2401                                    Node(data="n123"),
2402                                ],
2403                            ),
2404                            Node(data="n13"),
2405                        ],
2406                    ),
2407                    Node(data="n2", children=[Node(data="n21")]),
2408                ],
2409                d,
2410            )
2411
2412        self.assert_sql_count(testing.db, go, 4)
2413
2414
2415class InheritanceToRelatedTest(fixtures.MappedTest):
2416    @classmethod
2417    def define_tables(cls, metadata):
2418        Table(
2419            "foo",
2420            metadata,
2421            Column("id", Integer, primary_key=True),
2422            Column("type", String(50)),
2423            Column("related_id", Integer, ForeignKey("related.id")),
2424        )
2425        Table(
2426            "bar",
2427            metadata,
2428            Column("id", Integer, ForeignKey("foo.id"), primary_key=True),
2429        )
2430        Table(
2431            "baz",
2432            metadata,
2433            Column("id", Integer, ForeignKey("foo.id"), primary_key=True),
2434        )
2435        Table("related", metadata, Column("id", Integer, primary_key=True))
2436
2437    @classmethod
2438    def setup_classes(cls):
2439        class Foo(cls.Comparable):
2440            pass
2441
2442        class Bar(Foo):
2443            pass
2444
2445        class Baz(Foo):
2446            pass
2447
2448        class Related(cls.Comparable):
2449            pass
2450
2451    @classmethod
2452    def fixtures(cls):
2453        return dict(
2454            foo=[
2455                ("id", "type", "related_id"),
2456                (1, "bar", 1),
2457                (2, "bar", 2),
2458                (3, "baz", 1),
2459                (4, "baz", 2),
2460            ],
2461            bar=[("id",), (1,), (2,)],
2462            baz=[("id",), (3,), (4,)],
2463            related=[("id",), (1,), (2,)],
2464        )
2465
2466    @classmethod
2467    def setup_mappers(cls):
2468        mapper(
2469            cls.classes.Foo,
2470            cls.tables.foo,
2471            properties={"related": relationship(cls.classes.Related)},
2472            polymorphic_on=cls.tables.foo.c.type,
2473        )
2474        mapper(
2475            cls.classes.Bar,
2476            cls.tables.bar,
2477            polymorphic_identity="bar",
2478            inherits=cls.classes.Foo,
2479        )
2480        mapper(
2481            cls.classes.Baz,
2482            cls.tables.baz,
2483            polymorphic_identity="baz",
2484            inherits=cls.classes.Foo,
2485        )
2486        mapper(cls.classes.Related, cls.tables.related)
2487
2488    def test_caches_query_per_base_subq(self):
2489        Foo, Bar, Baz, Related = (
2490            self.classes.Foo,
2491            self.classes.Bar,
2492            self.classes.Baz,
2493            self.classes.Related,
2494        )
2495        s = Session(testing.db)
2496
2497        def go():
2498            eq_(
2499                s.query(Foo)
2500                .with_polymorphic([Bar, Baz])
2501                .order_by(Foo.id)
2502                .options(subqueryload(Foo.related))
2503                .all(),
2504                [
2505                    Bar(id=1, related=Related(id=1)),
2506                    Bar(id=2, related=Related(id=2)),
2507                    Baz(id=3, related=Related(id=1)),
2508                    Baz(id=4, related=Related(id=2)),
2509                ],
2510            )
2511
2512        self.assert_sql_count(testing.db, go, 2)
2513
2514    def test_caches_query_per_base_joined(self):
2515        # technically this should be in test_eager_relations
2516        Foo, Bar, Baz, Related = (
2517            self.classes.Foo,
2518            self.classes.Bar,
2519            self.classes.Baz,
2520            self.classes.Related,
2521        )
2522        s = Session(testing.db)
2523
2524        def go():
2525            eq_(
2526                s.query(Foo)
2527                .with_polymorphic([Bar, Baz])
2528                .order_by(Foo.id)
2529                .options(joinedload(Foo.related))
2530                .all(),
2531                [
2532                    Bar(id=1, related=Related(id=1)),
2533                    Bar(id=2, related=Related(id=2)),
2534                    Baz(id=3, related=Related(id=1)),
2535                    Baz(id=4, related=Related(id=2)),
2536                ],
2537            )
2538
2539        self.assert_sql_count(testing.db, go, 1)
2540
2541
2542class CyclicalInheritingEagerTestOne(fixtures.MappedTest):
2543    @classmethod
2544    def define_tables(cls, metadata):
2545        Table(
2546            "t1",
2547            metadata,
2548            Column(
2549                "c1", Integer, primary_key=True, test_needs_autoincrement=True
2550            ),
2551            Column("c2", String(30)),
2552            Column("type", String(30)),
2553        )
2554
2555        Table(
2556            "t2",
2557            metadata,
2558            Column(
2559                "c1", Integer, primary_key=True, test_needs_autoincrement=True
2560            ),
2561            Column("c2", String(30)),
2562            Column("type", String(30)),
2563            Column("t1.id", Integer, ForeignKey("t1.c1")),
2564        )
2565
2566    def test_basic(self):
2567        t2, t1 = self.tables.t2, self.tables.t1
2568
2569        class T(object):
2570            pass
2571
2572        class SubT(T):
2573            pass
2574
2575        class T2(object):
2576            pass
2577
2578        class SubT2(T2):
2579            pass
2580
2581        mapper(T, t1, polymorphic_on=t1.c.type, polymorphic_identity="t1")
2582        mapper(
2583            SubT,
2584            None,
2585            inherits=T,
2586            polymorphic_identity="subt1",
2587            properties={
2588                "t2s": relationship(
2589                    SubT2,
2590                    lazy="subquery",
2591                    backref=sa.orm.backref("subt", lazy="subquery"),
2592                )
2593            },
2594        )
2595        mapper(T2, t2, polymorphic_on=t2.c.type, polymorphic_identity="t2")
2596        mapper(SubT2, None, inherits=T2, polymorphic_identity="subt2")
2597
2598        # testing a particular endless loop condition in eager load setup
2599        create_session().query(SubT).all()
2600
2601
2602class CyclicalInheritingEagerTestTwo(
2603    fixtures.DeclarativeMappedTest, testing.AssertsCompiledSQL
2604):
2605    __dialect__ = "default"
2606
2607    @classmethod
2608    def setup_classes(cls):
2609        Base = cls.DeclarativeBasic
2610
2611        class PersistentObject(Base):
2612            __tablename__ = "persistent"
2613            id = Column(
2614                Integer, primary_key=True, test_needs_autoincrement=True
2615            )
2616
2617        class Movie(PersistentObject):
2618            __tablename__ = "movie"
2619            id = Column(Integer, ForeignKey("persistent.id"), primary_key=True)
2620            director_id = Column(Integer, ForeignKey("director.id"))
2621            title = Column(String(50))
2622
2623        class Director(PersistentObject):
2624            __tablename__ = "director"
2625            id = Column(Integer, ForeignKey("persistent.id"), primary_key=True)
2626            movies = relationship("Movie", foreign_keys=Movie.director_id)
2627            name = Column(String(50))
2628
2629    def test_from_subclass(self):
2630        Director = self.classes.Director
2631
2632        s = create_session()
2633
2634        ctx = s.query(Director).options(subqueryload("*"))._compile_context()
2635
2636        q = ctx.attributes[
2637            ("subquery", (inspect(Director), inspect(Director).attrs.movies))
2638        ]
2639        self.assert_compile(
2640            q,
2641            "SELECT movie.id AS movie_id, "
2642            "persistent.id AS persistent_id, "
2643            "movie.director_id AS movie_director_id, "
2644            "movie.title AS movie_title, "
2645            "anon_1.director_id AS anon_1_director_id "
2646            "FROM (SELECT director.id AS director_id "
2647            "FROM persistent JOIN director "
2648            "ON persistent.id = director.id) AS anon_1 "
2649            "JOIN (persistent JOIN movie "
2650            "ON persistent.id = movie.id) "
2651            "ON anon_1.director_id = movie.director_id "
2652            "ORDER BY anon_1.director_id",
2653            dialect="default",
2654        )
2655
2656    def test_integrate(self):
2657        Director = self.classes.Director
2658        Movie = self.classes.Movie
2659
2660        session = Session(testing.db)
2661        rscott = Director(name="Ridley Scott")
2662        alien = Movie(title="Alien")
2663        brunner = Movie(title="Blade Runner")
2664        rscott.movies.append(brunner)
2665        rscott.movies.append(alien)
2666        session.add_all([rscott, alien, brunner])
2667        session.commit()
2668
2669        session.close_all()
2670        d = session.query(Director).options(subqueryload("*")).first()
2671        assert len(list(session)) == 3
2672
2673
2674class SubqueryloadDistinctTest(
2675    fixtures.DeclarativeMappedTest, testing.AssertsCompiledSQL
2676):
2677    __dialect__ = "default"
2678
2679    run_inserts = "once"
2680    run_deletes = None
2681
2682    @classmethod
2683    def setup_classes(cls):
2684        Base = cls.DeclarativeBasic
2685
2686        class Director(Base):
2687            __tablename__ = "director"
2688            id = Column(
2689                Integer, primary_key=True, test_needs_autoincrement=True
2690            )
2691            name = Column(String(50))
2692
2693        class DirectorPhoto(Base):
2694            __tablename__ = "director_photo"
2695            id = Column(
2696                Integer, primary_key=True, test_needs_autoincrement=True
2697            )
2698            path = Column(String(255))
2699            director_id = Column(Integer, ForeignKey("director.id"))
2700            director = relationship(Director, backref="photos")
2701
2702        class Movie(Base):
2703            __tablename__ = "movie"
2704            id = Column(
2705                Integer, primary_key=True, test_needs_autoincrement=True
2706            )
2707            director_id = Column(Integer, ForeignKey("director.id"))
2708            director = relationship(Director, backref="movies")
2709            title = Column(String(50))
2710            credits = relationship("Credit", backref="movie")
2711
2712        class Credit(Base):
2713            __tablename__ = "credit"
2714            id = Column(
2715                Integer, primary_key=True, test_needs_autoincrement=True
2716            )
2717            movie_id = Column(Integer, ForeignKey("movie.id"))
2718
2719    @classmethod
2720    def insert_data(cls):
2721        Movie = cls.classes.Movie
2722        Director = cls.classes.Director
2723        DirectorPhoto = cls.classes.DirectorPhoto
2724        Credit = cls.classes.Credit
2725
2726        d = Director(name="Woody Allen")
2727        d.photos = [DirectorPhoto(path="/1.jpg"), DirectorPhoto(path="/2.jpg")]
2728        d.movies = [
2729            Movie(title="Manhattan", credits=[Credit(), Credit()]),
2730            Movie(title="Sweet and Lowdown", credits=[Credit()]),
2731        ]
2732        sess = create_session()
2733        sess.add_all([d])
2734        sess.flush()
2735
2736    def test_distinct_strategy_opt_m2o(self):
2737        self._run_test_m2o(True, None)
2738        self._run_test_m2o(False, None)
2739
2740    def test_distinct_unrelated_opt_m2o(self):
2741        self._run_test_m2o(None, True)
2742        self._run_test_m2o(None, False)
2743
2744    def _run_test_m2o(self, director_strategy_level, photo_strategy_level):
2745
2746        # test where the innermost is m2o, e.g.
2747        # Movie->director
2748
2749        Movie = self.classes.Movie
2750        Director = self.classes.Director
2751
2752        Movie.director.property.distinct_target_key = director_strategy_level
2753        Director.photos.property.distinct_target_key = photo_strategy_level
2754
2755        # the DISTINCT is controlled by
2756        # only the Movie->director relationship, *not* the
2757        # Director.photos
2758        expect_distinct = director_strategy_level in (True, None)
2759
2760        s = create_session()
2761
2762        q = s.query(Movie).options(
2763            subqueryload(Movie.director).subqueryload(Director.photos)
2764        )
2765        ctx = q._compile_context()
2766
2767        q2 = ctx.attributes[
2768            ("subquery", (inspect(Movie), inspect(Movie).attrs.director))
2769        ]
2770        self.assert_compile(
2771            q2,
2772            "SELECT director.id AS director_id, "
2773            "director.name AS director_name, "
2774            "anon_1.movie_director_id AS anon_1_movie_director_id "
2775            "FROM (SELECT%s movie.director_id AS movie_director_id "
2776            "FROM movie) AS anon_1 "
2777            "JOIN director ON director.id = anon_1.movie_director_id "
2778            "ORDER BY anon_1.movie_director_id"
2779            % (" DISTINCT" if expect_distinct else ""),
2780        )
2781
2782        ctx2 = q2._compile_context()
2783        result = s.execute(q2)
2784        rows = result.fetchall()
2785
2786        if expect_distinct:
2787            eq_(rows, [(1, "Woody Allen", 1)])
2788        else:
2789            eq_(rows, [(1, "Woody Allen", 1), (1, "Woody Allen", 1)])
2790
2791        q3 = ctx2.attributes[
2792            ("subquery", (inspect(Director), inspect(Director).attrs.photos))
2793        ]
2794
2795        self.assert_compile(
2796            q3,
2797            "SELECT director_photo.id AS director_photo_id, "
2798            "director_photo.path AS director_photo_path, "
2799            "director_photo.director_id AS director_photo_director_id, "
2800            "director_1.id AS director_1_id "
2801            "FROM (SELECT%s movie.director_id AS movie_director_id "
2802            "FROM movie) AS anon_1 "
2803            "JOIN director AS director_1 "
2804            "ON director_1.id = anon_1.movie_director_id "
2805            "JOIN director_photo "
2806            "ON director_1.id = director_photo.director_id "
2807            "ORDER BY director_1.id"
2808            % (" DISTINCT" if expect_distinct else ""),
2809        )
2810        result = s.execute(q3)
2811        rows = result.fetchall()
2812        if expect_distinct:
2813            eq_(
2814                set(tuple(t) for t in rows),
2815                set([(1, "/1.jpg", 1, 1), (2, "/2.jpg", 1, 1)]),
2816            )
2817        else:
2818            # oracle might not order the way we expect here
2819            eq_(
2820                set(tuple(t) for t in rows),
2821                set(
2822                    [
2823                        (1, "/1.jpg", 1, 1),
2824                        (2, "/2.jpg", 1, 1),
2825                        (1, "/1.jpg", 1, 1),
2826                        (2, "/2.jpg", 1, 1),
2827                    ]
2828                ),
2829            )
2830
2831        movies = q.all()
2832
2833        # check number of persistent objects in session
2834        eq_(len(list(s)), 5)
2835
2836    def test_cant_do_distinct_in_joins(self):
2837        """the DISTINCT feature here works when the m2o is in the innermost
2838        mapper, but when we are just joining along relationships outside
2839        of that, we can still have dupes, and there's no solution to that.
2840
2841        """
2842        Movie = self.classes.Movie
2843        Credit = self.classes.Credit
2844
2845        s = create_session()
2846
2847        q = s.query(Credit).options(
2848            subqueryload(Credit.movie).subqueryload(Movie.director)
2849        )
2850
2851        ctx = q._compile_context()
2852
2853        q2 = ctx.attributes[
2854            ("subquery", (inspect(Credit), Credit.movie.property))
2855        ]
2856        ctx2 = q2._compile_context()
2857        q3 = ctx2.attributes[
2858            ("subquery", (inspect(Movie), Movie.director.property))
2859        ]
2860
2861        result = s.execute(q3)
2862        eq_(result.fetchall(), [(1, "Woody Allen", 1), (1, "Woody Allen", 1)])
2863
2864
2865class JoinedNoLoadConflictTest(fixtures.DeclarativeMappedTest):
2866    """test for [ticket:2887]"""
2867
2868    @classmethod
2869    def setup_classes(cls):
2870        Base = cls.DeclarativeBasic
2871
2872        class Parent(ComparableEntity, Base):
2873            __tablename__ = "parent"
2874
2875            id = Column(
2876                Integer, primary_key=True, test_needs_autoincrement=True
2877            )
2878            name = Column(String(20))
2879
2880            children = relationship(
2881                "Child", back_populates="parent", lazy="noload"
2882            )
2883
2884        class Child(ComparableEntity, Base):
2885            __tablename__ = "child"
2886
2887            id = Column(
2888                Integer, primary_key=True, test_needs_autoincrement=True
2889            )
2890            name = Column(String(20))
2891            parent_id = Column(Integer, ForeignKey("parent.id"))
2892
2893            parent = relationship(
2894                "Parent", back_populates="children", lazy="joined"
2895            )
2896
2897    @classmethod
2898    def insert_data(cls):
2899        Parent = cls.classes.Parent
2900        Child = cls.classes.Child
2901
2902        s = Session()
2903        s.add(Parent(name="parent", children=[Child(name="c1")]))
2904        s.commit()
2905
2906    def test_subqueryload_on_joined_noload(self):
2907        Parent = self.classes.Parent
2908        Child = self.classes.Child
2909
2910        s = Session()
2911
2912        # here we have
2913        # Parent->subqueryload->Child->joinedload->parent->noload->children.
2914        # the actual subqueryload has to emit *after* we've started populating
2915        # Parent->subqueryload->child.
2916        parent = s.query(Parent).options([subqueryload("children")]).first()
2917        eq_(parent.children, [Child(name="c1")])
2918
2919
2920class SelfRefInheritanceAliasedTest(
2921    fixtures.DeclarativeMappedTest, testing.AssertsCompiledSQL
2922):
2923    __dialect__ = "default"
2924
2925    @classmethod
2926    def setup_classes(cls):
2927        Base = cls.DeclarativeBasic
2928
2929        class Foo(Base):
2930            __tablename__ = "foo"
2931            id = Column(Integer, primary_key=True)
2932            type = Column(String(50))
2933
2934            foo_id = Column(Integer, ForeignKey("foo.id"))
2935            foo = relationship(
2936                lambda: Foo, foreign_keys=foo_id, remote_side=id
2937            )
2938
2939            __mapper_args__ = {
2940                "polymorphic_on": type,
2941                "polymorphic_identity": "foo",
2942            }
2943
2944        class Bar(Foo):
2945            __mapper_args__ = {"polymorphic_identity": "bar"}
2946
2947    @classmethod
2948    def insert_data(cls):
2949        Foo, Bar = cls.classes("Foo", "Bar")
2950
2951        session = Session()
2952        target = Bar(id=1)
2953        b1 = Bar(id=2, foo=Foo(id=3, foo=target))
2954        session.add(b1)
2955        session.commit()
2956
2957    def test_twolevel_subquery_w_polymorphic(self):
2958        Foo, Bar = self.classes("Foo", "Bar")
2959
2960        r = with_polymorphic(Foo, "*", aliased=True)
2961        attr1 = Foo.foo.of_type(r)
2962        attr2 = r.foo
2963
2964        s = Session()
2965        q = (
2966            s.query(Foo)
2967            .filter(Foo.id == 2)
2968            .options(subqueryload(attr1).subqueryload(attr2))
2969        )
2970
2971        self.assert_sql_execution(
2972            testing.db,
2973            q.all,
2974            CompiledSQL(
2975                "SELECT foo.id AS foo_id_1, foo.type AS foo_type, "
2976                "foo.foo_id AS foo_foo_id FROM foo WHERE foo.id = :id_1",
2977                [{"id_1": 2}],
2978            ),
2979            CompiledSQL(
2980                "SELECT foo_1.id AS foo_1_id, foo_1.type AS foo_1_type, "
2981                "foo_1.foo_id AS foo_1_foo_id, "
2982                "anon_1.foo_foo_id AS anon_1_foo_foo_id "
2983                "FROM (SELECT DISTINCT foo.foo_id AS foo_foo_id "
2984                "FROM foo WHERE foo.id = :id_1) AS anon_1 "
2985                "JOIN foo AS foo_1 ON foo_1.id = anon_1.foo_foo_id "
2986                "ORDER BY anon_1.foo_foo_id",
2987                {"id_1": 2},
2988            ),
2989            CompiledSQL(
2990                "SELECT foo.id AS foo_id_1, foo.type AS foo_type, "
2991                "foo.foo_id AS foo_foo_id, foo_1.foo_id AS foo_1_foo_id "
2992                "FROM (SELECT DISTINCT foo.foo_id AS foo_foo_id FROM foo "
2993                "WHERE foo.id = :id_1) AS anon_1 "
2994                "JOIN foo AS foo_1 ON foo_1.id = anon_1.foo_foo_id "
2995                "JOIN foo ON foo.id = foo_1.foo_id ORDER BY foo_1.foo_id",
2996                {"id_1": 2},
2997            ),
2998        )
2999
3000
3001class TestExistingRowPopulation(fixtures.DeclarativeMappedTest):
3002    @classmethod
3003    def setup_classes(cls):
3004        Base = cls.DeclarativeBasic
3005
3006        class A(Base):
3007            __tablename__ = "a"
3008
3009            id = Column(Integer, primary_key=True)
3010            b_id = Column(ForeignKey("b.id"))
3011            a2_id = Column(ForeignKey("a2.id"))
3012            a2 = relationship("A2")
3013            b = relationship("B")
3014
3015        class A2(Base):
3016            __tablename__ = "a2"
3017
3018            id = Column(Integer, primary_key=True)
3019            b_id = Column(ForeignKey("b.id"))
3020            b = relationship("B")
3021
3022        class B(Base):
3023            __tablename__ = "b"
3024
3025            id = Column(Integer, primary_key=True)
3026
3027            c1_m2o_id = Column(ForeignKey("c1_m2o.id"))
3028            c2_m2o_id = Column(ForeignKey("c2_m2o.id"))
3029
3030            c1_o2m = relationship("C1o2m")
3031            c2_o2m = relationship("C2o2m")
3032            c1_m2o = relationship("C1m2o")
3033            c2_m2o = relationship("C2m2o")
3034
3035        class C1o2m(Base):
3036            __tablename__ = "c1_o2m"
3037
3038            id = Column(Integer, primary_key=True)
3039            b_id = Column(ForeignKey("b.id"))
3040
3041        class C2o2m(Base):
3042            __tablename__ = "c2_o2m"
3043
3044            id = Column(Integer, primary_key=True)
3045            b_id = Column(ForeignKey("b.id"))
3046
3047        class C1m2o(Base):
3048            __tablename__ = "c1_m2o"
3049
3050            id = Column(Integer, primary_key=True)
3051
3052        class C2m2o(Base):
3053            __tablename__ = "c2_m2o"
3054
3055            id = Column(Integer, primary_key=True)
3056
3057    @classmethod
3058    def insert_data(cls):
3059        A, A2, B, C1o2m, C2o2m, C1m2o, C2m2o = cls.classes(
3060            "A", "A2", "B", "C1o2m", "C2o2m", "C1m2o", "C2m2o"
3061        )
3062
3063        s = Session()
3064
3065        b = B(
3066            c1_o2m=[C1o2m()], c2_o2m=[C2o2m()], c1_m2o=C1m2o(), c2_m2o=C2m2o()
3067        )
3068
3069        s.add(A(b=b, a2=A2(b=b)))
3070        s.commit()
3071
3072    def test_o2m(self):
3073        A, A2, B, C1o2m, C2o2m = self.classes("A", "A2", "B", "C1o2m", "C2o2m")
3074
3075        s = Session()
3076
3077        # A -J-> B -L-> C1
3078        # A -J-> B -S-> C2
3079
3080        # A -J-> A2 -J-> B -S-> C1
3081        # A -J-> A2 -J-> B -L-> C2
3082
3083        q = s.query(A).options(
3084            joinedload(A.b).subqueryload(B.c2_o2m),
3085            joinedload(A.a2).joinedload(A2.b).subqueryload(B.c1_o2m),
3086        )
3087
3088        a1 = q.all()[0]
3089
3090        is_true("c1_o2m" in a1.b.__dict__)
3091        is_true("c2_o2m" in a1.b.__dict__)
3092
3093    def test_m2o(self):
3094        A, A2, B, C1m2o, C2m2o = self.classes("A", "A2", "B", "C1m2o", "C2m2o")
3095
3096        s = Session()
3097
3098        # A -J-> B -L-> C1
3099        # A -J-> B -S-> C2
3100
3101        # A -J-> A2 -J-> B -S-> C1
3102        # A -J-> A2 -J-> B -L-> C2
3103
3104        q = s.query(A).options(
3105            joinedload(A.b).subqueryload(B.c2_m2o),
3106            joinedload(A.a2).joinedload(A2.b).subqueryload(B.c1_m2o),
3107        )
3108
3109        a1 = q.all()[0]
3110        is_true("c1_m2o" in a1.b.__dict__)
3111        is_true("c2_m2o" in a1.b.__dict__)
3112