1"""basic tests of lazy loaded attributes"""
2
3import datetime
4
5import sqlalchemy as sa
6from sqlalchemy import and_
7from sqlalchemy import bindparam
8from sqlalchemy import Boolean
9from sqlalchemy import ForeignKey
10from sqlalchemy import ForeignKeyConstraint
11from sqlalchemy import Integer
12from sqlalchemy import orm
13from sqlalchemy import SmallInteger
14from sqlalchemy import String
15from sqlalchemy import testing
16from sqlalchemy.orm import attributes
17from sqlalchemy.orm import configure_mappers
18from sqlalchemy.orm import create_session
19from sqlalchemy.orm import exc as orm_exc
20from sqlalchemy.orm import mapper
21from sqlalchemy.orm import relationship
22from sqlalchemy.orm import Session
23from sqlalchemy.orm.interfaces import MapperOption
24from sqlalchemy.testing import assert_raises
25from sqlalchemy.testing import eq_
26from sqlalchemy.testing import fixtures
27from sqlalchemy.testing import is_false
28from sqlalchemy.testing import is_true
29from sqlalchemy.testing import mock
30from sqlalchemy.testing.assertsql import CompiledSQL
31from sqlalchemy.testing.schema import Column
32from sqlalchemy.testing.schema import Table
33from sqlalchemy.types import TypeDecorator
34from test.orm import _fixtures
35
36
37class LazyTest(_fixtures.FixtureTest):
38    run_inserts = "once"
39    run_deletes = None
40
41    def test_basic(self):
42        users, Address, addresses, User = (
43            self.tables.users,
44            self.classes.Address,
45            self.tables.addresses,
46            self.classes.User,
47        )
48
49        mapper(
50            User,
51            users,
52            properties={
53                "addresses": relationship(
54                    mapper(Address, addresses), lazy="select"
55                )
56            },
57        )
58        sess = create_session()
59        q = sess.query(User)
60        eq_(
61            [
62                User(
63                    id=7,
64                    addresses=[Address(id=1, email_address="jack@bean.com")],
65                )
66            ],
67            q.filter(users.c.id == 7).all(),
68        )
69
70    def test_needs_parent(self):
71        """test the error raised when parent object is not bound."""
72
73        users, Address, addresses, User = (
74            self.tables.users,
75            self.classes.Address,
76            self.tables.addresses,
77            self.classes.User,
78        )
79
80        mapper(
81            User,
82            users,
83            properties={
84                "addresses": relationship(
85                    mapper(Address, addresses), lazy="select"
86                )
87            },
88        )
89        sess = create_session()
90        q = sess.query(User)
91        u = q.filter(users.c.id == 7).first()
92        sess.expunge(u)
93        assert_raises(orm_exc.DetachedInstanceError, getattr, u, "addresses")
94
95    def test_orderby(self):
96        users, Address, addresses, User = (
97            self.tables.users,
98            self.classes.Address,
99            self.tables.addresses,
100            self.classes.User,
101        )
102
103        mapper(
104            User,
105            users,
106            properties={
107                "addresses": relationship(
108                    mapper(Address, addresses),
109                    lazy="select",
110                    order_by=addresses.c.email_address,
111                )
112            },
113        )
114        q = create_session().query(User)
115        assert [
116            User(id=7, addresses=[Address(id=1)]),
117            User(
118                id=8,
119                addresses=[
120                    Address(id=3, email_address="ed@bettyboop.com"),
121                    Address(id=4, email_address="ed@lala.com"),
122                    Address(id=2, email_address="ed@wood.com"),
123                ],
124            ),
125            User(id=9, addresses=[Address(id=5)]),
126            User(id=10, addresses=[]),
127        ] == q.all()
128
129    def test_orderby_secondary(self):
130        """tests that a regular mapper select on a single table can
131        order by a relationship to a second table"""
132
133        Address, addresses, users, User = (
134            self.classes.Address,
135            self.tables.addresses,
136            self.tables.users,
137            self.classes.User,
138        )
139
140        mapper(Address, addresses)
141
142        mapper(
143            User,
144            users,
145            properties=dict(addresses=relationship(Address, lazy="select")),
146        )
147        q = create_session().query(User)
148        result = (
149            q.filter(users.c.id == addresses.c.user_id)
150            .order_by(addresses.c.email_address)
151            .all()
152        )
153        assert [
154            User(
155                id=8,
156                addresses=[
157                    Address(id=2, email_address="ed@wood.com"),
158                    Address(id=3, email_address="ed@bettyboop.com"),
159                    Address(id=4, email_address="ed@lala.com"),
160                ],
161            ),
162            User(id=9, addresses=[Address(id=5)]),
163            User(id=7, addresses=[Address(id=1)]),
164        ] == result
165
166    def test_orderby_desc(self):
167        Address, addresses, users, User = (
168            self.classes.Address,
169            self.tables.addresses,
170            self.tables.users,
171            self.classes.User,
172        )
173
174        mapper(Address, addresses)
175
176        mapper(
177            User,
178            users,
179            properties=dict(
180                addresses=relationship(
181                    Address,
182                    lazy="select",
183                    order_by=[sa.desc(addresses.c.email_address)],
184                )
185            ),
186        )
187        sess = create_session()
188        assert [
189            User(id=7, addresses=[Address(id=1)]),
190            User(
191                id=8,
192                addresses=[
193                    Address(id=2, email_address="ed@wood.com"),
194                    Address(id=4, email_address="ed@lala.com"),
195                    Address(id=3, email_address="ed@bettyboop.com"),
196                ],
197            ),
198            User(id=9, addresses=[Address(id=5)]),
199            User(id=10, addresses=[]),
200        ] == sess.query(User).all()
201
202    def test_no_orphan(self):
203        """test that a lazily loaded child object is not marked as an orphan"""
204
205        users, Address, addresses, User = (
206            self.tables.users,
207            self.classes.Address,
208            self.tables.addresses,
209            self.classes.User,
210        )
211
212        mapper(
213            User,
214            users,
215            properties={
216                "addresses": relationship(
217                    Address, cascade="all,delete-orphan", lazy="select"
218                )
219            },
220        )
221        mapper(Address, addresses)
222
223        sess = create_session()
224        user = sess.query(User).get(7)
225        assert getattr(User, "addresses").hasparent(
226            attributes.instance_state(user.addresses[0]), optimistic=True
227        )
228        assert not sa.orm.class_mapper(Address)._is_orphan(
229            attributes.instance_state(user.addresses[0])
230        )
231
232    def test_limit(self):
233        """test limit operations combined with lazy-load relationships."""
234
235        (
236            users,
237            items,
238            order_items,
239            orders,
240            Item,
241            User,
242            Address,
243            Order,
244            addresses,
245        ) = (
246            self.tables.users,
247            self.tables.items,
248            self.tables.order_items,
249            self.tables.orders,
250            self.classes.Item,
251            self.classes.User,
252            self.classes.Address,
253            self.classes.Order,
254            self.tables.addresses,
255        )
256
257        mapper(Item, items)
258        mapper(
259            Order,
260            orders,
261            properties={
262                "items": relationship(
263                    Item, secondary=order_items, lazy="select"
264                )
265            },
266        )
267        mapper(
268            User,
269            users,
270            properties={
271                "addresses": relationship(
272                    mapper(Address, addresses), lazy="select"
273                ),
274                "orders": relationship(Order, lazy="select"),
275            },
276        )
277
278        sess = create_session()
279        q = sess.query(User)
280
281        if testing.against("mssql"):
282            result = q.limit(2).all()
283            assert self.static.user_all_result[:2] == result
284        else:
285            result = q.limit(2).offset(1).all()
286            assert self.static.user_all_result[1:3] == result
287
288    def test_distinct(self):
289        (
290            users,
291            items,
292            order_items,
293            orders,
294            Item,
295            User,
296            Address,
297            Order,
298            addresses,
299        ) = (
300            self.tables.users,
301            self.tables.items,
302            self.tables.order_items,
303            self.tables.orders,
304            self.classes.Item,
305            self.classes.User,
306            self.classes.Address,
307            self.classes.Order,
308            self.tables.addresses,
309        )
310
311        mapper(Item, items)
312        mapper(
313            Order,
314            orders,
315            properties={
316                "items": relationship(
317                    Item, secondary=order_items, lazy="select"
318                )
319            },
320        )
321        mapper(
322            User,
323            users,
324            properties={
325                "addresses": relationship(
326                    mapper(Address, addresses), lazy="select"
327                ),
328                "orders": relationship(Order, lazy="select"),
329            },
330        )
331
332        sess = create_session()
333        q = sess.query(User)
334
335        # use a union all to get a lot of rows to join against
336        u2 = users.alias("u2")
337        s = sa.union_all(
338            u2.select(use_labels=True),
339            u2.select(use_labels=True),
340            u2.select(use_labels=True),
341        ).alias("u")
342        result = (
343            q.filter(s.c.u2_id == User.id).order_by(User.id).distinct().all()
344        )
345        eq_(self.static.user_all_result, result)
346
347    def test_uselist_false_warning(self):
348        """test that multiple rows received by a
349        uselist=False raises a warning."""
350
351        User, users, orders, Order = (
352            self.classes.User,
353            self.tables.users,
354            self.tables.orders,
355            self.classes.Order,
356        )
357
358        mapper(
359            User,
360            users,
361            properties={"order": relationship(Order, uselist=False)},
362        )
363        mapper(Order, orders)
364        s = create_session()
365        u1 = s.query(User).filter(User.id == 7).one()
366        assert_raises(sa.exc.SAWarning, getattr, u1, "order")
367
368    def test_callable_bind(self):
369        Address, addresses, users, User = (
370            self.classes.Address,
371            self.tables.addresses,
372            self.tables.users,
373            self.classes.User,
374        )
375
376        mapper(
377            User,
378            users,
379            properties=dict(
380                addresses=relationship(
381                    mapper(Address, addresses),
382                    lazy="select",
383                    primaryjoin=and_(
384                        users.c.id == addresses.c.user_id,
385                        users.c.name
386                        == bindparam("name", callable_=lambda: "ed"),
387                    ),
388                )
389            ),
390        )
391
392        s = Session()
393        ed = s.query(User).filter_by(name="ed").one()
394        eq_(
395            ed.addresses,
396            [
397                Address(id=2, user_id=8),
398                Address(id=3, user_id=8),
399                Address(id=4, user_id=8),
400            ],
401        )
402
403        fred = s.query(User).filter_by(name="fred").one()
404        eq_(fred.addresses, [])  # fred is missing
405
406    def test_custom_bind(self):
407        Address, addresses, users, User = (
408            self.classes.Address,
409            self.tables.addresses,
410            self.tables.users,
411            self.classes.User,
412        )
413
414        mapper(
415            User,
416            users,
417            properties=dict(
418                addresses=relationship(
419                    mapper(Address, addresses),
420                    lazy="select",
421                    primaryjoin=and_(
422                        users.c.id == addresses.c.user_id,
423                        users.c.name == bindparam("name"),
424                    ),
425                )
426            ),
427        )
428
429        canary = mock.Mock()
430
431        class MyOption(MapperOption):
432            propagate_to_loaders = True
433
434            def __init__(self, crit):
435                self.crit = crit
436
437            def process_query_conditionally(self, query):
438                """process query during a lazyload"""
439                canary()
440                query._params = query._params.union(dict(name=self.crit))
441
442        s = Session()
443        ed = s.query(User).options(MyOption("ed")).filter_by(name="ed").one()
444        eq_(
445            ed.addresses,
446            [
447                Address(id=2, user_id=8),
448                Address(id=3, user_id=8),
449                Address(id=4, user_id=8),
450            ],
451        )
452        eq_(canary.mock_calls, [mock.call()])
453
454        fred = (
455            s.query(User).options(MyOption("ed")).filter_by(name="fred").one()
456        )
457        eq_(fred.addresses, [])  # fred is missing
458        eq_(canary.mock_calls, [mock.call(), mock.call()])
459
460        # the lazy query was not cached; the option is re-applied to the
461        # Fred object due to populate_existing()
462        fred = (
463            s.query(User)
464            .populate_existing()
465            .options(MyOption("fred"))
466            .filter_by(name="fred")
467            .one()
468        )
469        eq_(fred.addresses, [Address(id=5, user_id=9)])  # fred is there
470
471        eq_(canary.mock_calls, [mock.call(), mock.call(), mock.call()])
472
473    def test_one_to_many_scalar(self):
474        Address, addresses, users, User = (
475            self.classes.Address,
476            self.tables.addresses,
477            self.tables.users,
478            self.classes.User,
479        )
480
481        mapper(
482            User,
483            users,
484            properties=dict(
485                address=relationship(
486                    mapper(Address, addresses), lazy="select", uselist=False
487                )
488            ),
489        )
490        q = create_session().query(User)
491        result = q.filter(users.c.id == 7).all()
492        assert [User(id=7, address=Address(id=1))] == result
493
494    def test_many_to_one_binds(self):
495        Address, addresses, users, User = (
496            self.classes.Address,
497            self.tables.addresses,
498            self.tables.users,
499            self.classes.User,
500        )
501
502        mapper(
503            Address,
504            addresses,
505            primary_key=[addresses.c.user_id, addresses.c.email_address],
506        )
507
508        mapper(
509            User,
510            users,
511            properties=dict(
512                address=relationship(
513                    Address,
514                    uselist=False,
515                    primaryjoin=sa.and_(
516                        users.c.id == addresses.c.user_id,
517                        addresses.c.email_address == "ed@bettyboop.com",
518                    ),
519                )
520            ),
521        )
522        q = create_session().query(User)
523        eq_(
524            [
525                User(id=7, address=None),
526                User(id=8, address=Address(id=3)),
527                User(id=9, address=None),
528                User(id=10, address=None),
529            ],
530            list(q),
531        )
532
533    def test_double(self):
534        """tests lazy loading with two relationships simultaneously,
535        from the same table, using aliases.  """
536
537        users, orders, User, Address, Order, addresses = (
538            self.tables.users,
539            self.tables.orders,
540            self.classes.User,
541            self.classes.Address,
542            self.classes.Order,
543            self.tables.addresses,
544        )
545
546        openorders = sa.alias(orders, "openorders")
547        closedorders = sa.alias(orders, "closedorders")
548
549        mapper(Address, addresses)
550
551        mapper(Order, orders)
552
553        open_mapper = mapper(Order, openorders, non_primary=True)
554        closed_mapper = mapper(Order, closedorders, non_primary=True)
555        mapper(
556            User,
557            users,
558            properties=dict(
559                addresses=relationship(Address, lazy=True),
560                open_orders=relationship(
561                    open_mapper,
562                    primaryjoin=sa.and_(
563                        openorders.c.isopen == 1,
564                        users.c.id == openorders.c.user_id,
565                    ),
566                    lazy="select",
567                ),
568                closed_orders=relationship(
569                    closed_mapper,
570                    primaryjoin=sa.and_(
571                        closedorders.c.isopen == 0,
572                        users.c.id == closedorders.c.user_id,
573                    ),
574                    lazy="select",
575                ),
576            ),
577        )
578        q = create_session().query(User)
579
580        assert [
581            User(
582                id=7,
583                addresses=[Address(id=1)],
584                open_orders=[Order(id=3)],
585                closed_orders=[Order(id=1), Order(id=5)],
586            ),
587            User(
588                id=8,
589                addresses=[Address(id=2), Address(id=3), Address(id=4)],
590                open_orders=[],
591                closed_orders=[],
592            ),
593            User(
594                id=9,
595                addresses=[Address(id=5)],
596                open_orders=[Order(id=4)],
597                closed_orders=[Order(id=2)],
598            ),
599            User(id=10),
600        ] == q.all()
601
602        sess = create_session()
603        user = sess.query(User).get(7)
604        eq_(
605            [Order(id=1), Order(id=5)],
606            create_session()
607            .query(closed_mapper)
608            .with_parent(user, property="closed_orders")
609            .all(),
610        )
611        eq_(
612            [Order(id=3)],
613            create_session()
614            .query(open_mapper)
615            .with_parent(user, property="open_orders")
616            .all(),
617        )
618
619    def test_many_to_many(self):
620        keywords, items, item_keywords, Keyword, Item = (
621            self.tables.keywords,
622            self.tables.items,
623            self.tables.item_keywords,
624            self.classes.Keyword,
625            self.classes.Item,
626        )
627
628        mapper(Keyword, keywords)
629        mapper(
630            Item,
631            items,
632            properties=dict(
633                keywords=relationship(
634                    Keyword, secondary=item_keywords, lazy="select"
635                )
636            ),
637        )
638
639        q = create_session().query(Item)
640        assert self.static.item_keyword_result == q.all()
641
642        eq_(
643            self.static.item_keyword_result[0:2],
644            q.join("keywords").filter(keywords.c.name == "red").all(),
645        )
646
647    def test_uses_get(self):
648        """test that a simple many-to-one lazyload optimizes
649        to use query.get()."""
650
651        Address, addresses, users, User = (
652            self.classes.Address,
653            self.tables.addresses,
654            self.tables.users,
655            self.classes.User,
656        )
657
658        for pj in (
659            None,
660            users.c.id == addresses.c.user_id,
661            addresses.c.user_id == users.c.id,
662        ):
663            mapper(
664                Address,
665                addresses,
666                properties=dict(
667                    user=relationship(
668                        mapper(User, users), lazy="select", primaryjoin=pj
669                    )
670                ),
671            )
672
673            sess = create_session()
674
675            # load address
676            a1 = (
677                sess.query(Address)
678                .filter_by(email_address="ed@wood.com")
679                .one()
680            )
681
682            # load user that is attached to the address
683            u1 = sess.query(User).get(8)
684
685            def go():
686                # lazy load of a1.user should get it from the session
687                assert a1.user is u1
688
689            self.assert_sql_count(testing.db, go, 0)
690            sa.orm.clear_mappers()
691
692    def test_uses_get_compatible_types(self):
693        """test the use_get optimization with compatible
694        but non-identical types"""
695
696        User, Address = self.classes.User, self.classes.Address
697
698        class IntDecorator(TypeDecorator):
699            impl = Integer
700
701        class SmallintDecorator(TypeDecorator):
702            impl = SmallInteger
703
704        class SomeDBInteger(sa.Integer):
705            pass
706
707        for tt in [
708            Integer,
709            SmallInteger,
710            IntDecorator,
711            SmallintDecorator,
712            SomeDBInteger,
713        ]:
714            m = sa.MetaData()
715            users = Table(
716                "users",
717                m,
718                Column(
719                    "id",
720                    Integer,
721                    primary_key=True,
722                    test_needs_autoincrement=True,
723                ),
724                Column("name", String(30), nullable=False),
725            )
726            addresses = Table(
727                "addresses",
728                m,
729                Column(
730                    "id",
731                    Integer,
732                    primary_key=True,
733                    test_needs_autoincrement=True,
734                ),
735                Column("user_id", tt, ForeignKey("users.id")),
736                Column("email_address", String(50), nullable=False),
737            )
738
739            mapper(
740                Address,
741                addresses,
742                properties=dict(user=relationship(mapper(User, users))),
743            )
744
745            sess = create_session(bind=testing.db)
746
747            # load address
748            a1 = (
749                sess.query(Address)
750                .filter_by(email_address="ed@wood.com")
751                .one()
752            )
753
754            # load user that is attached to the address
755            u1 = sess.query(User).get(8)
756
757            def go():
758                # lazy load of a1.user should get it from the session
759                assert a1.user is u1
760
761            self.assert_sql_count(testing.db, go, 0)
762            sa.orm.clear_mappers()
763
764    def test_many_to_one(self):
765        users, Address, addresses, User = (
766            self.tables.users,
767            self.classes.Address,
768            self.tables.addresses,
769            self.classes.User,
770        )
771
772        mapper(
773            Address,
774            addresses,
775            properties=dict(
776                user=relationship(mapper(User, users), lazy="select")
777            ),
778        )
779        sess = create_session()
780        q = sess.query(Address)
781        a = q.filter(addresses.c.id == 1).one()
782
783        assert a.user is not None
784
785        u1 = sess.query(User).get(7)
786
787        assert a.user is u1
788
789    def test_backrefs_dont_lazyload(self):
790        users, Address, addresses, User = (
791            self.tables.users,
792            self.classes.Address,
793            self.tables.addresses,
794            self.classes.User,
795        )
796
797        mapper(
798            User,
799            users,
800            properties={"addresses": relationship(Address, backref="user")},
801        )
802        mapper(Address, addresses)
803        sess = create_session()
804        ad = sess.query(Address).filter_by(id=1).one()
805        assert ad.user.id == 7
806
807        def go():
808            ad.user = None
809            assert ad.user is None
810
811        self.assert_sql_count(testing.db, go, 0)
812
813        u1 = sess.query(User).filter_by(id=7).one()
814
815        def go():
816            assert ad not in u1.addresses
817
818        self.assert_sql_count(testing.db, go, 1)
819
820        sess.expire(u1, ["addresses"])
821
822        def go():
823            assert ad in u1.addresses
824
825        self.assert_sql_count(testing.db, go, 1)
826
827        sess.expire(u1, ["addresses"])
828        ad2 = Address()
829
830        def go():
831            ad2.user = u1
832            assert ad2.user is u1
833
834        self.assert_sql_count(testing.db, go, 0)
835
836        def go():
837            assert ad2 in u1.addresses
838
839        self.assert_sql_count(testing.db, go, 1)
840
841
842class GetterStateTest(_fixtures.FixtureTest):
843
844    """test lazyloader on non-existent attribute returns
845    expected attribute symbols, maintain expected state"""
846
847    run_inserts = None
848
849    def _unhashable_fixture(self, metadata, load_on_pending=False):
850        class MyHashType(sa.TypeDecorator):
851            impl = sa.String(100)
852
853            def process_bind_param(self, value, dialect):
854                return ";".join(
855                    "%s=%s" % (k, v)
856                    for k, v in sorted(value.items(), key=lambda key: key[0])
857                )
858
859            def process_result_value(self, value, dialect):
860                return dict(elem.split("=", 1) for elem in value.split(";"))
861
862        category = Table(
863            "category",
864            metadata,
865            Column("id", Integer, primary_key=True),
866            Column("data", MyHashType()),
867        )
868        article = Table(
869            "article",
870            metadata,
871            Column("id", Integer, primary_key=True),
872            Column("data", MyHashType()),
873        )
874
875        class Category(fixtures.ComparableEntity):
876            pass
877
878        class Article(fixtures.ComparableEntity):
879            pass
880
881        mapper(Category, category)
882        mapper(
883            Article,
884            article,
885            properties={
886                "category": relationship(
887                    Category,
888                    primaryjoin=orm.foreign(article.c.data) == category.c.data,
889                    load_on_pending=load_on_pending,
890                )
891            },
892        )
893
894        metadata.create_all()
895        sess = Session(autoflush=False)
896        data = {"im": "unhashable"}
897        a1 = Article(id=1, data=data)
898        c1 = Category(id=1, data=data)
899        if load_on_pending:
900            sess.add(c1)
901        else:
902            sess.add_all([c1, a1])
903        sess.flush()
904        if load_on_pending:
905            sess.add(a1)
906        return Category, Article, sess, a1, c1
907
908    def _u_ad_fixture(self, populate_user, dont_use_get=False):
909        users, Address, addresses, User = (
910            self.tables.users,
911            self.classes.Address,
912            self.tables.addresses,
913            self.classes.User,
914        )
915
916        mapper(
917            User,
918            users,
919            properties={
920                "addresses": relationship(Address, back_populates="user")
921            },
922        )
923        mapper(
924            Address,
925            addresses,
926            properties={
927                "user": relationship(
928                    User,
929                    primaryjoin=and_(
930                        users.c.id == addresses.c.user_id, users.c.id != 27
931                    )
932                    if dont_use_get
933                    else None,
934                    back_populates="addresses",
935                )
936            },
937        )
938
939        sess = create_session()
940        a1 = Address(email_address="a1")
941        sess.add(a1)
942        if populate_user:
943            a1.user = User(name="ed")
944        sess.flush()
945        if populate_user:
946            sess.expire_all()
947        return User, Address, sess, a1
948
949    def test_no_use_get_params_missing(self):
950        User, Address, sess, a1 = self._u_ad_fixture(False, True)
951
952        def go():
953            eq_(a1.user, None)
954
955        # doesn't emit SQL
956        self.assert_sql_count(testing.db, go, 0)
957
958    @testing.provide_metadata
959    def test_no_use_get_params_not_hashable(self):
960        Category, Article, sess, a1, c1 = self._unhashable_fixture(
961            self.metadata
962        )
963
964        def go():
965            eq_(a1.category, c1)
966
967        self.assert_sql_count(testing.db, go, 1)
968
969    @testing.provide_metadata
970    def test_no_use_get_params_not_hashable_on_pending(self):
971        Category, Article, sess, a1, c1 = self._unhashable_fixture(
972            self.metadata, load_on_pending=True
973        )
974
975        def go():
976            eq_(a1.category, c1)
977
978        self.assert_sql_count(testing.db, go, 1)
979
980    def test_get_empty_passive_return_never_set(self):
981        User, Address, sess, a1 = self._u_ad_fixture(False)
982        eq_(
983            Address.user.impl.get(
984                attributes.instance_state(a1),
985                attributes.instance_dict(a1),
986                passive=attributes.PASSIVE_RETURN_NEVER_SET,
987            ),
988            attributes.NEVER_SET,
989        )
990        assert "user_id" not in a1.__dict__
991        assert "user" not in a1.__dict__
992
993    def test_history_empty_passive_return_never_set(self):
994        User, Address, sess, a1 = self._u_ad_fixture(False)
995        eq_(
996            Address.user.impl.get_history(
997                attributes.instance_state(a1),
998                attributes.instance_dict(a1),
999                passive=attributes.PASSIVE_RETURN_NEVER_SET,
1000            ),
1001            ((), (), ()),
1002        )
1003        assert "user_id" not in a1.__dict__
1004        assert "user" not in a1.__dict__
1005
1006    def test_get_empty_passive_no_initialize(self):
1007        User, Address, sess, a1 = self._u_ad_fixture(False)
1008        eq_(
1009            Address.user.impl.get(
1010                attributes.instance_state(a1),
1011                attributes.instance_dict(a1),
1012                passive=attributes.PASSIVE_NO_INITIALIZE,
1013            ),
1014            attributes.PASSIVE_NO_RESULT,
1015        )
1016        assert "user_id" not in a1.__dict__
1017        assert "user" not in a1.__dict__
1018
1019    def test_history_empty_passive_no_initialize(self):
1020        User, Address, sess, a1 = self._u_ad_fixture(False)
1021        eq_(
1022            Address.user.impl.get_history(
1023                attributes.instance_state(a1),
1024                attributes.instance_dict(a1),
1025                passive=attributes.PASSIVE_NO_INITIALIZE,
1026            ),
1027            attributes.HISTORY_BLANK,
1028        )
1029        assert "user_id" not in a1.__dict__
1030        assert "user" not in a1.__dict__
1031
1032    def test_get_populated_passive_no_initialize(self):
1033        User, Address, sess, a1 = self._u_ad_fixture(True)
1034        eq_(
1035            Address.user.impl.get(
1036                attributes.instance_state(a1),
1037                attributes.instance_dict(a1),
1038                passive=attributes.PASSIVE_NO_INITIALIZE,
1039            ),
1040            attributes.PASSIVE_NO_RESULT,
1041        )
1042        assert "user_id" not in a1.__dict__
1043        assert "user" not in a1.__dict__
1044
1045    def test_history_populated_passive_no_initialize(self):
1046        User, Address, sess, a1 = self._u_ad_fixture(True)
1047        eq_(
1048            Address.user.impl.get_history(
1049                attributes.instance_state(a1),
1050                attributes.instance_dict(a1),
1051                passive=attributes.PASSIVE_NO_INITIALIZE,
1052            ),
1053            attributes.HISTORY_BLANK,
1054        )
1055        assert "user_id" not in a1.__dict__
1056        assert "user" not in a1.__dict__
1057
1058    def test_get_populated_passive_return_never_set(self):
1059        User, Address, sess, a1 = self._u_ad_fixture(True)
1060        eq_(
1061            Address.user.impl.get(
1062                attributes.instance_state(a1),
1063                attributes.instance_dict(a1),
1064                passive=attributes.PASSIVE_RETURN_NEVER_SET,
1065            ),
1066            User(name="ed"),
1067        )
1068
1069    def test_history_populated_passive_return_never_set(self):
1070        User, Address, sess, a1 = self._u_ad_fixture(True)
1071        eq_(
1072            Address.user.impl.get_history(
1073                attributes.instance_state(a1),
1074                attributes.instance_dict(a1),
1075                passive=attributes.PASSIVE_RETURN_NEVER_SET,
1076            ),
1077            ((), [User(name="ed")], ()),
1078        )
1079
1080
1081class M2OGetTest(_fixtures.FixtureTest):
1082    run_inserts = "once"
1083    run_deletes = None
1084
1085    def test_m2o_noload(self):
1086        """test that a NULL foreign key doesn't trigger a lazy load"""
1087
1088        users, Address, addresses, User = (
1089            self.tables.users,
1090            self.classes.Address,
1091            self.tables.addresses,
1092            self.classes.User,
1093        )
1094
1095        mapper(User, users)
1096
1097        mapper(Address, addresses, properties={"user": relationship(User)})
1098
1099        sess = create_session()
1100        ad1 = Address(email_address="somenewaddress", id=12)
1101        sess.add(ad1)
1102        sess.flush()
1103        sess.expunge_all()
1104
1105        ad2 = sess.query(Address).get(1)
1106        ad3 = sess.query(Address).get(ad1.id)
1107
1108        def go():
1109            # one lazy load
1110            assert ad2.user.name == "jack"
1111            # no lazy load
1112            assert ad3.user is None
1113
1114        self.assert_sql_count(testing.db, go, 1)
1115
1116
1117class CorrelatedTest(fixtures.MappedTest):
1118    @classmethod
1119    def define_tables(self, meta):
1120        Table(
1121            "user_t",
1122            meta,
1123            Column("id", Integer, primary_key=True),
1124            Column("name", String(50)),
1125        )
1126
1127        Table(
1128            "stuff",
1129            meta,
1130            Column("id", Integer, primary_key=True),
1131            Column("date", sa.Date),
1132            Column("user_id", Integer, ForeignKey("user_t.id")),
1133        )
1134
1135    @classmethod
1136    def insert_data(cls):
1137        stuff, user_t = cls.tables.stuff, cls.tables.user_t
1138
1139        user_t.insert().execute(
1140            {"id": 1, "name": "user1"},
1141            {"id": 2, "name": "user2"},
1142            {"id": 3, "name": "user3"},
1143        )
1144
1145        stuff.insert().execute(
1146            {"id": 1, "user_id": 1, "date": datetime.date(2007, 10, 15)},
1147            {"id": 2, "user_id": 1, "date": datetime.date(2007, 12, 15)},
1148            {"id": 3, "user_id": 1, "date": datetime.date(2007, 11, 15)},
1149            {"id": 4, "user_id": 2, "date": datetime.date(2008, 1, 15)},
1150            {"id": 5, "user_id": 3, "date": datetime.date(2007, 6, 15)},
1151        )
1152
1153    def test_correlated_lazyload(self):
1154        stuff, user_t = self.tables.stuff, self.tables.user_t
1155
1156        class User(fixtures.ComparableEntity):
1157            pass
1158
1159        class Stuff(fixtures.ComparableEntity):
1160            pass
1161
1162        mapper(Stuff, stuff)
1163
1164        stuff_view = (
1165            sa.select([stuff.c.id])
1166            .where(stuff.c.user_id == user_t.c.id)
1167            .correlate(user_t)
1168            .order_by(sa.desc(stuff.c.date))
1169            .limit(1)
1170        )
1171
1172        mapper(
1173            User,
1174            user_t,
1175            properties={
1176                "stuff": relationship(
1177                    Stuff,
1178                    primaryjoin=sa.and_(
1179                        user_t.c.id == stuff.c.user_id,
1180                        stuff.c.id == (stuff_view.as_scalar()),
1181                    ),
1182                )
1183            },
1184        )
1185
1186        sess = create_session()
1187
1188        eq_(
1189            sess.query(User).all(),
1190            [
1191                User(
1192                    name="user1",
1193                    stuff=[Stuff(date=datetime.date(2007, 12, 15), id=2)],
1194                ),
1195                User(
1196                    name="user2",
1197                    stuff=[Stuff(id=4, date=datetime.date(2008, 1, 15))],
1198                ),
1199                User(
1200                    name="user3",
1201                    stuff=[Stuff(id=5, date=datetime.date(2007, 6, 15))],
1202                ),
1203            ],
1204        )
1205
1206
1207class O2MWOSideFixedTest(fixtures.MappedTest):
1208    # test #2948 - o2m backref with a "m2o does/does not count"
1209    # criteria doesn't scan the "o" table
1210
1211    @classmethod
1212    def define_tables(self, meta):
1213        Table(
1214            "city",
1215            meta,
1216            Column("id", Integer, primary_key=True),
1217            Column("deleted", Boolean),
1218        )
1219        Table(
1220            "person",
1221            meta,
1222            Column("id", Integer, primary_key=True),
1223            Column("city_id", ForeignKey("city.id")),
1224        )
1225
1226    @classmethod
1227    def setup_classes(cls):
1228        class Person(cls.Basic):
1229            pass
1230
1231        class City(cls.Basic):
1232            pass
1233
1234    @classmethod
1235    def setup_mappers(cls):
1236        Person, City = cls.classes.Person, cls.classes.City
1237        city, person = cls.tables.city, cls.tables.person
1238
1239        mapper(
1240            Person,
1241            person,
1242            properties={
1243                "city": relationship(
1244                    City,
1245                    primaryjoin=and_(
1246                        person.c.city_id == city.c.id, city.c.deleted == False
1247                    ),  # noqa
1248                    backref="people",
1249                )
1250            },
1251        )
1252        mapper(City, city)
1253
1254    def _fixture(self, include_other):
1255        city, person = self.tables.city, self.tables.person
1256
1257        if include_other:
1258            city.insert().execute({"id": 1, "deleted": False})
1259
1260            person.insert().execute(
1261                {"id": 1, "city_id": 1}, {"id": 2, "city_id": 1}
1262            )
1263
1264        city.insert().execute({"id": 2, "deleted": True})
1265
1266        person.insert().execute(
1267            {"id": 3, "city_id": 2}, {"id": 4, "city_id": 2}
1268        )
1269
1270    def test_lazyload_assert_expected_sql(self):
1271        self._fixture(True)
1272        City = self.classes.City
1273        sess = Session(testing.db)
1274        c1, c2 = sess.query(City).order_by(City.id).all()
1275
1276        def go():
1277            eq_([p.id for p in c2.people], [])
1278
1279        self.assert_sql_execution(
1280            testing.db,
1281            go,
1282            CompiledSQL(
1283                "SELECT person.id AS person_id, person.city_id AS "
1284                "person_city_id FROM person "
1285                "WHERE person.city_id = :param_1 AND :param_2 = 0",
1286                {"param_1": 2, "param_2": 1},
1287            ),
1288        )
1289
1290    def test_lazyload_people_other_exists(self):
1291        self._fixture(True)
1292        City = self.classes.City
1293        sess = Session(testing.db)
1294        c1, c2 = sess.query(City).order_by(City.id).all()
1295        eq_([p.id for p in c1.people], [1, 2])
1296
1297        eq_([p.id for p in c2.people], [])
1298
1299    def test_lazyload_people_no_other_exists(self):
1300        # note that if we revert #2948, *this still passes!*
1301        # e.g. due to the scan of the "o" table, whether or not *another*
1302        # row exists determines if this works.
1303
1304        self._fixture(False)
1305        City = self.classes.City
1306        sess = Session(testing.db)
1307        c2, = sess.query(City).order_by(City.id).all()
1308
1309        eq_([p.id for p in c2.people], [])
1310
1311
1312class RefersToSelfLazyLoadInterferenceTest(fixtures.MappedTest):
1313    """Test [issue:3145].
1314
1315    This involves an object that refers to itself, which isn't
1316    entirely a supported use case.   Here, we're able to fix it,
1317    but long term it's not clear if future needs will affect this.
1318    The use case is not super-critical.
1319
1320    """
1321
1322    @classmethod
1323    def define_tables(cls, metadata):
1324        Table(
1325            "a",
1326            metadata,
1327            Column("a_id", Integer, primary_key=True),
1328            Column("b_id", ForeignKey("b.b_id")),
1329        )
1330
1331        Table(
1332            "b",
1333            metadata,
1334            Column("b_id", Integer, primary_key=True),
1335            Column("parent_id", ForeignKey("b.b_id")),
1336        )
1337
1338        Table(
1339            "c",
1340            metadata,
1341            Column("c_id", Integer, primary_key=True),
1342            Column("b_id", ForeignKey("b.b_id")),
1343        )
1344
1345    @classmethod
1346    def setup_classes(cls):
1347        class A(cls.Basic):
1348            pass
1349
1350        class B(cls.Basic):
1351            pass
1352
1353        class C(cls.Basic):
1354            pass
1355
1356    @classmethod
1357    def setup_mappers(cls):
1358        mapper(
1359            cls.classes.A,
1360            cls.tables.a,
1361            properties={"b": relationship(cls.classes.B)},
1362        )
1363        bm = mapper(
1364            cls.classes.B,
1365            cls.tables.b,
1366            properties={
1367                "parent": relationship(
1368                    cls.classes.B, remote_side=cls.tables.b.c.b_id
1369                ),
1370                "zc": relationship(cls.classes.C),
1371            },
1372        )
1373        mapper(cls.classes.C, cls.tables.c)
1374
1375        bmp = bm._props
1376        configure_mappers()
1377        # Bug is order-dependent, must sort the "zc" property to the end
1378        bmp.sort()
1379
1380    def test_lazy_doesnt_interfere(self):
1381        A, B, C = self.classes("A", "B", "C")
1382
1383        session = Session()
1384        b = B()
1385        session.add(b)
1386        session.flush()
1387
1388        b.parent_id = b.b_id
1389
1390        b.zc.append(C())
1391        b.zc.append(C())
1392        session.commit()
1393
1394        # If the bug is here, the next line throws an exception
1395        session.query(B).options(
1396            sa.orm.joinedload("parent").joinedload("zc")
1397        ).all()
1398
1399
1400class TypeCoerceTest(fixtures.MappedTest, testing.AssertsExecutionResults):
1401    """ORM-level test for [ticket:3531]"""
1402
1403    # mysql is having a recursion issue in the bind_expression
1404    __only_on__ = ("sqlite", "postgresql")
1405
1406    class StringAsInt(TypeDecorator):
1407        impl = String(50)
1408
1409        def column_expression(self, col):
1410            return sa.cast(col, Integer)
1411
1412        def bind_expression(self, col):
1413            return sa.cast(col, String)
1414
1415    @classmethod
1416    def define_tables(cls, metadata):
1417        Table(
1418            "person", metadata, Column("id", cls.StringAsInt, primary_key=True)
1419        )
1420        Table(
1421            "pets",
1422            metadata,
1423            Column("id", Integer, primary_key=True),
1424            Column("person_id", Integer),
1425        )
1426
1427    @classmethod
1428    def setup_classes(cls):
1429        class Person(cls.Basic):
1430            pass
1431
1432        class Pet(cls.Basic):
1433            pass
1434
1435    @classmethod
1436    def setup_mappers(cls):
1437        mapper(
1438            cls.classes.Person,
1439            cls.tables.person,
1440            properties=dict(
1441                pets=relationship(
1442                    cls.classes.Pet,
1443                    primaryjoin=(
1444                        orm.foreign(cls.tables.pets.c.person_id)
1445                        == sa.cast(
1446                            sa.type_coerce(cls.tables.person.c.id, Integer),
1447                            Integer,
1448                        )
1449                    ),
1450                )
1451            ),
1452        )
1453
1454        mapper(cls.classes.Pet, cls.tables.pets)
1455
1456    def test_lazyload_singlecast(self):
1457        Person = self.classes.Person
1458        Pet = self.classes.Pet
1459
1460        s = Session()
1461        s.add_all([Person(id=5), Pet(id=1, person_id=5)])
1462        s.commit()
1463
1464        p1 = s.query(Person).first()
1465
1466        with self.sql_execution_asserter() as asserter:
1467            p1.pets
1468
1469        asserter.assert_(
1470            CompiledSQL(
1471                "SELECT pets.id AS pets_id, pets.person_id "
1472                "AS pets_person_id FROM pets "
1473                "WHERE pets.person_id = CAST(:param_1 AS INTEGER)",
1474                [{"param_1": 5}],
1475            )
1476        )
1477
1478
1479class CompositeSimpleM2OTest(fixtures.MappedTest):
1480    """ORM-level test for [ticket:3788]"""
1481
1482    @classmethod
1483    def define_tables(cls, metadata):
1484        Table(
1485            "a",
1486            metadata,
1487            Column("id1", Integer, primary_key=True),
1488            Column("id2", Integer, primary_key=True),
1489        )
1490
1491        Table(
1492            "b_sameorder",
1493            metadata,
1494            Column("id", Integer, primary_key=True),
1495            Column("a_id1", Integer),
1496            Column("a_id2", Integer),
1497            ForeignKeyConstraint(["a_id1", "a_id2"], ["a.id1", "a.id2"]),
1498        )
1499
1500        Table(
1501            "b_differentorder",
1502            metadata,
1503            Column("id", Integer, primary_key=True),
1504            Column("a_id1", Integer),
1505            Column("a_id2", Integer),
1506            ForeignKeyConstraint(["a_id1", "a_id2"], ["a.id1", "a.id2"]),
1507        )
1508
1509    @classmethod
1510    def setup_classes(cls):
1511        class A(cls.Basic):
1512            pass
1513
1514        class B(cls.Basic):
1515            pass
1516
1517    def test_use_get_sameorder(self):
1518        mapper(self.classes.A, self.tables.a)
1519        m_b = mapper(
1520            self.classes.B,
1521            self.tables.b_sameorder,
1522            properties={"a": relationship(self.classes.A)},
1523        )
1524
1525        configure_mappers()
1526        is_true(m_b.relationships.a.strategy.use_get)
1527
1528    def test_use_get_reverseorder(self):
1529        mapper(self.classes.A, self.tables.a)
1530        m_b = mapper(
1531            self.classes.B,
1532            self.tables.b_differentorder,
1533            properties={"a": relationship(self.classes.A)},
1534        )
1535
1536        configure_mappers()
1537        is_true(m_b.relationships.a.strategy.use_get)
1538
1539    def test_dont_use_get_pj_is_different(self):
1540        mapper(self.classes.A, self.tables.a)
1541        m_b = mapper(
1542            self.classes.B,
1543            self.tables.b_sameorder,
1544            properties={
1545                "a": relationship(
1546                    self.classes.A,
1547                    primaryjoin=and_(
1548                        self.tables.a.c.id1 == self.tables.b_sameorder.c.a_id1,
1549                        self.tables.a.c.id2 == 12,
1550                    ),
1551                )
1552            },
1553        )
1554
1555        configure_mappers()
1556        is_false(m_b.relationships.a.strategy.use_get)
1557