1import sqlalchemy as sa
2from sqlalchemy import bindparam
3from sqlalchemy import ForeignKey
4from sqlalchemy import ForeignKeyConstraint
5from sqlalchemy import Integer
6from sqlalchemy import select
7from sqlalchemy import String
8from sqlalchemy import testing
9from sqlalchemy.orm import aliased
10from sqlalchemy.orm import clear_mappers
11from sqlalchemy.orm import create_session
12from sqlalchemy.orm import defaultload
13from sqlalchemy.orm import defer
14from sqlalchemy.orm import deferred
15from sqlalchemy.orm import joinedload
16from sqlalchemy.orm import mapper
17from sqlalchemy.orm import relationship
18from sqlalchemy.orm import selectinload
19from sqlalchemy.orm import Session
20from sqlalchemy.orm import subqueryload
21from sqlalchemy.orm import undefer
22from sqlalchemy.orm import with_polymorphic
23from sqlalchemy.testing import assert_raises
24from sqlalchemy.testing import assert_raises_message
25from sqlalchemy.testing import eq_
26from sqlalchemy.testing import fixtures
27from sqlalchemy.testing import is_
28from sqlalchemy.testing import is_not
29from sqlalchemy.testing import is_true
30from sqlalchemy.testing import mock
31from sqlalchemy.testing.assertsql import AllOf
32from sqlalchemy.testing.assertsql import assert_engine
33from sqlalchemy.testing.assertsql import CompiledSQL
34from sqlalchemy.testing.schema import Column
35from sqlalchemy.testing.schema import Table
36from test.orm import _fixtures
37from .inheritance._poly_fixtures import _Polymorphic
38from .inheritance._poly_fixtures import Company
39from .inheritance._poly_fixtures import Engineer
40from .inheritance._poly_fixtures import Machine
41from .inheritance._poly_fixtures import MachineType
42from .inheritance._poly_fixtures import Paperwork
43from .inheritance._poly_fixtures import Person
44
45
46class EagerTest(_fixtures.FixtureTest, testing.AssertsCompiledSQL):
47    run_inserts = "once"
48    run_deletes = None
49
50    def test_basic(self):
51        users, Address, addresses, User = (
52            self.tables.users,
53            self.classes.Address,
54            self.tables.addresses,
55            self.classes.User,
56        )
57
58        mapper(
59            User,
60            users,
61            properties={
62                "addresses": relationship(
63                    mapper(Address, addresses), order_by=Address.id
64                )
65            },
66        )
67        sess = create_session()
68
69        q = sess.query(User).options(selectinload(User.addresses))
70
71        def go():
72            eq_(
73                [
74                    User(
75                        id=7,
76                        addresses=[
77                            Address(id=1, email_address="jack@bean.com")
78                        ],
79                    )
80                ],
81                q.filter(User.id == 7).all(),
82            )
83
84        self.assert_sql_count(testing.db, go, 2)
85
86        def go():
87            eq_(self.static.user_address_result, q.order_by(User.id).all())
88
89        self.assert_sql_count(testing.db, go, 2)
90
91    def test_from_aliased(self):
92        users, Dingaling, User, dingalings, Address, addresses = (
93            self.tables.users,
94            self.classes.Dingaling,
95            self.classes.User,
96            self.tables.dingalings,
97            self.classes.Address,
98            self.tables.addresses,
99        )
100
101        mapper(Dingaling, dingalings)
102        mapper(
103            Address,
104            addresses,
105            properties={
106                "dingalings": relationship(Dingaling, order_by=Dingaling.id)
107            },
108        )
109        mapper(
110            User,
111            users,
112            properties={
113                "addresses": relationship(Address, order_by=Address.id)
114            },
115        )
116        sess = create_session()
117
118        u = aliased(User)
119
120        q = sess.query(u).options(selectinload(u.addresses))
121
122        def go():
123            eq_(
124                [
125                    User(
126                        id=7,
127                        addresses=[
128                            Address(id=1, email_address="jack@bean.com")
129                        ],
130                    )
131                ],
132                q.filter(u.id == 7).all(),
133            )
134
135        self.assert_sql_count(testing.db, go, 2)
136
137        def go():
138            eq_(self.static.user_address_result, q.order_by(u.id).all())
139
140        self.assert_sql_count(testing.db, go, 2)
141
142        q = sess.query(u).options(
143            selectinload(u.addresses).selectinload(Address.dingalings)
144        )
145
146        def go():
147            eq_(
148                [
149                    User(
150                        id=8,
151                        addresses=[
152                            Address(
153                                id=2,
154                                email_address="ed@wood.com",
155                                dingalings=[Dingaling()],
156                            ),
157                            Address(id=3, email_address="ed@bettyboop.com"),
158                            Address(id=4, email_address="ed@lala.com"),
159                        ],
160                    ),
161                    User(
162                        id=9,
163                        addresses=[Address(id=5, dingalings=[Dingaling()])],
164                    ),
165                ],
166                q.filter(u.id.in_([8, 9])).all(),
167            )
168
169        self.assert_sql_count(testing.db, go, 3)
170
171    def test_from_get(self):
172        users, Address, addresses, User = (
173            self.tables.users,
174            self.classes.Address,
175            self.tables.addresses,
176            self.classes.User,
177        )
178
179        mapper(
180            User,
181            users,
182            properties={
183                "addresses": relationship(
184                    mapper(Address, addresses), order_by=Address.id
185                )
186            },
187        )
188        sess = create_session()
189
190        q = sess.query(User).options(selectinload(User.addresses))
191
192        def go():
193            eq_(
194                User(
195                    id=7,
196                    addresses=[Address(id=1, email_address="jack@bean.com")],
197                ),
198                q.get(7),
199            )
200
201        self.assert_sql_count(testing.db, go, 2)
202
203    def test_from_params(self):
204        users, Address, addresses, User = (
205            self.tables.users,
206            self.classes.Address,
207            self.tables.addresses,
208            self.classes.User,
209        )
210
211        mapper(
212            User,
213            users,
214            properties={
215                "addresses": relationship(
216                    mapper(Address, addresses), order_by=Address.id
217                )
218            },
219        )
220        sess = create_session()
221
222        q = sess.query(User).options(selectinload(User.addresses))
223
224        def go():
225            eq_(
226                User(
227                    id=7,
228                    addresses=[Address(id=1, email_address="jack@bean.com")],
229                ),
230                q.filter(User.id == bindparam("foo")).params(foo=7).one(),
231            )
232
233        self.assert_sql_count(testing.db, go, 2)
234
235    def test_disable_dynamic(self):
236        """test no selectin option on a dynamic."""
237
238        users, Address, addresses, User = (
239            self.tables.users,
240            self.classes.Address,
241            self.tables.addresses,
242            self.classes.User,
243        )
244
245        mapper(
246            User,
247            users,
248            properties={"addresses": relationship(Address, lazy="dynamic")},
249        )
250        mapper(Address, addresses)
251        sess = create_session()
252
253        # previously this would not raise, but would emit
254        # the query needlessly and put the result nowhere.
255        assert_raises_message(
256            sa.exc.InvalidRequestError,
257            "User.addresses' does not support object population - eager "
258            "loading cannot be applied.",
259            sess.query(User).options(selectinload(User.addresses)).first,
260        )
261
262    def test_many_to_many_plain(self):
263        keywords, items, item_keywords, Keyword, Item = (
264            self.tables.keywords,
265            self.tables.items,
266            self.tables.item_keywords,
267            self.classes.Keyword,
268            self.classes.Item,
269        )
270
271        mapper(Keyword, keywords)
272        mapper(
273            Item,
274            items,
275            properties=dict(
276                keywords=relationship(
277                    Keyword,
278                    secondary=item_keywords,
279                    lazy="selectin",
280                    order_by=keywords.c.id,
281                )
282            ),
283        )
284
285        q = create_session().query(Item).order_by(Item.id)
286
287        def go():
288            eq_(self.static.item_keyword_result, q.all())
289
290        self.assert_sql_count(testing.db, go, 2)
291
292    def test_many_to_many_with_join(self):
293        keywords, items, item_keywords, Keyword, Item = (
294            self.tables.keywords,
295            self.tables.items,
296            self.tables.item_keywords,
297            self.classes.Keyword,
298            self.classes.Item,
299        )
300
301        mapper(Keyword, keywords)
302        mapper(
303            Item,
304            items,
305            properties=dict(
306                keywords=relationship(
307                    Keyword,
308                    secondary=item_keywords,
309                    lazy="selectin",
310                    order_by=keywords.c.id,
311                )
312            ),
313        )
314
315        q = create_session().query(Item).order_by(Item.id)
316
317        def go():
318            eq_(
319                self.static.item_keyword_result[0:2],
320                q.join("keywords").filter(Keyword.name == "red").all(),
321            )
322
323        self.assert_sql_count(testing.db, go, 2)
324
325    def test_many_to_many_with_join_alias(self):
326        keywords, items, item_keywords, Keyword, Item = (
327            self.tables.keywords,
328            self.tables.items,
329            self.tables.item_keywords,
330            self.classes.Keyword,
331            self.classes.Item,
332        )
333
334        mapper(Keyword, keywords)
335        mapper(
336            Item,
337            items,
338            properties=dict(
339                keywords=relationship(
340                    Keyword,
341                    secondary=item_keywords,
342                    lazy="selectin",
343                    order_by=keywords.c.id,
344                )
345            ),
346        )
347
348        q = create_session().query(Item).order_by(Item.id)
349
350        def go():
351            eq_(
352                self.static.item_keyword_result[0:2],
353                (
354                    q.join("keywords", aliased=True).filter(
355                        Keyword.name == "red"
356                    )
357                ).all(),
358            )
359
360        self.assert_sql_count(testing.db, go, 2)
361
362    def test_orderby(self):
363        users, Address, addresses, User = (
364            self.tables.users,
365            self.classes.Address,
366            self.tables.addresses,
367            self.classes.User,
368        )
369
370        mapper(
371            User,
372            users,
373            properties={
374                "addresses": relationship(
375                    mapper(Address, addresses),
376                    lazy="selectin",
377                    order_by=addresses.c.email_address,
378                )
379            },
380        )
381        q = create_session().query(User)
382        eq_(
383            [
384                User(id=7, addresses=[Address(id=1)]),
385                User(
386                    id=8,
387                    addresses=[
388                        Address(id=3, email_address="ed@bettyboop.com"),
389                        Address(id=4, email_address="ed@lala.com"),
390                        Address(id=2, email_address="ed@wood.com"),
391                    ],
392                ),
393                User(id=9, addresses=[Address(id=5)]),
394                User(id=10, addresses=[]),
395            ],
396            q.order_by(User.id).all(),
397        )
398
399    def test_orderby_multi(self):
400        users, Address, addresses, User = (
401            self.tables.users,
402            self.classes.Address,
403            self.tables.addresses,
404            self.classes.User,
405        )
406
407        mapper(
408            User,
409            users,
410            properties={
411                "addresses": relationship(
412                    mapper(Address, addresses),
413                    lazy="selectin",
414                    order_by=[addresses.c.email_address, addresses.c.id],
415                )
416            },
417        )
418        q = create_session().query(User)
419        eq_(
420            [
421                User(id=7, addresses=[Address(id=1)]),
422                User(
423                    id=8,
424                    addresses=[
425                        Address(id=3, email_address="ed@bettyboop.com"),
426                        Address(id=4, email_address="ed@lala.com"),
427                        Address(id=2, email_address="ed@wood.com"),
428                    ],
429                ),
430                User(id=9, addresses=[Address(id=5)]),
431                User(id=10, addresses=[]),
432            ],
433            q.order_by(User.id).all(),
434        )
435
436    def test_orderby_related(self):
437        """A regular mapper select on a single table can
438        order by a relationship to a second table"""
439
440        Address, addresses, users, User = (
441            self.classes.Address,
442            self.tables.addresses,
443            self.tables.users,
444            self.classes.User,
445        )
446
447        mapper(Address, addresses)
448        mapper(
449            User,
450            users,
451            properties=dict(
452                addresses=relationship(
453                    Address, lazy="selectin", order_by=addresses.c.id
454                )
455            ),
456        )
457
458        q = create_session().query(User)
459        result = (
460            q.filter(User.id == Address.user_id)
461            .order_by(Address.email_address)
462            .all()
463        )
464
465        eq_(
466            [
467                User(
468                    id=8,
469                    addresses=[
470                        Address(id=2, email_address="ed@wood.com"),
471                        Address(id=3, email_address="ed@bettyboop.com"),
472                        Address(id=4, email_address="ed@lala.com"),
473                    ],
474                ),
475                User(id=9, addresses=[Address(id=5)]),
476                User(id=7, addresses=[Address(id=1)]),
477            ],
478            result,
479        )
480
481    def test_orderby_desc(self):
482        Address, addresses, users, User = (
483            self.classes.Address,
484            self.tables.addresses,
485            self.tables.users,
486            self.classes.User,
487        )
488
489        mapper(Address, addresses)
490        mapper(
491            User,
492            users,
493            properties=dict(
494                addresses=relationship(
495                    Address,
496                    lazy="selectin",
497                    order_by=[sa.desc(addresses.c.email_address)],
498                )
499            ),
500        )
501        sess = create_session()
502        eq_(
503            [
504                User(id=7, addresses=[Address(id=1)]),
505                User(
506                    id=8,
507                    addresses=[
508                        Address(id=2, email_address="ed@wood.com"),
509                        Address(id=4, email_address="ed@lala.com"),
510                        Address(id=3, email_address="ed@bettyboop.com"),
511                    ],
512                ),
513                User(id=9, addresses=[Address(id=5)]),
514                User(id=10, addresses=[]),
515            ],
516            sess.query(User).order_by(User.id).all(),
517        )
518
519    _pathing_runs = [
520        ("lazyload", "lazyload", "lazyload", 15),
521        ("selectinload", "lazyload", "lazyload", 12),
522        ("selectinload", "selectinload", "lazyload", 8),
523        ("joinedload", "selectinload", "lazyload", 7),
524        ("lazyload", "lazyload", "selectinload", 12),
525        ("selectinload", "selectinload", "selectinload", 4),
526        ("selectinload", "selectinload", "joinedload", 3),
527    ]
528
529    def test_options_pathing(self):
530        self._do_options_test(self._pathing_runs)
531
532    def test_mapper_pathing(self):
533        self._do_mapper_test(self._pathing_runs)
534
535    def _do_options_test(self, configs):
536        (
537            users,
538            Keyword,
539            orders,
540            items,
541            order_items,
542            Order,
543            Item,
544            User,
545            keywords,
546            item_keywords,
547        ) = (
548            self.tables.users,
549            self.classes.Keyword,
550            self.tables.orders,
551            self.tables.items,
552            self.tables.order_items,
553            self.classes.Order,
554            self.classes.Item,
555            self.classes.User,
556            self.tables.keywords,
557            self.tables.item_keywords,
558        )
559
560        mapper(
561            User,
562            users,
563            properties={
564                "orders": relationship(Order, order_by=orders.c.id)  # o2m, m2o
565            },
566        )
567        mapper(
568            Order,
569            orders,
570            properties={
571                "items": relationship(
572                    Item, secondary=order_items, order_by=items.c.id
573                )  # m2m
574            },
575        )
576        mapper(
577            Item,
578            items,
579            properties={
580                "keywords": relationship(
581                    Keyword, secondary=item_keywords, order_by=keywords.c.id
582                )  # m2m
583            },
584        )
585        mapper(Keyword, keywords)
586
587        callables = {
588            "joinedload": joinedload,
589            "selectinload": selectinload,
590            "subqueryload": subqueryload,
591        }
592
593        for o, i, k, count in configs:
594            options = []
595            if o in callables:
596                options.append(callables[o](User.orders))
597            if i in callables:
598                options.append(callables[i](User.orders, Order.items))
599            if k in callables:
600                options.append(
601                    callables[k](User.orders, Order.items, Item.keywords)
602                )
603
604            self._do_query_tests(options, count)
605
606    def _do_mapper_test(self, configs):
607        (
608            users,
609            Keyword,
610            orders,
611            items,
612            order_items,
613            Order,
614            Item,
615            User,
616            keywords,
617            item_keywords,
618        ) = (
619            self.tables.users,
620            self.classes.Keyword,
621            self.tables.orders,
622            self.tables.items,
623            self.tables.order_items,
624            self.classes.Order,
625            self.classes.Item,
626            self.classes.User,
627            self.tables.keywords,
628            self.tables.item_keywords,
629        )
630
631        opts = {
632            "lazyload": "select",
633            "joinedload": "joined",
634            "selectinload": "selectin",
635        }
636
637        for o, i, k, count in configs:
638            mapper(
639                User,
640                users,
641                properties={
642                    "orders": relationship(
643                        Order, lazy=opts[o], order_by=orders.c.id
644                    )
645                },
646            )
647            mapper(
648                Order,
649                orders,
650                properties={
651                    "items": relationship(
652                        Item,
653                        secondary=order_items,
654                        lazy=opts[i],
655                        order_by=items.c.id,
656                    )
657                },
658            )
659            mapper(
660                Item,
661                items,
662                properties={
663                    "keywords": relationship(
664                        Keyword,
665                        lazy=opts[k],
666                        secondary=item_keywords,
667                        order_by=keywords.c.id,
668                    )
669                },
670            )
671            mapper(Keyword, keywords)
672
673            try:
674                self._do_query_tests([], count)
675            finally:
676                clear_mappers()
677
678    def _do_query_tests(self, opts, count):
679        Order, User = self.classes.Order, self.classes.User
680
681        sess = create_session()
682
683        def go():
684            eq_(
685                sess.query(User).options(*opts).order_by(User.id).all(),
686                self.static.user_item_keyword_result,
687            )
688
689        self.assert_sql_count(testing.db, go, count)
690
691        eq_(
692            sess.query(User)
693            .options(*opts)
694            .filter(User.name == "fred")
695            .order_by(User.id)
696            .all(),
697            self.static.user_item_keyword_result[2:3],
698        )
699
700        sess = create_session()
701        eq_(
702            sess.query(User)
703            .options(*opts)
704            .join(User.orders)
705            .filter(Order.id == 3)
706            .order_by(User.id)
707            .all(),
708            self.static.user_item_keyword_result[0:1],
709        )
710
711    def test_cyclical(self):
712        """A circular eager relationship breaks the cycle with a lazy loader"""
713
714        Address, addresses, users, User = (
715            self.classes.Address,
716            self.tables.addresses,
717            self.tables.users,
718            self.classes.User,
719        )
720
721        mapper(Address, addresses)
722        mapper(
723            User,
724            users,
725            properties=dict(
726                addresses=relationship(
727                    Address,
728                    lazy="selectin",
729                    backref=sa.orm.backref("user", lazy="selectin"),
730                    order_by=Address.id,
731                )
732            ),
733        )
734        is_(
735            sa.orm.class_mapper(User).get_property("addresses").lazy,
736            "selectin",
737        )
738        is_(sa.orm.class_mapper(Address).get_property("user").lazy, "selectin")
739
740        sess = create_session()
741        eq_(
742            self.static.user_address_result,
743            sess.query(User).order_by(User.id).all(),
744        )
745
746    def test_cyclical_explicit_join_depth(self):
747        """A circular eager relationship breaks the cycle with a lazy loader"""
748
749        Address, addresses, users, User = (
750            self.classes.Address,
751            self.tables.addresses,
752            self.tables.users,
753            self.classes.User,
754        )
755
756        mapper(Address, addresses)
757        mapper(
758            User,
759            users,
760            properties=dict(
761                addresses=relationship(
762                    Address,
763                    lazy="selectin",
764                    join_depth=1,
765                    backref=sa.orm.backref(
766                        "user", lazy="selectin", join_depth=1
767                    ),
768                    order_by=Address.id,
769                )
770            ),
771        )
772        is_(
773            sa.orm.class_mapper(User).get_property("addresses").lazy,
774            "selectin",
775        )
776        is_(sa.orm.class_mapper(Address).get_property("user").lazy, "selectin")
777
778        sess = create_session()
779        eq_(
780            self.static.user_address_result,
781            sess.query(User).order_by(User.id).all(),
782        )
783
784    def test_double_w_ac_against_subquery(self):
785
786        (
787            users,
788            orders,
789            User,
790            Address,
791            Order,
792            addresses,
793            Item,
794            items,
795            order_items,
796        ) = (
797            self.tables.users,
798            self.tables.orders,
799            self.classes.User,
800            self.classes.Address,
801            self.classes.Order,
802            self.tables.addresses,
803            self.classes.Item,
804            self.tables.items,
805            self.tables.order_items,
806        )
807
808        mapper(Address, addresses)
809        mapper(
810            Order,
811            orders,
812            properties={
813                "items": relationship(
814                    Item,
815                    secondary=order_items,
816                    lazy="selectin",
817                    order_by=items.c.id,
818                )
819            },
820        )
821        mapper(Item, items)
822
823        open_mapper = aliased(
824            Order, select([orders]).where(orders.c.isopen == 1).alias()
825        )
826        closed_mapper = aliased(
827            Order, select([orders]).where(orders.c.isopen == 0).alias()
828        )
829
830        mapper(
831            User,
832            users,
833            properties=dict(
834                addresses=relationship(
835                    Address, lazy="selectin", order_by=addresses.c.id
836                ),
837                open_orders=relationship(
838                    open_mapper, lazy="selectin", order_by=open_mapper.id
839                ),
840                closed_orders=relationship(
841                    closed_mapper, lazy="selectin", order_by=closed_mapper.id
842                ),
843            ),
844        )
845
846        self._run_double_test()
847
848    def test_double_w_ac(self):
849
850        (
851            users,
852            orders,
853            User,
854            Address,
855            Order,
856            addresses,
857            Item,
858            items,
859            order_items,
860        ) = (
861            self.tables.users,
862            self.tables.orders,
863            self.classes.User,
864            self.classes.Address,
865            self.classes.Order,
866            self.tables.addresses,
867            self.classes.Item,
868            self.tables.items,
869            self.tables.order_items,
870        )
871
872        mapper(Address, addresses)
873        mapper(
874            Order,
875            orders,
876            properties={
877                "items": relationship(
878                    Item,
879                    secondary=order_items,
880                    lazy="selectin",
881                    order_by=items.c.id,
882                )
883            },
884        )
885        mapper(Item, items)
886
887        open_mapper = aliased(Order, orders)
888        closed_mapper = aliased(Order, orders)
889
890        mapper(
891            User,
892            users,
893            properties=dict(
894                addresses=relationship(
895                    Address, lazy="selectin", order_by=addresses.c.id
896                ),
897                open_orders=relationship(
898                    open_mapper,
899                    primaryjoin=sa.and_(
900                        open_mapper.isopen == 1,
901                        users.c.id == open_mapper.user_id,
902                    ),
903                    lazy="selectin",
904                    order_by=open_mapper.id,
905                ),
906                closed_orders=relationship(
907                    closed_mapper,
908                    primaryjoin=sa.and_(
909                        closed_mapper.isopen == 0,
910                        users.c.id == closed_mapper.user_id,
911                    ),
912                    lazy="selectin",
913                    order_by=closed_mapper.id,
914                ),
915            ),
916        )
917
918        self._run_double_test()
919
920    def test_double_same_mappers(self):
921        """Eager loading with two relationships simultaneously,
922        from the same table, using aliases."""
923
924        (
925            addresses,
926            items,
927            order_items,
928            orders,
929            Item,
930            User,
931            Address,
932            Order,
933            users,
934        ) = (
935            self.tables.addresses,
936            self.tables.items,
937            self.tables.order_items,
938            self.tables.orders,
939            self.classes.Item,
940            self.classes.User,
941            self.classes.Address,
942            self.classes.Order,
943            self.tables.users,
944        )
945
946        mapper(Address, addresses)
947        mapper(
948            Order,
949            orders,
950            properties={
951                "items": relationship(
952                    Item,
953                    secondary=order_items,
954                    lazy="selectin",
955                    order_by=items.c.id,
956                )
957            },
958        )
959        mapper(Item, items)
960        mapper(
961            User,
962            users,
963            properties=dict(
964                addresses=relationship(
965                    Address, lazy="selectin", order_by=addresses.c.id
966                ),
967                open_orders=relationship(
968                    Order,
969                    primaryjoin=sa.and_(
970                        orders.c.isopen == 1, users.c.id == orders.c.user_id
971                    ),
972                    lazy="selectin",
973                    order_by=orders.c.id,
974                ),
975                closed_orders=relationship(
976                    Order,
977                    primaryjoin=sa.and_(
978                        orders.c.isopen == 0, users.c.id == orders.c.user_id
979                    ),
980                    lazy="selectin",
981                    order_by=orders.c.id,
982                ),
983            ),
984        )
985
986        self._run_double_test()
987
988    def _run_double_test(self, no_items=False):
989        User, Address, Order, Item = self.classes(
990            "User", "Address", "Order", "Item"
991        )
992        q = create_session().query(User).order_by(User.id)
993
994        def items(*ids):
995            if no_items:
996                return {}
997            else:
998                return {"items": [Item(id=id_) for id_ in ids]}
999
1000        def go():
1001            eq_(
1002                [
1003                    User(
1004                        id=7,
1005                        addresses=[Address(id=1)],
1006                        open_orders=[Order(id=3, **items(3, 4, 5))],
1007                        closed_orders=[
1008                            Order(id=1, **items(1, 2, 3)),
1009                            Order(id=5, **items(5)),
1010                        ],
1011                    ),
1012                    User(
1013                        id=8,
1014                        addresses=[
1015                            Address(id=2),
1016                            Address(id=3),
1017                            Address(id=4),
1018                        ],
1019                        open_orders=[],
1020                        closed_orders=[],
1021                    ),
1022                    User(
1023                        id=9,
1024                        addresses=[Address(id=5)],
1025                        open_orders=[Order(id=4, **items(1, 5))],
1026                        closed_orders=[Order(id=2, **items(1, 2, 3))],
1027                    ),
1028                    User(id=10),
1029                ],
1030                q.all(),
1031            )
1032
1033        if no_items:
1034            self.assert_sql_count(testing.db, go, 4)
1035        else:
1036            self.assert_sql_count(testing.db, go, 6)
1037
1038    def test_limit(self):
1039        """Limit operations combined with lazy-load relationships."""
1040
1041        (
1042            users,
1043            items,
1044            order_items,
1045            orders,
1046            Item,
1047            User,
1048            Address,
1049            Order,
1050            addresses,
1051        ) = (
1052            self.tables.users,
1053            self.tables.items,
1054            self.tables.order_items,
1055            self.tables.orders,
1056            self.classes.Item,
1057            self.classes.User,
1058            self.classes.Address,
1059            self.classes.Order,
1060            self.tables.addresses,
1061        )
1062
1063        mapper(Item, items)
1064        mapper(
1065            Order,
1066            orders,
1067            properties={
1068                "items": relationship(
1069                    Item,
1070                    secondary=order_items,
1071                    lazy="selectin",
1072                    order_by=items.c.id,
1073                )
1074            },
1075        )
1076        mapper(
1077            User,
1078            users,
1079            properties={
1080                "addresses": relationship(
1081                    mapper(Address, addresses),
1082                    lazy="selectin",
1083                    order_by=addresses.c.id,
1084                ),
1085                "orders": relationship(
1086                    Order, lazy="select", order_by=orders.c.id
1087                ),
1088            },
1089        )
1090
1091        sess = create_session()
1092        q = sess.query(User)
1093
1094        result = q.order_by(User.id).limit(2).offset(1).all()
1095        eq_(self.static.user_all_result[1:3], result)
1096
1097        result = q.order_by(sa.desc(User.id)).limit(2).offset(2).all()
1098        eq_(list(reversed(self.static.user_all_result[0:2])), result)
1099
1100    def test_one_to_many_scalar(self):
1101        Address, addresses, users, User = (
1102            self.classes.Address,
1103            self.tables.addresses,
1104            self.tables.users,
1105            self.classes.User,
1106        )
1107
1108        mapper(
1109            User,
1110            users,
1111            properties=dict(
1112                address=relationship(
1113                    mapper(Address, addresses), lazy="selectin", uselist=False
1114                )
1115            ),
1116        )
1117        q = create_session().query(User)
1118
1119        def go():
1120            result = q.filter(users.c.id == 7).all()
1121            eq_([User(id=7, address=Address(id=1))], result)
1122
1123        self.assert_sql_count(testing.db, go, 2)
1124
1125    def test_one_to_many_scalar_none(self):
1126        Address, addresses, users, User = (
1127            self.classes.Address,
1128            self.tables.addresses,
1129            self.tables.users,
1130            self.classes.User,
1131        )
1132
1133        mapper(
1134            User,
1135            users,
1136            properties=dict(
1137                address=relationship(
1138                    mapper(Address, addresses), lazy="selectin", uselist=False
1139                )
1140            ),
1141        )
1142        q = create_session().query(User)
1143
1144        def go():
1145            result = q.filter(users.c.id == 10).all()
1146            eq_([User(id=10, address=None)], result)
1147
1148        self.assert_sql_count(testing.db, go, 2)
1149
1150    def test_many_to_one(self):
1151        users, Address, addresses, User = (
1152            self.tables.users,
1153            self.classes.Address,
1154            self.tables.addresses,
1155            self.classes.User,
1156        )
1157
1158        mapper(
1159            Address,
1160            addresses,
1161            properties=dict(
1162                user=relationship(mapper(User, users), lazy="selectin")
1163            ),
1164        )
1165        sess = create_session()
1166        q = sess.query(Address)
1167
1168        def go():
1169            a = q.filter(addresses.c.id == 1).one()
1170            is_not(a.user, None)
1171            u1 = sess.query(User).get(7)
1172            is_(a.user, u1)
1173
1174        self.assert_sql_count(testing.db, go, 2)
1175
1176    def test_m2o_none_value_present(self):
1177        orders, Order, addresses, Address = (
1178            self.tables.orders,
1179            self.classes.Order,
1180            self.tables.addresses,
1181            self.classes.Address,
1182        )
1183
1184        mapper(
1185            Order,
1186            orders,
1187            properties={"address": relationship(Address, lazy="selectin")},
1188        )
1189        mapper(Address, addresses)
1190
1191        sess = create_session()
1192        q = sess.query(Order).filter(Order.id.in_([4, 5])).order_by(Order.id)
1193
1194        o4, o5 = q.all()
1195        assert o4.__dict__["address"] is not None
1196        assert o5.__dict__["address"] is None
1197
1198        # test overwrite
1199
1200        o5.address = Address()
1201        sess.query(Order).filter(Order.id.in_([4, 5])).order_by(Order.id).all()
1202        assert o5.__dict__["address"] is not None
1203
1204        o5.address = Address()
1205        sess.query(Order).populate_existing().filter(
1206            Order.id.in_([4, 5])
1207        ).order_by(Order.id).all()
1208        assert o5.__dict__["address"] is None
1209
1210    def test_m2o_uselist_none_value_present(self):
1211        orders, Order, addresses, Address = (
1212            self.tables.orders,
1213            self.classes.Order,
1214            self.tables.addresses,
1215            self.classes.Address,
1216        )
1217
1218        mapper(
1219            Order,
1220            orders,
1221            properties={
1222                "address": relationship(Address, lazy="selectin", uselist=True)
1223            },
1224        )
1225        mapper(Address, addresses)
1226
1227        sess = create_session()
1228        q = sess.query(Order).filter(Order.id.in_([4, 5])).order_by(Order.id)
1229
1230        o4, o5 = q.all()
1231        assert len(o4.__dict__["address"])
1232        eq_(o5.__dict__["address"], [])
1233
1234    def test_o2m_empty_list_present(self):
1235        Address, addresses, users, User = (
1236            self.classes.Address,
1237            self.tables.addresses,
1238            self.tables.users,
1239            self.classes.User,
1240        )
1241
1242        mapper(
1243            User,
1244            users,
1245            properties=dict(
1246                addresses=relationship(
1247                    mapper(Address, addresses), lazy="selectin"
1248                )
1249            ),
1250        )
1251        q = create_session().query(User)
1252        result = q.filter(users.c.id == 10).all()
1253        u1 = result[0]
1254
1255        eq_(u1.__dict__["addresses"], [])
1256
1257    def test_double_with_aggregate(self):
1258        User, users, orders, Order = (
1259            self.classes.User,
1260            self.tables.users,
1261            self.tables.orders,
1262            self.classes.Order,
1263        )
1264
1265        max_orders_by_user = sa.select(
1266            [sa.func.max(orders.c.id).label("order_id")],
1267            group_by=[orders.c.user_id],
1268        ).alias("max_orders_by_user")
1269
1270        max_orders = orders.select(
1271            orders.c.id == max_orders_by_user.c.order_id
1272        ).alias("max_orders")
1273
1274        mapper(Order, orders)
1275        mapper(
1276            User,
1277            users,
1278            properties={
1279                "orders": relationship(
1280                    Order,
1281                    backref="user",
1282                    lazy="selectin",
1283                    order_by=orders.c.id,
1284                ),
1285                "max_order": relationship(
1286                    aliased(Order, max_orders), lazy="selectin", uselist=False
1287                ),
1288            },
1289        )
1290
1291        q = create_session().query(User)
1292
1293        def go():
1294            eq_(
1295                [
1296                    User(
1297                        id=7,
1298                        orders=[Order(id=1), Order(id=3), Order(id=5)],
1299                        max_order=Order(id=5),
1300                    ),
1301                    User(id=8, orders=[]),
1302                    User(
1303                        id=9,
1304                        orders=[Order(id=2), Order(id=4)],
1305                        max_order=Order(id=4),
1306                    ),
1307                    User(id=10),
1308                ],
1309                q.order_by(User.id).all(),
1310            )
1311
1312        self.assert_sql_count(testing.db, go, 3)
1313
1314    def test_uselist_false_warning(self):
1315        """test that multiple rows received by a
1316        uselist=False raises a warning."""
1317
1318        User, users, orders, Order = (
1319            self.classes.User,
1320            self.tables.users,
1321            self.tables.orders,
1322            self.classes.Order,
1323        )
1324
1325        mapper(
1326            User,
1327            users,
1328            properties={"order": relationship(Order, uselist=False)},
1329        )
1330        mapper(Order, orders)
1331        s = create_session()
1332        assert_raises(
1333            sa.exc.SAWarning,
1334            s.query(User).options(selectinload(User.order)).all,
1335        )
1336
1337
1338class LoadOnExistingTest(_fixtures.FixtureTest):
1339    """test that loaders from a base Query fully populate."""
1340
1341    run_inserts = "once"
1342    run_deletes = None
1343
1344    def _collection_to_scalar_fixture(self):
1345        User, Address, Dingaling = (
1346            self.classes.User,
1347            self.classes.Address,
1348            self.classes.Dingaling,
1349        )
1350        mapper(
1351            User,
1352            self.tables.users,
1353            properties={"addresses": relationship(Address)},
1354        )
1355        mapper(
1356            Address,
1357            self.tables.addresses,
1358            properties={"dingaling": relationship(Dingaling)},
1359        )
1360        mapper(Dingaling, self.tables.dingalings)
1361
1362        sess = Session(autoflush=False)
1363        return User, Address, Dingaling, sess
1364
1365    def _collection_to_collection_fixture(self):
1366        User, Order, Item = (
1367            self.classes.User,
1368            self.classes.Order,
1369            self.classes.Item,
1370        )
1371        mapper(
1372            User, self.tables.users, properties={"orders": relationship(Order)}
1373        )
1374        mapper(
1375            Order,
1376            self.tables.orders,
1377            properties={
1378                "items": relationship(Item, secondary=self.tables.order_items)
1379            },
1380        )
1381        mapper(Item, self.tables.items)
1382
1383        sess = Session(autoflush=False)
1384        return User, Order, Item, sess
1385
1386    def _eager_config_fixture(self):
1387        User, Address = self.classes.User, self.classes.Address
1388        mapper(
1389            User,
1390            self.tables.users,
1391            properties={"addresses": relationship(Address, lazy="selectin")},
1392        )
1393        mapper(Address, self.tables.addresses)
1394        sess = Session(autoflush=False)
1395        return User, Address, sess
1396
1397    def _deferred_config_fixture(self):
1398        User, Address = self.classes.User, self.classes.Address
1399        mapper(
1400            User,
1401            self.tables.users,
1402            properties={
1403                "name": deferred(self.tables.users.c.name),
1404                "addresses": relationship(Address, lazy="selectin"),
1405            },
1406        )
1407        mapper(Address, self.tables.addresses)
1408        sess = Session(autoflush=False)
1409        return User, Address, sess
1410
1411    def test_no_query_on_refresh(self):
1412        User, Address, sess = self._eager_config_fixture()
1413
1414        u1 = sess.query(User).get(8)
1415        assert "addresses" in u1.__dict__
1416        sess.expire(u1)
1417
1418        def go():
1419            eq_(u1.id, 8)
1420
1421        self.assert_sql_count(testing.db, go, 1)
1422        assert "addresses" not in u1.__dict__
1423
1424    def test_no_query_on_deferred(self):
1425        User, Address, sess = self._deferred_config_fixture()
1426        u1 = sess.query(User).get(8)
1427        assert "addresses" in u1.__dict__
1428        sess.expire(u1, ["addresses"])
1429
1430        def go():
1431            eq_(u1.name, "ed")
1432
1433        self.assert_sql_count(testing.db, go, 1)
1434        assert "addresses" not in u1.__dict__
1435
1436    def test_populate_existing_propagate(self):
1437        User, Address, sess = self._eager_config_fixture()
1438        u1 = sess.query(User).get(8)
1439        u1.addresses[2].email_address = "foofoo"
1440        del u1.addresses[1]
1441        u1 = sess.query(User).populate_existing().filter_by(id=8).one()
1442        # collection is reverted
1443        eq_(len(u1.addresses), 3)
1444
1445        # attributes on related items reverted
1446        eq_(u1.addresses[2].email_address, "ed@lala.com")
1447
1448    def test_loads_second_level_collection_to_scalar(self):
1449        User, Address, Dingaling, sess = self._collection_to_scalar_fixture()
1450
1451        u1 = sess.query(User).get(8)
1452        a1 = Address()
1453        u1.addresses.append(a1)
1454        a2 = u1.addresses[0]
1455        a2.email_address = "foo"
1456        sess.query(User).options(
1457            selectinload("addresses").selectinload("dingaling")
1458        ).filter_by(id=8).all()
1459        assert u1.addresses[-1] is a1
1460        for a in u1.addresses:
1461            if a is not a1:
1462                assert "dingaling" in a.__dict__
1463            else:
1464                assert "dingaling" not in a.__dict__
1465            if a is a2:
1466                eq_(a2.email_address, "foo")
1467
1468    def test_loads_second_level_collection_to_collection(self):
1469        User, Order, Item, sess = self._collection_to_collection_fixture()
1470
1471        u1 = sess.query(User).get(7)
1472        u1.orders
1473        o1 = Order()
1474        u1.orders.append(o1)
1475        sess.query(User).options(
1476            selectinload("orders").selectinload("items")
1477        ).filter_by(id=7).all()
1478        for o in u1.orders:
1479            if o is not o1:
1480                assert "items" in o.__dict__
1481            else:
1482                assert "items" not in o.__dict__
1483
1484    def test_load_two_levels_collection_to_scalar(self):
1485        User, Address, Dingaling, sess = self._collection_to_scalar_fixture()
1486
1487        u1 = (
1488            sess.query(User)
1489            .filter_by(id=8)
1490            .options(selectinload("addresses"))
1491            .one()
1492        )
1493        sess.query(User).filter_by(id=8).options(
1494            selectinload("addresses").selectinload("dingaling")
1495        ).first()
1496        assert "dingaling" in u1.addresses[0].__dict__
1497
1498    def test_load_two_levels_collection_to_collection(self):
1499        User, Order, Item, sess = self._collection_to_collection_fixture()
1500
1501        u1 = (
1502            sess.query(User)
1503            .filter_by(id=7)
1504            .options(selectinload("orders"))
1505            .one()
1506        )
1507        sess.query(User).filter_by(id=7).options(
1508            selectinload("orders").selectinload("items")
1509        ).first()
1510        assert "items" in u1.orders[0].__dict__
1511
1512
1513class OrderBySecondaryTest(fixtures.MappedTest):
1514    @classmethod
1515    def define_tables(cls, metadata):
1516        Table(
1517            "m2m",
1518            metadata,
1519            Column(
1520                "id", Integer, primary_key=True, test_needs_autoincrement=True
1521            ),
1522            Column("aid", Integer, ForeignKey("a.id")),
1523            Column("bid", Integer, ForeignKey("b.id")),
1524        )
1525
1526        Table(
1527            "a",
1528            metadata,
1529            Column(
1530                "id", Integer, primary_key=True, test_needs_autoincrement=True
1531            ),
1532            Column("data", String(50)),
1533        )
1534        Table(
1535            "b",
1536            metadata,
1537            Column(
1538                "id", Integer, primary_key=True, test_needs_autoincrement=True
1539            ),
1540            Column("data", String(50)),
1541        )
1542
1543    @classmethod
1544    def fixtures(cls):
1545        return dict(
1546            a=(("id", "data"), (1, "a1"), (2, "a2")),
1547            b=(("id", "data"), (1, "b1"), (2, "b2"), (3, "b3"), (4, "b4")),
1548            m2m=(
1549                ("id", "aid", "bid"),
1550                (2, 1, 1),
1551                (4, 2, 4),
1552                (1, 1, 3),
1553                (6, 2, 2),
1554                (3, 1, 2),
1555                (5, 2, 3),
1556            ),
1557        )
1558
1559    def test_ordering(self):
1560        a, m2m, b = (self.tables.a, self.tables.m2m, self.tables.b)
1561
1562        class A(fixtures.ComparableEntity):
1563            pass
1564
1565        class B(fixtures.ComparableEntity):
1566            pass
1567
1568        mapper(
1569            A,
1570            a,
1571            properties={
1572                "bs": relationship(
1573                    B, secondary=m2m, lazy="selectin", order_by=m2m.c.id
1574                )
1575            },
1576        )
1577        mapper(B, b)
1578
1579        sess = create_session()
1580
1581        def go():
1582            eq_(
1583                sess.query(A).all(),
1584                [
1585                    A(
1586                        data="a1",
1587                        bs=[B(data="b3"), B(data="b1"), B(data="b2")],
1588                    ),
1589                    A(bs=[B(data="b4"), B(data="b3"), B(data="b2")]),
1590                ],
1591            )
1592
1593        self.assert_sql_count(testing.db, go, 2)
1594
1595
1596class BaseRelationFromJoinedSubclassTest(_Polymorphic):
1597    """Like most tests here, this is adapted from subquery_relations
1598    as part of general inheritance testing.
1599
1600    The subquery test exercised the issue that the subquery load must
1601    imitate the original query very closely so that filter criteria, ordering
1602    etc. can be maintained with the original query embedded.   However,
1603    for selectin loading, none of that is really needed, so here the secondary
1604    queries are all just a simple "people JOIN paperwork".
1605
1606    """
1607
1608    @classmethod
1609    def define_tables(cls, metadata):
1610        Table(
1611            "people",
1612            metadata,
1613            Column(
1614                "person_id",
1615                Integer,
1616                primary_key=True,
1617                test_needs_autoincrement=True,
1618            ),
1619            Column("name", String(50)),
1620            Column("type", String(30)),
1621        )
1622
1623        # to test fully, PK of engineers table must be
1624        # named differently from that of people
1625        Table(
1626            "engineers",
1627            metadata,
1628            Column(
1629                "engineer_id",
1630                Integer,
1631                ForeignKey("people.person_id"),
1632                primary_key=True,
1633            ),
1634            Column("primary_language", String(50)),
1635        )
1636
1637        Table(
1638            "paperwork",
1639            metadata,
1640            Column(
1641                "paperwork_id",
1642                Integer,
1643                primary_key=True,
1644                test_needs_autoincrement=True,
1645            ),
1646            Column("description", String(50)),
1647            Column("person_id", Integer, ForeignKey("people.person_id")),
1648        )
1649
1650    @classmethod
1651    def setup_mappers(cls):
1652        people = cls.tables.people
1653        engineers = cls.tables.engineers
1654        paperwork = cls.tables.paperwork
1655
1656        mapper(
1657            Person,
1658            people,
1659            polymorphic_on=people.c.type,
1660            polymorphic_identity="person",
1661            properties={
1662                "paperwork": relationship(
1663                    Paperwork, order_by=paperwork.c.paperwork_id
1664                )
1665            },
1666        )
1667
1668        mapper(
1669            Engineer,
1670            engineers,
1671            inherits=Person,
1672            polymorphic_identity="engineer",
1673        )
1674
1675        mapper(Paperwork, paperwork)
1676
1677    @classmethod
1678    def insert_data(cls, connection):
1679
1680        e1 = Engineer(primary_language="java")
1681        e2 = Engineer(primary_language="c++")
1682        e1.paperwork = [
1683            Paperwork(description="tps report #1"),
1684            Paperwork(description="tps report #2"),
1685        ]
1686        e2.paperwork = [Paperwork(description="tps report #3")]
1687        sess = create_session(connection)
1688        sess.add_all([e1, e2])
1689        sess.flush()
1690
1691    def test_correct_select_nofrom(self):
1692        sess = create_session()
1693        # use Person.paperwork here just to give the least
1694        # amount of context
1695        q = (
1696            sess.query(Engineer)
1697            .filter(Engineer.primary_language == "java")
1698            .options(selectinload(Person.paperwork))
1699        )
1700
1701        def go():
1702            eq_(
1703                q.all()[0].paperwork,
1704                [
1705                    Paperwork(description="tps report #1"),
1706                    Paperwork(description="tps report #2"),
1707                ],
1708            )
1709
1710        self.assert_sql_execution(
1711            testing.db,
1712            go,
1713            CompiledSQL(
1714                "SELECT people.person_id AS people_person_id, "
1715                "people.name AS people_name, people.type AS people_type, "
1716                "engineers.engineer_id AS engineers_engineer_id, "
1717                "engineers.primary_language AS engineers_primary_language "
1718                "FROM people JOIN engineers ON "
1719                "people.person_id = engineers.engineer_id "
1720                "WHERE engineers.primary_language = :primary_language_1",
1721                {"primary_language_1": "java"},
1722            ),
1723            CompiledSQL(
1724                "SELECT paperwork.person_id AS paperwork_person_id, "
1725                "paperwork.paperwork_id AS paperwork_paperwork_id, "
1726                "paperwork.description AS paperwork_description "
1727                "FROM paperwork WHERE paperwork.person_id "
1728                "IN ([EXPANDING_primary_keys]) "
1729                "ORDER BY paperwork.paperwork_id",
1730                [{"primary_keys": [1]}],
1731            ),
1732        )
1733
1734    def test_correct_select_existingfrom(self):
1735        sess = create_session()
1736        # use Person.paperwork here just to give the least
1737        # amount of context
1738        q = (
1739            sess.query(Engineer)
1740            .filter(Engineer.primary_language == "java")
1741            .join(Engineer.paperwork)
1742            .filter(Paperwork.description == "tps report #2")
1743            .options(selectinload(Person.paperwork))
1744        )
1745
1746        def go():
1747            eq_(
1748                q.one().paperwork,
1749                [
1750                    Paperwork(description="tps report #1"),
1751                    Paperwork(description="tps report #2"),
1752                ],
1753            )
1754
1755        self.assert_sql_execution(
1756            testing.db,
1757            go,
1758            CompiledSQL(
1759                "SELECT people.person_id AS people_person_id, "
1760                "people.name AS people_name, people.type AS people_type, "
1761                "engineers.engineer_id AS engineers_engineer_id, "
1762                "engineers.primary_language AS engineers_primary_language "
1763                "FROM people JOIN engineers "
1764                "ON people.person_id = engineers.engineer_id "
1765                "JOIN paperwork ON people.person_id = paperwork.person_id "
1766                "WHERE engineers.primary_language = :primary_language_1 "
1767                "AND paperwork.description = :description_1",
1768                {
1769                    "primary_language_1": "java",
1770                    "description_1": "tps report #2",
1771                },
1772            ),
1773            CompiledSQL(
1774                "SELECT paperwork.person_id AS paperwork_person_id, "
1775                "paperwork.paperwork_id AS paperwork_paperwork_id, "
1776                "paperwork.description AS paperwork_description "
1777                "FROM paperwork WHERE paperwork.person_id "
1778                "IN ([EXPANDING_primary_keys]) "
1779                "ORDER BY paperwork.paperwork_id",
1780                [{"primary_keys": [1]}],
1781            ),
1782        )
1783
1784    def test_correct_select_with_polymorphic_no_alias(self):
1785        # test #3106
1786        sess = create_session()
1787
1788        wp = with_polymorphic(Person, [Engineer])
1789        q = (
1790            sess.query(wp)
1791            .options(selectinload(wp.paperwork))
1792            .order_by(Engineer.primary_language.desc())
1793        )
1794
1795        def go():
1796            eq_(
1797                q.first(),
1798                Engineer(
1799                    paperwork=[
1800                        Paperwork(description="tps report #1"),
1801                        Paperwork(description="tps report #2"),
1802                    ],
1803                    primary_language="java",
1804                ),
1805            )
1806
1807        self.assert_sql_execution(
1808            testing.db,
1809            go,
1810            CompiledSQL(
1811                "SELECT people.person_id AS people_person_id, "
1812                "people.name AS people_name, people.type AS people_type, "
1813                "engineers.engineer_id AS engineers_engineer_id, "
1814                "engineers.primary_language AS engineers_primary_language "
1815                "FROM people LEFT OUTER JOIN engineers ON people.person_id = "
1816                "engineers.engineer_id ORDER BY engineers.primary_language "
1817                "DESC LIMIT :param_1"
1818            ),
1819            CompiledSQL(
1820                "SELECT paperwork.person_id AS paperwork_person_id, "
1821                "paperwork.paperwork_id AS paperwork_paperwork_id, "
1822                "paperwork.description AS paperwork_description "
1823                "FROM paperwork WHERE paperwork.person_id "
1824                "IN ([EXPANDING_primary_keys]) "
1825                "ORDER BY paperwork.paperwork_id",
1826                [{"primary_keys": [1]}],
1827            ),
1828        )
1829
1830    def test_correct_select_with_polymorphic_alias(self):
1831        # test #3106
1832        sess = create_session()
1833
1834        wp = with_polymorphic(Person, [Engineer], aliased=True)
1835        q = (
1836            sess.query(wp)
1837            .options(selectinload(wp.paperwork))
1838            .order_by(wp.Engineer.primary_language.desc())
1839        )
1840
1841        def go():
1842            eq_(
1843                q.first(),
1844                Engineer(
1845                    paperwork=[
1846                        Paperwork(description="tps report #1"),
1847                        Paperwork(description="tps report #2"),
1848                    ],
1849                    primary_language="java",
1850                ),
1851            )
1852
1853        self.assert_sql_execution(
1854            testing.db,
1855            go,
1856            CompiledSQL(
1857                "SELECT anon_1.people_person_id AS anon_1_people_person_id, "
1858                "anon_1.people_name AS anon_1_people_name, "
1859                "anon_1.people_type AS anon_1_people_type, "
1860                "anon_1.engineers_engineer_id AS "
1861                "anon_1_engineers_engineer_id, "
1862                "anon_1.engineers_primary_language "
1863                "AS anon_1_engineers_primary_language FROM "
1864                "(SELECT people.person_id AS people_person_id, "
1865                "people.name AS people_name, people.type AS people_type, "
1866                "engineers.engineer_id AS engineers_engineer_id, "
1867                "engineers.primary_language AS engineers_primary_language "
1868                "FROM people LEFT OUTER JOIN engineers ON people.person_id = "
1869                "engineers.engineer_id) AS anon_1 "
1870                "ORDER BY anon_1.engineers_primary_language DESC "
1871                "LIMIT :param_1"
1872            ),
1873            CompiledSQL(
1874                "SELECT paperwork.person_id AS paperwork_person_id, "
1875                "paperwork.paperwork_id AS paperwork_paperwork_id, "
1876                "paperwork.description AS paperwork_description "
1877                "FROM paperwork WHERE paperwork.person_id "
1878                "IN ([EXPANDING_primary_keys]) "
1879                "ORDER BY paperwork.paperwork_id",
1880                [{"primary_keys": [1]}],
1881            ),
1882        )
1883
1884    def test_correct_select_with_polymorphic_flat_alias(self):
1885        # test #3106
1886        sess = create_session()
1887
1888        wp = with_polymorphic(Person, [Engineer], aliased=True, flat=True)
1889        q = (
1890            sess.query(wp)
1891            .options(selectinload(wp.paperwork))
1892            .order_by(wp.Engineer.primary_language.desc())
1893        )
1894
1895        def go():
1896            eq_(
1897                q.first(),
1898                Engineer(
1899                    paperwork=[
1900                        Paperwork(description="tps report #1"),
1901                        Paperwork(description="tps report #2"),
1902                    ],
1903                    primary_language="java",
1904                ),
1905            )
1906
1907        self.assert_sql_execution(
1908            testing.db,
1909            go,
1910            CompiledSQL(
1911                "SELECT people_1.person_id AS people_1_person_id, "
1912                "people_1.name AS people_1_name, "
1913                "people_1.type AS people_1_type, "
1914                "engineers_1.engineer_id AS engineers_1_engineer_id, "
1915                "engineers_1.primary_language AS engineers_1_primary_language "
1916                "FROM people AS people_1 "
1917                "LEFT OUTER JOIN engineers AS engineers_1 "
1918                "ON people_1.person_id = engineers_1.engineer_id "
1919                "ORDER BY engineers_1.primary_language DESC LIMIT :param_1"
1920            ),
1921            CompiledSQL(
1922                "SELECT paperwork.person_id AS paperwork_person_id, "
1923                "paperwork.paperwork_id AS paperwork_paperwork_id, "
1924                "paperwork.description AS paperwork_description "
1925                "FROM paperwork WHERE paperwork.person_id "
1926                "IN ([EXPANDING_primary_keys]) "
1927                "ORDER BY paperwork.paperwork_id",
1928                [{"primary_keys": [1]}],
1929            ),
1930        )
1931
1932
1933class HeterogeneousSubtypesTest(fixtures.DeclarativeMappedTest):
1934    @classmethod
1935    def setup_classes(cls):
1936        Base = cls.DeclarativeBasic
1937
1938        class Company(Base):
1939            __tablename__ = "company"
1940            id = Column(Integer, primary_key=True)
1941            name = Column(String(50))
1942            employees = relationship("Employee", order_by="Employee.id")
1943
1944        class Employee(Base):
1945            __tablename__ = "employee"
1946            id = Column(Integer, primary_key=True)
1947            type = Column(String(50))
1948            name = Column(String(50))
1949            company_id = Column(ForeignKey("company.id"))
1950
1951            __mapper_args__ = {
1952                "polymorphic_on": "type",
1953                "with_polymorphic": "*",
1954            }
1955
1956        class Programmer(Employee):
1957            __tablename__ = "programmer"
1958            id = Column(ForeignKey("employee.id"), primary_key=True)
1959            languages = relationship("Language")
1960
1961            __mapper_args__ = {"polymorphic_identity": "programmer"}
1962
1963        class Manager(Employee):
1964            __tablename__ = "manager"
1965            id = Column(ForeignKey("employee.id"), primary_key=True)
1966            golf_swing_id = Column(ForeignKey("golf_swing.id"))
1967            golf_swing = relationship("GolfSwing")
1968
1969            __mapper_args__ = {"polymorphic_identity": "manager"}
1970
1971        class Language(Base):
1972            __tablename__ = "language"
1973            id = Column(Integer, primary_key=True)
1974            programmer_id = Column(
1975                Integer, ForeignKey("programmer.id"), nullable=False
1976            )
1977            name = Column(String(50))
1978
1979        class GolfSwing(Base):
1980            __tablename__ = "golf_swing"
1981            id = Column(Integer, primary_key=True)
1982            name = Column(String(50))
1983
1984    @classmethod
1985    def insert_data(cls, connection):
1986        Company, Programmer, Manager, GolfSwing, Language = cls.classes(
1987            "Company", "Programmer", "Manager", "GolfSwing", "Language"
1988        )
1989        c1 = Company(
1990            id=1,
1991            name="Foobar Corp",
1992            employees=[
1993                Programmer(
1994                    id=1, name="p1", languages=[Language(id=1, name="Python")]
1995                ),
1996                Manager(id=2, name="m1", golf_swing=GolfSwing(name="fore")),
1997            ],
1998        )
1999        c2 = Company(
2000            id=2,
2001            name="bat Corp",
2002            employees=[
2003                Manager(id=3, name="m2", golf_swing=GolfSwing(name="clubs")),
2004                Programmer(
2005                    id=4, name="p2", languages=[Language(id=2, name="Java")]
2006                ),
2007            ],
2008        )
2009        sess = Session(connection)
2010        sess.add_all([c1, c2])
2011        sess.commit()
2012
2013    def test_one_to_many(self):
2014
2015        Company, Programmer, Manager, GolfSwing, Language = self.classes(
2016            "Company", "Programmer", "Manager", "GolfSwing", "Language"
2017        )
2018        sess = Session()
2019        company = (
2020            sess.query(Company)
2021            .filter(Company.id == 1)
2022            .options(
2023                selectinload(
2024                    Company.employees.of_type(Programmer)
2025                ).selectinload(Programmer.languages)
2026            )
2027            .one()
2028        )
2029
2030        def go():
2031            eq_(company.employees[0].languages[0].name, "Python")
2032
2033        self.assert_sql_count(testing.db, go, 0)
2034
2035    def test_many_to_one(self):
2036        Company, Programmer, Manager, GolfSwing, Language = self.classes(
2037            "Company", "Programmer", "Manager", "GolfSwing", "Language"
2038        )
2039        sess = Session()
2040        company = (
2041            sess.query(Company)
2042            .filter(Company.id == 2)
2043            .options(
2044                selectinload(Company.employees.of_type(Manager)).selectinload(
2045                    Manager.golf_swing
2046                )
2047            )
2048            .one()
2049        )
2050
2051        # NOTE: we *MUST* do a SQL compare on this one because the adaption
2052        # is very sensitive
2053        def go():
2054            eq_(company.employees[0].golf_swing.name, "clubs")
2055
2056        self.assert_sql_count(testing.db, go, 0)
2057
2058    def test_both(self):
2059        Company, Programmer, Manager, GolfSwing, Language = self.classes(
2060            "Company", "Programmer", "Manager", "GolfSwing", "Language"
2061        )
2062        sess = Session()
2063        rows = (
2064            sess.query(Company)
2065            .options(
2066                selectinload(Company.employees.of_type(Manager)).selectinload(
2067                    Manager.golf_swing
2068                ),
2069                defaultload(
2070                    Company.employees.of_type(Programmer)
2071                ).selectinload(Programmer.languages),
2072            )
2073            .order_by(Company.id)
2074            .all()
2075        )
2076
2077        def go():
2078            eq_(rows[0].employees[0].languages[0].name, "Python")
2079            eq_(rows[1].employees[0].golf_swing.name, "clubs")
2080
2081        self.assert_sql_count(testing.db, go, 0)
2082
2083
2084class TupleTest(fixtures.DeclarativeMappedTest):
2085    __requires__ = ("tuple_in",)
2086
2087    @classmethod
2088    def setup_classes(cls):
2089        Base = cls.DeclarativeBasic
2090
2091        class A(fixtures.ComparableEntity, Base):
2092            __tablename__ = "a"
2093            id1 = Column(Integer, primary_key=True)
2094            id2 = Column(Integer, primary_key=True)
2095
2096            bs = relationship("B", order_by="B.id", back_populates="a")
2097
2098        class B(fixtures.ComparableEntity, Base):
2099            __tablename__ = "b"
2100            id = Column(Integer, primary_key=True)
2101            a_id1 = Column()
2102            a_id2 = Column()
2103
2104            a = relationship("A", back_populates="bs")
2105
2106            __table_args__ = (
2107                ForeignKeyConstraint(["a_id1", "a_id2"], ["a.id1", "a.id2"]),
2108            )
2109
2110    @classmethod
2111    def insert_data(cls, connection):
2112        A, B = cls.classes("A", "B")
2113
2114        session = Session(connection)
2115        session.add_all(
2116            [
2117                A(id1=i, id2=i + 2, bs=[B(id=(i * 6) + j) for j in range(6)])
2118                for i in range(1, 20)
2119            ]
2120        )
2121        session.commit()
2122
2123    def test_load_o2m(self):
2124        A, B = self.classes("A", "B")
2125
2126        session = Session()
2127
2128        def go():
2129            q = (
2130                session.query(A)
2131                .options(selectinload(A.bs))
2132                .order_by(A.id1, A.id2)
2133            )
2134            return q.all()
2135
2136        result = self.assert_sql_execution(
2137            testing.db,
2138            go,
2139            CompiledSQL(
2140                "SELECT a.id1 AS a_id1, a.id2 AS a_id2 "
2141                "FROM a ORDER BY a.id1, a.id2",
2142                {},
2143            ),
2144            CompiledSQL(
2145                "SELECT b.a_id1 AS b_a_id1, b.a_id2 AS b_a_id2, b.id AS b_id "
2146                "FROM b WHERE (b.a_id1, b.a_id2) IN "
2147                "([EXPANDING_primary_keys]) ORDER BY b.id",
2148                [{"primary_keys": [(i, i + 2) for i in range(1, 20)]}],
2149            ),
2150        )
2151        eq_(
2152            result,
2153            [
2154                A(id1=i, id2=i + 2, bs=[B(id=(i * 6) + j) for j in range(6)])
2155                for i in range(1, 20)
2156            ],
2157        )
2158
2159    def test_load_m2o(self):
2160        A, B = self.classes("A", "B")
2161
2162        session = Session()
2163
2164        def go():
2165            q = session.query(B).options(selectinload(B.a)).order_by(B.id)
2166            return q.all()
2167
2168        result = self.assert_sql_execution(
2169            testing.db,
2170            go,
2171            CompiledSQL(
2172                "SELECT b.id AS b_id, b.a_id1 AS b_a_id1, b.a_id2 AS b_a_id2 "
2173                "FROM b ORDER BY b.id",
2174                {},
2175            ),
2176            CompiledSQL(
2177                "SELECT a.id1 AS a_id1, a.id2 AS a_id2 FROM a "
2178                "WHERE (a.id1, a.id2) IN ([EXPANDING_primary_keys])",
2179                [{"primary_keys": [(i, i + 2) for i in range(1, 20)]}],
2180            ),
2181        )
2182        as_ = [A(id1=i, id2=i + 2) for i in range(1, 20)]
2183
2184        eq_(
2185            result,
2186            [
2187                B(id=(i * 6) + j, a=as_[i - 1])
2188                for i in range(1, 20)
2189                for j in range(6)
2190            ],
2191        )
2192
2193
2194class ChunkingTest(fixtures.DeclarativeMappedTest):
2195    """test IN chunking.
2196
2197    the length of IN has a limit on at least some databases.
2198    On Oracle it's 1000.  In any case, you don't want a SQL statement with
2199    500K entries in an IN, so larger results need to chunk.
2200
2201    """
2202
2203    @classmethod
2204    def setup_classes(cls):
2205        Base = cls.DeclarativeBasic
2206
2207        class A(fixtures.ComparableEntity, Base):
2208            __tablename__ = "a"
2209            id = Column(Integer, primary_key=True)
2210            bs = relationship("B", order_by="B.id", back_populates="a")
2211
2212        class B(fixtures.ComparableEntity, Base):
2213            __tablename__ = "b"
2214            id = Column(Integer, primary_key=True)
2215            a_id = Column(ForeignKey("a.id"))
2216            a = relationship("A", back_populates="bs")
2217
2218    @classmethod
2219    def insert_data(cls, connection):
2220        A, B = cls.classes("A", "B")
2221
2222        session = Session(connection)
2223        session.add_all(
2224            [
2225                A(id=i, bs=[B(id=(i * 6) + j) for j in range(1, 6)])
2226                for i in range(1, 101)
2227            ]
2228        )
2229        session.commit()
2230
2231    def test_odd_number_chunks(self):
2232        A, B = self.classes("A", "B")
2233
2234        session = Session()
2235
2236        def go():
2237            with mock.patch(
2238                "sqlalchemy.orm.strategies.SelectInLoader._chunksize", 47
2239            ):
2240                q = session.query(A).options(selectinload(A.bs)).order_by(A.id)
2241
2242                for a in q:
2243                    a.bs
2244
2245        self.assert_sql_execution(
2246            testing.db,
2247            go,
2248            CompiledSQL("SELECT a.id AS a_id FROM a ORDER BY a.id", {}),
2249            CompiledSQL(
2250                "SELECT b.a_id AS b_a_id, b.id AS b_id "
2251                "FROM b WHERE b.a_id IN "
2252                "([EXPANDING_primary_keys]) ORDER BY b.id",
2253                {"primary_keys": list(range(1, 48))},
2254            ),
2255            CompiledSQL(
2256                "SELECT b.a_id AS b_a_id, b.id AS b_id "
2257                "FROM b WHERE b.a_id IN "
2258                "([EXPANDING_primary_keys]) ORDER BY b.id",
2259                {"primary_keys": list(range(48, 95))},
2260            ),
2261            CompiledSQL(
2262                "SELECT b.a_id AS b_a_id, b.id AS b_id "
2263                "FROM b WHERE b.a_id IN "
2264                "([EXPANDING_primary_keys]) ORDER BY b.id",
2265                {"primary_keys": list(range(95, 101))},
2266            ),
2267        )
2268
2269    @testing.requires.independent_cursors
2270    def test_yield_per(self):
2271        # the docs make a lot of guarantees about yield_per
2272        # so test that it works
2273        A, B = self.classes("A", "B")
2274
2275        import random
2276
2277        session = Session()
2278
2279        yield_per = random.randint(8, 105)
2280        offset = random.randint(0, 19)
2281        total_rows = 100 - offset
2282        total_expected_statements = (
2283            1
2284            + int(total_rows / yield_per)
2285            + (1 if total_rows % yield_per else 0)
2286        )
2287
2288        def go():
2289            for a in (
2290                session.query(A)
2291                .yield_per(yield_per)
2292                .offset(offset)
2293                .options(selectinload(A.bs))
2294            ):
2295
2296                # this part fails with joined eager loading
2297                # (if you enable joined eager w/ yield_per)
2298                eq_(a.bs, [B(id=(a.id * 6) + j) for j in range(1, 6)])
2299
2300        # this part fails with subquery eager loading
2301        # (if you enable subquery eager w/ yield_per)
2302        self.assert_sql_count(testing.db, go, total_expected_statements)
2303
2304    def test_dont_emit_for_redundant_m2o(self):
2305        A, B = self.classes("A", "B")
2306
2307        session = Session()
2308
2309        def go():
2310            with mock.patch(
2311                "sqlalchemy.orm.strategies.SelectInLoader._chunksize", 47
2312            ):
2313                q = session.query(B).options(selectinload(B.a)).order_by(B.id)
2314
2315                for b in q:
2316                    b.a
2317
2318        self.assert_sql_execution(
2319            testing.db,
2320            go,
2321            CompiledSQL(
2322                "SELECT b.id AS b_id, b.a_id AS b_a_id FROM b ORDER BY b.id",
2323                {},
2324            ),
2325            # chunk size is 47.  so first chunk are a 1->47...
2326            CompiledSQL(
2327                "SELECT a.id AS a_id FROM a WHERE a.id IN "
2328                "([EXPANDING_primary_keys])",
2329                {"primary_keys": list(range(1, 48))},
2330            ),
2331            # second chunk is a 48-94
2332            CompiledSQL(
2333                "SELECT a.id AS a_id FROM a WHERE a.id IN "
2334                "([EXPANDING_primary_keys])",
2335                {"primary_keys": list(range(48, 95))},
2336            ),
2337            # third and final chunk 95-100.
2338            CompiledSQL(
2339                "SELECT a.id AS a_id FROM a WHERE a.id IN "
2340                "([EXPANDING_primary_keys])",
2341                {"primary_keys": list(range(95, 101))},
2342            ),
2343        )
2344
2345
2346class SubRelationFromJoinedSubclassMultiLevelTest(_Polymorphic):
2347    @classmethod
2348    def define_tables(cls, metadata):
2349        Table(
2350            "companies",
2351            metadata,
2352            Column(
2353                "company_id",
2354                Integer,
2355                primary_key=True,
2356                test_needs_autoincrement=True,
2357            ),
2358            Column("name", String(50)),
2359        )
2360
2361        Table(
2362            "people",
2363            metadata,
2364            Column(
2365                "person_id",
2366                Integer,
2367                primary_key=True,
2368                test_needs_autoincrement=True,
2369            ),
2370            Column("company_id", ForeignKey("companies.company_id")),
2371            Column("name", String(50)),
2372            Column("type", String(30)),
2373        )
2374
2375        Table(
2376            "engineers",
2377            metadata,
2378            Column(
2379                "engineer_id", ForeignKey("people.person_id"), primary_key=True
2380            ),
2381            Column("primary_language", String(50)),
2382        )
2383
2384        Table(
2385            "machines",
2386            metadata,
2387            Column(
2388                "machine_id",
2389                Integer,
2390                primary_key=True,
2391                test_needs_autoincrement=True,
2392            ),
2393            Column("name", String(50)),
2394            Column("engineer_id", ForeignKey("engineers.engineer_id")),
2395            Column(
2396                "machine_type_id", ForeignKey("machine_type.machine_type_id")
2397            ),
2398        )
2399
2400        Table(
2401            "machine_type",
2402            metadata,
2403            Column(
2404                "machine_type_id",
2405                Integer,
2406                primary_key=True,
2407                test_needs_autoincrement=True,
2408            ),
2409            Column("name", String(50)),
2410        )
2411
2412    @classmethod
2413    def setup_mappers(cls):
2414        companies = cls.tables.companies
2415        people = cls.tables.people
2416        engineers = cls.tables.engineers
2417        machines = cls.tables.machines
2418        machine_type = cls.tables.machine_type
2419
2420        mapper(
2421            Company,
2422            companies,
2423            properties={
2424                "employees": relationship(Person, order_by=people.c.person_id)
2425            },
2426        )
2427        mapper(
2428            Person,
2429            people,
2430            polymorphic_on=people.c.type,
2431            polymorphic_identity="person",
2432            with_polymorphic="*",
2433        )
2434
2435        mapper(
2436            Engineer,
2437            engineers,
2438            inherits=Person,
2439            polymorphic_identity="engineer",
2440            properties={
2441                "machines": relationship(
2442                    Machine, order_by=machines.c.machine_id
2443                )
2444            },
2445        )
2446
2447        mapper(
2448            Machine, machines, properties={"type": relationship(MachineType)}
2449        )
2450        mapper(MachineType, machine_type)
2451
2452    @classmethod
2453    def insert_data(cls, connection):
2454        c1 = cls._fixture()
2455        sess = create_session(connection)
2456        sess.add(c1)
2457        sess.flush()
2458
2459    @classmethod
2460    def _fixture(cls):
2461        mt1 = MachineType(name="mt1")
2462        mt2 = MachineType(name="mt2")
2463        return Company(
2464            employees=[
2465                Engineer(
2466                    name="e1",
2467                    machines=[
2468                        Machine(name="m1", type=mt1),
2469                        Machine(name="m2", type=mt2),
2470                    ],
2471                ),
2472                Engineer(
2473                    name="e2",
2474                    machines=[
2475                        Machine(name="m3", type=mt1),
2476                        Machine(name="m4", type=mt1),
2477                    ],
2478                ),
2479            ]
2480        )
2481
2482    def test_chained_selectin_subclass(self):
2483        s = Session()
2484        q = s.query(Company).options(
2485            selectinload(Company.employees.of_type(Engineer))
2486            .selectinload(Engineer.machines)
2487            .selectinload(Machine.type)
2488        )
2489
2490        def go():
2491            eq_(q.all(), [self._fixture()])
2492
2493        self.assert_sql_count(testing.db, go, 4)
2494
2495
2496class SelfReferentialTest(fixtures.MappedTest):
2497    @classmethod
2498    def define_tables(cls, metadata):
2499        Table(
2500            "nodes",
2501            metadata,
2502            Column(
2503                "id", Integer, primary_key=True, test_needs_autoincrement=True
2504            ),
2505            Column("parent_id", Integer, ForeignKey("nodes.id")),
2506            Column("data", String(30)),
2507        )
2508
2509    def test_basic(self):
2510        nodes = self.tables.nodes
2511
2512        class Node(fixtures.ComparableEntity):
2513            def append(self, node):
2514                self.children.append(node)
2515
2516        mapper(
2517            Node,
2518            nodes,
2519            properties={
2520                "children": relationship(
2521                    Node, lazy="selectin", join_depth=3, order_by=nodes.c.id
2522                )
2523            },
2524        )
2525        sess = create_session()
2526        n1 = Node(data="n1")
2527        n1.append(Node(data="n11"))
2528        n1.append(Node(data="n12"))
2529        n1.append(Node(data="n13"))
2530        n1.children[1].append(Node(data="n121"))
2531        n1.children[1].append(Node(data="n122"))
2532        n1.children[1].append(Node(data="n123"))
2533        n2 = Node(data="n2")
2534        n2.append(Node(data="n21"))
2535        n2.children[0].append(Node(data="n211"))
2536        n2.children[0].append(Node(data="n212"))
2537
2538        sess.add(n1)
2539        sess.add(n2)
2540        sess.flush()
2541        sess.expunge_all()
2542
2543        def go():
2544            d = (
2545                sess.query(Node)
2546                .filter(Node.data.in_(["n1", "n2"]))
2547                .order_by(Node.data)
2548                .all()
2549            )
2550            eq_(
2551                [
2552                    Node(
2553                        data="n1",
2554                        children=[
2555                            Node(data="n11"),
2556                            Node(
2557                                data="n12",
2558                                children=[
2559                                    Node(data="n121"),
2560                                    Node(data="n122"),
2561                                    Node(data="n123"),
2562                                ],
2563                            ),
2564                            Node(data="n13"),
2565                        ],
2566                    ),
2567                    Node(
2568                        data="n2",
2569                        children=[
2570                            Node(
2571                                data="n21",
2572                                children=[
2573                                    Node(data="n211"),
2574                                    Node(data="n212"),
2575                                ],
2576                            )
2577                        ],
2578                    ),
2579                ],
2580                d,
2581            )
2582
2583        self.assert_sql_count(testing.db, go, 4)
2584
2585    def test_lazy_fallback_doesnt_affect_eager(self):
2586        nodes = self.tables.nodes
2587
2588        class Node(fixtures.ComparableEntity):
2589            def append(self, node):
2590                self.children.append(node)
2591
2592        mapper(
2593            Node,
2594            nodes,
2595            properties={
2596                "children": relationship(
2597                    Node, lazy="selectin", join_depth=1, order_by=nodes.c.id
2598                )
2599            },
2600        )
2601        sess = create_session()
2602        n1 = Node(data="n1")
2603        n1.append(Node(data="n11"))
2604        n1.append(Node(data="n12"))
2605        n1.append(Node(data="n13"))
2606        n1.children[0].append(Node(data="n111"))
2607        n1.children[0].append(Node(data="n112"))
2608        n1.children[1].append(Node(data="n121"))
2609        n1.children[1].append(Node(data="n122"))
2610        n1.children[1].append(Node(data="n123"))
2611        sess.add(n1)
2612        sess.flush()
2613        sess.expunge_all()
2614
2615        def go():
2616            allnodes = sess.query(Node).order_by(Node.data).all()
2617
2618            n11 = allnodes[1]
2619            eq_(n11.data, "n11")
2620            eq_([Node(data="n111"), Node(data="n112")], list(n11.children))
2621
2622            n12 = allnodes[4]
2623            eq_(n12.data, "n12")
2624            eq_(
2625                [Node(data="n121"), Node(data="n122"), Node(data="n123")],
2626                list(n12.children),
2627            )
2628
2629        self.assert_sql_count(testing.db, go, 2)
2630
2631    def test_with_deferred(self):
2632        nodes = self.tables.nodes
2633
2634        class Node(fixtures.ComparableEntity):
2635            def append(self, node):
2636                self.children.append(node)
2637
2638        mapper(
2639            Node,
2640            nodes,
2641            properties={
2642                "children": relationship(
2643                    Node, lazy="selectin", join_depth=3, order_by=nodes.c.id
2644                ),
2645                "data": deferred(nodes.c.data),
2646            },
2647        )
2648        sess = create_session()
2649        n1 = Node(data="n1")
2650        n1.append(Node(data="n11"))
2651        n1.append(Node(data="n12"))
2652        sess.add(n1)
2653        sess.flush()
2654        sess.expunge_all()
2655
2656        def go():
2657            eq_(
2658                Node(data="n1", children=[Node(data="n11"), Node(data="n12")]),
2659                sess.query(Node).order_by(Node.id).first(),
2660            )
2661
2662        self.assert_sql_count(testing.db, go, 6)
2663
2664        sess.expunge_all()
2665
2666        def go():
2667            eq_(
2668                Node(data="n1", children=[Node(data="n11"), Node(data="n12")]),
2669                sess.query(Node)
2670                .options(undefer("data"))
2671                .order_by(Node.id)
2672                .first(),
2673            )
2674
2675        self.assert_sql_count(testing.db, go, 5)
2676
2677        sess.expunge_all()
2678
2679        def go():
2680            eq_(
2681                Node(data="n1", children=[Node(data="n11"), Node(data="n12")]),
2682                sess.query(Node)
2683                .options(undefer("data"), undefer("children.data"))
2684                .first(),
2685            )
2686
2687        self.assert_sql_count(testing.db, go, 3)
2688
2689    def test_options(self):
2690        nodes = self.tables.nodes
2691
2692        class Node(fixtures.ComparableEntity):
2693            def append(self, node):
2694                self.children.append(node)
2695
2696        mapper(
2697            Node,
2698            nodes,
2699            properties={"children": relationship(Node, order_by=nodes.c.id)},
2700        )
2701        sess = create_session()
2702        n1 = Node(data="n1")
2703        n1.append(Node(data="n11"))
2704        n1.append(Node(data="n12"))
2705        n1.append(Node(data="n13"))
2706        n1.children[1].append(Node(data="n121"))
2707        n1.children[1].append(Node(data="n122"))
2708        n1.children[1].append(Node(data="n123"))
2709        sess.add(n1)
2710        sess.flush()
2711        sess.expunge_all()
2712
2713        def go():
2714            d = (
2715                sess.query(Node)
2716                .filter_by(data="n1")
2717                .order_by(Node.id)
2718                .options(selectinload("children").selectinload("children"))
2719                .first()
2720            )
2721            eq_(
2722                Node(
2723                    data="n1",
2724                    children=[
2725                        Node(data="n11"),
2726                        Node(
2727                            data="n12",
2728                            children=[
2729                                Node(data="n121"),
2730                                Node(data="n122"),
2731                                Node(data="n123"),
2732                            ],
2733                        ),
2734                        Node(data="n13"),
2735                    ],
2736                ),
2737                d,
2738            )
2739
2740        self.assert_sql_count(testing.db, go, 3)
2741
2742    def test_no_depth(self):
2743        """no join depth is set, so no eager loading occurs."""
2744
2745        nodes = self.tables.nodes
2746
2747        class Node(fixtures.ComparableEntity):
2748            def append(self, node):
2749                self.children.append(node)
2750
2751        mapper(
2752            Node,
2753            nodes,
2754            properties={"children": relationship(Node, lazy="selectin")},
2755        )
2756        sess = create_session()
2757        n1 = Node(data="n1")
2758        n1.append(Node(data="n11"))
2759        n1.append(Node(data="n12"))
2760        n1.append(Node(data="n13"))
2761        n1.children[1].append(Node(data="n121"))
2762        n1.children[1].append(Node(data="n122"))
2763        n1.children[1].append(Node(data="n123"))
2764        n2 = Node(data="n2")
2765        n2.append(Node(data="n21"))
2766        sess.add(n1)
2767        sess.add(n2)
2768        sess.flush()
2769        sess.expunge_all()
2770
2771        def go():
2772            d = (
2773                sess.query(Node)
2774                .filter(Node.data.in_(["n1", "n2"]))
2775                .order_by(Node.data)
2776                .all()
2777            )
2778            eq_(
2779                [
2780                    Node(
2781                        data="n1",
2782                        children=[
2783                            Node(data="n11"),
2784                            Node(
2785                                data="n12",
2786                                children=[
2787                                    Node(data="n121"),
2788                                    Node(data="n122"),
2789                                    Node(data="n123"),
2790                                ],
2791                            ),
2792                            Node(data="n13"),
2793                        ],
2794                    ),
2795                    Node(data="n2", children=[Node(data="n21")]),
2796                ],
2797                d,
2798            )
2799
2800        self.assert_sql_count(testing.db, go, 4)
2801
2802
2803class SelfRefInheritanceAliasedTest(
2804    fixtures.DeclarativeMappedTest, testing.AssertsCompiledSQL
2805):
2806    __dialect__ = "default"
2807
2808    @classmethod
2809    def setup_classes(cls):
2810        Base = cls.DeclarativeBasic
2811
2812        class Foo(fixtures.ComparableEntity, Base):
2813            __tablename__ = "foo"
2814            id = Column(Integer, primary_key=True)
2815            type = Column(String(50))
2816
2817            foo_id = Column(Integer, ForeignKey("foo.id"))
2818            foo = relationship(
2819                lambda: Foo, foreign_keys=foo_id, remote_side=id
2820            )
2821
2822            __mapper_args__ = {
2823                "polymorphic_on": type,
2824                "polymorphic_identity": "foo",
2825            }
2826
2827        class Bar(Foo):
2828            __mapper_args__ = {"polymorphic_identity": "bar"}
2829
2830    @classmethod
2831    def insert_data(cls, connection):
2832        Foo, Bar = cls.classes("Foo", "Bar")
2833
2834        session = Session(connection)
2835        target = Bar(id=1)
2836        b1 = Bar(id=2, foo=Foo(id=3, foo=target))
2837        session.add(b1)
2838        session.commit()
2839
2840    def test_twolevel_selectin_w_polymorphic(self):
2841        Foo, Bar = self.classes("Foo", "Bar")
2842
2843        for count in range(3):
2844            r = with_polymorphic(Foo, "*", aliased=True)
2845            attr1 = Foo.foo.of_type(r)
2846            attr2 = r.foo
2847
2848            s = Session()
2849            q = (
2850                s.query(Foo)
2851                .filter(Foo.id == 2)
2852                .options(selectinload(attr1).selectinload(attr2))
2853            )
2854            results = self.assert_sql_execution(
2855                testing.db,
2856                q.all,
2857                CompiledSQL(
2858                    "SELECT foo.id AS foo_id_1, foo.type AS foo_type, "
2859                    "foo.foo_id AS foo_foo_id FROM foo WHERE foo.id = :id_1",
2860                    [{"id_1": 2}],
2861                ),
2862                CompiledSQL(
2863                    "SELECT foo_1.id AS foo_1_id, "
2864                    "foo_1.type AS foo_1_type, foo_1.foo_id AS foo_1_foo_id "
2865                    "FROM foo AS foo_1 "
2866                    "WHERE foo_1.id IN ([EXPANDING_primary_keys])",
2867                    {"primary_keys": [3]},
2868                ),
2869                CompiledSQL(
2870                    "SELECT foo.id AS foo_id_1, foo.type AS foo_type, "
2871                    "foo.foo_id AS foo_foo_id FROM foo "
2872                    "WHERE foo.id IN ([EXPANDING_primary_keys])",
2873                    {"primary_keys": [1]},
2874                ),
2875            )
2876            eq_(results, [Bar(id=2, foo=Foo(id=3, foo=Bar(id=1)))])
2877
2878
2879class TestExistingRowPopulation(fixtures.DeclarativeMappedTest):
2880    @classmethod
2881    def setup_classes(cls):
2882        Base = cls.DeclarativeBasic
2883
2884        class A(Base):
2885            __tablename__ = "a"
2886
2887            id = Column(Integer, primary_key=True)
2888            b_id = Column(ForeignKey("b.id"))
2889            a2_id = Column(ForeignKey("a2.id"))
2890            a2 = relationship("A2")
2891            b = relationship("B")
2892
2893        class A2(Base):
2894            __tablename__ = "a2"
2895
2896            id = Column(Integer, primary_key=True)
2897            b_id = Column(ForeignKey("b.id"))
2898            b = relationship("B")
2899
2900        class B(Base):
2901            __tablename__ = "b"
2902
2903            id = Column(Integer, primary_key=True)
2904
2905            c1_m2o_id = Column(ForeignKey("c1_m2o.id"))
2906            c2_m2o_id = Column(ForeignKey("c2_m2o.id"))
2907
2908            c1_o2m = relationship("C1o2m")
2909            c2_o2m = relationship("C2o2m")
2910            c1_m2o = relationship("C1m2o")
2911            c2_m2o = relationship("C2m2o")
2912
2913        class C1o2m(Base):
2914            __tablename__ = "c1_o2m"
2915
2916            id = Column(Integer, primary_key=True)
2917            b_id = Column(ForeignKey("b.id"))
2918
2919        class C2o2m(Base):
2920            __tablename__ = "c2_o2m"
2921
2922            id = Column(Integer, primary_key=True)
2923            b_id = Column(ForeignKey("b.id"))
2924
2925        class C1m2o(Base):
2926            __tablename__ = "c1_m2o"
2927
2928            id = Column(Integer, primary_key=True)
2929
2930        class C2m2o(Base):
2931            __tablename__ = "c2_m2o"
2932
2933            id = Column(Integer, primary_key=True)
2934
2935    @classmethod
2936    def insert_data(cls, connection):
2937        A, A2, B, C1o2m, C2o2m, C1m2o, C2m2o = cls.classes(
2938            "A", "A2", "B", "C1o2m", "C2o2m", "C1m2o", "C2m2o"
2939        )
2940
2941        s = Session(connection)
2942
2943        b = B(
2944            c1_o2m=[C1o2m()], c2_o2m=[C2o2m()], c1_m2o=C1m2o(), c2_m2o=C2m2o()
2945        )
2946
2947        s.add(A(b=b, a2=A2(b=b)))
2948        s.commit()
2949
2950    def test_o2m(self):
2951        A, A2, B, C1o2m, C2o2m = self.classes("A", "A2", "B", "C1o2m", "C2o2m")
2952
2953        s = Session()
2954
2955        # A -J-> B -L-> C1
2956        # A -J-> B -S-> C2
2957
2958        # A -J-> A2 -J-> B -S-> C1
2959        # A -J-> A2 -J-> B -L-> C2
2960
2961        q = s.query(A).options(
2962            joinedload(A.b).selectinload(B.c2_o2m),
2963            joinedload(A.a2).joinedload(A2.b).selectinload(B.c1_o2m),
2964        )
2965
2966        a1 = q.all()[0]
2967
2968        is_true("c1_o2m" in a1.b.__dict__)
2969        is_true("c2_o2m" in a1.b.__dict__)
2970
2971    def test_m2o(self):
2972        A, A2, B, C1m2o, C2m2o = self.classes("A", "A2", "B", "C1m2o", "C2m2o")
2973
2974        s = Session()
2975
2976        # A -J-> B -L-> C1
2977        # A -J-> B -S-> C2
2978
2979        # A -J-> A2 -J-> B -S-> C1
2980        # A -J-> A2 -J-> B -L-> C2
2981
2982        q = s.query(A).options(
2983            joinedload(A.b).selectinload(B.c2_m2o),
2984            joinedload(A.a2).joinedload(A2.b).selectinload(B.c1_m2o),
2985        )
2986
2987        a1 = q.all()[0]
2988        is_true("c1_m2o" in a1.b.__dict__)
2989        is_true("c2_m2o" in a1.b.__dict__)
2990
2991
2992class SingleInhSubclassTest(
2993    fixtures.DeclarativeMappedTest, testing.AssertsExecutionResults
2994):
2995    @classmethod
2996    def setup_classes(cls):
2997        Base = cls.DeclarativeBasic
2998
2999        class User(Base):
3000            __tablename__ = "user"
3001
3002            id = Column(Integer, primary_key=True)
3003            type = Column(String(10))
3004
3005            __mapper_args__ = {"polymorphic_on": type}
3006
3007        class EmployerUser(User):
3008            roles = relationship("Role", lazy="selectin")
3009            __mapper_args__ = {"polymorphic_identity": "employer"}
3010
3011        class Role(Base):
3012            __tablename__ = "role"
3013
3014            id = Column(Integer, primary_key=True)
3015            user_id = Column(Integer, ForeignKey("user.id"))
3016
3017    @classmethod
3018    def insert_data(cls, connection):
3019        EmployerUser, Role = cls.classes("EmployerUser", "Role")
3020
3021        s = Session(connection)
3022        s.add(EmployerUser(roles=[Role(), Role(), Role()]))
3023        s.commit()
3024
3025    def test_load(self):
3026        (EmployerUser,) = self.classes("EmployerUser")
3027        s = Session()
3028
3029        q = s.query(EmployerUser)
3030
3031        self.assert_sql_execution(
3032            testing.db,
3033            q.all,
3034            CompiledSQL(
3035                'SELECT "user".id AS user_id, "user".type AS user_type '
3036                'FROM "user" WHERE "user".type IN (:type_1)',
3037                {"type_1": "employer"},
3038            ),
3039            CompiledSQL(
3040                "SELECT role.user_id AS role_user_id, role.id AS role_id "
3041                "FROM role WHERE role.user_id "
3042                "IN ([EXPANDING_primary_keys])",
3043                {"primary_keys": [1]},
3044            ),
3045        )
3046
3047
3048class MissingForeignTest(
3049    fixtures.DeclarativeMappedTest, testing.AssertsExecutionResults
3050):
3051    @classmethod
3052    def setup_classes(cls):
3053        Base = cls.DeclarativeBasic
3054
3055        class A(fixtures.ComparableEntity, Base):
3056            __tablename__ = "a"
3057            id = Column(Integer, primary_key=True)
3058            b_id = Column(Integer)
3059            b = relationship("B", primaryjoin="foreign(A.b_id) == B.id")
3060            q = Column(Integer)
3061
3062        class B(fixtures.ComparableEntity, Base):
3063            __tablename__ = "b"
3064            id = Column(Integer, primary_key=True)
3065            x = Column(Integer)
3066            y = Column(Integer)
3067
3068    @classmethod
3069    def insert_data(cls, connection):
3070        A, B = cls.classes("A", "B")
3071
3072        s = Session(connection)
3073        b1, b2 = B(id=1, x=5, y=9), B(id=2, x=10, y=8)
3074        s.add_all(
3075            [
3076                A(id=1, b_id=1),
3077                A(id=2, b_id=5),
3078                A(id=3, b_id=2),
3079                A(id=4, b=None),
3080                b1,
3081                b2,
3082            ]
3083        )
3084        s.commit()
3085
3086    def test_missing_rec(self):
3087        A, B = self.classes("A", "B")
3088
3089        s = Session()
3090        eq_(
3091            s.query(A).options(selectinload(A.b)).order_by(A.id).all(),
3092            [
3093                A(id=1, b=B(id=1)),
3094                A(id=2, b=None, b_id=5),
3095                A(id=3, b=B(id=2)),
3096                A(id=4, b=None, b_id=None),
3097            ],
3098        )
3099
3100
3101class M2OWDegradeTest(
3102    fixtures.DeclarativeMappedTest, testing.AssertsExecutionResults
3103):
3104    @classmethod
3105    def setup_classes(cls):
3106        Base = cls.DeclarativeBasic
3107
3108        class A(fixtures.ComparableEntity, Base):
3109            __tablename__ = "a"
3110            id = Column(Integer, primary_key=True)
3111            b_id = Column(ForeignKey("b.id"))
3112            b = relationship("B")
3113            b_no_omit_join = relationship("B", omit_join=False)
3114            q = Column(Integer)
3115
3116        class B(fixtures.ComparableEntity, Base):
3117            __tablename__ = "b"
3118            id = Column(Integer, primary_key=True)
3119            x = Column(Integer)
3120            y = Column(Integer)
3121
3122    @classmethod
3123    def insert_data(cls, connection):
3124        A, B = cls.classes("A", "B")
3125
3126        s = Session(connection)
3127        b1, b2 = B(id=1, x=5, y=9), B(id=2, x=10, y=8)
3128        s.add_all(
3129            [
3130                A(id=1, b=b1),
3131                A(id=2, b=b2),
3132                A(id=3, b=b2),
3133                A(id=4, b=None),
3134                A(id=5, b=b1),
3135            ]
3136        )
3137        s.commit()
3138
3139    def test_omit_join_warn_on_true(self):
3140        with testing.expect_warnings(
3141            "setting omit_join to True is not supported; selectin "
3142            "loading of this relationship"
3143        ):
3144            relationship("B", omit_join=True)
3145
3146    def test_use_join_parent_criteria(self):
3147        A, B = self.classes("A", "B")
3148        s = Session()
3149        q = (
3150            s.query(A)
3151            .filter(A.id.in_([1, 3]))
3152            .options(selectinload(A.b))
3153            .order_by(A.id)
3154        )
3155        results = self.assert_sql_execution(
3156            testing.db,
3157            q.all,
3158            CompiledSQL(
3159                "SELECT a.id AS a_id, a.b_id AS a_b_id, a.q AS a_q "
3160                "FROM a WHERE a.id IN (:id_1, :id_2) ORDER BY a.id",
3161                [{"id_1": 1, "id_2": 3}],
3162            ),
3163            CompiledSQL(
3164                "SELECT b.id AS b_id, b.x AS b_x, b.y AS b_y "
3165                "FROM b WHERE b.id IN ([EXPANDING_primary_keys])",
3166                [{"primary_keys": [1, 2]}],
3167            ),
3168        )
3169
3170        eq_(
3171            results,
3172            [A(id=1, b=B(id=1, x=5, y=9)), A(id=3, b=B(id=2, x=10, y=8))],
3173        )
3174
3175    def test_use_join_parent_criteria_degrade_on_defer(self):
3176        A, B = self.classes("A", "B")
3177        s = Session()
3178        q = (
3179            s.query(A)
3180            .filter(A.id.in_([1, 3]))
3181            .options(defer(A.b_id), selectinload(A.b))
3182            .order_by(A.id)
3183        )
3184        results = self.assert_sql_execution(
3185            testing.db,
3186            q.all,
3187            CompiledSQL(
3188                "SELECT a.id AS a_id, a.q AS a_q "
3189                "FROM a WHERE a.id IN (:id_1, :id_2) ORDER BY a.id",
3190                [{"id_1": 1, "id_2": 3}],
3191            ),
3192            # in the very unlikely case that the the FK col on parent is
3193            # deferred, we degrade to the JOIN version so that we don't need to
3194            # emit either for each parent object individually, or as a second
3195            # query for them.
3196            CompiledSQL(
3197                "SELECT a_1.id AS a_1_id, b.id AS b_id, b.x AS b_x, "
3198                "b.y AS b_y "
3199                "FROM a AS a_1 JOIN b ON b.id = a_1.b_id "
3200                "WHERE a_1.id IN ([EXPANDING_primary_keys])",
3201                [{"primary_keys": [1, 3]}],
3202            ),
3203        )
3204
3205        eq_(
3206            results,
3207            [A(id=1, b=B(id=1, x=5, y=9)), A(id=3, b=B(id=2, x=10, y=8))],
3208        )
3209
3210    def test_use_join(self):
3211        A, B = self.classes("A", "B")
3212        s = Session()
3213        q = s.query(A).options(selectinload(A.b)).order_by(A.id)
3214        results = self.assert_sql_execution(
3215            testing.db,
3216            q.all,
3217            CompiledSQL(
3218                "SELECT a.id AS a_id, a.b_id AS a_b_id, a.q AS a_q "
3219                "FROM a ORDER BY a.id",
3220                [{}],
3221            ),
3222            CompiledSQL(
3223                "SELECT b.id AS b_id, b.x AS b_x, b.y AS b_y "
3224                "FROM b WHERE b.id IN ([EXPANDING_primary_keys])",
3225                [{"primary_keys": [1, 2]}],
3226            ),
3227        )
3228
3229        b1, b2 = B(id=1, x=5, y=9), B(id=2, x=10, y=8)
3230        eq_(
3231            results,
3232            [
3233                A(id=1, b=b1),
3234                A(id=2, b=b2),
3235                A(id=3, b=b2),
3236                A(id=4, b=None),
3237                A(id=5, b=b1),
3238            ],
3239        )
3240
3241    def test_use_join_omit_join_false(self):
3242        A, B = self.classes("A", "B")
3243        s = Session()
3244        q = s.query(A).options(selectinload(A.b_no_omit_join)).order_by(A.id)
3245        results = self.assert_sql_execution(
3246            testing.db,
3247            q.all,
3248            CompiledSQL(
3249                "SELECT a.id AS a_id, a.b_id AS a_b_id, a.q AS a_q "
3250                "FROM a ORDER BY a.id",
3251                [{}],
3252            ),
3253            CompiledSQL(
3254                "SELECT a_1.id AS a_1_id, b.id AS b_id, b.x AS b_x, "
3255                "b.y AS b_y FROM a AS a_1 JOIN b ON b.id = a_1.b_id "
3256                "WHERE a_1.id IN ([EXPANDING_primary_keys])",
3257                [{"primary_keys": [1, 2, 3, 4, 5]}],
3258            ),
3259        )
3260
3261        b1, b2 = B(id=1, x=5, y=9), B(id=2, x=10, y=8)
3262        eq_(
3263            results,
3264            [
3265                A(id=1, b_no_omit_join=b1),
3266                A(id=2, b_no_omit_join=b2),
3267                A(id=3, b_no_omit_join=b2),
3268                A(id=4, b_no_omit_join=None),
3269                A(id=5, b_no_omit_join=b1),
3270            ],
3271        )
3272
3273    def test_use_join_parent_degrade_on_defer(self):
3274        A, B = self.classes("A", "B")
3275        s = Session()
3276        q = s.query(A).options(defer(A.b_id), selectinload(A.b)).order_by(A.id)
3277        results = self.assert_sql_execution(
3278            testing.db,
3279            q.all,
3280            CompiledSQL(
3281                "SELECT a.id AS a_id, a.q AS a_q " "FROM a ORDER BY a.id", [{}]
3282            ),
3283            # in the very unlikely case that the the FK col on parent is
3284            # deferred, we degrade to the JOIN version so that we don't need to
3285            # emit either for each parent object individually, or as a second
3286            # query for them.
3287            CompiledSQL(
3288                "SELECT a_1.id AS a_1_id, b.id AS b_id, b.x AS b_x, "
3289                "b.y AS b_y "
3290                "FROM a AS a_1 JOIN b ON b.id = a_1.b_id "
3291                "WHERE a_1.id IN ([EXPANDING_primary_keys])",
3292                [{"primary_keys": [1, 2, 3, 4, 5]}],
3293            ),
3294        )
3295
3296        b1, b2 = B(id=1, x=5, y=9), B(id=2, x=10, y=8)
3297        eq_(
3298            results,
3299            [
3300                A(id=1, b=b1),
3301                A(id=2, b=b2),
3302                A(id=3, b=b2),
3303                A(id=4, b=None),
3304                A(id=5, b=b1),
3305            ],
3306        )
3307
3308
3309class SameNamePolymorphicTest(fixtures.DeclarativeMappedTest):
3310    @classmethod
3311    def setup_classes(cls):
3312        Base = cls.DeclarativeBasic
3313
3314        class GenericParent(Base):
3315            __tablename__ = "generic_parent"
3316            id = Column(Integer, primary_key=True)
3317            type = Column(String(50), nullable=False)
3318
3319            __mapper_args__ = {
3320                "polymorphic_on": type,
3321                "polymorphic_identity": "generic_parent",
3322            }
3323
3324        class ParentA(GenericParent):
3325            __tablename__ = "parent_a"
3326
3327            id = Column(
3328                Integer, ForeignKey("generic_parent.id"), primary_key=True
3329            )
3330            children = relationship("ChildA", back_populates="parent")
3331
3332            __mapper_args__ = {"polymorphic_identity": "parent_a"}
3333
3334        class ParentB(GenericParent):
3335            __tablename__ = "parent_b"
3336
3337            id = Column(
3338                Integer, ForeignKey("generic_parent.id"), primary_key=True
3339            )
3340            children = relationship("ChildB", back_populates="parent")
3341
3342            __mapper_args__ = {"polymorphic_identity": "parent_b"}
3343
3344        class ChildA(Base):
3345            __tablename__ = "child_a"
3346            id = Column(Integer, primary_key=True)
3347            parent_id = Column(
3348                Integer, ForeignKey("parent_a.id"), nullable=False
3349            )
3350            parent = relationship("ParentA", back_populates="children")
3351
3352        class ChildB(Base):
3353            __tablename__ = "child_b"
3354
3355            id = Column(Integer, primary_key=True)
3356            parent_id = Column(
3357                Integer, ForeignKey("parent_b.id"), nullable=False
3358            )
3359            parent = relationship("ParentB", back_populates="children")
3360
3361    @classmethod
3362    def insert_data(cls, connection):
3363        ParentA, ParentB, ChildA, ChildB = cls.classes(
3364            "ParentA", "ParentB", "ChildA", "ChildB"
3365        )
3366        session = Session(connection)
3367        parent_a = ParentA(id=1)
3368        parent_b = ParentB(id=2)
3369        for i in range(10):
3370            parent_a.children.append(ChildA())
3371            parent_b.children.append(ChildB())
3372        session.add_all([parent_a, parent_b])
3373
3374        session.commit()
3375
3376    def test_load_both_wpoly(self):
3377        GenericParent, ParentA, ParentB, ChildA, ChildB = self.classes(
3378            "GenericParent", "ParentA", "ParentB", "ChildA", "ChildB"
3379        )
3380        session = Session()
3381
3382        parent_types = with_polymorphic(GenericParent, [ParentA, ParentB])
3383
3384        with assert_engine(testing.db) as asserter_:
3385            session.query(parent_types).options(
3386                selectinload(parent_types.ParentA.children),
3387                selectinload(parent_types.ParentB.children),
3388            ).all()
3389
3390        asserter_.assert_(
3391            CompiledSQL(
3392                "SELECT generic_parent.id AS generic_parent_id, "
3393                "generic_parent.type AS generic_parent_type, "
3394                "parent_a.id AS parent_a_id, parent_b.id AS parent_b_id "
3395                "FROM generic_parent LEFT OUTER JOIN parent_a "
3396                "ON generic_parent.id = parent_a.id LEFT OUTER JOIN parent_b "
3397                "ON generic_parent.id = parent_b.id"
3398            ),
3399            AllOf(
3400                CompiledSQL(
3401                    "SELECT child_a.parent_id AS child_a_parent_id, "
3402                    "child_a.id AS child_a_id FROM child_a "
3403                    "WHERE child_a.parent_id IN ([EXPANDING_primary_keys])",
3404                    [{"primary_keys": [1]}],
3405                ),
3406                CompiledSQL(
3407                    "SELECT child_b.parent_id AS child_b_parent_id, "
3408                    "child_b.id AS child_b_id FROM child_b "
3409                    "WHERE child_b.parent_id IN ([EXPANDING_primary_keys])",
3410                    [{"primary_keys": [2]}],
3411                ),
3412            ),
3413        )
3414
3415
3416class TestBakedCancelsCorrectly(fixtures.DeclarativeMappedTest):
3417    # test issue #5303
3418
3419    @classmethod
3420    def setup_classes(cls):
3421        Base = cls.DeclarativeBasic
3422
3423        class User(Base):
3424            __tablename__ = "users"
3425
3426            id = Column(Integer, primary_key=True)
3427
3428        class Foo(Base):
3429            __tablename__ = "foos"
3430            __mapper_args__ = {"polymorphic_on": "type"}
3431
3432            id = Column(Integer, primary_key=True)
3433            type = Column(String(50), nullable=False)
3434
3435        class SubFoo(Foo):
3436            __tablename__ = "foos_sub"
3437            __mapper_args__ = {"polymorphic_identity": "USER"}
3438
3439            id = Column(Integer, ForeignKey("foos.id"), primary_key=True)
3440            user_id = Column(Integer, ForeignKey("users.id"))
3441            user = relationship("User")
3442
3443        class Bar(Base):
3444            __tablename__ = "bars"
3445
3446            id = Column(Integer, primary_key=True)
3447            foo_id = Column(Integer, ForeignKey("foos.id"))
3448            foo = relationship("Foo", cascade="all", uselist=False)
3449
3450    @classmethod
3451    def insert_data(cls, connection):
3452        User, Bar, SubFoo = cls.classes("User", "Bar", "SubFoo")
3453
3454        session = Session(connection)
3455
3456        user = User()
3457        sub_foo = SubFoo(user=user)
3458        sub_sub_bar = Bar(foo=sub_foo)
3459        session.add_all([user, sub_foo, sub_sub_bar])
3460        session.commit()
3461
3462    def test_option_accepted_each_time(self):
3463        Foo, User, Bar, SubFoo = self.classes("Foo", "User", "Bar", "SubFoo")
3464
3465        def go():
3466            # in this test, the loader options cancel caching because
3467            # the with_polymorphic() can't be cached, and this actually
3468            # fails because it won't match up to the with_polymorphic
3469            # used in the query if the query is in fact cached.  however
3470            # the cache spoil did not use full=True which kept the lead
3471            # entities around.
3472
3473            sess = Session()
3474            foo_polymorphic = with_polymorphic(Foo, [SubFoo], aliased=True)
3475
3476            credit_adjustment_load = selectinload(
3477                Bar.foo.of_type(foo_polymorphic)
3478            )
3479            user_load = credit_adjustment_load.joinedload(
3480                foo_polymorphic.SubFoo.user
3481            )
3482            query = sess.query(Bar).options(user_load)
3483            ledger_entry = query.first()
3484            ledger_entry.foo.user
3485
3486        self.assert_sql_count(testing.db, go, 2)
3487        self.assert_sql_count(testing.db, go, 2)
3488        self.assert_sql_count(testing.db, go, 2)
3489