1"""basic tests of lazy loaded attributes"""
2
3from sqlalchemy.testing import assert_raises
4import datetime
5from sqlalchemy.orm import attributes, exc as orm_exc, configure_mappers
6import sqlalchemy as sa
7from sqlalchemy import testing, and_, bindparam
8from sqlalchemy import Integer, String, ForeignKey, SmallInteger, Boolean
9from sqlalchemy import ForeignKeyConstraint
10from sqlalchemy.types import TypeDecorator
11from sqlalchemy.testing.schema import Table
12from sqlalchemy.testing.schema import Column
13from sqlalchemy import orm
14from sqlalchemy.orm import mapper, relationship, create_session, Session
15from sqlalchemy.testing import eq_, is_true, is_false
16from sqlalchemy.testing import fixtures
17from test.orm import _fixtures
18from sqlalchemy.testing.assertsql import CompiledSQL
19
20
21class LazyTest(_fixtures.FixtureTest):
22    run_inserts = 'once'
23    run_deletes = None
24
25    def test_basic(self):
26        users, Address, addresses, User = (
27            self.tables.users,
28            self.classes.Address,
29            self.tables.addresses,
30            self.classes.User)
31
32        mapper(User, users, properties={
33            'addresses': relationship(
34                mapper(Address, addresses), lazy='select')
35        })
36        sess = create_session()
37        q = sess.query(User)
38        eq_(
39            [User(id=7,
40                  addresses=[Address(id=1, email_address='jack@bean.com')])],
41            q.filter(users.c.id == 7).all()
42        )
43
44    def test_needs_parent(self):
45        """test the error raised when parent object is not bound."""
46
47        users, Address, addresses, User = (
48            self.tables.users,
49            self.classes.Address,
50            self.tables.addresses,
51            self.classes.User)
52
53        mapper(User, users, properties={
54            'addresses': relationship(
55                mapper(Address, addresses), lazy='select')
56        })
57        sess = create_session()
58        q = sess.query(User)
59        u = q.filter(users.c.id == 7).first()
60        sess.expunge(u)
61        assert_raises(orm_exc.DetachedInstanceError, getattr, u, 'addresses')
62
63    def test_orderby(self):
64        users, Address, addresses, User = (
65            self.tables.users,
66            self.classes.Address,
67            self.tables.addresses,
68            self.classes.User)
69
70        mapper(User, users, properties={
71            'addresses': relationship(
72                mapper(Address, addresses),
73                lazy='select', order_by=addresses.c.email_address),
74        })
75        q = create_session().query(User)
76        assert [
77            User(id=7, addresses=[
78                Address(id=1)
79            ]),
80            User(id=8, addresses=[
81                Address(id=3, email_address='ed@bettyboop.com'),
82                Address(id=4, email_address='ed@lala.com'),
83                Address(id=2, email_address='ed@wood.com')
84            ]),
85            User(id=9, addresses=[
86                Address(id=5)
87            ]),
88            User(id=10, addresses=[])
89        ] == q.all()
90
91    def test_orderby_secondary(self):
92        """tests that a regular mapper select on a single table can
93        order by a relationship to a second table"""
94
95        Address, addresses, users, User = (
96            self.classes.Address,
97            self.tables.addresses,
98            self.tables.users,
99            self.classes.User)
100
101        mapper(Address, addresses)
102
103        mapper(User, users, properties=dict(
104            addresses=relationship(Address, lazy='select'),
105        ))
106        q = create_session().query(User)
107        result = q.filter(users.c.id == addresses.c.user_id).\
108            order_by(addresses.c.email_address).all()
109        assert [
110            User(id=8, addresses=[
111                Address(id=2, email_address='ed@wood.com'),
112                Address(id=3, email_address='ed@bettyboop.com'),
113                Address(id=4, email_address='ed@lala.com'),
114            ]),
115            User(id=9, addresses=[
116                Address(id=5)
117            ]),
118            User(id=7, addresses=[
119                Address(id=1)
120            ]),
121        ] == result
122
123    def test_orderby_desc(self):
124        Address, addresses, users, User = (
125            self.classes.Address,
126            self.tables.addresses,
127            self.tables.users,
128            self.classes.User)
129
130        mapper(Address, addresses)
131
132        mapper(User, users, properties=dict(
133            addresses=relationship(
134                Address, lazy='select',
135                order_by=[sa.desc(addresses.c.email_address)]),
136        ))
137        sess = create_session()
138        assert [
139            User(id=7, addresses=[
140                Address(id=1)
141            ]),
142            User(id=8, addresses=[
143                Address(id=2, email_address='ed@wood.com'),
144                Address(id=4, email_address='ed@lala.com'),
145                Address(id=3, email_address='ed@bettyboop.com'),
146            ]),
147            User(id=9, addresses=[
148                Address(id=5)
149            ]),
150            User(id=10, addresses=[])
151        ] == sess.query(User).all()
152
153    def test_no_orphan(self):
154        """test that a lazily loaded child object is not marked as an orphan"""
155
156        users, Address, addresses, User = (
157            self.tables.users,
158            self.classes.Address,
159            self.tables.addresses,
160            self.classes.User)
161
162        mapper(User, users, properties={
163            'addresses': relationship(
164                Address, cascade="all,delete-orphan", lazy='select')
165        })
166        mapper(Address, addresses)
167
168        sess = create_session()
169        user = sess.query(User).get(7)
170        assert getattr(User, 'addresses').hasparent(
171            attributes.instance_state(user.addresses[0]), optimistic=True)
172        assert not sa.orm.class_mapper(Address)._is_orphan(
173            attributes.instance_state(user.addresses[0]))
174
175    def test_limit(self):
176        """test limit operations combined with lazy-load relationships."""
177
178        users, items, order_items, orders, Item, \
179            User, Address, Order, addresses = (
180                self.tables.users,
181                self.tables.items,
182                self.tables.order_items,
183                self.tables.orders,
184                self.classes.Item,
185                self.classes.User,
186                self.classes.Address,
187                self.classes.Order,
188                self.tables.addresses)
189
190        mapper(Item, items)
191        mapper(Order, orders, properties={
192            'items': relationship(Item, secondary=order_items, lazy='select')
193        })
194        mapper(User, users, properties={
195            'addresses': relationship(
196                mapper(Address, addresses), lazy='select'),
197            'orders': relationship(Order, lazy='select')
198        })
199
200        sess = create_session()
201        q = sess.query(User)
202
203        if testing.against('mssql'):
204            result = q.limit(2).all()
205            assert self.static.user_all_result[:2] == result
206        else:
207            result = q.limit(2).offset(1).all()
208            assert self.static.user_all_result[1:3] == result
209
210    def test_distinct(self):
211        users, items, order_items, orders, \
212            Item, User, Address, Order, addresses = (
213                self.tables.users,
214                self.tables.items,
215                self.tables.order_items,
216                self.tables.orders,
217                self.classes.Item,
218                self.classes.User,
219                self.classes.Address,
220                self.classes.Order,
221                self.tables.addresses)
222
223        mapper(Item, items)
224        mapper(Order, orders, properties={
225            'items': relationship(Item, secondary=order_items, lazy='select')
226        })
227        mapper(User, users, properties={
228            'addresses': relationship(
229                mapper(Address, addresses), lazy='select'),
230            'orders': relationship(Order, lazy='select')
231        })
232
233        sess = create_session()
234        q = sess.query(User)
235
236        # use a union all to get a lot of rows to join against
237        u2 = users.alias('u2')
238        s = sa.union_all(
239            u2.select(use_labels=True),
240            u2.select(use_labels=True), u2.select(use_labels=True)).alias('u')
241        result = q.filter(s.c.u2_id == User.id).order_by(User.id).distinct() \
242            .all()
243        eq_(self.static.user_all_result, result)
244
245    def test_uselist_false_warning(self):
246        """test that multiple rows received by a
247        uselist=False raises a warning."""
248
249        User, users, orders, Order = (
250            self.classes.User,
251            self.tables.users,
252            self.tables.orders,
253            self.classes.Order)
254
255        mapper(User, users, properties={
256            'order': relationship(Order, uselist=False)
257        })
258        mapper(Order, orders)
259        s = create_session()
260        u1 = s.query(User).filter(User.id == 7).one()
261        assert_raises(sa.exc.SAWarning, getattr, u1, 'order')
262
263    def test_callable_bind(self):
264        Address, addresses, users, User = (
265            self.classes.Address,
266            self.tables.addresses,
267            self.tables.users,
268            self.classes.User)
269
270        mapper(User, users, properties=dict(
271            addresses=relationship(
272                mapper(Address, addresses),
273                lazy='select',
274                primaryjoin=and_(
275                    users.c.id == addresses.c.user_id,
276                    users.c.name == bindparam("name", callable_=lambda: "ed")
277                )
278            )
279        ))
280
281        s = Session()
282        ed = s.query(User).filter_by(name='ed').one()
283        eq_(ed.addresses, [
284            Address(id=2, user_id=8),
285            Address(id=3, user_id=8),
286            Address(id=4, user_id=8)
287        ])
288
289        fred = s.query(User).filter_by(name='fred').one()
290        eq_(fred.addresses, [])  # fred is missing
291
292    def test_one_to_many_scalar(self):
293        Address, addresses, users, User = (
294            self.classes.Address,
295            self.tables.addresses,
296            self.tables.users,
297            self.classes.User)
298
299        mapper(User, users, properties=dict(
300            address=relationship(
301                mapper(Address, addresses), lazy='select', uselist=False)
302        ))
303        q = create_session().query(User)
304        result = q.filter(users.c.id == 7).all()
305        assert [User(id=7, address=Address(id=1))] == result
306
307    def test_many_to_one_binds(self):
308        Address, addresses, users, User = (
309            self.classes.Address,
310            self.tables.addresses,
311            self.tables.users,
312            self.classes.User)
313
314        mapper(Address, addresses,
315               primary_key=[addresses.c.user_id, addresses.c.email_address])
316
317        mapper(User, users, properties=dict(
318            address=relationship(
319                Address, uselist=False,
320                primaryjoin=sa.and_(
321                    users.c.id == addresses.c.user_id,
322                    addresses.c.email_address == 'ed@bettyboop.com'))
323        ))
324        q = create_session().query(User)
325        eq_(
326            [
327                User(id=7, address=None),
328                User(id=8, address=Address(id=3)),
329                User(id=9, address=None),
330                User(id=10, address=None),
331            ],
332            list(q)
333        )
334
335    def test_double(self):
336        """tests lazy loading with two relationships simultaneously,
337        from the same table, using aliases.  """
338
339        users, orders, User, Address, Order, addresses = (
340            self.tables.users,
341            self.tables.orders,
342            self.classes.User,
343            self.classes.Address,
344            self.classes.Order,
345            self.tables.addresses)
346
347        openorders = sa.alias(orders, 'openorders')
348        closedorders = sa.alias(orders, 'closedorders')
349
350        mapper(Address, addresses)
351
352        mapper(Order, orders)
353
354        open_mapper = mapper(Order, openorders, non_primary=True)
355        closed_mapper = mapper(Order, closedorders, non_primary=True)
356        mapper(User, users, properties=dict(
357            addresses=relationship(Address, lazy=True),
358            open_orders=relationship(
359                open_mapper,
360                primaryjoin=sa.and_(
361                    openorders.c.isopen == 1,
362                    users.c.id == openorders.c.user_id), lazy='select'),
363            closed_orders=relationship(
364                closed_mapper,
365                primaryjoin=sa.and_(
366                    closedorders.c.isopen == 0,
367                    users.c.id == closedorders.c.user_id), lazy='select')
368        ))
369        q = create_session().query(User)
370
371        assert [
372            User(
373                id=7,
374                addresses=[Address(id=1)],
375                open_orders=[Order(id=3)],
376                closed_orders=[Order(id=1), Order(id=5)]
377            ),
378            User(
379                id=8,
380                addresses=[Address(id=2), Address(id=3), Address(id=4)],
381                open_orders=[],
382                closed_orders=[]
383            ),
384            User(
385                id=9,
386                addresses=[Address(id=5)],
387                open_orders=[Order(id=4)],
388                closed_orders=[Order(id=2)]
389            ),
390            User(id=10)
391
392        ] == q.all()
393
394        sess = create_session()
395        user = sess.query(User).get(7)
396        eq_(
397            [Order(id=1), Order(id=5)],
398            create_session().query(closed_mapper).with_parent(
399                user, property='closed_orders').all()
400        )
401        eq_(
402            [Order(id=3)],
403            create_session().query(open_mapper).
404            with_parent(user, property='open_orders').all()
405        )
406
407    def test_many_to_many(self):
408        keywords, items, item_keywords, Keyword, Item = (
409            self.tables.keywords,
410            self.tables.items,
411            self.tables.item_keywords,
412            self.classes.Keyword,
413            self.classes.Item)
414
415        mapper(Keyword, keywords)
416        mapper(Item, items, properties=dict(
417            keywords=relationship(
418                Keyword, secondary=item_keywords, lazy='select'),
419        ))
420
421        q = create_session().query(Item)
422        assert self.static.item_keyword_result == q.all()
423
424        eq_(
425            self.static.item_keyword_result[0:2],
426            q.join('keywords').filter(keywords.c.name == 'red').all()
427        )
428
429    def test_uses_get(self):
430        """test that a simple many-to-one lazyload optimizes
431        to use query.get()."""
432
433        Address, addresses, users, User = (
434            self.classes.Address,
435            self.tables.addresses,
436            self.tables.users,
437            self.classes.User)
438
439        for pj in (
440            None,
441            users.c.id == addresses.c.user_id,
442            addresses.c.user_id == users.c.id
443        ):
444            mapper(Address, addresses, properties=dict(
445                user=relationship(
446                    mapper(User, users), lazy='select', primaryjoin=pj)
447            ))
448
449            sess = create_session()
450
451            # load address
452            a1 = sess.query(Address).\
453                filter_by(email_address="ed@wood.com").one()
454
455            # load user that is attached to the address
456            u1 = sess.query(User).get(8)
457
458            def go():
459                # lazy load of a1.user should get it from the session
460                assert a1.user is u1
461            self.assert_sql_count(testing.db, go, 0)
462            sa.orm.clear_mappers()
463
464    def test_uses_get_compatible_types(self):
465        """test the use_get optimization with compatible
466        but non-identical types"""
467
468        User, Address = self.classes.User, self.classes.Address
469
470        class IntDecorator(TypeDecorator):
471            impl = Integer
472
473        class SmallintDecorator(TypeDecorator):
474            impl = SmallInteger
475
476        class SomeDBInteger(sa.Integer):
477            pass
478
479        for tt in [
480            Integer,
481            SmallInteger,
482            IntDecorator,
483            SmallintDecorator,
484            SomeDBInteger,
485        ]:
486            m = sa.MetaData()
487            users = Table(
488                'users', m,
489                Column(
490                    'id', Integer, primary_key=True,
491                    test_needs_autoincrement=True),
492                Column('name', String(30), nullable=False),
493            )
494            addresses = Table(
495                'addresses', m,
496                Column(
497                    'id', Integer, primary_key=True,
498                    test_needs_autoincrement=True),
499                Column('user_id', tt, ForeignKey('users.id')),
500                Column('email_address', String(50), nullable=False),
501            )
502
503            mapper(Address, addresses, properties=dict(
504                user=relationship(mapper(User, users))
505            ))
506
507            sess = create_session(bind=testing.db)
508
509            # load address
510            a1 = sess.query(Address).\
511                filter_by(email_address="ed@wood.com").one()
512
513            # load user that is attached to the address
514            u1 = sess.query(User).get(8)
515
516            def go():
517                # lazy load of a1.user should get it from the session
518                assert a1.user is u1
519            self.assert_sql_count(testing.db, go, 0)
520            sa.orm.clear_mappers()
521
522    def test_many_to_one(self):
523        users, Address, addresses, User = (
524            self.tables.users,
525            self.classes.Address,
526            self.tables.addresses,
527            self.classes.User)
528
529        mapper(Address, addresses, properties=dict(
530            user=relationship(mapper(User, users), lazy='select')
531        ))
532        sess = create_session()
533        q = sess.query(Address)
534        a = q.filter(addresses.c.id == 1).one()
535
536        assert a.user is not None
537
538        u1 = sess.query(User).get(7)
539
540        assert a.user is u1
541
542    def test_backrefs_dont_lazyload(self):
543        users, Address, addresses, User = (
544            self.tables.users,
545            self.classes.Address,
546            self.tables.addresses,
547            self.classes.User)
548
549        mapper(User, users, properties={
550            'addresses': relationship(Address, backref='user')
551        })
552        mapper(Address, addresses)
553        sess = create_session()
554        ad = sess.query(Address).filter_by(id=1).one()
555        assert ad.user.id == 7
556
557        def go():
558            ad.user = None
559            assert ad.user is None
560        self.assert_sql_count(testing.db, go, 0)
561
562        u1 = sess.query(User).filter_by(id=7).one()
563
564        def go():
565            assert ad not in u1.addresses
566        self.assert_sql_count(testing.db, go, 1)
567
568        sess.expire(u1, ['addresses'])
569
570        def go():
571            assert ad in u1.addresses
572        self.assert_sql_count(testing.db, go, 1)
573
574        sess.expire(u1, ['addresses'])
575        ad2 = Address()
576
577        def go():
578            ad2.user = u1
579            assert ad2.user is u1
580        self.assert_sql_count(testing.db, go, 0)
581
582        def go():
583            assert ad2 in u1.addresses
584        self.assert_sql_count(testing.db, go, 1)
585
586
587class GetterStateTest(_fixtures.FixtureTest):
588
589    """test lazyloader on non-existent attribute returns
590    expected attribute symbols, maintain expected state"""
591
592    run_inserts = None
593
594    def _unhashable_fixture(self, metadata, load_on_pending=False):
595        class MyHashType(sa.TypeDecorator):
596            impl = sa.String(100)
597
598            def process_bind_param(self, value, dialect):
599                return ";".join(
600                    "%s=%s" % (k, v)
601                       for k, v in
602                       sorted(value.items(), key=lambda key: key[0]))
603
604            def process_result_value(self, value, dialect):
605                return dict(elem.split("=", 1) for elem in value.split(";"))
606
607        category = Table(
608            'category', metadata,
609            Column('id', Integer, primary_key=True),
610            Column('data', MyHashType())
611        )
612        article = Table(
613            'article', metadata,
614            Column('id', Integer, primary_key=True),
615            Column('data', MyHashType())
616        )
617
618        class Category(fixtures.ComparableEntity):
619            pass
620
621        class Article(fixtures.ComparableEntity):
622            pass
623
624        mapper(Category, category)
625        mapper(Article, article, properties={
626            "category": relationship(
627                Category,
628                primaryjoin=orm.foreign(article.c.data) == category.c.data,
629                load_on_pending=load_on_pending
630            )
631        })
632
633        metadata.create_all()
634        sess = Session(autoflush=False)
635        data = {"im": "unhashable"}
636        a1 = Article(id=1, data=data)
637        c1 = Category(id=1, data=data)
638        if load_on_pending:
639            sess.add(c1)
640        else:
641            sess.add_all([c1, a1])
642        sess.flush()
643        if load_on_pending:
644            sess.add(a1)
645        return Category, Article, sess, a1, c1
646
647    def _u_ad_fixture(self, populate_user, dont_use_get=False):
648        users, Address, addresses, User = (
649            self.tables.users,
650            self.classes.Address,
651            self.tables.addresses,
652            self.classes.User)
653
654        mapper(User, users, properties={
655            'addresses': relationship(Address, back_populates='user')
656        })
657        mapper(Address, addresses, properties={
658            'user': relationship(
659                User,
660                primaryjoin=and_(
661                    users.c.id == addresses.c.user_id, users.c.id != 27)
662                if dont_use_get else None,
663                back_populates='addresses'
664            )
665        })
666
667        sess = create_session()
668        a1 = Address(email_address='a1')
669        sess.add(a1)
670        if populate_user:
671            a1.user = User(name='ed')
672        sess.flush()
673        if populate_user:
674            sess.expire_all()
675        return User, Address, sess, a1
676
677    def test_no_use_get_params_missing(self):
678        User, Address, sess, a1 = self._u_ad_fixture(False, True)
679
680        def go():
681            eq_(a1.user, None)
682
683        # doesn't emit SQL
684        self.assert_sql_count(
685            testing.db,
686            go,
687            0
688        )
689
690    @testing.provide_metadata
691    def test_no_use_get_params_not_hashable(self):
692        Category, Article, sess, a1, c1 = \
693            self._unhashable_fixture(self.metadata)
694
695        def go():
696            eq_(a1.category, c1)
697
698        self.assert_sql_count(
699            testing.db,
700            go,
701            1
702        )
703
704    @testing.provide_metadata
705    def test_no_use_get_params_not_hashable_on_pending(self):
706        Category, Article, sess, a1, c1 = \
707            self._unhashable_fixture(self.metadata, load_on_pending=True)
708
709        def go():
710            eq_(a1.category, c1)
711
712        self.assert_sql_count(
713            testing.db,
714            go,
715            1
716        )
717
718    def test_get_empty_passive_return_never_set(self):
719        User, Address, sess, a1 = self._u_ad_fixture(False)
720        eq_(
721            Address.user.impl.get(
722                attributes.instance_state(a1),
723                attributes.instance_dict(a1),
724                passive=attributes.PASSIVE_RETURN_NEVER_SET),
725            attributes.NEVER_SET
726        )
727        assert 'user_id' not in a1.__dict__
728        assert 'user' not in a1.__dict__
729
730    def test_history_empty_passive_return_never_set(self):
731        User, Address, sess, a1 = self._u_ad_fixture(False)
732        eq_(
733            Address.user.impl.get_history(
734                attributes.instance_state(a1),
735                attributes.instance_dict(a1),
736                passive=attributes.PASSIVE_RETURN_NEVER_SET),
737            ((), (), ())
738        )
739        assert 'user_id' not in a1.__dict__
740        assert 'user' not in a1.__dict__
741
742    def test_get_empty_passive_no_initialize(self):
743        User, Address, sess, a1 = self._u_ad_fixture(False)
744        eq_(
745            Address.user.impl.get(
746                attributes.instance_state(a1),
747                attributes.instance_dict(a1),
748                passive=attributes.PASSIVE_NO_INITIALIZE),
749            attributes.PASSIVE_NO_RESULT
750        )
751        assert 'user_id' not in a1.__dict__
752        assert 'user' not in a1.__dict__
753
754    def test_history_empty_passive_no_initialize(self):
755        User, Address, sess, a1 = self._u_ad_fixture(False)
756        eq_(
757            Address.user.impl.get_history(
758                attributes.instance_state(a1),
759                attributes.instance_dict(a1),
760                passive=attributes.PASSIVE_NO_INITIALIZE),
761            attributes.HISTORY_BLANK
762        )
763        assert 'user_id' not in a1.__dict__
764        assert 'user' not in a1.__dict__
765
766    def test_get_populated_passive_no_initialize(self):
767        User, Address, sess, a1 = self._u_ad_fixture(True)
768        eq_(
769            Address.user.impl.get(
770                attributes.instance_state(a1),
771                attributes.instance_dict(a1),
772                passive=attributes.PASSIVE_NO_INITIALIZE),
773            attributes.PASSIVE_NO_RESULT
774        )
775        assert 'user_id' not in a1.__dict__
776        assert 'user' not in a1.__dict__
777
778    def test_history_populated_passive_no_initialize(self):
779        User, Address, sess, a1 = self._u_ad_fixture(True)
780        eq_(
781            Address.user.impl.get_history(
782                attributes.instance_state(a1),
783                attributes.instance_dict(a1),
784                passive=attributes.PASSIVE_NO_INITIALIZE),
785            attributes.HISTORY_BLANK
786        )
787        assert 'user_id' not in a1.__dict__
788        assert 'user' not in a1.__dict__
789
790    def test_get_populated_passive_return_never_set(self):
791        User, Address, sess, a1 = self._u_ad_fixture(True)
792        eq_(
793            Address.user.impl.get(
794                attributes.instance_state(a1),
795                attributes.instance_dict(a1),
796                passive=attributes.PASSIVE_RETURN_NEVER_SET),
797            User(name='ed')
798        )
799
800    def test_history_populated_passive_return_never_set(self):
801        User, Address, sess, a1 = self._u_ad_fixture(True)
802        eq_(
803            Address.user.impl.get_history(
804                attributes.instance_state(a1),
805                attributes.instance_dict(a1),
806                passive=attributes.PASSIVE_RETURN_NEVER_SET),
807            ((), [User(name='ed'), ], ())
808        )
809
810
811class M2OGetTest(_fixtures.FixtureTest):
812    run_inserts = 'once'
813    run_deletes = None
814
815    def test_m2o_noload(self):
816        """test that a NULL foreign key doesn't trigger a lazy load"""
817
818        users, Address, addresses, User = (
819            self.tables.users,
820            self.classes.Address,
821            self.tables.addresses,
822            self.classes.User)
823
824        mapper(User, users)
825
826        mapper(Address, addresses, properties={
827            'user': relationship(User)
828        })
829
830        sess = create_session()
831        ad1 = Address(email_address='somenewaddress', id=12)
832        sess.add(ad1)
833        sess.flush()
834        sess.expunge_all()
835
836        ad2 = sess.query(Address).get(1)
837        ad3 = sess.query(Address).get(ad1.id)
838
839        def go():
840            # one lazy load
841            assert ad2.user.name == 'jack'
842            # no lazy load
843            assert ad3.user is None
844        self.assert_sql_count(testing.db, go, 1)
845
846
847class CorrelatedTest(fixtures.MappedTest):
848
849    @classmethod
850    def define_tables(self, meta):
851        Table('user_t', meta,
852              Column('id', Integer, primary_key=True),
853              Column('name', String(50)))
854
855        Table('stuff', meta,
856              Column('id', Integer, primary_key=True),
857              Column('date', sa.Date),
858              Column('user_id', Integer, ForeignKey('user_t.id')))
859
860    @classmethod
861    def insert_data(cls):
862        stuff, user_t = cls.tables.stuff, cls.tables.user_t
863
864        user_t.insert().execute(
865            {'id': 1, 'name': 'user1'},
866            {'id': 2, 'name': 'user2'},
867            {'id': 3, 'name': 'user3'})
868
869        stuff.insert().execute(
870            {'id': 1, 'user_id': 1, 'date': datetime.date(2007, 10, 15)},
871            {'id': 2, 'user_id': 1, 'date': datetime.date(2007, 12, 15)},
872            {'id': 3, 'user_id': 1, 'date': datetime.date(2007, 11, 15)},
873            {'id': 4, 'user_id': 2, 'date': datetime.date(2008, 1, 15)},
874            {'id': 5, 'user_id': 3, 'date': datetime.date(2007, 6, 15)})
875
876    def test_correlated_lazyload(self):
877        stuff, user_t = self.tables.stuff, self.tables.user_t
878
879        class User(fixtures.ComparableEntity):
880            pass
881
882        class Stuff(fixtures.ComparableEntity):
883            pass
884
885        mapper(Stuff, stuff)
886
887        stuff_view = sa.select([stuff.c.id]).\
888            where(stuff.c.user_id == user_t.c.id).correlate(user_t).\
889            order_by(sa.desc(stuff.c.date)).limit(1)
890
891        mapper(User, user_t, properties={
892            'stuff': relationship(
893                Stuff,
894                primaryjoin=sa.and_(
895                    user_t.c.id == stuff.c.user_id,
896                    stuff.c.id == (stuff_view.as_scalar())))
897        })
898
899        sess = create_session()
900
901        eq_(
902            sess.query(User).all(),
903            [
904                User(
905                    name='user1',
906                    stuff=[Stuff(date=datetime.date(2007, 12, 15), id=2)]),
907                User(
908                    name='user2',
909                    stuff=[Stuff(id=4, date=datetime.date(2008, 1, 15))]),
910                User(
911                    name='user3',
912                    stuff=[Stuff(id=5, date=datetime.date(2007, 6, 15))])
913            ]
914        )
915
916
917class O2MWOSideFixedTest(fixtures.MappedTest):
918    # test #2948 - o2m backref with a "m2o does/does not count"
919    # criteria doesn't scan the "o" table
920
921    @classmethod
922    def define_tables(self, meta):
923        Table('city', meta,
924              Column('id', Integer, primary_key=True),
925              Column('deleted', Boolean),
926              )
927        Table('person', meta,
928              Column('id', Integer, primary_key=True),
929              Column('city_id', ForeignKey('city.id'))
930              )
931
932    @classmethod
933    def setup_classes(cls):
934        class Person(cls.Basic):
935            pass
936
937        class City(cls.Basic):
938            pass
939
940    @classmethod
941    def setup_mappers(cls):
942        Person, City = cls.classes.Person, cls.classes.City
943        city, person = cls.tables.city, cls.tables.person
944
945        mapper(Person, person, properties={
946            'city': relationship(City,
947                                 primaryjoin=and_(
948                                     person.c.city_id == city.c.id,
949                                     city.c.deleted == False),  # noqa
950                                 backref='people')
951        })
952        mapper(City, city)
953
954    def _fixture(self, include_other):
955        city, person = self.tables.city, self.tables.person
956
957        if include_other:
958            city.insert().execute(
959                {"id": 1, "deleted": False},
960            )
961
962            person.insert().execute(
963                {"id": 1, "city_id": 1},
964                {"id": 2, "city_id": 1},
965            )
966
967        city.insert().execute(
968            {"id": 2, "deleted": True},
969        )
970
971        person.insert().execute(
972            {"id": 3, "city_id": 2},
973            {"id": 4, "city_id": 2},
974        )
975
976    def test_lazyload_assert_expected_sql(self):
977        self._fixture(True)
978        City = self.classes.City
979        sess = Session(testing.db)
980        c1, c2 = sess.query(City).order_by(City.id).all()
981
982        def go():
983            eq_(
984                [p.id for p in c2.people],
985                []
986            )
987
988        self.assert_sql_execution(
989            testing.db,
990            go,
991            CompiledSQL(
992                "SELECT person.id AS person_id, person.city_id AS "
993                "person_city_id FROM person "
994                "WHERE person.city_id = :param_1 AND :param_2 = 0",
995                {"param_1": 2, "param_2": 1}
996            )
997        )
998
999    def test_lazyload_people_other_exists(self):
1000        self._fixture(True)
1001        City = self.classes.City
1002        sess = Session(testing.db)
1003        c1, c2 = sess.query(City).order_by(City.id).all()
1004        eq_(
1005            [p.id for p in c1.people],
1006            [1, 2]
1007        )
1008
1009        eq_(
1010            [p.id for p in c2.people],
1011            []
1012        )
1013
1014    def test_lazyload_people_no_other_exists(self):
1015        # note that if we revert #2948, *this still passes!*
1016        # e.g. due to the scan of the "o" table, whether or not *another*
1017        # row exists determines if this works.
1018
1019        self._fixture(False)
1020        City = self.classes.City
1021        sess = Session(testing.db)
1022        c2, = sess.query(City).order_by(City.id).all()
1023
1024        eq_(
1025            [p.id for p in c2.people],
1026            []
1027        )
1028
1029
1030class RefersToSelfLazyLoadInterferenceTest(fixtures.MappedTest):
1031    """Test [issue:3145].
1032
1033    This involves an object that refers to itself, which isn't
1034    entirely a supported use case.   Here, we're able to fix it,
1035    but long term it's not clear if future needs will affect this.
1036    The use case is not super-critical.
1037
1038    """
1039
1040    @classmethod
1041    def define_tables(cls, metadata):
1042        Table(
1043            'a', metadata,
1044            Column('a_id', Integer, primary_key=True),
1045            Column('b_id', ForeignKey('b.b_id')),
1046        )
1047
1048        Table(
1049            'b', metadata,
1050            Column('b_id', Integer, primary_key=True),
1051            Column('parent_id', ForeignKey('b.b_id')),
1052        )
1053
1054        Table(
1055            'c', metadata,
1056            Column('c_id', Integer, primary_key=True),
1057            Column('b_id', ForeignKey('b.b_id')),
1058        )
1059
1060    @classmethod
1061    def setup_classes(cls):
1062        class A(cls.Basic):
1063            pass
1064
1065        class B(cls.Basic):
1066            pass
1067
1068        class C(cls.Basic):
1069            pass
1070
1071    @classmethod
1072    def setup_mappers(cls):
1073        mapper(cls.classes.A, cls.tables.a, properties={
1074            "b": relationship(cls.classes.B)
1075        })
1076        bm = mapper(cls.classes.B, cls.tables.b, properties={
1077            "parent": relationship(
1078                cls.classes.B, remote_side=cls.tables.b.c.b_id),
1079            "zc": relationship(cls.classes.C)
1080        })
1081        mapper(cls.classes.C, cls.tables.c)
1082
1083        bmp = bm._props
1084        configure_mappers()
1085        # Bug is order-dependent, must sort the "zc" property to the end
1086        bmp.sort()
1087
1088    def test_lazy_doesnt_interfere(self):
1089        A, B, C = self.classes("A", "B", "C")
1090
1091        session = Session()
1092        b = B()
1093        session.add(b)
1094        session.flush()
1095
1096        b.parent_id = b.b_id
1097
1098        b.zc.append(C())
1099        b.zc.append(C())
1100        session.commit()
1101
1102        # If the bug is here, the next line throws an exception
1103        session.query(B).options(
1104            sa.orm.joinedload('parent').joinedload('zc')).all()
1105
1106
1107class TypeCoerceTest(fixtures.MappedTest, testing.AssertsExecutionResults,):
1108    """ORM-level test for [ticket:3531]"""
1109
1110    # mysql is having a recursion issue in the bind_expression
1111    __only_on__ = ('sqlite', 'postgresql')
1112
1113    class StringAsInt(TypeDecorator):
1114        impl = String(50)
1115
1116        def column_expression(self, col):
1117            return sa.cast(col, Integer)
1118
1119        def bind_expression(self, col):
1120            return sa.cast(col, String)
1121
1122    @classmethod
1123    def define_tables(cls, metadata):
1124        Table(
1125            'person', metadata,
1126            Column("id", cls.StringAsInt, primary_key=True),
1127        )
1128        Table(
1129            "pets", metadata,
1130            Column("id", Integer, primary_key=True),
1131            Column("person_id", Integer),
1132        )
1133
1134    @classmethod
1135    def setup_classes(cls):
1136        class Person(cls.Basic):
1137            pass
1138
1139        class Pet(cls.Basic):
1140            pass
1141
1142    @classmethod
1143    def setup_mappers(cls):
1144        mapper(cls.classes.Person, cls.tables.person, properties=dict(
1145            pets=relationship(
1146                cls.classes.Pet, primaryjoin=(
1147                    orm.foreign(cls.tables.pets.c.person_id) ==
1148                    sa.cast(
1149                        sa.type_coerce(cls.tables.person.c.id, Integer),
1150                        Integer
1151                    )
1152                )
1153            )
1154        ))
1155
1156        mapper(cls.classes.Pet, cls.tables.pets)
1157
1158    def test_lazyload_singlecast(self):
1159        Person = self.classes.Person
1160        Pet = self.classes.Pet
1161
1162        s = Session()
1163        s.add_all([
1164            Person(id=5), Pet(id=1, person_id=5)
1165        ])
1166        s.commit()
1167
1168        p1 = s.query(Person).first()
1169
1170        with self.sql_execution_asserter() as asserter:
1171            p1.pets
1172
1173        asserter.assert_(
1174            CompiledSQL(
1175                "SELECT pets.id AS pets_id, pets.person_id "
1176                "AS pets_person_id FROM pets "
1177                "WHERE pets.person_id = CAST(:param_1 AS INTEGER)",
1178                [{'param_1': 5}]
1179            )
1180        )
1181
1182
1183class CompositeSimpleM2OTest(fixtures.MappedTest):
1184    """ORM-level test for [ticket:3788]"""
1185
1186    @classmethod
1187    def define_tables(cls, metadata):
1188        Table(
1189            'a', metadata,
1190            Column("id1", Integer, primary_key=True),
1191            Column("id2", Integer, primary_key=True),
1192        )
1193
1194        Table(
1195            "b_sameorder", metadata,
1196            Column("id", Integer, primary_key=True),
1197            Column('a_id1', Integer),
1198            Column('a_id2', Integer),
1199            ForeignKeyConstraint(['a_id1', 'a_id2'], ['a.id1', 'a.id2'])
1200        )
1201
1202        Table(
1203            "b_differentorder", metadata,
1204            Column("id", Integer, primary_key=True),
1205            Column('a_id1', Integer),
1206            Column('a_id2', Integer),
1207            ForeignKeyConstraint(['a_id1', 'a_id2'], ['a.id1', 'a.id2'])
1208        )
1209
1210    @classmethod
1211    def setup_classes(cls):
1212        class A(cls.Basic):
1213            pass
1214
1215        class B(cls.Basic):
1216            pass
1217
1218    def test_use_get_sameorder(self):
1219        mapper(self.classes.A, self.tables.a)
1220        m_b = mapper(self.classes.B, self.tables.b_sameorder, properties={
1221            'a': relationship(self.classes.A)
1222        })
1223
1224        configure_mappers()
1225        is_true(m_b.relationships.a.strategy.use_get)
1226
1227    def test_use_get_reverseorder(self):
1228        mapper(self.classes.A, self.tables.a)
1229        m_b = mapper(self.classes.B, self.tables.b_differentorder, properties={
1230            'a': relationship(self.classes.A)
1231        })
1232
1233        configure_mappers()
1234        is_true(m_b.relationships.a.strategy.use_get)
1235
1236    def test_dont_use_get_pj_is_different(self):
1237        mapper(self.classes.A, self.tables.a)
1238        m_b = mapper(self.classes.B, self.tables.b_sameorder, properties={
1239            'a': relationship(self.classes.A, primaryjoin=and_(
1240                self.tables.a.c.id1 == self.tables.b_sameorder.c.a_id1,
1241                self.tables.a.c.id2 == 12
1242            ))
1243        })
1244
1245        configure_mappers()
1246        is_false(m_b.relationships.a.strategy.use_get)
1247