1import sqlalchemy as sa
2from sqlalchemy import bindparam
3from sqlalchemy import ForeignKey
4from sqlalchemy import ForeignKeyConstraint
5from sqlalchemy import Integer
6from sqlalchemy import select
7from sqlalchemy import String
8from sqlalchemy import testing
9from sqlalchemy.orm import aliased
10from sqlalchemy.orm import clear_mappers
11from sqlalchemy.orm import defaultload
12from sqlalchemy.orm import defer
13from sqlalchemy.orm import deferred
14from sqlalchemy.orm import joinedload
15from sqlalchemy.orm import relationship
16from sqlalchemy.orm import selectinload
17from sqlalchemy.orm import Session
18from sqlalchemy.orm import subqueryload
19from sqlalchemy.orm import undefer
20from sqlalchemy.orm import with_polymorphic
21from sqlalchemy.testing import assert_raises
22from sqlalchemy.testing import assert_raises_message
23from sqlalchemy.testing import eq_
24from sqlalchemy.testing import fixtures
25from sqlalchemy.testing import is_
26from sqlalchemy.testing import is_not
27from sqlalchemy.testing import is_true
28from sqlalchemy.testing import mock
29from sqlalchemy.testing.assertsql import AllOf
30from sqlalchemy.testing.assertsql import assert_engine
31from sqlalchemy.testing.assertsql import CompiledSQL
32from sqlalchemy.testing.fixtures import ComparableEntity
33from sqlalchemy.testing.fixtures import fixture_session
34from sqlalchemy.testing.schema import Column
35from sqlalchemy.testing.schema import Table
36from test.orm import _fixtures
37from .inheritance._poly_fixtures import _Polymorphic
38from .inheritance._poly_fixtures import Company
39from .inheritance._poly_fixtures import Engineer
40from .inheritance._poly_fixtures import Machine
41from .inheritance._poly_fixtures import MachineType
42from .inheritance._poly_fixtures import Paperwork
43from .inheritance._poly_fixtures import Person
44
45
46class EagerTest(_fixtures.FixtureTest, testing.AssertsCompiledSQL):
47    run_inserts = "once"
48    run_deletes = None
49
50    def test_basic(self):
51        users, Address, addresses, User = (
52            self.tables.users,
53            self.classes.Address,
54            self.tables.addresses,
55            self.classes.User,
56        )
57
58        self.mapper_registry.map_imperatively(
59            User,
60            users,
61            properties={
62                "addresses": relationship(
63                    self.mapper_registry.map_imperatively(Address, addresses),
64                    order_by=Address.id,
65                )
66            },
67        )
68        sess = fixture_session()
69
70        q = sess.query(User).options(selectinload(User.addresses))
71
72        def go():
73            eq_(
74                [
75                    User(
76                        id=7,
77                        addresses=[
78                            Address(id=1, email_address="jack@bean.com")
79                        ],
80                    )
81                ],
82                q.filter(User.id == 7).all(),
83            )
84
85        self.assert_sql_count(testing.db, go, 2)
86
87        def go():
88            eq_(self.static.user_address_result, q.order_by(User.id).all())
89
90        self.assert_sql_count(testing.db, go, 2)
91
92    def user_dingaling_fixture(self):
93        users, Dingaling, User, dingalings, Address, addresses = (
94            self.tables.users,
95            self.classes.Dingaling,
96            self.classes.User,
97            self.tables.dingalings,
98            self.classes.Address,
99            self.tables.addresses,
100        )
101
102        self.mapper_registry.map_imperatively(Dingaling, dingalings)
103        self.mapper_registry.map_imperatively(
104            Address,
105            addresses,
106            properties={
107                "dingalings": relationship(Dingaling, order_by=Dingaling.id)
108            },
109        )
110        self.mapper_registry.map_imperatively(
111            User,
112            users,
113            properties={
114                "addresses": relationship(Address, order_by=Address.id)
115            },
116        )
117        return User, Dingaling, Address
118
119    def test_from_aliased_w_cache_one(self):
120        User, Dingaling, Address = self.user_dingaling_fixture()
121
122        for i in range(3):
123
124            def go():
125
126                sess = fixture_session()
127
128                u = aliased(User)
129
130                q = sess.query(u).options(selectinload(u.addresses))
131
132                eq_(
133                    [
134                        User(
135                            id=7,
136                            addresses=[
137                                Address(id=1, email_address="jack@bean.com")
138                            ],
139                        )
140                    ],
141                    q.filter(u.id == 7).all(),
142                )
143
144            self.assert_sql_count(testing.db, go, 2)
145
146    def test_from_aliased_w_cache_two(self):
147        User, Dingaling, Address = self.user_dingaling_fixture()
148
149        for i in range(3):
150
151            def go():
152                sess = fixture_session()
153
154                u = aliased(User)
155
156                q = sess.query(u).options(selectinload(u.addresses))
157
158                eq_(self.static.user_address_result, q.order_by(u.id).all())
159
160            self.assert_sql_count(testing.db, go, 2)
161
162    def test_from_aliased_w_cache_three(self):
163
164        User, Dingaling, Address = self.user_dingaling_fixture()
165
166        for i in range(3):
167
168            def go():
169                sess = fixture_session()
170
171                u = aliased(User)
172
173                q = sess.query(u).options(
174                    selectinload(u.addresses).selectinload(Address.dingalings)
175                )
176                eq_(
177                    [
178                        User(
179                            id=8,
180                            addresses=[
181                                Address(
182                                    id=2,
183                                    email_address="ed@wood.com",
184                                    dingalings=[Dingaling()],
185                                ),
186                                Address(
187                                    id=3, email_address="ed@bettyboop.com"
188                                ),
189                                Address(id=4, email_address="ed@lala.com"),
190                            ],
191                        ),
192                        User(
193                            id=9,
194                            addresses=[
195                                Address(id=5, dingalings=[Dingaling()])
196                            ],
197                        ),
198                    ],
199                    q.filter(u.id.in_([8, 9])).all(),
200                )
201
202            self.assert_sql_count(testing.db, go, 3)
203
204    def test_from_get(self):
205        users, Address, addresses, User = (
206            self.tables.users,
207            self.classes.Address,
208            self.tables.addresses,
209            self.classes.User,
210        )
211
212        self.mapper_registry.map_imperatively(
213            User,
214            users,
215            properties={
216                "addresses": relationship(
217                    self.mapper_registry.map_imperatively(Address, addresses),
218                    order_by=Address.id,
219                )
220            },
221        )
222        sess = fixture_session()
223
224        def go():
225            eq_(
226                User(
227                    id=7,
228                    addresses=[Address(id=1, email_address="jack@bean.com")],
229                ),
230                sess.get(User, 7, options=[selectinload(User.addresses)]),
231            )
232
233        self.assert_sql_count(testing.db, go, 2)
234
235    def test_from_params(self):
236        users, Address, addresses, User = (
237            self.tables.users,
238            self.classes.Address,
239            self.tables.addresses,
240            self.classes.User,
241        )
242
243        self.mapper_registry.map_imperatively(
244            User,
245            users,
246            properties={
247                "addresses": relationship(
248                    self.mapper_registry.map_imperatively(Address, addresses),
249                    order_by=Address.id,
250                )
251            },
252        )
253        sess = fixture_session()
254
255        q = sess.query(User).options(selectinload(User.addresses))
256
257        def go():
258            eq_(
259                User(
260                    id=7,
261                    addresses=[Address(id=1, email_address="jack@bean.com")],
262                ),
263                q.filter(User.id == bindparam("foo")).params(foo=7).one(),
264            )
265
266        self.assert_sql_count(testing.db, go, 2)
267
268    def test_disable_dynamic(self):
269        """test no selectin option on a dynamic."""
270
271        users, Address, addresses, User = (
272            self.tables.users,
273            self.classes.Address,
274            self.tables.addresses,
275            self.classes.User,
276        )
277
278        self.mapper_registry.map_imperatively(
279            User,
280            users,
281            properties={"addresses": relationship(Address, lazy="dynamic")},
282        )
283        self.mapper_registry.map_imperatively(Address, addresses)
284        sess = fixture_session()
285
286        # previously this would not raise, but would emit
287        # the query needlessly and put the result nowhere.
288        assert_raises_message(
289            sa.exc.InvalidRequestError,
290            "User.addresses' does not support object population - eager "
291            "loading cannot be applied.",
292            sess.query(User).options(selectinload(User.addresses)).first,
293        )
294
295    def test_many_to_many_plain(self):
296        keywords, items, item_keywords, Keyword, Item = (
297            self.tables.keywords,
298            self.tables.items,
299            self.tables.item_keywords,
300            self.classes.Keyword,
301            self.classes.Item,
302        )
303
304        self.mapper_registry.map_imperatively(Keyword, keywords)
305        self.mapper_registry.map_imperatively(
306            Item,
307            items,
308            properties=dict(
309                keywords=relationship(
310                    Keyword,
311                    secondary=item_keywords,
312                    lazy="selectin",
313                    order_by=keywords.c.id,
314                )
315            ),
316        )
317
318        q = fixture_session().query(Item).order_by(Item.id)
319
320        def go():
321            eq_(self.static.item_keyword_result, q.all())
322
323        self.assert_sql_count(testing.db, go, 2)
324
325    def test_many_to_many_with_join(self):
326        keywords, items, item_keywords, Keyword, Item = (
327            self.tables.keywords,
328            self.tables.items,
329            self.tables.item_keywords,
330            self.classes.Keyword,
331            self.classes.Item,
332        )
333
334        self.mapper_registry.map_imperatively(Keyword, keywords)
335        self.mapper_registry.map_imperatively(
336            Item,
337            items,
338            properties=dict(
339                keywords=relationship(
340                    Keyword,
341                    secondary=item_keywords,
342                    lazy="selectin",
343                    order_by=keywords.c.id,
344                )
345            ),
346        )
347
348        q = fixture_session().query(Item).order_by(Item.id)
349
350        def go():
351            eq_(
352                self.static.item_keyword_result[0:2],
353                q.join(Item.keywords).filter(Keyword.name == "red").all(),
354            )
355
356        self.assert_sql_count(testing.db, go, 2)
357
358    def test_many_to_many_with_join_alias(self):
359        keywords, items, item_keywords, Keyword, Item = (
360            self.tables.keywords,
361            self.tables.items,
362            self.tables.item_keywords,
363            self.classes.Keyword,
364            self.classes.Item,
365        )
366
367        self.mapper_registry.map_imperatively(Keyword, keywords)
368        self.mapper_registry.map_imperatively(
369            Item,
370            items,
371            properties=dict(
372                keywords=relationship(
373                    Keyword,
374                    secondary=item_keywords,
375                    lazy="selectin",
376                    order_by=keywords.c.id,
377                )
378            ),
379        )
380
381        q = fixture_session().query(Item).order_by(Item.id)
382
383        def go():
384            ka = aliased(Keyword)
385            eq_(
386                self.static.item_keyword_result[0:2],
387                (q.join(ka, Item.keywords).filter(ka.name == "red")).all(),
388            )
389
390        self.assert_sql_count(testing.db, go, 2)
391
392    def test_orderby(self):
393        users, Address, addresses, User = (
394            self.tables.users,
395            self.classes.Address,
396            self.tables.addresses,
397            self.classes.User,
398        )
399
400        self.mapper_registry.map_imperatively(
401            User,
402            users,
403            properties={
404                "addresses": relationship(
405                    self.mapper_registry.map_imperatively(Address, addresses),
406                    lazy="selectin",
407                    order_by=addresses.c.email_address,
408                )
409            },
410        )
411        q = fixture_session().query(User)
412        eq_(
413            [
414                User(id=7, addresses=[Address(id=1)]),
415                User(
416                    id=8,
417                    addresses=[
418                        Address(id=3, email_address="ed@bettyboop.com"),
419                        Address(id=4, email_address="ed@lala.com"),
420                        Address(id=2, email_address="ed@wood.com"),
421                    ],
422                ),
423                User(id=9, addresses=[Address(id=5)]),
424                User(id=10, addresses=[]),
425            ],
426            q.order_by(User.id).all(),
427        )
428
429    def test_orderby_multi(self):
430        users, Address, addresses, User = (
431            self.tables.users,
432            self.classes.Address,
433            self.tables.addresses,
434            self.classes.User,
435        )
436
437        self.mapper_registry.map_imperatively(
438            User,
439            users,
440            properties={
441                "addresses": relationship(
442                    self.mapper_registry.map_imperatively(Address, addresses),
443                    lazy="selectin",
444                    order_by=[addresses.c.email_address, addresses.c.id],
445                )
446            },
447        )
448        q = fixture_session().query(User)
449        eq_(
450            [
451                User(id=7, addresses=[Address(id=1)]),
452                User(
453                    id=8,
454                    addresses=[
455                        Address(id=3, email_address="ed@bettyboop.com"),
456                        Address(id=4, email_address="ed@lala.com"),
457                        Address(id=2, email_address="ed@wood.com"),
458                    ],
459                ),
460                User(id=9, addresses=[Address(id=5)]),
461                User(id=10, addresses=[]),
462            ],
463            q.order_by(User.id).all(),
464        )
465
466    def test_orderby_related(self):
467        """A regular mapper select on a single table can
468        order by a relationship to a second table"""
469
470        Address, addresses, users, User = (
471            self.classes.Address,
472            self.tables.addresses,
473            self.tables.users,
474            self.classes.User,
475        )
476
477        self.mapper_registry.map_imperatively(Address, addresses)
478        self.mapper_registry.map_imperatively(
479            User,
480            users,
481            properties=dict(
482                addresses=relationship(
483                    Address, lazy="selectin", order_by=addresses.c.id
484                )
485            ),
486        )
487
488        q = fixture_session().query(User)
489        result = (
490            q.filter(User.id == Address.user_id)
491            .order_by(Address.email_address)
492            .all()
493        )
494
495        eq_(
496            [
497                User(
498                    id=8,
499                    addresses=[
500                        Address(id=2, email_address="ed@wood.com"),
501                        Address(id=3, email_address="ed@bettyboop.com"),
502                        Address(id=4, email_address="ed@lala.com"),
503                    ],
504                ),
505                User(id=9, addresses=[Address(id=5)]),
506                User(id=7, addresses=[Address(id=1)]),
507            ],
508            result,
509        )
510
511    def test_orderby_desc(self):
512        Address, addresses, users, User = (
513            self.classes.Address,
514            self.tables.addresses,
515            self.tables.users,
516            self.classes.User,
517        )
518
519        self.mapper_registry.map_imperatively(Address, addresses)
520        self.mapper_registry.map_imperatively(
521            User,
522            users,
523            properties=dict(
524                addresses=relationship(
525                    Address,
526                    lazy="selectin",
527                    order_by=[sa.desc(addresses.c.email_address)],
528                )
529            ),
530        )
531        sess = fixture_session()
532        eq_(
533            [
534                User(id=7, addresses=[Address(id=1)]),
535                User(
536                    id=8,
537                    addresses=[
538                        Address(id=2, email_address="ed@wood.com"),
539                        Address(id=4, email_address="ed@lala.com"),
540                        Address(id=3, email_address="ed@bettyboop.com"),
541                    ],
542                ),
543                User(id=9, addresses=[Address(id=5)]),
544                User(id=10, addresses=[]),
545            ],
546            sess.query(User).order_by(User.id).all(),
547        )
548
549    _pathing_runs = [
550        ("lazyload", "lazyload", "lazyload", 15),
551        ("selectinload", "lazyload", "lazyload", 12),
552        ("selectinload", "selectinload", "lazyload", 8),
553        ("joinedload", "selectinload", "lazyload", 7),
554        ("lazyload", "lazyload", "selectinload", 12),
555        ("selectinload", "selectinload", "selectinload", 4),
556        ("selectinload", "selectinload", "joinedload", 3),
557    ]
558
559    def test_options_pathing(self):
560        self._do_options_test(self._pathing_runs)
561
562    def test_mapper_pathing(self):
563        self._do_mapper_test(self._pathing_runs)
564
565    def _do_options_test(self, configs):
566        (
567            users,
568            Keyword,
569            orders,
570            items,
571            order_items,
572            Order,
573            Item,
574            User,
575            keywords,
576            item_keywords,
577        ) = (
578            self.tables.users,
579            self.classes.Keyword,
580            self.tables.orders,
581            self.tables.items,
582            self.tables.order_items,
583            self.classes.Order,
584            self.classes.Item,
585            self.classes.User,
586            self.tables.keywords,
587            self.tables.item_keywords,
588        )
589
590        self.mapper_registry.map_imperatively(
591            User,
592            users,
593            properties={
594                "orders": relationship(Order, order_by=orders.c.id)  # o2m, m2o
595            },
596        )
597        self.mapper_registry.map_imperatively(
598            Order,
599            orders,
600            properties={
601                "items": relationship(
602                    Item, secondary=order_items, order_by=items.c.id
603                )  # m2m
604            },
605        )
606        self.mapper_registry.map_imperatively(
607            Item,
608            items,
609            properties={
610                "keywords": relationship(
611                    Keyword, secondary=item_keywords, order_by=keywords.c.id
612                )  # m2m
613            },
614        )
615        self.mapper_registry.map_imperatively(Keyword, keywords)
616
617        callables = {
618            "joinedload": joinedload,
619            "selectinload": selectinload,
620            "subqueryload": subqueryload,
621        }
622
623        # NOTE: make sure this test continues to run many different
624        # combinations for the *same* mappers above; that is, don't tear the
625        # mappers down and build them up for every "config".  This allows
626        # testing of the LRUCache that's associated with LazyLoader
627        # and SelectInLoader and how they interact with the lambda query
628        # API, which stores AnalyzedFunction objects in this cache.
629
630        for o, i, k, count in configs:
631            options = []
632            if o in callables:
633                options.append(callables[o](User.orders))
634            if i in callables:
635                options.append(callables[i](User.orders, Order.items))
636            if k in callables:
637                options.append(
638                    callables[k](User.orders, Order.items, Item.keywords)
639                )
640            self._do_query_tests(options, count)
641
642    def _do_mapper_test(self, configs):
643        (
644            users,
645            Keyword,
646            orders,
647            items,
648            order_items,
649            Order,
650            Item,
651            User,
652            keywords,
653            item_keywords,
654        ) = (
655            self.tables.users,
656            self.classes.Keyword,
657            self.tables.orders,
658            self.tables.items,
659            self.tables.order_items,
660            self.classes.Order,
661            self.classes.Item,
662            self.classes.User,
663            self.tables.keywords,
664            self.tables.item_keywords,
665        )
666
667        opts = {
668            "lazyload": "select",
669            "joinedload": "joined",
670            "selectinload": "selectin",
671        }
672
673        for o, i, k, count in configs:
674            self.mapper_registry.map_imperatively(
675                User,
676                users,
677                properties={
678                    "orders": relationship(
679                        Order, lazy=opts[o], order_by=orders.c.id
680                    )
681                },
682            )
683            self.mapper_registry.map_imperatively(
684                Order,
685                orders,
686                properties={
687                    "items": relationship(
688                        Item,
689                        secondary=order_items,
690                        lazy=opts[i],
691                        order_by=items.c.id,
692                    )
693                },
694            )
695            self.mapper_registry.map_imperatively(
696                Item,
697                items,
698                properties={
699                    "keywords": relationship(
700                        Keyword,
701                        lazy=opts[k],
702                        secondary=item_keywords,
703                        order_by=keywords.c.id,
704                    )
705                },
706            )
707            self.mapper_registry.map_imperatively(Keyword, keywords)
708
709            try:
710                self._do_query_tests([], count)
711            finally:
712                clear_mappers()
713
714    def _do_query_tests(self, opts, count):
715        Order, User = self.classes.Order, self.classes.User
716
717        with fixture_session() as sess:
718
719            def go():
720                eq_(
721                    sess.query(User).options(*opts).order_by(User.id).all(),
722                    self.static.user_item_keyword_result,
723                )
724
725            self.assert_sql_count(testing.db, go, count)
726
727            eq_(
728                sess.query(User)
729                .options(*opts)
730                .filter(User.name == "fred")
731                .order_by(User.id)
732                .all(),
733                self.static.user_item_keyword_result[2:3],
734            )
735
736        with fixture_session() as sess:
737            eq_(
738                sess.query(User)
739                .options(*opts)
740                .join(User.orders)
741                .filter(Order.id == 3)
742                .order_by(User.id)
743                .all(),
744                self.static.user_item_keyword_result[0:1],
745            )
746
747    def test_cyclical(self):
748        """A circular eager relationship breaks the cycle with a lazy loader"""
749
750        Address, addresses, users, User = (
751            self.classes.Address,
752            self.tables.addresses,
753            self.tables.users,
754            self.classes.User,
755        )
756
757        self.mapper_registry.map_imperatively(Address, addresses)
758        self.mapper_registry.map_imperatively(
759            User,
760            users,
761            properties=dict(
762                addresses=relationship(
763                    Address,
764                    lazy="selectin",
765                    backref=sa.orm.backref("user", lazy="selectin"),
766                    order_by=Address.id,
767                )
768            ),
769        )
770        is_(
771            sa.orm.class_mapper(User).get_property("addresses").lazy,
772            "selectin",
773        )
774        is_(sa.orm.class_mapper(Address).get_property("user").lazy, "selectin")
775
776        sess = fixture_session()
777        eq_(
778            self.static.user_address_result,
779            sess.query(User).order_by(User.id).all(),
780        )
781
782    def test_cyclical_explicit_join_depth(self):
783        """A circular eager relationship breaks the cycle with a lazy loader"""
784
785        Address, addresses, users, User = (
786            self.classes.Address,
787            self.tables.addresses,
788            self.tables.users,
789            self.classes.User,
790        )
791
792        self.mapper_registry.map_imperatively(Address, addresses)
793        self.mapper_registry.map_imperatively(
794            User,
795            users,
796            properties=dict(
797                addresses=relationship(
798                    Address,
799                    lazy="selectin",
800                    join_depth=1,
801                    backref=sa.orm.backref(
802                        "user", lazy="selectin", join_depth=1
803                    ),
804                    order_by=Address.id,
805                )
806            ),
807        )
808        is_(
809            sa.orm.class_mapper(User).get_property("addresses").lazy,
810            "selectin",
811        )
812        is_(sa.orm.class_mapper(Address).get_property("user").lazy, "selectin")
813
814        sess = fixture_session()
815        eq_(
816            self.static.user_address_result,
817            sess.query(User).order_by(User.id).all(),
818        )
819
820    def test_double_w_ac_against_subquery(self):
821
822        (
823            users,
824            orders,
825            User,
826            Address,
827            Order,
828            addresses,
829            Item,
830            items,
831            order_items,
832        ) = (
833            self.tables.users,
834            self.tables.orders,
835            self.classes.User,
836            self.classes.Address,
837            self.classes.Order,
838            self.tables.addresses,
839            self.classes.Item,
840            self.tables.items,
841            self.tables.order_items,
842        )
843
844        self.mapper_registry.map_imperatively(Address, addresses)
845        self.mapper_registry.map_imperatively(
846            Order,
847            orders,
848            properties={
849                "items": relationship(
850                    Item,
851                    secondary=order_items,
852                    lazy="selectin",
853                    order_by=items.c.id,
854                )
855            },
856        )
857        self.mapper_registry.map_imperatively(Item, items)
858
859        open_mapper = aliased(
860            Order, select(orders).where(orders.c.isopen == 1).alias()
861        )
862        closed_mapper = aliased(
863            Order, select(orders).where(orders.c.isopen == 0).alias()
864        )
865
866        self.mapper_registry.map_imperatively(
867            User,
868            users,
869            properties=dict(
870                addresses=relationship(
871                    Address, lazy="selectin", order_by=addresses.c.id
872                ),
873                open_orders=relationship(
874                    open_mapper,
875                    lazy="selectin",
876                    order_by=open_mapper.id,
877                    overlaps="closed_orders",
878                ),
879                closed_orders=relationship(
880                    closed_mapper,
881                    lazy="selectin",
882                    order_by=closed_mapper.id,
883                    overlaps="open_orders",
884                ),
885            ),
886        )
887
888        self._run_double_test()
889
890    def test_double_w_ac(self):
891
892        (
893            users,
894            orders,
895            User,
896            Address,
897            Order,
898            addresses,
899            Item,
900            items,
901            order_items,
902        ) = (
903            self.tables.users,
904            self.tables.orders,
905            self.classes.User,
906            self.classes.Address,
907            self.classes.Order,
908            self.tables.addresses,
909            self.classes.Item,
910            self.tables.items,
911            self.tables.order_items,
912        )
913
914        self.mapper_registry.map_imperatively(Address, addresses)
915        self.mapper_registry.map_imperatively(
916            Order,
917            orders,
918            properties={
919                "items": relationship(
920                    Item,
921                    secondary=order_items,
922                    lazy="selectin",
923                    order_by=items.c.id,
924                )
925            },
926        )
927        self.mapper_registry.map_imperatively(Item, items)
928
929        open_mapper = aliased(Order, orders)
930        closed_mapper = aliased(Order, orders)
931
932        self.mapper_registry.map_imperatively(
933            User,
934            users,
935            properties=dict(
936                addresses=relationship(
937                    Address, lazy="selectin", order_by=addresses.c.id
938                ),
939                open_orders=relationship(
940                    open_mapper,
941                    primaryjoin=sa.and_(
942                        open_mapper.isopen == 1,
943                        users.c.id == open_mapper.user_id,
944                    ),
945                    lazy="selectin",
946                    order_by=open_mapper.id,
947                    viewonly=True,
948                ),
949                closed_orders=relationship(
950                    closed_mapper,
951                    primaryjoin=sa.and_(
952                        closed_mapper.isopen == 0,
953                        users.c.id == closed_mapper.user_id,
954                    ),
955                    lazy="selectin",
956                    order_by=closed_mapper.id,
957                    viewonly=True,
958                ),
959            ),
960        )
961
962        self._run_double_test()
963
964    def test_double_same_mappers(self):
965        """Eager loading with two relationships simultaneously,
966        from the same table, using aliases."""
967
968        (
969            addresses,
970            items,
971            order_items,
972            orders,
973            Item,
974            User,
975            Address,
976            Order,
977            users,
978        ) = (
979            self.tables.addresses,
980            self.tables.items,
981            self.tables.order_items,
982            self.tables.orders,
983            self.classes.Item,
984            self.classes.User,
985            self.classes.Address,
986            self.classes.Order,
987            self.tables.users,
988        )
989
990        self.mapper_registry.map_imperatively(Address, addresses)
991        self.mapper_registry.map_imperatively(
992            Order,
993            orders,
994            properties={
995                "items": relationship(
996                    Item,
997                    secondary=order_items,
998                    lazy="selectin",
999                    order_by=items.c.id,
1000                )
1001            },
1002        )
1003        self.mapper_registry.map_imperatively(Item, items)
1004        self.mapper_registry.map_imperatively(
1005            User,
1006            users,
1007            properties=dict(
1008                addresses=relationship(
1009                    Address, lazy="selectin", order_by=addresses.c.id
1010                ),
1011                open_orders=relationship(
1012                    Order,
1013                    primaryjoin=sa.and_(
1014                        orders.c.isopen == 1, users.c.id == orders.c.user_id
1015                    ),
1016                    lazy="selectin",
1017                    order_by=orders.c.id,
1018                    overlaps="closed_orders",
1019                ),
1020                closed_orders=relationship(
1021                    Order,
1022                    primaryjoin=sa.and_(
1023                        orders.c.isopen == 0, users.c.id == orders.c.user_id
1024                    ),
1025                    lazy="selectin",
1026                    order_by=orders.c.id,
1027                    overlaps="open_orders",
1028                ),
1029            ),
1030        )
1031
1032        self._run_double_test()
1033
1034    def _run_double_test(self, no_items=False):
1035        User, Address, Order, Item = self.classes(
1036            "User", "Address", "Order", "Item"
1037        )
1038        q = fixture_session().query(User).order_by(User.id)
1039
1040        def items(*ids):
1041            if no_items:
1042                return {}
1043            else:
1044                return {"items": [Item(id=id_) for id_ in ids]}
1045
1046        def go():
1047            eq_(
1048                [
1049                    User(
1050                        id=7,
1051                        addresses=[Address(id=1)],
1052                        open_orders=[Order(id=3, **items(3, 4, 5))],
1053                        closed_orders=[
1054                            Order(id=1, **items(1, 2, 3)),
1055                            Order(id=5, **items(5)),
1056                        ],
1057                    ),
1058                    User(
1059                        id=8,
1060                        addresses=[
1061                            Address(id=2),
1062                            Address(id=3),
1063                            Address(id=4),
1064                        ],
1065                        open_orders=[],
1066                        closed_orders=[],
1067                    ),
1068                    User(
1069                        id=9,
1070                        addresses=[Address(id=5)],
1071                        open_orders=[Order(id=4, **items(1, 5))],
1072                        closed_orders=[Order(id=2, **items(1, 2, 3))],
1073                    ),
1074                    User(id=10),
1075                ],
1076                q.all(),
1077            )
1078
1079        if no_items:
1080            self.assert_sql_count(testing.db, go, 4)
1081        else:
1082            self.assert_sql_count(testing.db, go, 6)
1083
1084    @testing.combinations(
1085        ("plain",), ("cte", testing.requires.ctes), ("subquery",), id_="s"
1086    )
1087    def test_map_to_cte_subq(self, type_):
1088        User, Address = self.classes("User", "Address")
1089        users, addresses = self.tables("users", "addresses")
1090
1091        if type_ == "plain":
1092            target = users
1093        elif type_ == "cte":
1094            target = select(users).cte()
1095        elif type_ == "subquery":
1096            target = select(users).subquery()
1097
1098        self.mapper_registry.map_imperatively(
1099            User,
1100            target,
1101            properties={"addresses": relationship(Address, backref="user")},
1102        )
1103        self.mapper_registry.map_imperatively(Address, addresses)
1104
1105        sess = fixture_session()
1106
1107        q = (
1108            sess.query(Address)
1109            .options(selectinload(Address.user))
1110            .order_by(Address.id)
1111        )
1112        eq_(q.all(), self.static.address_user_result)
1113
1114    def test_limit(self):
1115        """Limit operations combined with lazy-load relationships."""
1116
1117        (
1118            users,
1119            items,
1120            order_items,
1121            orders,
1122            Item,
1123            User,
1124            Address,
1125            Order,
1126            addresses,
1127        ) = (
1128            self.tables.users,
1129            self.tables.items,
1130            self.tables.order_items,
1131            self.tables.orders,
1132            self.classes.Item,
1133            self.classes.User,
1134            self.classes.Address,
1135            self.classes.Order,
1136            self.tables.addresses,
1137        )
1138
1139        self.mapper_registry.map_imperatively(Item, items)
1140        self.mapper_registry.map_imperatively(
1141            Order,
1142            orders,
1143            properties={
1144                "items": relationship(
1145                    Item,
1146                    secondary=order_items,
1147                    lazy="selectin",
1148                    order_by=items.c.id,
1149                )
1150            },
1151        )
1152        self.mapper_registry.map_imperatively(
1153            User,
1154            users,
1155            properties={
1156                "addresses": relationship(
1157                    self.mapper_registry.map_imperatively(Address, addresses),
1158                    lazy="selectin",
1159                    order_by=addresses.c.id,
1160                ),
1161                "orders": relationship(
1162                    Order, lazy="select", order_by=orders.c.id
1163                ),
1164            },
1165        )
1166
1167        sess = fixture_session()
1168        q = sess.query(User)
1169
1170        result = q.order_by(User.id).limit(2).offset(1).all()
1171        eq_(self.static.user_all_result[1:3], result)
1172
1173        result = q.order_by(sa.desc(User.id)).limit(2).offset(2).all()
1174        eq_(list(reversed(self.static.user_all_result[0:2])), result)
1175
1176    def test_one_to_many_scalar(self):
1177        Address, addresses, users, User = (
1178            self.classes.Address,
1179            self.tables.addresses,
1180            self.tables.users,
1181            self.classes.User,
1182        )
1183
1184        self.mapper_registry.map_imperatively(
1185            User,
1186            users,
1187            properties=dict(
1188                address=relationship(
1189                    self.mapper_registry.map_imperatively(Address, addresses),
1190                    lazy="selectin",
1191                    uselist=False,
1192                )
1193            ),
1194        )
1195        q = fixture_session().query(User)
1196
1197        def go():
1198            result = q.filter(users.c.id == 7).all()
1199            eq_([User(id=7, address=Address(id=1))], result)
1200
1201        self.assert_sql_count(testing.db, go, 2)
1202
1203    def test_one_to_many_scalar_none(self):
1204        Address, addresses, users, User = (
1205            self.classes.Address,
1206            self.tables.addresses,
1207            self.tables.users,
1208            self.classes.User,
1209        )
1210
1211        self.mapper_registry.map_imperatively(
1212            User,
1213            users,
1214            properties=dict(
1215                address=relationship(
1216                    self.mapper_registry.map_imperatively(Address, addresses),
1217                    lazy="selectin",
1218                    uselist=False,
1219                )
1220            ),
1221        )
1222        q = fixture_session().query(User)
1223
1224        def go():
1225            result = q.filter(users.c.id == 10).all()
1226            eq_([User(id=10, address=None)], result)
1227
1228        self.assert_sql_count(testing.db, go, 2)
1229
1230    def test_many_to_one(self):
1231        users, Address, addresses, User = (
1232            self.tables.users,
1233            self.classes.Address,
1234            self.tables.addresses,
1235            self.classes.User,
1236        )
1237
1238        self.mapper_registry.map_imperatively(
1239            Address,
1240            addresses,
1241            properties=dict(
1242                user=relationship(
1243                    self.mapper_registry.map_imperatively(User, users),
1244                    lazy="selectin",
1245                )
1246            ),
1247        )
1248        sess = fixture_session()
1249        q = sess.query(Address)
1250
1251        def go():
1252            a = q.filter(addresses.c.id == 1).one()
1253            is_not(a.user, None)
1254            u1 = sess.get(User, 7)
1255            is_(a.user, u1)
1256
1257        self.assert_sql_count(testing.db, go, 2)
1258
1259    def test_m2o_none_value_present(self):
1260        orders, Order, addresses, Address = (
1261            self.tables.orders,
1262            self.classes.Order,
1263            self.tables.addresses,
1264            self.classes.Address,
1265        )
1266
1267        self.mapper_registry.map_imperatively(
1268            Order,
1269            orders,
1270            properties={"address": relationship(Address, lazy="selectin")},
1271        )
1272        self.mapper_registry.map_imperatively(Address, addresses)
1273
1274        sess = fixture_session(autoflush=False)
1275        q = sess.query(Order).filter(Order.id.in_([4, 5])).order_by(Order.id)
1276
1277        o4, o5 = q.all()
1278        assert o4.__dict__["address"] is not None
1279        assert o5.__dict__["address"] is None
1280
1281        # test overwrite
1282
1283        o5.address = Address()
1284        sess.query(Order).filter(Order.id.in_([4, 5])).order_by(Order.id).all()
1285        assert o5.__dict__["address"] is not None
1286
1287        o5.address = Address()
1288        sess.query(Order).populate_existing().filter(
1289            Order.id.in_([4, 5])
1290        ).order_by(Order.id).all()
1291        assert o5.__dict__["address"] is None
1292
1293    def test_m2o_uselist_none_value_present(self):
1294        orders, Order, addresses, Address = (
1295            self.tables.orders,
1296            self.classes.Order,
1297            self.tables.addresses,
1298            self.classes.Address,
1299        )
1300
1301        self.mapper_registry.map_imperatively(
1302            Order,
1303            orders,
1304            properties={
1305                "address": relationship(Address, lazy="selectin", uselist=True)
1306            },
1307        )
1308        self.mapper_registry.map_imperatively(Address, addresses)
1309
1310        sess = fixture_session()
1311        q = sess.query(Order).filter(Order.id.in_([4, 5])).order_by(Order.id)
1312
1313        o4, o5 = q.all()
1314        assert len(o4.__dict__["address"])
1315        eq_(o5.__dict__["address"], [])
1316
1317    def test_o2m_empty_list_present(self):
1318        Address, addresses, users, User = (
1319            self.classes.Address,
1320            self.tables.addresses,
1321            self.tables.users,
1322            self.classes.User,
1323        )
1324
1325        self.mapper_registry.map_imperatively(
1326            User,
1327            users,
1328            properties=dict(
1329                addresses=relationship(
1330                    self.mapper_registry.map_imperatively(Address, addresses),
1331                    lazy="selectin",
1332                )
1333            ),
1334        )
1335        q = fixture_session().query(User)
1336        result = q.filter(users.c.id == 10).all()
1337        u1 = result[0]
1338
1339        eq_(u1.__dict__["addresses"], [])
1340
1341    def test_double_with_aggregate(self):
1342        User, users, orders, Order = (
1343            self.classes.User,
1344            self.tables.users,
1345            self.tables.orders,
1346            self.classes.Order,
1347        )
1348
1349        max_orders_by_user = (
1350            sa.select(sa.func.max(orders.c.id).label("order_id"))
1351            .group_by(orders.c.user_id)
1352            .alias("max_orders_by_user")
1353        )
1354
1355        max_orders = (
1356            orders.select()
1357            .where(orders.c.id == max_orders_by_user.c.order_id)
1358            .alias("max_orders")
1359        )
1360
1361        self.mapper_registry.map_imperatively(Order, orders)
1362        self.mapper_registry.map_imperatively(
1363            User,
1364            users,
1365            properties={
1366                "orders": relationship(
1367                    Order,
1368                    backref="user",
1369                    lazy="selectin",
1370                    order_by=orders.c.id,
1371                ),
1372                "max_order": relationship(
1373                    aliased(Order, max_orders), lazy="selectin", uselist=False
1374                ),
1375            },
1376        )
1377
1378        q = fixture_session().query(User)
1379
1380        def go():
1381            eq_(
1382                [
1383                    User(
1384                        id=7,
1385                        orders=[Order(id=1), Order(id=3), Order(id=5)],
1386                        max_order=Order(id=5),
1387                    ),
1388                    User(id=8, orders=[]),
1389                    User(
1390                        id=9,
1391                        orders=[Order(id=2), Order(id=4)],
1392                        max_order=Order(id=4),
1393                    ),
1394                    User(id=10),
1395                ],
1396                q.order_by(User.id).all(),
1397            )
1398
1399        self.assert_sql_count(testing.db, go, 3)
1400
1401    def test_uselist_false_warning(self):
1402        """test that multiple rows received by a
1403        uselist=False raises a warning."""
1404
1405        User, users, orders, Order = (
1406            self.classes.User,
1407            self.tables.users,
1408            self.tables.orders,
1409            self.classes.Order,
1410        )
1411
1412        self.mapper_registry.map_imperatively(
1413            User,
1414            users,
1415            properties={"order": relationship(Order, uselist=False)},
1416        )
1417        self.mapper_registry.map_imperatively(Order, orders)
1418        s = fixture_session()
1419        assert_raises(
1420            sa.exc.SAWarning,
1421            s.query(User).options(selectinload(User.order)).all,
1422        )
1423
1424
1425class LoadOnExistingTest(_fixtures.FixtureTest):
1426    """test that loaders from a base Query fully populate."""
1427
1428    run_inserts = "once"
1429    run_deletes = None
1430
1431    def _collection_to_scalar_fixture(self):
1432        User, Address, Dingaling = (
1433            self.classes.User,
1434            self.classes.Address,
1435            self.classes.Dingaling,
1436        )
1437        self.mapper_registry.map_imperatively(
1438            User,
1439            self.tables.users,
1440            properties={"addresses": relationship(Address)},
1441        )
1442        self.mapper_registry.map_imperatively(
1443            Address,
1444            self.tables.addresses,
1445            properties={"dingaling": relationship(Dingaling)},
1446        )
1447        self.mapper_registry.map_imperatively(
1448            Dingaling, self.tables.dingalings
1449        )
1450
1451        sess = fixture_session(autoflush=False)
1452        return User, Address, Dingaling, sess
1453
1454    def _collection_to_collection_fixture(self):
1455        User, Order, Item = (
1456            self.classes.User,
1457            self.classes.Order,
1458            self.classes.Item,
1459        )
1460        self.mapper_registry.map_imperatively(
1461            User, self.tables.users, properties={"orders": relationship(Order)}
1462        )
1463        self.mapper_registry.map_imperatively(
1464            Order,
1465            self.tables.orders,
1466            properties={
1467                "items": relationship(Item, secondary=self.tables.order_items)
1468            },
1469        )
1470        self.mapper_registry.map_imperatively(Item, self.tables.items)
1471
1472        sess = fixture_session(autoflush=False)
1473        return User, Order, Item, sess
1474
1475    def _eager_config_fixture(self, default_lazy="selectin"):
1476        User, Address = self.classes.User, self.classes.Address
1477        self.mapper_registry.map_imperatively(
1478            User,
1479            self.tables.users,
1480            properties={"addresses": relationship(Address, lazy=default_lazy)},
1481        )
1482        self.mapper_registry.map_imperatively(Address, self.tables.addresses)
1483        sess = fixture_session(autoflush=False)
1484        return User, Address, sess
1485
1486    def _deferred_config_fixture(self):
1487        User, Address = self.classes.User, self.classes.Address
1488        self.mapper_registry.map_imperatively(
1489            User,
1490            self.tables.users,
1491            properties={
1492                "name": deferred(self.tables.users.c.name),
1493                "addresses": relationship(Address, lazy="selectin"),
1494            },
1495        )
1496        self.mapper_registry.map_imperatively(Address, self.tables.addresses)
1497        sess = fixture_session(autoflush=False)
1498        return User, Address, sess
1499
1500    def test_runs_query_on_refresh(self):
1501        User, Address, sess = self._eager_config_fixture()
1502
1503        u1 = sess.get(User, 8)
1504        assert "addresses" in u1.__dict__
1505        sess.expire(u1)
1506
1507        def go():
1508            eq_(u1.id, 8)
1509
1510        self.assert_sql_count(testing.db, go, 2)
1511        assert "addresses" in u1.__dict__
1512
1513    @testing.combinations(
1514        ("raise",),
1515        ("raise_on_sql",),
1516        ("select",),
1517        ("immediate"),
1518    )
1519    def test_runs_query_on_option_refresh(self, default_lazy):
1520        User, Address, sess = self._eager_config_fixture(
1521            default_lazy=default_lazy
1522        )
1523
1524        u1 = (
1525            sess.query(User)
1526            .options(selectinload(User.addresses))
1527            .filter_by(id=8)
1528            .first()
1529        )
1530        assert "addresses" in u1.__dict__
1531        sess.expire(u1)
1532
1533        def go():
1534            eq_(u1.id, 8)
1535
1536        self.assert_sql_count(testing.db, go, 2)
1537        assert "addresses" in u1.__dict__
1538
1539    def test_no_query_on_deferred(self):
1540        User, Address, sess = self._deferred_config_fixture()
1541        u1 = sess.get(User, 8)
1542        assert "addresses" in u1.__dict__
1543        sess.expire(u1, ["addresses"])
1544
1545        def go():
1546            eq_(u1.name, "ed")
1547
1548        self.assert_sql_count(testing.db, go, 1)
1549        assert "addresses" not in u1.__dict__
1550
1551    def test_populate_existing_propagate(self):
1552        User, Address, sess = self._eager_config_fixture()
1553        u1 = sess.get(User, 8)
1554        u1.addresses[2].email_address = "foofoo"
1555        del u1.addresses[1]
1556        u1 = sess.query(User).populate_existing().filter_by(id=8).one()
1557        # collection is reverted
1558        eq_(len(u1.addresses), 3)
1559
1560        # attributes on related items reverted
1561        eq_(u1.addresses[2].email_address, "ed@lala.com")
1562
1563    def test_loads_second_level_collection_to_scalar(self):
1564        User, Address, Dingaling, sess = self._collection_to_scalar_fixture()
1565
1566        u1 = sess.get(User, 8)
1567        a1 = Address()
1568        u1.addresses.append(a1)
1569        a2 = u1.addresses[0]
1570        a2.email_address = "foo"
1571        sess.query(User).options(
1572            selectinload(User.addresses).selectinload(Address.dingaling)
1573        ).filter_by(id=8).all()
1574        assert u1.addresses[-1] is a1
1575        for a in u1.addresses:
1576            if a is not a1:
1577                assert "dingaling" in a.__dict__
1578            else:
1579                assert "dingaling" not in a.__dict__
1580            if a is a2:
1581                eq_(a2.email_address, "foo")
1582
1583    def test_loads_second_level_collection_to_collection(self):
1584        User, Order, Item, sess = self._collection_to_collection_fixture()
1585
1586        u1 = sess.get(User, 7)
1587        u1.orders
1588        o1 = Order()
1589        u1.orders.append(o1)
1590        sess.query(User).options(
1591            selectinload(User.orders).selectinload(Order.items)
1592        ).filter_by(id=7).all()
1593        for o in u1.orders:
1594            if o is not o1:
1595                assert "items" in o.__dict__
1596            else:
1597                assert "items" not in o.__dict__
1598
1599    def test_load_two_levels_collection_to_scalar(self):
1600        User, Address, Dingaling, sess = self._collection_to_scalar_fixture()
1601
1602        u1 = (
1603            sess.query(User)
1604            .filter_by(id=8)
1605            .options(selectinload(User.addresses))
1606            .one()
1607        )
1608        sess.query(User).filter_by(id=8).options(
1609            selectinload(User.addresses).selectinload(Address.dingaling)
1610        ).first()
1611        assert "dingaling" in u1.addresses[0].__dict__
1612
1613    def test_load_two_levels_collection_to_collection(self):
1614        User, Order, Item, sess = self._collection_to_collection_fixture()
1615
1616        u1 = (
1617            sess.query(User)
1618            .filter_by(id=7)
1619            .options(selectinload(User.orders))
1620            .one()
1621        )
1622        sess.query(User).filter_by(id=7).options(
1623            selectinload(User.orders).selectinload(Order.items)
1624        ).first()
1625        assert "items" in u1.orders[0].__dict__
1626
1627
1628class OrderBySecondaryTest(fixtures.MappedTest):
1629    @classmethod
1630    def define_tables(cls, metadata):
1631        Table(
1632            "m2m",
1633            metadata,
1634            Column(
1635                "id", Integer, primary_key=True, test_needs_autoincrement=True
1636            ),
1637            Column("aid", Integer, ForeignKey("a.id")),
1638            Column("bid", Integer, ForeignKey("b.id")),
1639        )
1640
1641        Table(
1642            "a",
1643            metadata,
1644            Column(
1645                "id", Integer, primary_key=True, test_needs_autoincrement=True
1646            ),
1647            Column("data", String(50)),
1648        )
1649        Table(
1650            "b",
1651            metadata,
1652            Column(
1653                "id", Integer, primary_key=True, test_needs_autoincrement=True
1654            ),
1655            Column("data", String(50)),
1656        )
1657
1658    @classmethod
1659    def fixtures(cls):
1660        return dict(
1661            a=(("id", "data"), (1, "a1"), (2, "a2")),
1662            b=(("id", "data"), (1, "b1"), (2, "b2"), (3, "b3"), (4, "b4")),
1663            m2m=(
1664                ("id", "aid", "bid"),
1665                (2, 1, 1),
1666                (4, 2, 4),
1667                (1, 1, 3),
1668                (6, 2, 2),
1669                (3, 1, 2),
1670                (5, 2, 3),
1671            ),
1672        )
1673
1674    def test_ordering(self):
1675        a, m2m, b = (self.tables.a, self.tables.m2m, self.tables.b)
1676
1677        class A(fixtures.ComparableEntity):
1678            pass
1679
1680        class B(fixtures.ComparableEntity):
1681            pass
1682
1683        self.mapper_registry.map_imperatively(
1684            A,
1685            a,
1686            properties={
1687                "bs": relationship(
1688                    B, secondary=m2m, lazy="selectin", order_by=m2m.c.id
1689                )
1690            },
1691        )
1692        self.mapper_registry.map_imperatively(B, b)
1693
1694        sess = fixture_session()
1695
1696        def go():
1697            eq_(
1698                sess.query(A).all(),
1699                [
1700                    A(
1701                        data="a1",
1702                        bs=[B(data="b3"), B(data="b1"), B(data="b2")],
1703                    ),
1704                    A(bs=[B(data="b4"), B(data="b3"), B(data="b2")]),
1705                ],
1706            )
1707
1708        self.assert_sql_count(testing.db, go, 2)
1709
1710
1711class BaseRelationFromJoinedSubclassTest(_Polymorphic):
1712    """Like most tests here, this is adapted from subquery_relations
1713    as part of general inheritance testing.
1714
1715    The subquery test exercised the issue that the subquery load must
1716    imitate the original query very closely so that filter criteria, ordering
1717    etc. can be maintained with the original query embedded.   However,
1718    for selectin loading, none of that is really needed, so here the secondary
1719    queries are all just a simple "people JOIN paperwork".
1720
1721    """
1722
1723    @classmethod
1724    def define_tables(cls, metadata):
1725        Table(
1726            "people",
1727            metadata,
1728            Column(
1729                "person_id",
1730                Integer,
1731                primary_key=True,
1732                test_needs_autoincrement=True,
1733            ),
1734            Column("name", String(50)),
1735            Column("type", String(30)),
1736        )
1737
1738        # to test fully, PK of engineers table must be
1739        # named differently from that of people
1740        Table(
1741            "engineers",
1742            metadata,
1743            Column(
1744                "engineer_id",
1745                Integer,
1746                ForeignKey("people.person_id"),
1747                primary_key=True,
1748            ),
1749            Column("primary_language", String(50)),
1750        )
1751
1752        Table(
1753            "paperwork",
1754            metadata,
1755            Column(
1756                "paperwork_id",
1757                Integer,
1758                primary_key=True,
1759                test_needs_autoincrement=True,
1760            ),
1761            Column("description", String(50)),
1762            Column("person_id", Integer, ForeignKey("people.person_id")),
1763        )
1764
1765    @classmethod
1766    def setup_mappers(cls):
1767        people = cls.tables.people
1768        engineers = cls.tables.engineers
1769        paperwork = cls.tables.paperwork
1770
1771        cls.mapper_registry.map_imperatively(
1772            Person,
1773            people,
1774            polymorphic_on=people.c.type,
1775            polymorphic_identity="person",
1776            properties={
1777                "paperwork": relationship(
1778                    Paperwork, order_by=paperwork.c.paperwork_id
1779                )
1780            },
1781        )
1782
1783        cls.mapper_registry.map_imperatively(
1784            Engineer,
1785            engineers,
1786            inherits=Person,
1787            polymorphic_identity="engineer",
1788        )
1789
1790        cls.mapper_registry.map_imperatively(Paperwork, paperwork)
1791
1792    @classmethod
1793    def insert_data(cls, connection):
1794
1795        e1 = Engineer(primary_language="java")
1796        e2 = Engineer(primary_language="c++")
1797        e1.paperwork = [
1798            Paperwork(description="tps report #1"),
1799            Paperwork(description="tps report #2"),
1800        ]
1801        e2.paperwork = [Paperwork(description="tps report #3")]
1802        sess = Session(connection)
1803        sess.add_all([e1, e2])
1804        sess.flush()
1805
1806    def test_correct_select_nofrom(self):
1807        sess = fixture_session()
1808        # use Person.paperwork here just to give the least
1809        # amount of context
1810        q = (
1811            sess.query(Engineer)
1812            .filter(Engineer.primary_language == "java")
1813            .options(selectinload(Person.paperwork))
1814        )
1815
1816        def go():
1817            eq_(
1818                q.all()[0].paperwork,
1819                [
1820                    Paperwork(description="tps report #1"),
1821                    Paperwork(description="tps report #2"),
1822                ],
1823            )
1824
1825        self.assert_sql_execution(
1826            testing.db,
1827            go,
1828            CompiledSQL(
1829                "SELECT people.person_id AS people_person_id, "
1830                "people.name AS people_name, people.type AS people_type, "
1831                "engineers.engineer_id AS engineers_engineer_id, "
1832                "engineers.primary_language AS engineers_primary_language "
1833                "FROM people JOIN engineers ON "
1834                "people.person_id = engineers.engineer_id "
1835                "WHERE engineers.primary_language = :primary_language_1",
1836                {"primary_language_1": "java"},
1837            ),
1838            CompiledSQL(
1839                "SELECT paperwork.person_id AS paperwork_person_id, "
1840                "paperwork.paperwork_id AS paperwork_paperwork_id, "
1841                "paperwork.description AS paperwork_description "
1842                "FROM paperwork WHERE paperwork.person_id "
1843                "IN (__[POSTCOMPILE_primary_keys]) "
1844                "ORDER BY paperwork.paperwork_id",
1845                [{"primary_keys": [1]}],
1846            ),
1847        )
1848
1849    def test_correct_select_existingfrom(self):
1850        sess = fixture_session()
1851        # use Person.paperwork here just to give the least
1852        # amount of context
1853        q = (
1854            sess.query(Engineer)
1855            .filter(Engineer.primary_language == "java")
1856            .join(Engineer.paperwork)
1857            .filter(Paperwork.description == "tps report #2")
1858            .options(selectinload(Person.paperwork))
1859        )
1860
1861        def go():
1862            eq_(
1863                q.one().paperwork,
1864                [
1865                    Paperwork(description="tps report #1"),
1866                    Paperwork(description="tps report #2"),
1867                ],
1868            )
1869
1870        self.assert_sql_execution(
1871            testing.db,
1872            go,
1873            CompiledSQL(
1874                "SELECT people.person_id AS people_person_id, "
1875                "people.name AS people_name, people.type AS people_type, "
1876                "engineers.engineer_id AS engineers_engineer_id, "
1877                "engineers.primary_language AS engineers_primary_language "
1878                "FROM people JOIN engineers "
1879                "ON people.person_id = engineers.engineer_id "
1880                "JOIN paperwork ON people.person_id = paperwork.person_id "
1881                "WHERE engineers.primary_language = :primary_language_1 "
1882                "AND paperwork.description = :description_1",
1883                {
1884                    "primary_language_1": "java",
1885                    "description_1": "tps report #2",
1886                },
1887            ),
1888            CompiledSQL(
1889                "SELECT paperwork.person_id AS paperwork_person_id, "
1890                "paperwork.paperwork_id AS paperwork_paperwork_id, "
1891                "paperwork.description AS paperwork_description "
1892                "FROM paperwork WHERE paperwork.person_id "
1893                "IN (__[POSTCOMPILE_primary_keys]) "
1894                "ORDER BY paperwork.paperwork_id",
1895                [{"primary_keys": [1]}],
1896            ),
1897        )
1898
1899    def test_correct_select_with_polymorphic_no_alias(self):
1900        # test #3106
1901        sess = fixture_session()
1902
1903        wp = with_polymorphic(Person, [Engineer])
1904        q = (
1905            sess.query(wp)
1906            .options(selectinload(wp.paperwork))
1907            .order_by(Engineer.primary_language.desc())
1908        )
1909
1910        def go():
1911            eq_(
1912                q.first(),
1913                Engineer(
1914                    paperwork=[
1915                        Paperwork(description="tps report #1"),
1916                        Paperwork(description="tps report #2"),
1917                    ],
1918                    primary_language="java",
1919                ),
1920            )
1921
1922        self.assert_sql_execution(
1923            testing.db,
1924            go,
1925            CompiledSQL(
1926                "SELECT people.person_id AS people_person_id, "
1927                "people.name AS people_name, people.type AS people_type, "
1928                "engineers.engineer_id AS engineers_engineer_id, "
1929                "engineers.primary_language AS engineers_primary_language "
1930                "FROM people LEFT OUTER JOIN engineers ON people.person_id = "
1931                "engineers.engineer_id ORDER BY engineers.primary_language "
1932                "DESC LIMIT :param_1"
1933            ),
1934            CompiledSQL(
1935                "SELECT paperwork.person_id AS paperwork_person_id, "
1936                "paperwork.paperwork_id AS paperwork_paperwork_id, "
1937                "paperwork.description AS paperwork_description "
1938                "FROM paperwork WHERE paperwork.person_id "
1939                "IN (__[POSTCOMPILE_primary_keys]) "
1940                "ORDER BY paperwork.paperwork_id",
1941                [{"primary_keys": [1]}],
1942            ),
1943        )
1944
1945    def test_correct_select_with_polymorphic_alias(self):
1946        # test #3106
1947        sess = fixture_session()
1948
1949        wp = with_polymorphic(Person, [Engineer], aliased=True)
1950        q = (
1951            sess.query(wp)
1952            .options(selectinload(wp.paperwork))
1953            .order_by(wp.Engineer.primary_language.desc())
1954        )
1955
1956        def go():
1957            eq_(
1958                q.first(),
1959                Engineer(
1960                    paperwork=[
1961                        Paperwork(description="tps report #1"),
1962                        Paperwork(description="tps report #2"),
1963                    ],
1964                    primary_language="java",
1965                ),
1966            )
1967
1968        self.assert_sql_execution(
1969            testing.db,
1970            go,
1971            CompiledSQL(
1972                "SELECT anon_1.people_person_id AS anon_1_people_person_id, "
1973                "anon_1.people_name AS anon_1_people_name, "
1974                "anon_1.people_type AS anon_1_people_type, "
1975                "anon_1.engineers_engineer_id AS "
1976                "anon_1_engineers_engineer_id, "
1977                "anon_1.engineers_primary_language "
1978                "AS anon_1_engineers_primary_language FROM "
1979                "(SELECT people.person_id AS people_person_id, "
1980                "people.name AS people_name, people.type AS people_type, "
1981                "engineers.engineer_id AS engineers_engineer_id, "
1982                "engineers.primary_language AS engineers_primary_language "
1983                "FROM people LEFT OUTER JOIN engineers ON people.person_id = "
1984                "engineers.engineer_id) AS anon_1 "
1985                "ORDER BY anon_1.engineers_primary_language DESC "
1986                "LIMIT :param_1"
1987            ),
1988            CompiledSQL(
1989                "SELECT paperwork.person_id AS paperwork_person_id, "
1990                "paperwork.paperwork_id AS paperwork_paperwork_id, "
1991                "paperwork.description AS paperwork_description "
1992                "FROM paperwork WHERE paperwork.person_id "
1993                "IN (__[POSTCOMPILE_primary_keys]) "
1994                "ORDER BY paperwork.paperwork_id",
1995                [{"primary_keys": [1]}],
1996            ),
1997        )
1998
1999    def test_correct_select_with_polymorphic_flat_alias(self):
2000        # test #3106
2001        sess = fixture_session()
2002
2003        wp = with_polymorphic(Person, [Engineer], aliased=True, flat=True)
2004        q = (
2005            sess.query(wp)
2006            .options(selectinload(wp.paperwork))
2007            .order_by(wp.Engineer.primary_language.desc())
2008        )
2009
2010        def go():
2011            eq_(
2012                q.first(),
2013                Engineer(
2014                    paperwork=[
2015                        Paperwork(description="tps report #1"),
2016                        Paperwork(description="tps report #2"),
2017                    ],
2018                    primary_language="java",
2019                ),
2020            )
2021
2022        self.assert_sql_execution(
2023            testing.db,
2024            go,
2025            CompiledSQL(
2026                "SELECT people_1.person_id AS people_1_person_id, "
2027                "people_1.name AS people_1_name, "
2028                "people_1.type AS people_1_type, "
2029                "engineers_1.engineer_id AS engineers_1_engineer_id, "
2030                "engineers_1.primary_language AS engineers_1_primary_language "
2031                "FROM people AS people_1 "
2032                "LEFT OUTER JOIN engineers AS engineers_1 "
2033                "ON people_1.person_id = engineers_1.engineer_id "
2034                "ORDER BY engineers_1.primary_language DESC LIMIT :param_1"
2035            ),
2036            CompiledSQL(
2037                "SELECT paperwork.person_id AS paperwork_person_id, "
2038                "paperwork.paperwork_id AS paperwork_paperwork_id, "
2039                "paperwork.description AS paperwork_description "
2040                "FROM paperwork WHERE paperwork.person_id "
2041                "IN (__[POSTCOMPILE_primary_keys]) "
2042                "ORDER BY paperwork.paperwork_id",
2043                [{"primary_keys": [1]}],
2044            ),
2045        )
2046
2047
2048class HeterogeneousSubtypesTest(fixtures.DeclarativeMappedTest):
2049    @classmethod
2050    def setup_classes(cls):
2051        Base = cls.DeclarativeBasic
2052
2053        class Company(Base):
2054            __tablename__ = "company"
2055            id = Column(Integer, primary_key=True)
2056            name = Column(String(50))
2057            employees = relationship("Employee", order_by="Employee.id")
2058
2059        class Employee(Base):
2060            __tablename__ = "employee"
2061            id = Column(Integer, primary_key=True)
2062            type = Column(String(50))
2063            name = Column(String(50))
2064            company_id = Column(ForeignKey("company.id"))
2065
2066            __mapper_args__ = {
2067                "polymorphic_on": "type",
2068                "with_polymorphic": "*",
2069            }
2070
2071        class Programmer(Employee):
2072            __tablename__ = "programmer"
2073            id = Column(ForeignKey("employee.id"), primary_key=True)
2074            languages = relationship("Language")
2075
2076            __mapper_args__ = {"polymorphic_identity": "programmer"}
2077
2078        class Manager(Employee):
2079            __tablename__ = "manager"
2080            id = Column(ForeignKey("employee.id"), primary_key=True)
2081            golf_swing_id = Column(ForeignKey("golf_swing.id"))
2082            golf_swing = relationship("GolfSwing")
2083
2084            __mapper_args__ = {"polymorphic_identity": "manager"}
2085
2086        class Language(Base):
2087            __tablename__ = "language"
2088            id = Column(Integer, primary_key=True)
2089            programmer_id = Column(
2090                Integer, ForeignKey("programmer.id"), nullable=False
2091            )
2092            name = Column(String(50))
2093
2094        class GolfSwing(Base):
2095            __tablename__ = "golf_swing"
2096            id = Column(Integer, primary_key=True)
2097            name = Column(String(50))
2098
2099    @classmethod
2100    def insert_data(cls, connection):
2101        Company, Programmer, Manager, GolfSwing, Language = cls.classes(
2102            "Company", "Programmer", "Manager", "GolfSwing", "Language"
2103        )
2104        c1 = Company(
2105            id=1,
2106            name="Foobar Corp",
2107            employees=[
2108                Programmer(
2109                    id=1, name="p1", languages=[Language(id=1, name="Python")]
2110                ),
2111                Manager(id=2, name="m1", golf_swing=GolfSwing(name="fore")),
2112            ],
2113        )
2114        c2 = Company(
2115            id=2,
2116            name="bat Corp",
2117            employees=[
2118                Manager(id=3, name="m2", golf_swing=GolfSwing(name="clubs")),
2119                Programmer(
2120                    id=4, name="p2", languages=[Language(id=2, name="Java")]
2121                ),
2122            ],
2123        )
2124        sess = Session(connection)
2125        sess.add_all([c1, c2])
2126        sess.commit()
2127
2128    def test_one_to_many(self):
2129
2130        Company, Programmer, Manager, GolfSwing, Language = self.classes(
2131            "Company", "Programmer", "Manager", "GolfSwing", "Language"
2132        )
2133        sess = fixture_session()
2134        company = (
2135            sess.query(Company)
2136            .filter(Company.id == 1)
2137            .options(
2138                selectinload(
2139                    Company.employees.of_type(Programmer)
2140                ).selectinload(Programmer.languages)
2141            )
2142            .one()
2143        )
2144
2145        def go():
2146            eq_(company.employees[0].languages[0].name, "Python")
2147
2148        self.assert_sql_count(testing.db, go, 0)
2149
2150    def test_many_to_one(self):
2151        Company, Programmer, Manager, GolfSwing, Language = self.classes(
2152            "Company", "Programmer", "Manager", "GolfSwing", "Language"
2153        )
2154        sess = fixture_session()
2155        company = (
2156            sess.query(Company)
2157            .filter(Company.id == 2)
2158            .options(
2159                selectinload(Company.employees.of_type(Manager)).selectinload(
2160                    Manager.golf_swing
2161                )
2162            )
2163            .one()
2164        )
2165
2166        # NOTE: we *MUST* do a SQL compare on this one because the adaption
2167        # is very sensitive
2168        def go():
2169            eq_(company.employees[0].golf_swing.name, "clubs")
2170
2171        self.assert_sql_count(testing.db, go, 0)
2172
2173    def test_both(self):
2174        Company, Programmer, Manager, GolfSwing, Language = self.classes(
2175            "Company", "Programmer", "Manager", "GolfSwing", "Language"
2176        )
2177        sess = fixture_session()
2178        rows = (
2179            sess.query(Company)
2180            .options(
2181                selectinload(Company.employees.of_type(Manager)).selectinload(
2182                    Manager.golf_swing
2183                ),
2184                defaultload(
2185                    Company.employees.of_type(Programmer)
2186                ).selectinload(Programmer.languages),
2187            )
2188            .order_by(Company.id)
2189            .all()
2190        )
2191
2192        def go():
2193            eq_(rows[0].employees[0].languages[0].name, "Python")
2194            eq_(rows[1].employees[0].golf_swing.name, "clubs")
2195
2196        self.assert_sql_count(testing.db, go, 0)
2197
2198
2199class TupleTest(fixtures.DeclarativeMappedTest):
2200    __requires__ = ("tuple_in",)
2201
2202    @classmethod
2203    def setup_classes(cls):
2204        Base = cls.DeclarativeBasic
2205
2206        class A(fixtures.ComparableEntity, Base):
2207            __tablename__ = "a"
2208            id1 = Column(Integer, primary_key=True)
2209            id2 = Column(Integer, primary_key=True)
2210
2211            bs = relationship("B", order_by="B.id", back_populates="a")
2212
2213        class B(fixtures.ComparableEntity, Base):
2214            __tablename__ = "b"
2215            id = Column(Integer, primary_key=True)
2216            a_id1 = Column()
2217            a_id2 = Column()
2218
2219            a = relationship("A", back_populates="bs")
2220
2221            __table_args__ = (
2222                ForeignKeyConstraint(["a_id1", "a_id2"], ["a.id1", "a.id2"]),
2223            )
2224
2225    @classmethod
2226    def insert_data(cls, connection):
2227        A, B = cls.classes("A", "B")
2228
2229        session = Session(connection)
2230        session.add_all(
2231            [
2232                A(id1=i, id2=i + 2, bs=[B(id=(i * 6) + j) for j in range(6)])
2233                for i in range(1, 20)
2234            ]
2235        )
2236        session.commit()
2237
2238    def test_load_o2m(self):
2239        A, B = self.classes("A", "B")
2240
2241        session = fixture_session()
2242
2243        def go():
2244            q = (
2245                session.query(A)
2246                .options(selectinload(A.bs))
2247                .order_by(A.id1, A.id2)
2248            )
2249            return q.all()
2250
2251        result = self.assert_sql_execution(
2252            testing.db,
2253            go,
2254            CompiledSQL(
2255                "SELECT a.id1 AS a_id1, a.id2 AS a_id2 "
2256                "FROM a ORDER BY a.id1, a.id2",
2257                {},
2258            ),
2259            CompiledSQL(
2260                "SELECT b.a_id1 AS b_a_id1, b.a_id2 AS b_a_id2, b.id AS b_id "
2261                "FROM b WHERE (b.a_id1, b.a_id2) IN "
2262                "(__[POSTCOMPILE_primary_keys]) ORDER BY b.id",
2263                [{"primary_keys": [(i, i + 2) for i in range(1, 20)]}],
2264            ),
2265        )
2266        eq_(
2267            result,
2268            [
2269                A(id1=i, id2=i + 2, bs=[B(id=(i * 6) + j) for j in range(6)])
2270                for i in range(1, 20)
2271            ],
2272        )
2273
2274    def test_load_m2o(self):
2275        A, B = self.classes("A", "B")
2276
2277        session = fixture_session()
2278
2279        def go():
2280            q = session.query(B).options(selectinload(B.a)).order_by(B.id)
2281            return q.all()
2282
2283        result = self.assert_sql_execution(
2284            testing.db,
2285            go,
2286            CompiledSQL(
2287                "SELECT b.id AS b_id, b.a_id1 AS b_a_id1, b.a_id2 AS b_a_id2 "
2288                "FROM b ORDER BY b.id",
2289                {},
2290            ),
2291            CompiledSQL(
2292                "SELECT a.id1 AS a_id1, a.id2 AS a_id2 FROM a "
2293                "WHERE (a.id1, a.id2) IN (__[POSTCOMPILE_primary_keys])",
2294                [{"primary_keys": [(i, i + 2) for i in range(1, 20)]}],
2295            ),
2296        )
2297        as_ = [A(id1=i, id2=i + 2) for i in range(1, 20)]
2298
2299        eq_(
2300            result,
2301            [
2302                B(id=(i * 6) + j, a=as_[i - 1])
2303                for i in range(1, 20)
2304                for j in range(6)
2305            ],
2306        )
2307
2308
2309class ChunkingTest(fixtures.DeclarativeMappedTest):
2310    """test IN chunking.
2311
2312    the length of IN has a limit on at least some databases.
2313    On Oracle it's 1000.  In any case, you don't want a SQL statement with
2314    500K entries in an IN, so larger results need to chunk.
2315
2316    """
2317
2318    @classmethod
2319    def setup_classes(cls):
2320        Base = cls.DeclarativeBasic
2321
2322        class A(fixtures.ComparableEntity, Base):
2323            __tablename__ = "a"
2324            id = Column(Integer, primary_key=True)
2325            bs = relationship("B", order_by="B.id", back_populates="a")
2326
2327        class B(fixtures.ComparableEntity, Base):
2328            __tablename__ = "b"
2329            id = Column(Integer, primary_key=True)
2330            a_id = Column(ForeignKey("a.id"))
2331            a = relationship("A", back_populates="bs")
2332
2333    @classmethod
2334    def insert_data(cls, connection):
2335        A, B = cls.classes("A", "B")
2336
2337        session = Session(connection)
2338        session.add_all(
2339            [
2340                A(id=i, bs=[B(id=(i * 6) + j) for j in range(1, 6)])
2341                for i in range(1, 101)
2342            ]
2343        )
2344        session.commit()
2345
2346    def test_odd_number_chunks(self):
2347        A, B = self.classes("A", "B")
2348
2349        session = fixture_session()
2350
2351        def go():
2352            with mock.patch(
2353                "sqlalchemy.orm.strategies.SelectInLoader._chunksize", 47
2354            ):
2355                q = session.query(A).options(selectinload(A.bs)).order_by(A.id)
2356
2357                for a in q:
2358                    a.bs
2359
2360        self.assert_sql_execution(
2361            testing.db,
2362            go,
2363            CompiledSQL("SELECT a.id AS a_id FROM a ORDER BY a.id", {}),
2364            CompiledSQL(
2365                "SELECT b.a_id AS b_a_id, b.id AS b_id "
2366                "FROM b WHERE b.a_id IN "
2367                "(__[POSTCOMPILE_primary_keys]) ORDER BY b.id",
2368                {"primary_keys": list(range(1, 48))},
2369            ),
2370            CompiledSQL(
2371                "SELECT b.a_id AS b_a_id, b.id AS b_id "
2372                "FROM b WHERE b.a_id IN "
2373                "(__[POSTCOMPILE_primary_keys]) ORDER BY b.id",
2374                {"primary_keys": list(range(48, 95))},
2375            ),
2376            CompiledSQL(
2377                "SELECT b.a_id AS b_a_id, b.id AS b_id "
2378                "FROM b WHERE b.a_id IN "
2379                "(__[POSTCOMPILE_primary_keys]) ORDER BY b.id",
2380                {"primary_keys": list(range(95, 101))},
2381            ),
2382        )
2383
2384    @testing.requires.independent_cursors
2385    def test_yield_per(self):
2386        # the docs make a lot of guarantees about yield_per
2387        # so test that it works
2388        A, B = self.classes("A", "B")
2389
2390        import random
2391
2392        session = fixture_session()
2393
2394        yield_per = random.randint(8, 105)
2395        offset = random.randint(0, 19)
2396        total_rows = 100 - offset
2397        total_expected_statements = (
2398            1
2399            + int(total_rows / yield_per)
2400            + (1 if total_rows % yield_per else 0)
2401        )
2402
2403        def go():
2404            for a in (
2405                session.query(A)
2406                .yield_per(yield_per)
2407                .offset(offset)
2408                .options(selectinload(A.bs))
2409            ):
2410
2411                # this part fails with joined eager loading
2412                # (if you enable joined eager w/ yield_per)
2413                eq_(a.bs, [B(id=(a.id * 6) + j) for j in range(1, 6)])
2414
2415        # this part fails with subquery eager loading
2416        # (if you enable subquery eager w/ yield_per)
2417        self.assert_sql_count(testing.db, go, total_expected_statements)
2418
2419    def test_dont_emit_for_redundant_m2o(self):
2420        A, B = self.classes("A", "B")
2421
2422        session = fixture_session()
2423
2424        def go():
2425            with mock.patch(
2426                "sqlalchemy.orm.strategies.SelectInLoader._chunksize", 47
2427            ):
2428                q = session.query(B).options(selectinload(B.a)).order_by(B.id)
2429
2430                for b in q:
2431                    b.a
2432
2433        self.assert_sql_execution(
2434            testing.db,
2435            go,
2436            CompiledSQL(
2437                "SELECT b.id AS b_id, b.a_id AS b_a_id FROM b ORDER BY b.id",
2438                {},
2439            ),
2440            # chunk size is 47.  so first chunk are a 1->47...
2441            CompiledSQL(
2442                "SELECT a.id AS a_id FROM a WHERE a.id IN "
2443                "(__[POSTCOMPILE_primary_keys])",
2444                {"primary_keys": list(range(1, 48))},
2445            ),
2446            # second chunk is a 48-94
2447            CompiledSQL(
2448                "SELECT a.id AS a_id FROM a WHERE a.id IN "
2449                "(__[POSTCOMPILE_primary_keys])",
2450                {"primary_keys": list(range(48, 95))},
2451            ),
2452            # third and final chunk 95-100.
2453            CompiledSQL(
2454                "SELECT a.id AS a_id FROM a WHERE a.id IN "
2455                "(__[POSTCOMPILE_primary_keys])",
2456                {"primary_keys": list(range(95, 101))},
2457            ),
2458        )
2459
2460
2461class SubRelationFromJoinedSubclassMultiLevelTest(_Polymorphic):
2462    @classmethod
2463    def define_tables(cls, metadata):
2464        Table(
2465            "companies",
2466            metadata,
2467            Column(
2468                "company_id",
2469                Integer,
2470                primary_key=True,
2471                test_needs_autoincrement=True,
2472            ),
2473            Column("name", String(50)),
2474        )
2475
2476        Table(
2477            "people",
2478            metadata,
2479            Column(
2480                "person_id",
2481                Integer,
2482                primary_key=True,
2483                test_needs_autoincrement=True,
2484            ),
2485            Column("company_id", ForeignKey("companies.company_id")),
2486            Column("name", String(50)),
2487            Column("type", String(30)),
2488        )
2489
2490        Table(
2491            "engineers",
2492            metadata,
2493            Column(
2494                "engineer_id", ForeignKey("people.person_id"), primary_key=True
2495            ),
2496            Column("primary_language", String(50)),
2497        )
2498
2499        Table(
2500            "machines",
2501            metadata,
2502            Column(
2503                "machine_id",
2504                Integer,
2505                primary_key=True,
2506                test_needs_autoincrement=True,
2507            ),
2508            Column("name", String(50)),
2509            Column("engineer_id", ForeignKey("engineers.engineer_id")),
2510            Column(
2511                "machine_type_id", ForeignKey("machine_type.machine_type_id")
2512            ),
2513        )
2514
2515        Table(
2516            "machine_type",
2517            metadata,
2518            Column(
2519                "machine_type_id",
2520                Integer,
2521                primary_key=True,
2522                test_needs_autoincrement=True,
2523            ),
2524            Column("name", String(50)),
2525        )
2526
2527    @classmethod
2528    def setup_mappers(cls):
2529        companies = cls.tables.companies
2530        people = cls.tables.people
2531        engineers = cls.tables.engineers
2532        machines = cls.tables.machines
2533        machine_type = cls.tables.machine_type
2534
2535        cls.mapper_registry.map_imperatively(
2536            Company,
2537            companies,
2538            properties={
2539                "employees": relationship(Person, order_by=people.c.person_id)
2540            },
2541        )
2542        cls.mapper_registry.map_imperatively(
2543            Person,
2544            people,
2545            polymorphic_on=people.c.type,
2546            polymorphic_identity="person",
2547            with_polymorphic="*",
2548        )
2549
2550        cls.mapper_registry.map_imperatively(
2551            Engineer,
2552            engineers,
2553            inherits=Person,
2554            polymorphic_identity="engineer",
2555            properties={
2556                "machines": relationship(
2557                    Machine, order_by=machines.c.machine_id
2558                )
2559            },
2560        )
2561
2562        cls.mapper_registry.map_imperatively(
2563            Machine, machines, properties={"type": relationship(MachineType)}
2564        )
2565        cls.mapper_registry.map_imperatively(MachineType, machine_type)
2566
2567    @classmethod
2568    def insert_data(cls, connection):
2569        c1 = cls._fixture()
2570        sess = Session(connection)
2571        sess.add(c1)
2572        sess.flush()
2573
2574    @classmethod
2575    def _fixture(cls):
2576        mt1 = MachineType(name="mt1")
2577        mt2 = MachineType(name="mt2")
2578        return Company(
2579            employees=[
2580                Engineer(
2581                    name="e1",
2582                    machines=[
2583                        Machine(name="m1", type=mt1),
2584                        Machine(name="m2", type=mt2),
2585                    ],
2586                ),
2587                Engineer(
2588                    name="e2",
2589                    machines=[
2590                        Machine(name="m3", type=mt1),
2591                        Machine(name="m4", type=mt1),
2592                    ],
2593                ),
2594            ]
2595        )
2596
2597    def test_chained_selectin_subclass(self):
2598        s = fixture_session()
2599        q = s.query(Company).options(
2600            selectinload(Company.employees.of_type(Engineer))
2601            .selectinload(Engineer.machines)
2602            .selectinload(Machine.type)
2603        )
2604
2605        def go():
2606            eq_(q.all(), [self._fixture()])
2607
2608        self.assert_sql_count(testing.db, go, 4)
2609
2610
2611class SelfReferentialTest(fixtures.MappedTest):
2612    @classmethod
2613    def define_tables(cls, metadata):
2614        Table(
2615            "nodes",
2616            metadata,
2617            Column(
2618                "id", Integer, primary_key=True, test_needs_autoincrement=True
2619            ),
2620            Column("parent_id", Integer, ForeignKey("nodes.id")),
2621            Column("data", String(30)),
2622        )
2623
2624    def test_basic(self):
2625        nodes = self.tables.nodes
2626
2627        class Node(fixtures.ComparableEntity):
2628            def append(self, node):
2629                self.children.append(node)
2630
2631        self.mapper_registry.map_imperatively(
2632            Node,
2633            nodes,
2634            properties={
2635                "children": relationship(
2636                    Node, lazy="selectin", join_depth=3, order_by=nodes.c.id
2637                )
2638            },
2639        )
2640        sess = fixture_session()
2641        n1 = Node(data="n1")
2642        n1.append(Node(data="n11"))
2643        n1.append(Node(data="n12"))
2644        n1.append(Node(data="n13"))
2645        n1.children[1].append(Node(data="n121"))
2646        n1.children[1].append(Node(data="n122"))
2647        n1.children[1].append(Node(data="n123"))
2648        n2 = Node(data="n2")
2649        n2.append(Node(data="n21"))
2650        n2.children[0].append(Node(data="n211"))
2651        n2.children[0].append(Node(data="n212"))
2652
2653        sess.add(n1)
2654        sess.add(n2)
2655        sess.flush()
2656        sess.expunge_all()
2657
2658        def go():
2659            d = (
2660                sess.query(Node)
2661                .filter(Node.data.in_(["n1", "n2"]))
2662                .order_by(Node.data)
2663                .all()
2664            )
2665            eq_(
2666                [
2667                    Node(
2668                        data="n1",
2669                        children=[
2670                            Node(data="n11"),
2671                            Node(
2672                                data="n12",
2673                                children=[
2674                                    Node(data="n121"),
2675                                    Node(data="n122"),
2676                                    Node(data="n123"),
2677                                ],
2678                            ),
2679                            Node(data="n13"),
2680                        ],
2681                    ),
2682                    Node(
2683                        data="n2",
2684                        children=[
2685                            Node(
2686                                data="n21",
2687                                children=[
2688                                    Node(data="n211"),
2689                                    Node(data="n212"),
2690                                ],
2691                            )
2692                        ],
2693                    ),
2694                ],
2695                d,
2696            )
2697
2698        self.assert_sql_count(testing.db, go, 4)
2699
2700    def test_lazy_fallback_doesnt_affect_eager(self):
2701        nodes = self.tables.nodes
2702
2703        class Node(fixtures.ComparableEntity):
2704            def append(self, node):
2705                self.children.append(node)
2706
2707        self.mapper_registry.map_imperatively(
2708            Node,
2709            nodes,
2710            properties={
2711                "children": relationship(
2712                    Node, lazy="selectin", join_depth=1, order_by=nodes.c.id
2713                )
2714            },
2715        )
2716        sess = fixture_session()
2717        n1 = Node(data="n1")
2718        n1.append(Node(data="n11"))
2719        n1.append(Node(data="n12"))
2720        n1.append(Node(data="n13"))
2721        n1.children[0].append(Node(data="n111"))
2722        n1.children[0].append(Node(data="n112"))
2723        n1.children[1].append(Node(data="n121"))
2724        n1.children[1].append(Node(data="n122"))
2725        n1.children[1].append(Node(data="n123"))
2726        sess.add(n1)
2727        sess.flush()
2728        sess.expunge_all()
2729
2730        def go():
2731            allnodes = sess.query(Node).order_by(Node.data).all()
2732
2733            n11 = allnodes[1]
2734            eq_(n11.data, "n11")
2735            eq_([Node(data="n111"), Node(data="n112")], list(n11.children))
2736
2737            n12 = allnodes[4]
2738            eq_(n12.data, "n12")
2739            eq_(
2740                [Node(data="n121"), Node(data="n122"), Node(data="n123")],
2741                list(n12.children),
2742            )
2743
2744        self.assert_sql_count(testing.db, go, 2)
2745
2746    def test_with_deferred(self):
2747        nodes = self.tables.nodes
2748
2749        class Node(fixtures.ComparableEntity):
2750            def append(self, node):
2751                self.children.append(node)
2752
2753        self.mapper_registry.map_imperatively(
2754            Node,
2755            nodes,
2756            properties={
2757                "children": relationship(
2758                    Node, lazy="selectin", join_depth=3, order_by=nodes.c.id
2759                ),
2760                "data": deferred(nodes.c.data),
2761            },
2762        )
2763        sess = fixture_session()
2764        n1 = Node(data="n1")
2765        n1.append(Node(data="n11"))
2766        n1.append(Node(data="n12"))
2767        sess.add(n1)
2768        sess.flush()
2769        sess.expunge_all()
2770
2771        def go():
2772            eq_(
2773                Node(data="n1", children=[Node(data="n11"), Node(data="n12")]),
2774                sess.query(Node).order_by(Node.id).first(),
2775            )
2776
2777        self.assert_sql_count(testing.db, go, 6)
2778
2779        sess.expunge_all()
2780
2781        def go():
2782            eq_(
2783                Node(data="n1", children=[Node(data="n11"), Node(data="n12")]),
2784                sess.query(Node)
2785                .options(undefer(Node.data))
2786                .order_by(Node.id)
2787                .first(),
2788            )
2789
2790        self.assert_sql_count(testing.db, go, 5)
2791
2792        sess.expunge_all()
2793
2794        def go():
2795            eq_(
2796                Node(data="n1", children=[Node(data="n11"), Node(data="n12")]),
2797                sess.query(Node)
2798                .options(
2799                    undefer(Node.data),
2800                    defaultload(Node.children).undefer(Node.data),
2801                )
2802                .first(),
2803            )
2804
2805        self.assert_sql_count(testing.db, go, 3)
2806
2807    def test_options(self):
2808        nodes = self.tables.nodes
2809
2810        class Node(fixtures.ComparableEntity):
2811            def append(self, node):
2812                self.children.append(node)
2813
2814        self.mapper_registry.map_imperatively(
2815            Node,
2816            nodes,
2817            properties={"children": relationship(Node, order_by=nodes.c.id)},
2818        )
2819        sess = fixture_session()
2820        n1 = Node(data="n1")
2821        n1.append(Node(data="n11"))
2822        n1.append(Node(data="n12"))
2823        n1.append(Node(data="n13"))
2824        n1.children[1].append(Node(data="n121"))
2825        n1.children[1].append(Node(data="n122"))
2826        n1.children[1].append(Node(data="n123"))
2827        sess.add(n1)
2828        sess.flush()
2829        sess.expunge_all()
2830
2831        def go():
2832            d = (
2833                sess.query(Node)
2834                .filter_by(data="n1")
2835                .order_by(Node.id)
2836                .options(
2837                    selectinload(Node.children).selectinload(Node.children)
2838                )
2839                .first()
2840            )
2841            eq_(
2842                Node(
2843                    data="n1",
2844                    children=[
2845                        Node(data="n11"),
2846                        Node(
2847                            data="n12",
2848                            children=[
2849                                Node(data="n121"),
2850                                Node(data="n122"),
2851                                Node(data="n123"),
2852                            ],
2853                        ),
2854                        Node(data="n13"),
2855                    ],
2856                ),
2857                d,
2858            )
2859
2860        self.assert_sql_count(testing.db, go, 3)
2861
2862    def test_no_depth(self):
2863        """no join depth is set, so no eager loading occurs."""
2864
2865        nodes = self.tables.nodes
2866
2867        class Node(fixtures.ComparableEntity):
2868            def append(self, node):
2869                self.children.append(node)
2870
2871        self.mapper_registry.map_imperatively(
2872            Node,
2873            nodes,
2874            properties={"children": relationship(Node, lazy="selectin")},
2875        )
2876        sess = fixture_session()
2877        n1 = Node(data="n1")
2878        n1.append(Node(data="n11"))
2879        n1.append(Node(data="n12"))
2880        n1.append(Node(data="n13"))
2881        n1.children[1].append(Node(data="n121"))
2882        n1.children[1].append(Node(data="n122"))
2883        n1.children[1].append(Node(data="n123"))
2884        n2 = Node(data="n2")
2885        n2.append(Node(data="n21"))
2886        sess.add(n1)
2887        sess.add(n2)
2888        sess.flush()
2889        sess.expunge_all()
2890
2891        def go():
2892            d = (
2893                sess.query(Node)
2894                .filter(Node.data.in_(["n1", "n2"]))
2895                .order_by(Node.data)
2896                .all()
2897            )
2898            eq_(
2899                [
2900                    Node(
2901                        data="n1",
2902                        children=[
2903                            Node(data="n11"),
2904                            Node(
2905                                data="n12",
2906                                children=[
2907                                    Node(data="n121"),
2908                                    Node(data="n122"),
2909                                    Node(data="n123"),
2910                                ],
2911                            ),
2912                            Node(data="n13"),
2913                        ],
2914                    ),
2915                    Node(data="n2", children=[Node(data="n21")]),
2916                ],
2917                d,
2918            )
2919
2920        self.assert_sql_count(testing.db, go, 4)
2921
2922
2923class SelfRefInheritanceAliasedTest(
2924    fixtures.DeclarativeMappedTest, testing.AssertsCompiledSQL
2925):
2926    __dialect__ = "default"
2927
2928    @classmethod
2929    def setup_classes(cls):
2930        Base = cls.DeclarativeBasic
2931
2932        class Foo(fixtures.ComparableEntity, Base):
2933            __tablename__ = "foo"
2934            id = Column(Integer, primary_key=True)
2935            type = Column(String(50))
2936
2937            foo_id = Column(Integer, ForeignKey("foo.id"))
2938            foo = relationship(
2939                lambda: Foo, foreign_keys=foo_id, remote_side=id
2940            )
2941
2942            __mapper_args__ = {
2943                "polymorphic_on": type,
2944                "polymorphic_identity": "foo",
2945            }
2946
2947        class Bar(Foo):
2948            __mapper_args__ = {"polymorphic_identity": "bar"}
2949
2950    @classmethod
2951    def insert_data(cls, connection):
2952        Foo, Bar = cls.classes("Foo", "Bar")
2953
2954        session = Session(connection)
2955        target = Bar(id=1)
2956        b1 = Bar(id=2, foo=Foo(id=3, foo=target))
2957        session.add(b1)
2958        session.commit()
2959
2960    def test_twolevel_selectin_w_polymorphic(self):
2961        Foo, Bar = self.classes("Foo", "Bar")
2962
2963        for count in range(3):
2964            r = with_polymorphic(Foo, "*", aliased=True)
2965            attr1 = Foo.foo.of_type(r)
2966            attr2 = r.foo
2967
2968            s = fixture_session()
2969            q = (
2970                s.query(Foo)
2971                .filter(Foo.id == 2)
2972                .options(selectinload(attr1).selectinload(attr2))
2973            )
2974            results = self.assert_sql_execution(
2975                testing.db,
2976                q.all,
2977                CompiledSQL(
2978                    "SELECT foo.id AS foo_id_1, foo.type AS foo_type, "
2979                    "foo.foo_id AS foo_foo_id FROM foo WHERE foo.id = :id_1",
2980                    [{"id_1": 2}],
2981                ),
2982                CompiledSQL(
2983                    "SELECT foo_1.id AS foo_1_id, "
2984                    "foo_1.type AS foo_1_type, foo_1.foo_id AS foo_1_foo_id "
2985                    "FROM foo AS foo_1 "
2986                    "WHERE foo_1.id IN (__[POSTCOMPILE_primary_keys])",
2987                    {"primary_keys": [3]},
2988                ),
2989                CompiledSQL(
2990                    "SELECT foo.id AS foo_id_1, foo.type AS foo_type, "
2991                    "foo.foo_id AS foo_foo_id FROM foo "
2992                    "WHERE foo.id IN (__[POSTCOMPILE_primary_keys])",
2993                    {"primary_keys": [1]},
2994                ),
2995            )
2996            eq_(results, [Bar(id=2, foo=Foo(id=3, foo=Bar(id=1)))])
2997
2998
2999class TestExistingRowPopulation(fixtures.DeclarativeMappedTest):
3000    @classmethod
3001    def setup_classes(cls):
3002        Base = cls.DeclarativeBasic
3003
3004        class A(Base):
3005            __tablename__ = "a"
3006
3007            id = Column(Integer, primary_key=True)
3008            b_id = Column(ForeignKey("b.id"))
3009            a2_id = Column(ForeignKey("a2.id"))
3010            a2 = relationship("A2")
3011            b = relationship("B")
3012
3013        class A2(Base):
3014            __tablename__ = "a2"
3015
3016            id = Column(Integer, primary_key=True)
3017            b_id = Column(ForeignKey("b.id"))
3018            b = relationship("B")
3019
3020        class B(Base):
3021            __tablename__ = "b"
3022
3023            id = Column(Integer, primary_key=True)
3024
3025            c1_m2o_id = Column(ForeignKey("c1_m2o.id"))
3026            c2_m2o_id = Column(ForeignKey("c2_m2o.id"))
3027
3028            c1_o2m = relationship("C1o2m")
3029            c2_o2m = relationship("C2o2m")
3030            c1_m2o = relationship("C1m2o")
3031            c2_m2o = relationship("C2m2o")
3032
3033        class C1o2m(Base):
3034            __tablename__ = "c1_o2m"
3035
3036            id = Column(Integer, primary_key=True)
3037            b_id = Column(ForeignKey("b.id"))
3038
3039        class C2o2m(Base):
3040            __tablename__ = "c2_o2m"
3041
3042            id = Column(Integer, primary_key=True)
3043            b_id = Column(ForeignKey("b.id"))
3044
3045        class C1m2o(Base):
3046            __tablename__ = "c1_m2o"
3047
3048            id = Column(Integer, primary_key=True)
3049
3050        class C2m2o(Base):
3051            __tablename__ = "c2_m2o"
3052
3053            id = Column(Integer, primary_key=True)
3054
3055    @classmethod
3056    def insert_data(cls, connection):
3057        A, A2, B, C1o2m, C2o2m, C1m2o, C2m2o = cls.classes(
3058            "A", "A2", "B", "C1o2m", "C2o2m", "C1m2o", "C2m2o"
3059        )
3060
3061        s = Session(connection)
3062
3063        b = B(
3064            c1_o2m=[C1o2m()], c2_o2m=[C2o2m()], c1_m2o=C1m2o(), c2_m2o=C2m2o()
3065        )
3066
3067        s.add(A(b=b, a2=A2(b=b)))
3068        s.commit()
3069
3070    def test_o2m(self):
3071        A, A2, B, C1o2m, C2o2m = self.classes("A", "A2", "B", "C1o2m", "C2o2m")
3072
3073        s = fixture_session()
3074
3075        # A -J-> B -L-> C1
3076        # A -J-> B -S-> C2
3077
3078        # A -J-> A2 -J-> B -S-> C1
3079        # A -J-> A2 -J-> B -L-> C2
3080
3081        q = s.query(A).options(
3082            joinedload(A.b).selectinload(B.c2_o2m),
3083            joinedload(A.a2).joinedload(A2.b).selectinload(B.c1_o2m),
3084        )
3085
3086        a1 = q.all()[0]
3087
3088        is_true("c1_o2m" in a1.b.__dict__)
3089        is_true("c2_o2m" in a1.b.__dict__)
3090
3091    def test_m2o(self):
3092        A, A2, B, C1m2o, C2m2o = self.classes("A", "A2", "B", "C1m2o", "C2m2o")
3093
3094        s = fixture_session()
3095
3096        # A -J-> B -L-> C1
3097        # A -J-> B -S-> C2
3098
3099        # A -J-> A2 -J-> B -S-> C1
3100        # A -J-> A2 -J-> B -L-> C2
3101
3102        q = s.query(A).options(
3103            joinedload(A.b).selectinload(B.c2_m2o),
3104            joinedload(A.a2).joinedload(A2.b).selectinload(B.c1_m2o),
3105        )
3106
3107        a1 = q.all()[0]
3108        is_true("c1_m2o" in a1.b.__dict__)
3109        is_true("c2_m2o" in a1.b.__dict__)
3110
3111
3112class SingleInhSubclassTest(
3113    fixtures.DeclarativeMappedTest, testing.AssertsExecutionResults
3114):
3115    @classmethod
3116    def setup_classes(cls):
3117        Base = cls.DeclarativeBasic
3118
3119        class User(Base):
3120            __tablename__ = "user"
3121
3122            id = Column(Integer, primary_key=True)
3123            type = Column(String(10))
3124
3125            __mapper_args__ = {"polymorphic_on": type}
3126
3127        class EmployerUser(User):
3128            roles = relationship("Role", lazy="selectin")
3129            __mapper_args__ = {"polymorphic_identity": "employer"}
3130
3131        class Role(Base):
3132            __tablename__ = "role"
3133
3134            id = Column(Integer, primary_key=True)
3135            user_id = Column(Integer, ForeignKey("user.id"))
3136
3137    @classmethod
3138    def insert_data(cls, connection):
3139        EmployerUser, Role = cls.classes("EmployerUser", "Role")
3140
3141        s = Session(connection)
3142        s.add(EmployerUser(roles=[Role(), Role(), Role()]))
3143        s.commit()
3144
3145    def test_load(self):
3146        (EmployerUser,) = self.classes("EmployerUser")
3147        s = fixture_session()
3148
3149        q = s.query(EmployerUser)
3150
3151        self.assert_sql_execution(
3152            testing.db,
3153            q.all,
3154            CompiledSQL(
3155                'SELECT "user".id AS user_id, "user".type AS user_type '
3156                'FROM "user" WHERE "user".type IN (__[POSTCOMPILE_type_1])',
3157                {"type_1": ["employer"]},
3158            ),
3159            CompiledSQL(
3160                "SELECT role.user_id AS role_user_id, role.id AS role_id "
3161                "FROM role WHERE role.user_id "
3162                "IN (__[POSTCOMPILE_primary_keys])",
3163                {"primary_keys": [1]},
3164            ),
3165        )
3166
3167
3168class MissingForeignTest(
3169    fixtures.DeclarativeMappedTest, testing.AssertsExecutionResults
3170):
3171    @classmethod
3172    def setup_classes(cls):
3173        Base = cls.DeclarativeBasic
3174
3175        class A(fixtures.ComparableEntity, Base):
3176            __tablename__ = "a"
3177            id = Column(Integer, primary_key=True)
3178            b_id = Column(Integer)
3179            b = relationship("B", primaryjoin="foreign(A.b_id) == B.id")
3180            q = Column(Integer)
3181
3182        class B(fixtures.ComparableEntity, Base):
3183            __tablename__ = "b"
3184            id = Column(Integer, primary_key=True)
3185            x = Column(Integer)
3186            y = Column(Integer)
3187
3188    @classmethod
3189    def insert_data(cls, connection):
3190        A, B = cls.classes("A", "B")
3191
3192        s = Session(connection)
3193        b1, b2 = B(id=1, x=5, y=9), B(id=2, x=10, y=8)
3194        s.add_all(
3195            [
3196                A(id=1, b_id=1),
3197                A(id=2, b_id=5),
3198                A(id=3, b_id=2),
3199                A(id=4, b=None),
3200                b1,
3201                b2,
3202            ]
3203        )
3204        s.commit()
3205
3206    def test_missing_rec(self):
3207        A, B = self.classes("A", "B")
3208
3209        s = fixture_session()
3210        eq_(
3211            s.query(A).options(selectinload(A.b)).order_by(A.id).all(),
3212            [
3213                A(id=1, b=B(id=1)),
3214                A(id=2, b=None, b_id=5),
3215                A(id=3, b=B(id=2)),
3216                A(id=4, b=None, b_id=None),
3217            ],
3218        )
3219
3220
3221class M2OWDegradeTest(
3222    fixtures.DeclarativeMappedTest, testing.AssertsExecutionResults
3223):
3224    @classmethod
3225    def setup_classes(cls):
3226        Base = cls.DeclarativeBasic
3227
3228        class A(fixtures.ComparableEntity, Base):
3229            __tablename__ = "a"
3230            id = Column(Integer, primary_key=True)
3231            b_id = Column(ForeignKey("b.id"))
3232            b = relationship("B")
3233            b_no_omit_join = relationship("B", omit_join=False, overlaps="b")
3234            q = Column(Integer)
3235
3236        class B(fixtures.ComparableEntity, Base):
3237            __tablename__ = "b"
3238            id = Column(Integer, primary_key=True)
3239            x = Column(Integer)
3240            y = Column(Integer)
3241
3242    @classmethod
3243    def insert_data(cls, connection):
3244        A, B = cls.classes("A", "B")
3245
3246        s = Session(connection)
3247        b1, b2 = B(id=1, x=5, y=9), B(id=2, x=10, y=8)
3248        s.add_all(
3249            [
3250                A(id=1, b=b1),
3251                A(id=2, b=b2),
3252                A(id=3, b=b2),
3253                A(id=4, b=None),
3254                A(id=5, b=b1),
3255            ]
3256        )
3257        s.commit()
3258
3259    def test_omit_join_warn_on_true(self):
3260        with testing.expect_warnings(
3261            "setting omit_join to True is not supported; selectin "
3262            "loading of this relationship"
3263        ):
3264            relationship("B", omit_join=True)
3265
3266    def test_use_join_parent_criteria(self):
3267        A, B = self.classes("A", "B")
3268        s = fixture_session()
3269        q = (
3270            s.query(A)
3271            .filter(A.id.in_([1, 3]))
3272            .options(selectinload(A.b))
3273            .order_by(A.id)
3274        )
3275        results = self.assert_sql_execution(
3276            testing.db,
3277            q.all,
3278            CompiledSQL(
3279                "SELECT a.id AS a_id, a.b_id AS a_b_id, a.q AS a_q "
3280                "FROM a WHERE a.id IN (__[POSTCOMPILE_id_1]) ORDER BY a.id",
3281                [{"id_1": [1, 3]}],
3282            ),
3283            CompiledSQL(
3284                "SELECT b.id AS b_id, b.x AS b_x, b.y AS b_y "
3285                "FROM b WHERE b.id IN (__[POSTCOMPILE_primary_keys])",
3286                [{"primary_keys": [1, 2]}],
3287            ),
3288        )
3289
3290        eq_(
3291            results,
3292            [A(id=1, b=B(id=1, x=5, y=9)), A(id=3, b=B(id=2, x=10, y=8))],
3293        )
3294
3295    def test_use_join_parent_criteria_degrade_on_defer(self):
3296        A, B = self.classes("A", "B")
3297        s = fixture_session()
3298        q = (
3299            s.query(A)
3300            .filter(A.id.in_([1, 3]))
3301            .options(defer(A.b_id), selectinload(A.b))
3302            .order_by(A.id)
3303        )
3304        results = self.assert_sql_execution(
3305            testing.db,
3306            q.all,
3307            CompiledSQL(
3308                "SELECT a.id AS a_id, a.q AS a_q "
3309                "FROM a WHERE a.id IN (__[POSTCOMPILE_id_1]) ORDER BY a.id",
3310                [{"id_1": [1, 3]}],
3311            ),
3312            # in the very unlikely case that the the FK col on parent is
3313            # deferred, we degrade to the JOIN version so that we don't need to
3314            # emit either for each parent object individually, or as a second
3315            # query for them.
3316            CompiledSQL(
3317                "SELECT a_1.id AS a_1_id, b.id AS b_id, b.x AS b_x, "
3318                "b.y AS b_y "
3319                "FROM a AS a_1 JOIN b ON b.id = a_1.b_id "
3320                "WHERE a_1.id IN (__[POSTCOMPILE_primary_keys])",
3321                [{"primary_keys": [1, 3]}],
3322            ),
3323        )
3324
3325        eq_(
3326            results,
3327            [A(id=1, b=B(id=1, x=5, y=9)), A(id=3, b=B(id=2, x=10, y=8))],
3328        )
3329
3330    def test_use_join(self):
3331        A, B = self.classes("A", "B")
3332        s = fixture_session()
3333        q = s.query(A).options(selectinload(A.b)).order_by(A.id)
3334        results = self.assert_sql_execution(
3335            testing.db,
3336            q.all,
3337            CompiledSQL(
3338                "SELECT a.id AS a_id, a.b_id AS a_b_id, a.q AS a_q "
3339                "FROM a ORDER BY a.id",
3340                [{}],
3341            ),
3342            CompiledSQL(
3343                "SELECT b.id AS b_id, b.x AS b_x, b.y AS b_y "
3344                "FROM b WHERE b.id IN (__[POSTCOMPILE_primary_keys])",
3345                [{"primary_keys": [1, 2]}],
3346            ),
3347        )
3348
3349        b1, b2 = B(id=1, x=5, y=9), B(id=2, x=10, y=8)
3350        eq_(
3351            results,
3352            [
3353                A(id=1, b=b1),
3354                A(id=2, b=b2),
3355                A(id=3, b=b2),
3356                A(id=4, b=None),
3357                A(id=5, b=b1),
3358            ],
3359        )
3360
3361    def test_use_join_omit_join_false(self):
3362        A, B = self.classes("A", "B")
3363        s = fixture_session()
3364        q = s.query(A).options(selectinload(A.b_no_omit_join)).order_by(A.id)
3365        results = self.assert_sql_execution(
3366            testing.db,
3367            q.all,
3368            CompiledSQL(
3369                "SELECT a.id AS a_id, a.b_id AS a_b_id, a.q AS a_q "
3370                "FROM a ORDER BY a.id",
3371                [{}],
3372            ),
3373            CompiledSQL(
3374                "SELECT a_1.id AS a_1_id, b.id AS b_id, b.x AS b_x, "
3375                "b.y AS b_y FROM a AS a_1 JOIN b ON b.id = a_1.b_id "
3376                "WHERE a_1.id IN (__[POSTCOMPILE_primary_keys])",
3377                [{"primary_keys": [1, 2, 3, 4, 5]}],
3378            ),
3379        )
3380
3381        b1, b2 = B(id=1, x=5, y=9), B(id=2, x=10, y=8)
3382        eq_(
3383            results,
3384            [
3385                A(id=1, b_no_omit_join=b1),
3386                A(id=2, b_no_omit_join=b2),
3387                A(id=3, b_no_omit_join=b2),
3388                A(id=4, b_no_omit_join=None),
3389                A(id=5, b_no_omit_join=b1),
3390            ],
3391        )
3392
3393    def test_use_join_parent_degrade_on_defer(self):
3394        A, B = self.classes("A", "B")
3395        s = fixture_session()
3396        q = s.query(A).options(defer(A.b_id), selectinload(A.b)).order_by(A.id)
3397        results = self.assert_sql_execution(
3398            testing.db,
3399            q.all,
3400            CompiledSQL(
3401                "SELECT a.id AS a_id, a.q AS a_q " "FROM a ORDER BY a.id", [{}]
3402            ),
3403            # in the very unlikely case that the the FK col on parent is
3404            # deferred, we degrade to the JOIN version so that we don't need to
3405            # emit either for each parent object individually, or as a second
3406            # query for them.
3407            CompiledSQL(
3408                "SELECT a_1.id AS a_1_id, b.id AS b_id, b.x AS b_x, "
3409                "b.y AS b_y "
3410                "FROM a AS a_1 JOIN b ON b.id = a_1.b_id "
3411                "WHERE a_1.id IN (__[POSTCOMPILE_primary_keys])",
3412                [{"primary_keys": [1, 2, 3, 4, 5]}],
3413            ),
3414        )
3415
3416        b1, b2 = B(id=1, x=5, y=9), B(id=2, x=10, y=8)
3417        eq_(
3418            results,
3419            [
3420                A(id=1, b=b1),
3421                A(id=2, b=b2),
3422                A(id=3, b=b2),
3423                A(id=4, b=None),
3424                A(id=5, b=b1),
3425            ],
3426        )
3427
3428
3429class SameNamePolymorphicTest(fixtures.DeclarativeMappedTest):
3430    @classmethod
3431    def setup_classes(cls):
3432        Base = cls.DeclarativeBasic
3433
3434        class GenericParent(Base):
3435            __tablename__ = "generic_parent"
3436            id = Column(Integer, primary_key=True)
3437            type = Column(String(50), nullable=False)
3438
3439            __mapper_args__ = {
3440                "polymorphic_on": type,
3441                "polymorphic_identity": "generic_parent",
3442            }
3443
3444        class ParentA(GenericParent):
3445            __tablename__ = "parent_a"
3446
3447            id = Column(
3448                Integer, ForeignKey("generic_parent.id"), primary_key=True
3449            )
3450            children = relationship("ChildA", back_populates="parent")
3451
3452            __mapper_args__ = {"polymorphic_identity": "parent_a"}
3453
3454        class ParentB(GenericParent):
3455            __tablename__ = "parent_b"
3456
3457            id = Column(
3458                Integer, ForeignKey("generic_parent.id"), primary_key=True
3459            )
3460            children = relationship("ChildB", back_populates="parent")
3461
3462            __mapper_args__ = {"polymorphic_identity": "parent_b"}
3463
3464        class ChildA(Base):
3465            __tablename__ = "child_a"
3466            id = Column(Integer, primary_key=True)
3467            parent_id = Column(
3468                Integer, ForeignKey("parent_a.id"), nullable=False
3469            )
3470            parent = relationship("ParentA", back_populates="children")
3471
3472        class ChildB(Base):
3473            __tablename__ = "child_b"
3474
3475            id = Column(Integer, primary_key=True)
3476            parent_id = Column(
3477                Integer, ForeignKey("parent_b.id"), nullable=False
3478            )
3479            parent = relationship("ParentB", back_populates="children")
3480
3481    @classmethod
3482    def insert_data(cls, connection):
3483        ParentA, ParentB, ChildA, ChildB = cls.classes(
3484            "ParentA", "ParentB", "ChildA", "ChildB"
3485        )
3486        session = Session(connection)
3487        parent_a = ParentA(id=1)
3488        parent_b = ParentB(id=2)
3489        for i in range(10):
3490            parent_a.children.append(ChildA())
3491            parent_b.children.append(ChildB())
3492        session.add_all([parent_a, parent_b])
3493
3494        session.commit()
3495
3496    def test_load_both_wpoly(self):
3497        GenericParent, ParentA, ParentB, ChildA, ChildB = self.classes(
3498            "GenericParent", "ParentA", "ParentB", "ChildA", "ChildB"
3499        )
3500        session = fixture_session()
3501
3502        parent_types = with_polymorphic(GenericParent, [ParentA, ParentB])
3503
3504        with assert_engine(testing.db) as asserter_:
3505            session.query(parent_types).options(
3506                selectinload(parent_types.ParentA.children),
3507                selectinload(parent_types.ParentB.children),
3508            ).all()
3509
3510        asserter_.assert_(
3511            CompiledSQL(
3512                "SELECT generic_parent.id AS generic_parent_id, "
3513                "generic_parent.type AS generic_parent_type, "
3514                "parent_a.id AS parent_a_id, parent_b.id AS parent_b_id "
3515                "FROM generic_parent LEFT OUTER JOIN parent_a "
3516                "ON generic_parent.id = parent_a.id LEFT OUTER JOIN parent_b "
3517                "ON generic_parent.id = parent_b.id"
3518            ),
3519            AllOf(
3520                CompiledSQL(
3521                    "SELECT child_a.parent_id AS child_a_parent_id, "
3522                    "child_a.id AS child_a_id FROM child_a "
3523                    "WHERE child_a.parent_id IN "
3524                    "(__[POSTCOMPILE_primary_keys])",
3525                    [{"primary_keys": [1]}],
3526                ),
3527                CompiledSQL(
3528                    "SELECT child_b.parent_id AS child_b_parent_id, "
3529                    "child_b.id AS child_b_id FROM child_b "
3530                    "WHERE child_b.parent_id IN "
3531                    "(__[POSTCOMPILE_primary_keys])",
3532                    [{"primary_keys": [2]}],
3533                ),
3534            ),
3535        )
3536
3537
3538class TestBakedCancelsCorrectly(fixtures.DeclarativeMappedTest):
3539    # test issue #5303
3540
3541    @classmethod
3542    def setup_classes(cls):
3543        Base = cls.DeclarativeBasic
3544
3545        class User(Base):
3546            __tablename__ = "users"
3547
3548            id = Column(Integer, primary_key=True)
3549
3550        class Foo(Base):
3551            __tablename__ = "foos"
3552            __mapper_args__ = {"polymorphic_on": "type"}
3553
3554            id = Column(Integer, primary_key=True)
3555            type = Column(String(50), nullable=False)
3556
3557        class SubFoo(Foo):
3558            __tablename__ = "foos_sub"
3559            __mapper_args__ = {"polymorphic_identity": "USER"}
3560
3561            id = Column(Integer, ForeignKey("foos.id"), primary_key=True)
3562            user_id = Column(Integer, ForeignKey("users.id"))
3563            user = relationship("User")
3564
3565        class Bar(Base):
3566            __tablename__ = "bars"
3567
3568            id = Column(Integer, primary_key=True)
3569            foo_id = Column(Integer, ForeignKey("foos.id"))
3570            foo = relationship("Foo", cascade="all", uselist=False)
3571
3572    @classmethod
3573    def insert_data(cls, connection):
3574        User, Bar, SubFoo = cls.classes("User", "Bar", "SubFoo")
3575
3576        session = Session(connection)
3577
3578        user = User()
3579        sub_foo = SubFoo(user=user)
3580        sub_sub_bar = Bar(foo=sub_foo)
3581        session.add_all([user, sub_foo, sub_sub_bar])
3582        session.commit()
3583
3584    def test_option_accepted_each_time(self):
3585        Foo, User, Bar, SubFoo = self.classes("Foo", "User", "Bar", "SubFoo")
3586
3587        def go():
3588            # in this test, the loader options cancel caching because
3589            # the with_polymorphic() can't be cached, and this actually
3590            # fails because it won't match up to the with_polymorphic
3591            # used in the query if the query is in fact cached.  however
3592            # the cache spoil did not use full=True which kept the lead
3593            # entities around.
3594
3595            sess = fixture_session()
3596            foo_polymorphic = with_polymorphic(Foo, [SubFoo], aliased=True)
3597
3598            credit_adjustment_load = selectinload(
3599                Bar.foo.of_type(foo_polymorphic)
3600            )
3601            user_load = credit_adjustment_load.joinedload(
3602                foo_polymorphic.SubFoo.user
3603            )
3604            query = sess.query(Bar).options(user_load)
3605            ledger_entry = query.first()
3606            ledger_entry.foo.user
3607
3608        self.assert_sql_count(testing.db, go, 2)
3609        self.assert_sql_count(testing.db, go, 2)
3610        self.assert_sql_count(testing.db, go, 2)
3611
3612
3613class TestCompositePlusNonComposite(fixtures.DeclarativeMappedTest):
3614    __requires__ = ("tuple_in",)
3615
3616    @classmethod
3617    def setup_classes(cls):
3618        Base = cls.DeclarativeBasic
3619
3620        from sqlalchemy.sql import lambdas
3621        from sqlalchemy.orm import configure_mappers
3622
3623        lambdas._closure_per_cache_key.clear()
3624        lambdas.AnalyzedCode._fns.clear()
3625
3626        class A(ComparableEntity, Base):
3627            __tablename__ = "a"
3628
3629            id = Column(Integer, primary_key=True)
3630            bs = relationship("B", lazy="selectin")
3631
3632        class B(ComparableEntity, Base):
3633            __tablename__ = "b"
3634            id = Column(Integer, primary_key=True)
3635            a_id = Column(ForeignKey("a.id"))
3636
3637        class A2(ComparableEntity, Base):
3638            __tablename__ = "a2"
3639
3640            id = Column(Integer, primary_key=True)
3641            id2 = Column(Integer, primary_key=True)
3642            bs = relationship("B2", lazy="selectin")
3643
3644        class B2(ComparableEntity, Base):
3645            __tablename__ = "b2"
3646            id = Column(Integer, primary_key=True)
3647            a_id = Column(Integer)
3648            a_id2 = Column(Integer)
3649            __table_args__ = (
3650                ForeignKeyConstraint(["a_id", "a_id2"], ["a2.id", "a2.id2"]),
3651            )
3652
3653        configure_mappers()
3654
3655    @classmethod
3656    def insert_data(cls, connection):
3657        A, B, A2, B2 = cls.classes("A", "B", "A2", "B2")
3658        s = Session(connection)
3659
3660        s.add(A(bs=[B()]))
3661        s.add(A2(id=1, id2=1, bs=[B2()]))
3662
3663        s.commit()
3664
3665    def test_load_composite_then_non_composite(self):
3666
3667        A, B, A2, B2 = self.classes("A", "B", "A2", "B2")
3668
3669        s = fixture_session()
3670
3671        a2 = s.query(A2).first()
3672        a1 = s.query(A).first()
3673
3674        eq_(a2.bs, [B2()])
3675        eq_(a1.bs, [B()])
3676