1"""basic tests of lazy loaded attributes"""
2
3from sqlalchemy.testing import assert_raises
4import datetime
5from sqlalchemy.orm import attributes, exc as orm_exc, configure_mappers
6import sqlalchemy as sa
7from sqlalchemy import testing, and_
8from sqlalchemy import Integer, String, ForeignKey, SmallInteger, Boolean
9from sqlalchemy.types import TypeDecorator
10from sqlalchemy.testing.schema import Table
11from sqlalchemy.testing.schema import Column
12from sqlalchemy import orm
13from sqlalchemy.orm import mapper, relationship, create_session, Session
14from sqlalchemy.testing import eq_
15from sqlalchemy.testing import fixtures
16from test.orm import _fixtures
17from sqlalchemy.testing.assertsql import CompiledSQL
18
19
20class LazyTest(_fixtures.FixtureTest):
21    run_inserts = 'once'
22    run_deletes = None
23
24    def test_basic(self):
25        users, Address, addresses, User = (
26            self.tables.users,
27            self.classes.Address,
28            self.tables.addresses,
29            self.classes.User)
30
31        mapper(User, users, properties={
32            'addresses': relationship(
33                mapper(Address, addresses), lazy='select')
34        })
35        sess = create_session()
36        q = sess.query(User)
37        eq_(
38            [User(id=7,
39                  addresses=[Address(id=1, email_address='jack@bean.com')])],
40            q.filter(users.c.id == 7).all()
41        )
42
43    def test_needs_parent(self):
44        """test the error raised when parent object is not bound."""
45
46        users, Address, addresses, User = (
47            self.tables.users,
48            self.classes.Address,
49            self.tables.addresses,
50            self.classes.User)
51
52        mapper(User, users, properties={
53            'addresses': relationship(
54                mapper(Address, addresses), lazy='select')
55        })
56        sess = create_session()
57        q = sess.query(User)
58        u = q.filter(users.c.id == 7).first()
59        sess.expunge(u)
60        assert_raises(orm_exc.DetachedInstanceError, getattr, u, 'addresses')
61
62    def test_orderby(self):
63        users, Address, addresses, User = (
64            self.tables.users,
65            self.classes.Address,
66            self.tables.addresses,
67            self.classes.User)
68
69        mapper(User, users, properties={
70            'addresses': relationship(
71                mapper(Address, addresses),
72                lazy='select', order_by=addresses.c.email_address),
73        })
74        q = create_session().query(User)
75        assert [
76            User(id=7, addresses=[
77                Address(id=1)
78            ]),
79            User(id=8, addresses=[
80                Address(id=3, email_address='ed@bettyboop.com'),
81                Address(id=4, email_address='ed@lala.com'),
82                Address(id=2, email_address='ed@wood.com')
83            ]),
84            User(id=9, addresses=[
85                Address(id=5)
86            ]),
87            User(id=10, addresses=[])
88        ] == q.all()
89
90    def test_orderby_secondary(self):
91        """tests that a regular mapper select on a single table can
92        order by a relationship to a second table"""
93
94        Address, addresses, users, User = (
95            self.classes.Address,
96            self.tables.addresses,
97            self.tables.users,
98            self.classes.User)
99
100        mapper(Address, addresses)
101
102        mapper(User, users, properties=dict(
103            addresses=relationship(Address, lazy='select'),
104        ))
105        q = create_session().query(User)
106        l = q.filter(users.c.id == addresses.c.user_id).\
107            order_by(addresses.c.email_address).all()
108        assert [
109            User(id=8, addresses=[
110                Address(id=2, email_address='ed@wood.com'),
111                Address(id=3, email_address='ed@bettyboop.com'),
112                Address(id=4, email_address='ed@lala.com'),
113            ]),
114            User(id=9, addresses=[
115                Address(id=5)
116            ]),
117            User(id=7, addresses=[
118                Address(id=1)
119            ]),
120        ] == l
121
122    def test_orderby_desc(self):
123        Address, addresses, users, User = (
124            self.classes.Address,
125            self.tables.addresses,
126            self.tables.users,
127            self.classes.User)
128
129        mapper(Address, addresses)
130
131        mapper(User, users, properties=dict(
132            addresses=relationship(
133                Address, lazy='select',
134                order_by=[sa.desc(addresses.c.email_address)]),
135        ))
136        sess = create_session()
137        assert [
138            User(id=7, addresses=[
139                Address(id=1)
140            ]),
141            User(id=8, addresses=[
142                Address(id=2, email_address='ed@wood.com'),
143                Address(id=4, email_address='ed@lala.com'),
144                Address(id=3, email_address='ed@bettyboop.com'),
145            ]),
146            User(id=9, addresses=[
147                Address(id=5)
148            ]),
149            User(id=10, addresses=[])
150        ] == sess.query(User).all()
151
152    def test_no_orphan(self):
153        """test that a lazily loaded child object is not marked as an orphan"""
154
155        users, Address, addresses, User = (
156            self.tables.users,
157            self.classes.Address,
158            self.tables.addresses,
159            self.classes.User)
160
161        mapper(User, users, properties={
162            'addresses': relationship(
163                Address, cascade="all,delete-orphan", lazy='select')
164        })
165        mapper(Address, addresses)
166
167        sess = create_session()
168        user = sess.query(User).get(7)
169        assert getattr(User, 'addresses').hasparent(
170            attributes.instance_state(user.addresses[0]), optimistic=True)
171        assert not sa.orm.class_mapper(Address)._is_orphan(
172            attributes.instance_state(user.addresses[0]))
173
174    def test_limit(self):
175        """test limit operations combined with lazy-load relationships."""
176
177        users, items, order_items, orders, Item, \
178            User, Address, Order, addresses = (
179                self.tables.users,
180                self.tables.items,
181                self.tables.order_items,
182                self.tables.orders,
183                self.classes.Item,
184                self.classes.User,
185                self.classes.Address,
186                self.classes.Order,
187                self.tables.addresses)
188
189        mapper(Item, items)
190        mapper(Order, orders, properties={
191            'items': relationship(Item, secondary=order_items, lazy='select')
192        })
193        mapper(User, users, properties={
194            'addresses': relationship(
195                mapper(Address, addresses), lazy='select'),
196            'orders': relationship(Order, lazy='select')
197        })
198
199        sess = create_session()
200        q = sess.query(User)
201
202        if testing.against('mssql'):
203            l = q.limit(2).all()
204            assert self.static.user_all_result[:2] == l
205        else:
206            l = q.limit(2).offset(1).all()
207            assert self.static.user_all_result[1:3] == l
208
209    def test_distinct(self):
210        users, items, order_items, orders, \
211            Item, User, Address, Order, addresses = (
212                self.tables.users,
213                self.tables.items,
214                self.tables.order_items,
215                self.tables.orders,
216                self.classes.Item,
217                self.classes.User,
218                self.classes.Address,
219                self.classes.Order,
220                self.tables.addresses)
221
222        mapper(Item, items)
223        mapper(Order, orders, properties={
224            'items': relationship(Item, secondary=order_items, lazy='select')
225        })
226        mapper(User, users, properties={
227            'addresses': relationship(
228                mapper(Address, addresses), lazy='select'),
229            'orders': relationship(Order, lazy='select')
230        })
231
232        sess = create_session()
233        q = sess.query(User)
234
235        # use a union all to get a lot of rows to join against
236        u2 = users.alias('u2')
237        s = sa.union_all(
238            u2.select(use_labels=True),
239            u2.select(use_labels=True), u2.select(use_labels=True)).alias('u')
240        l = q.filter(s.c.u2_id == User.id).order_by(User.id).distinct().all()
241        eq_(self.static.user_all_result, l)
242
243    def test_uselist_false_warning(self):
244        """test that multiple rows received by a
245        uselist=False raises a warning."""
246
247        User, users, orders, Order = (
248            self.classes.User,
249            self.tables.users,
250            self.tables.orders,
251            self.classes.Order)
252
253        mapper(User, users, properties={
254            'order': relationship(Order, uselist=False)
255        })
256        mapper(Order, orders)
257        s = create_session()
258        u1 = s.query(User).filter(User.id == 7).one()
259        assert_raises(sa.exc.SAWarning, getattr, u1, 'order')
260
261    def test_one_to_many_scalar(self):
262        Address, addresses, users, User = (
263            self.classes.Address,
264            self.tables.addresses,
265            self.tables.users,
266            self.classes.User)
267
268        mapper(User, users, properties=dict(
269            address=relationship(
270                mapper(Address, addresses), lazy='select', uselist=False)
271        ))
272        q = create_session().query(User)
273        l = q.filter(users.c.id == 7).all()
274        assert [User(id=7, address=Address(id=1))] == l
275
276    def test_many_to_one_binds(self):
277        Address, addresses, users, User = (
278            self.classes.Address,
279            self.tables.addresses,
280            self.tables.users,
281            self.classes.User)
282
283        mapper(Address, addresses,
284               primary_key=[addresses.c.user_id, addresses.c.email_address])
285
286        mapper(User, users, properties=dict(
287            address=relationship(
288                Address, uselist=False,
289                primaryjoin=sa.and_(
290                    users.c.id == addresses.c.user_id,
291                    addresses.c.email_address == 'ed@bettyboop.com'))
292        ))
293        q = create_session().query(User)
294        eq_(
295            [
296                User(id=7, address=None),
297                User(id=8, address=Address(id=3)),
298                User(id=9, address=None),
299                User(id=10, address=None),
300            ],
301            list(q)
302        )
303
304    def test_double(self):
305        """tests lazy loading with two relationships simulatneously,
306        from the same table, using aliases.  """
307
308        users, orders, User, Address, Order, addresses = (
309            self.tables.users,
310            self.tables.orders,
311            self.classes.User,
312            self.classes.Address,
313            self.classes.Order,
314            self.tables.addresses)
315
316        openorders = sa.alias(orders, 'openorders')
317        closedorders = sa.alias(orders, 'closedorders')
318
319        mapper(Address, addresses)
320
321        mapper(Order, orders)
322
323        open_mapper = mapper(Order, openorders, non_primary=True)
324        closed_mapper = mapper(Order, closedorders, non_primary=True)
325        mapper(User, users, properties=dict(
326            addresses=relationship(Address, lazy=True),
327            open_orders=relationship(
328                open_mapper,
329                primaryjoin=sa.and_(
330                    openorders.c.isopen == 1,
331                    users.c.id == openorders.c.user_id), lazy='select'),
332            closed_orders=relationship(
333                closed_mapper,
334                primaryjoin=sa.and_(
335                    closedorders.c.isopen == 0,
336                    users.c.id == closedorders.c.user_id), lazy='select')
337        ))
338        q = create_session().query(User)
339
340        assert [
341            User(
342                id=7,
343                addresses=[Address(id=1)],
344                open_orders=[Order(id=3)],
345                closed_orders=[Order(id=1), Order(id=5)]
346            ),
347            User(
348                id=8,
349                addresses=[Address(id=2), Address(id=3), Address(id=4)],
350                open_orders=[],
351                closed_orders=[]
352            ),
353            User(
354                id=9,
355                addresses=[Address(id=5)],
356                open_orders=[Order(id=4)],
357                closed_orders=[Order(id=2)]
358            ),
359            User(id=10)
360
361        ] == q.all()
362
363        sess = create_session()
364        user = sess.query(User).get(7)
365        eq_(
366            [Order(id=1), Order(id=5)],
367            create_session().query(closed_mapper).with_parent(
368                user, property='closed_orders').all()
369        )
370        eq_(
371            [Order(id=3)],
372            create_session().query(open_mapper).
373            with_parent(user, property='open_orders').all()
374        )
375
376    def test_many_to_many(self):
377        keywords, items, item_keywords, Keyword, Item = (
378            self.tables.keywords,
379            self.tables.items,
380            self.tables.item_keywords,
381            self.classes.Keyword,
382            self.classes.Item)
383
384        mapper(Keyword, keywords)
385        mapper(Item, items, properties=dict(
386            keywords=relationship(
387                Keyword, secondary=item_keywords, lazy='select'),
388        ))
389
390        q = create_session().query(Item)
391        assert self.static.item_keyword_result == q.all()
392
393        eq_(
394            self.static.item_keyword_result[0:2],
395            q.join('keywords').filter(keywords.c.name == 'red').all()
396        )
397
398    def test_uses_get(self):
399        """test that a simple many-to-one lazyload optimizes
400        to use query.get()."""
401
402        Address, addresses, users, User = (
403            self.classes.Address,
404            self.tables.addresses,
405            self.tables.users,
406            self.classes.User)
407
408        for pj in (
409            None,
410            users.c.id == addresses.c.user_id,
411            addresses.c.user_id == users.c.id
412        ):
413            mapper(Address, addresses, properties=dict(
414                user=relationship(
415                    mapper(User, users), lazy='select', primaryjoin=pj)
416            ))
417
418            sess = create_session()
419
420            # load address
421            a1 = sess.query(Address).\
422                filter_by(email_address="ed@wood.com").one()
423
424            # load user that is attached to the address
425            u1 = sess.query(User).get(8)
426
427            def go():
428                # lazy load of a1.user should get it from the session
429                assert a1.user is u1
430            self.assert_sql_count(testing.db, go, 0)
431            sa.orm.clear_mappers()
432
433    def test_uses_get_compatible_types(self):
434        """test the use_get optimization with compatible
435        but non-identical types"""
436
437        User, Address = self.classes.User, self.classes.Address
438
439        class IntDecorator(TypeDecorator):
440            impl = Integer
441
442        class SmallintDecorator(TypeDecorator):
443            impl = SmallInteger
444
445        class SomeDBInteger(sa.Integer):
446            pass
447
448        for tt in [
449            Integer,
450            SmallInteger,
451            IntDecorator,
452            SmallintDecorator,
453            SomeDBInteger,
454        ]:
455            m = sa.MetaData()
456            users = Table(
457                'users', m,
458                Column(
459                    'id', Integer, primary_key=True,
460                    test_needs_autoincrement=True),
461                Column('name', String(30), nullable=False),
462            )
463            addresses = Table(
464                'addresses', m,
465                Column(
466                    'id', Integer, primary_key=True,
467                    test_needs_autoincrement=True),
468                Column('user_id', tt, ForeignKey('users.id')),
469                Column('email_address', String(50), nullable=False),
470            )
471
472            mapper(Address, addresses, properties=dict(
473                user=relationship(mapper(User, users))
474            ))
475
476            sess = create_session(bind=testing.db)
477
478            # load address
479            a1 = sess.query(Address).\
480                filter_by(email_address="ed@wood.com").one()
481
482            # load user that is attached to the address
483            u1 = sess.query(User).get(8)
484
485            def go():
486                # lazy load of a1.user should get it from the session
487                assert a1.user is u1
488            self.assert_sql_count(testing.db, go, 0)
489            sa.orm.clear_mappers()
490
491    def test_many_to_one(self):
492        users, Address, addresses, User = (
493            self.tables.users,
494            self.classes.Address,
495            self.tables.addresses,
496            self.classes.User)
497
498        mapper(Address, addresses, properties=dict(
499            user=relationship(mapper(User, users), lazy='select')
500        ))
501        sess = create_session()
502        q = sess.query(Address)
503        a = q.filter(addresses.c.id == 1).one()
504
505        assert a.user is not None
506
507        u1 = sess.query(User).get(7)
508
509        assert a.user is u1
510
511    def test_backrefs_dont_lazyload(self):
512        users, Address, addresses, User = (
513            self.tables.users,
514            self.classes.Address,
515            self.tables.addresses,
516            self.classes.User)
517
518        mapper(User, users, properties={
519            'addresses': relationship(Address, backref='user')
520        })
521        mapper(Address, addresses)
522        sess = create_session()
523        ad = sess.query(Address).filter_by(id=1).one()
524        assert ad.user.id == 7
525
526        def go():
527            ad.user = None
528            assert ad.user is None
529        self.assert_sql_count(testing.db, go, 0)
530
531        u1 = sess.query(User).filter_by(id=7).one()
532
533        def go():
534            assert ad not in u1.addresses
535        self.assert_sql_count(testing.db, go, 1)
536
537        sess.expire(u1, ['addresses'])
538
539        def go():
540            assert ad in u1.addresses
541        self.assert_sql_count(testing.db, go, 1)
542
543        sess.expire(u1, ['addresses'])
544        ad2 = Address()
545
546        def go():
547            ad2.user = u1
548            assert ad2.user is u1
549        self.assert_sql_count(testing.db, go, 0)
550
551        def go():
552            assert ad2 in u1.addresses
553        self.assert_sql_count(testing.db, go, 1)
554
555
556class GetterStateTest(_fixtures.FixtureTest):
557
558    """test lazyloader on non-existent attribute returns
559    expected attribute symbols, maintain expected state"""
560
561    run_inserts = None
562
563    def _unhashable_fixture(self, metadata, load_on_pending=False):
564        class MyHashType(sa.TypeDecorator):
565            impl = sa.String(100)
566
567            def process_bind_param(self, value, dialect):
568                return ";".join(
569                    "%s=%s" % (k, v)
570                       for k, v in
571                       sorted(value.items(), key=lambda key: key[0]))
572
573            def process_result_value(self, value, dialect):
574                return dict(elem.split("=", 1) for elem in value.split(";"))
575
576        category = Table(
577            'category', metadata,
578            Column('id', Integer, primary_key=True),
579            Column('data', MyHashType())
580        )
581        article = Table(
582            'article', metadata,
583            Column('id', Integer, primary_key=True),
584            Column('data', MyHashType())
585        )
586
587        class Category(fixtures.ComparableEntity):
588            pass
589
590        class Article(fixtures.ComparableEntity):
591            pass
592
593        mapper(Category, category)
594        mapper(Article, article, properties={
595            "category": relationship(
596                Category,
597                primaryjoin=orm.foreign(article.c.data) == category.c.data,
598                load_on_pending=load_on_pending
599            )
600        })
601
602        metadata.create_all()
603        sess = Session(autoflush=False)
604        data = {"im": "unhashable"}
605        a1 = Article(id=1, data=data)
606        c1 = Category(id=1, data=data)
607        if load_on_pending:
608            sess.add(c1)
609        else:
610            sess.add_all([c1, a1])
611        sess.flush()
612        if load_on_pending:
613            sess.add(a1)
614        return Category, Article, sess, a1, c1
615
616    def _u_ad_fixture(self, populate_user, dont_use_get=False):
617        users, Address, addresses, User = (
618            self.tables.users,
619            self.classes.Address,
620            self.tables.addresses,
621            self.classes.User)
622
623        mapper(User, users, properties={
624            'addresses': relationship(Address, back_populates='user')
625        })
626        mapper(Address, addresses, properties={
627            'user': relationship(
628                User,
629                primaryjoin=and_(
630                    users.c.id == addresses.c.user_id, users.c.id != 27)
631                if dont_use_get else None,
632                back_populates='addresses'
633            )
634        })
635
636        sess = create_session()
637        a1 = Address(email_address='a1')
638        sess.add(a1)
639        if populate_user:
640            a1.user = User(name='ed')
641        sess.flush()
642        if populate_user:
643            sess.expire_all()
644        return User, Address, sess, a1
645
646    def test_no_use_get_params_missing(self):
647        User, Address, sess, a1 = self._u_ad_fixture(False, True)
648
649        def go():
650            eq_(a1.user, None)
651
652        # doesn't emit SQL
653        self.assert_sql_count(
654            testing.db,
655            go,
656            0
657        )
658
659    @testing.provide_metadata
660    def test_no_use_get_params_not_hashable(self):
661        Category, Article, sess, a1, c1 = \
662            self._unhashable_fixture(self.metadata)
663
664        def go():
665            eq_(a1.category, c1)
666
667        self.assert_sql_count(
668            testing.db,
669            go,
670            1
671        )
672
673    @testing.provide_metadata
674    def test_no_use_get_params_not_hashable_on_pending(self):
675        Category, Article, sess, a1, c1 = \
676            self._unhashable_fixture(self.metadata, load_on_pending=True)
677
678        def go():
679            eq_(a1.category, c1)
680
681        self.assert_sql_count(
682            testing.db,
683            go,
684            1
685        )
686
687    def test_get_empty_passive_return_never_set(self):
688        User, Address, sess, a1 = self._u_ad_fixture(False)
689        eq_(
690            Address.user.impl.get(
691                attributes.instance_state(a1),
692                attributes.instance_dict(a1),
693                passive=attributes.PASSIVE_RETURN_NEVER_SET),
694            attributes.NEVER_SET
695        )
696        assert 'user_id' not in a1.__dict__
697        assert 'user' not in a1.__dict__
698
699    def test_history_empty_passive_return_never_set(self):
700        User, Address, sess, a1 = self._u_ad_fixture(False)
701        eq_(
702            Address.user.impl.get_history(
703                attributes.instance_state(a1),
704                attributes.instance_dict(a1),
705                passive=attributes.PASSIVE_RETURN_NEVER_SET),
706            ((), (), ())
707        )
708        assert 'user_id' not in a1.__dict__
709        assert 'user' not in a1.__dict__
710
711    def test_get_empty_passive_no_initialize(self):
712        User, Address, sess, a1 = self._u_ad_fixture(False)
713        eq_(
714            Address.user.impl.get(
715                attributes.instance_state(a1),
716                attributes.instance_dict(a1),
717                passive=attributes.PASSIVE_NO_INITIALIZE),
718            attributes.PASSIVE_NO_RESULT
719        )
720        assert 'user_id' not in a1.__dict__
721        assert 'user' not in a1.__dict__
722
723    def test_history_empty_passive_no_initialize(self):
724        User, Address, sess, a1 = self._u_ad_fixture(False)
725        eq_(
726            Address.user.impl.get_history(
727                attributes.instance_state(a1),
728                attributes.instance_dict(a1),
729                passive=attributes.PASSIVE_NO_INITIALIZE),
730            attributes.HISTORY_BLANK
731        )
732        assert 'user_id' not in a1.__dict__
733        assert 'user' not in a1.__dict__
734
735    def test_get_populated_passive_no_initialize(self):
736        User, Address, sess, a1 = self._u_ad_fixture(True)
737        eq_(
738            Address.user.impl.get(
739                attributes.instance_state(a1),
740                attributes.instance_dict(a1),
741                passive=attributes.PASSIVE_NO_INITIALIZE),
742            attributes.PASSIVE_NO_RESULT
743        )
744        assert 'user_id' not in a1.__dict__
745        assert 'user' not in a1.__dict__
746
747    def test_history_populated_passive_no_initialize(self):
748        User, Address, sess, a1 = self._u_ad_fixture(True)
749        eq_(
750            Address.user.impl.get_history(
751                attributes.instance_state(a1),
752                attributes.instance_dict(a1),
753                passive=attributes.PASSIVE_NO_INITIALIZE),
754            attributes.HISTORY_BLANK
755        )
756        assert 'user_id' not in a1.__dict__
757        assert 'user' not in a1.__dict__
758
759    def test_get_populated_passive_return_never_set(self):
760        User, Address, sess, a1 = self._u_ad_fixture(True)
761        eq_(
762            Address.user.impl.get(
763                attributes.instance_state(a1),
764                attributes.instance_dict(a1),
765                passive=attributes.PASSIVE_RETURN_NEVER_SET),
766            User(name='ed')
767        )
768
769    def test_history_populated_passive_return_never_set(self):
770        User, Address, sess, a1 = self._u_ad_fixture(True)
771        eq_(
772            Address.user.impl.get_history(
773                attributes.instance_state(a1),
774                attributes.instance_dict(a1),
775                passive=attributes.PASSIVE_RETURN_NEVER_SET),
776            ((), [User(name='ed'), ], ())
777        )
778
779
780class M2OGetTest(_fixtures.FixtureTest):
781    run_inserts = 'once'
782    run_deletes = None
783
784    def test_m2o_noload(self):
785        """test that a NULL foreign key doesn't trigger a lazy load"""
786
787        users, Address, addresses, User = (
788            self.tables.users,
789            self.classes.Address,
790            self.tables.addresses,
791            self.classes.User)
792
793        mapper(User, users)
794
795        mapper(Address, addresses, properties={
796            'user': relationship(User)
797        })
798
799        sess = create_session()
800        ad1 = Address(email_address='somenewaddress', id=12)
801        sess.add(ad1)
802        sess.flush()
803        sess.expunge_all()
804
805        ad2 = sess.query(Address).get(1)
806        ad3 = sess.query(Address).get(ad1.id)
807
808        def go():
809            # one lazy load
810            assert ad2.user.name == 'jack'
811            # no lazy load
812            assert ad3.user is None
813        self.assert_sql_count(testing.db, go, 1)
814
815
816class CorrelatedTest(fixtures.MappedTest):
817
818    @classmethod
819    def define_tables(self, meta):
820        Table('user_t', meta,
821              Column('id', Integer, primary_key=True),
822              Column('name', String(50)))
823
824        Table('stuff', meta,
825              Column('id', Integer, primary_key=True),
826              Column('date', sa.Date),
827              Column('user_id', Integer, ForeignKey('user_t.id')))
828
829    @classmethod
830    def insert_data(cls):
831        stuff, user_t = cls.tables.stuff, cls.tables.user_t
832
833        user_t.insert().execute(
834            {'id': 1, 'name': 'user1'},
835            {'id': 2, 'name': 'user2'},
836            {'id': 3, 'name': 'user3'})
837
838        stuff.insert().execute(
839            {'id': 1, 'user_id': 1, 'date': datetime.date(2007, 10, 15)},
840            {'id': 2, 'user_id': 1, 'date': datetime.date(2007, 12, 15)},
841            {'id': 3, 'user_id': 1, 'date': datetime.date(2007, 11, 15)},
842            {'id': 4, 'user_id': 2, 'date': datetime.date(2008, 1, 15)},
843            {'id': 5, 'user_id': 3, 'date': datetime.date(2007, 6, 15)})
844
845    def test_correlated_lazyload(self):
846        stuff, user_t = self.tables.stuff, self.tables.user_t
847
848        class User(fixtures.ComparableEntity):
849            pass
850
851        class Stuff(fixtures.ComparableEntity):
852            pass
853
854        mapper(Stuff, stuff)
855
856        stuff_view = sa.select([stuff.c.id]).\
857            where(stuff.c.user_id == user_t.c.id).correlate(user_t).\
858            order_by(sa.desc(stuff.c.date)).limit(1)
859
860        mapper(User, user_t, properties={
861            'stuff': relationship(
862                Stuff,
863                primaryjoin=sa.and_(
864                    user_t.c.id == stuff.c.user_id,
865                    stuff.c.id == (stuff_view.as_scalar())))
866        })
867
868        sess = create_session()
869
870        eq_(
871            sess.query(User).all(),
872            [
873                User(
874                    name='user1',
875                    stuff=[Stuff(date=datetime.date(2007, 12, 15), id=2)]),
876                User(
877                    name='user2',
878                    stuff=[Stuff(id=4, date=datetime.date(2008, 1, 15))]),
879                User(
880                    name='user3',
881                    stuff=[Stuff(id=5, date=datetime.date(2007, 6, 15))])
882            ]
883        )
884
885
886class O2MWOSideFixedTest(fixtures.MappedTest):
887    # test #2948 - o2m backref with a "m2o does/does not count"
888    # criteria doesn't scan the "o" table
889
890    @classmethod
891    def define_tables(self, meta):
892        Table('city', meta,
893              Column('id', Integer, primary_key=True),
894              Column('deleted', Boolean),
895              )
896        Table('person', meta,
897              Column('id', Integer, primary_key=True),
898              Column('city_id', ForeignKey('city.id'))
899              )
900
901    @classmethod
902    def setup_classes(cls):
903        class Person(cls.Basic):
904            pass
905
906        class City(cls.Basic):
907            pass
908
909    @classmethod
910    def setup_mappers(cls):
911        Person, City = cls.classes.Person, cls.classes.City
912        city, person = cls.tables.city, cls.tables.person
913
914        mapper(Person, person, properties={
915            'city': relationship(City,
916                                 primaryjoin=and_(
917                                     person.c.city_id == city.c.id,
918                                     city.c.deleted == False),
919                                 backref='people'
920                                 )
921        })
922        mapper(City, city)
923
924    def _fixture(self, include_other):
925        city, person = self.tables.city, self.tables.person
926
927        if include_other:
928            city.insert().execute(
929                {"id": 1, "deleted": False},
930            )
931
932            person.insert().execute(
933                {"id": 1, "city_id": 1},
934                {"id": 2, "city_id": 1},
935            )
936
937        city.insert().execute(
938            {"id": 2, "deleted": True},
939        )
940
941        person.insert().execute(
942            {"id": 3, "city_id": 2},
943            {"id": 4, "city_id": 2},
944        )
945
946    def test_lazyload_assert_expected_sql(self):
947        self._fixture(True)
948        City = self.classes.City
949        sess = Session(testing.db)
950        c1, c2 = sess.query(City).order_by(City.id).all()
951
952        def go():
953            eq_(
954                [p.id for p in c2.people],
955                []
956            )
957
958        self.assert_sql_execution(
959            testing.db,
960            go,
961            CompiledSQL(
962                "SELECT person.id AS person_id, person.city_id AS "
963                "person_city_id FROM person "
964                "WHERE person.city_id = :param_1 AND :param_2 = 0",
965                {"param_1": 2, "param_2": 1}
966            )
967        )
968
969    def test_lazyload_people_other_exists(self):
970        self._fixture(True)
971        City = self.classes.City
972        sess = Session(testing.db)
973        c1, c2 = sess.query(City).order_by(City.id).all()
974        eq_(
975            [p.id for p in c1.people],
976            [1, 2]
977        )
978
979        eq_(
980            [p.id for p in c2.people],
981            []
982        )
983
984    def test_lazyload_people_no_other_exists(self):
985        # note that if we revert #2948, *this still passes!*
986        # e.g. due to the scan of the "o" table, whether or not *another*
987        # row exists determines if this works.
988
989        self._fixture(False)
990        City = self.classes.City
991        sess = Session(testing.db)
992        c2, = sess.query(City).order_by(City.id).all()
993
994        eq_(
995            [p.id for p in c2.people],
996            []
997        )
998
999
1000class RefersToSelfLazyLoadInterferenceTest(fixtures.MappedTest):
1001    """Test [issue:3145].
1002
1003    This involves an object that refers to itself, which isn't
1004    entirely a supported use case.   Here, we're able to fix it,
1005    but long term it's not clear if future needs will affect this.
1006    The use case is not super-critical.
1007
1008    """
1009
1010    @classmethod
1011    def define_tables(cls, metadata):
1012        Table(
1013            'a', metadata,
1014            Column('a_id', Integer, primary_key=True),
1015            Column('b_id', ForeignKey('b.b_id')),
1016        )
1017
1018        Table(
1019            'b', metadata,
1020            Column('b_id', Integer, primary_key=True),
1021            Column('parent_id', ForeignKey('b.b_id')),
1022        )
1023
1024        Table(
1025            'c', metadata,
1026            Column('c_id', Integer, primary_key=True),
1027            Column('b_id', ForeignKey('b.b_id')),
1028        )
1029
1030    @classmethod
1031    def setup_classes(cls):
1032        class A(cls.Basic):
1033            pass
1034
1035        class B(cls.Basic):
1036            pass
1037
1038        class C(cls.Basic):
1039            pass
1040
1041    @classmethod
1042    def setup_mappers(cls):
1043        mapper(cls.classes.A, cls.tables.a, properties={
1044            "b": relationship(cls.classes.B)
1045        })
1046        bm = mapper(cls.classes.B, cls.tables.b, properties={
1047            "parent": relationship(
1048                cls.classes.B, remote_side=cls.tables.b.c.b_id),
1049            "zc": relationship(cls.classes.C)
1050        })
1051        mapper(cls.classes.C, cls.tables.c)
1052
1053        bmp = bm._props
1054        configure_mappers()
1055        # Bug is order-dependent, must sort the "zc" property to the end
1056        bmp.sort()
1057
1058    def test_lazy_doesnt_interfere(self):
1059        A, B, C = self.classes("A", "B", "C")
1060
1061        session = Session()
1062        b = B()
1063        session.add(b)
1064        session.flush()
1065
1066        b.parent_id = b.b_id
1067
1068        b.zc.append(C())
1069        b.zc.append(C())
1070        session.commit()
1071
1072        # If the bug is here, the next line throws an exception
1073        session.query(B).options(
1074            sa.orm.joinedload('parent').joinedload('zc')).all()
1075
1076