1from sqlalchemy import (
2    testing, null, exists, text, union, literal, literal_column, func, between,
3    Unicode, desc, and_, bindparam, select, distinct, or_, collate, insert,
4    Integer, String, Boolean, exc as sa_exc, util, cast)
5from sqlalchemy.sql import operators, expression
6from sqlalchemy import column, table
7from sqlalchemy.engine import default
8from sqlalchemy.orm import (
9    attributes, mapper, relationship, create_session, synonym, Session,
10    aliased, column_property, joinedload_all, joinedload, Query, Bundle,
11    subqueryload, backref, lazyload, defer)
12from sqlalchemy.testing.assertsql import CompiledSQL
13from sqlalchemy.testing.schema import Table, Column
14import sqlalchemy as sa
15from sqlalchemy.testing.assertions import (
16    eq_, assert_raises, assert_raises_message, expect_warnings)
17from sqlalchemy.testing import fixtures, AssertsCompiledSQL, assert_warnings
18from test.orm import _fixtures
19from sqlalchemy.orm.util import join, with_parent
20import contextlib
21from sqlalchemy.testing import mock, is_, is_not_
22from sqlalchemy import inspect
23
24
25class QueryTest(_fixtures.FixtureTest):
26    run_setup_mappers = 'once'
27    run_inserts = 'once'
28    run_deletes = None
29
30    @classmethod
31    def setup_mappers(cls):
32        cls._setup_stock_mapping()
33
34
35class MiscTest(QueryTest):
36    run_create_tables = None
37    run_inserts = None
38
39    def test_with_session(self):
40        User = self.classes.User
41        s1 = Session()
42        s2 = Session()
43        q1 = s1.query(User)
44        q2 = q1.with_session(s2)
45        assert q2.session is s2
46        assert q1.session is s1
47
48
49class RowTupleTest(QueryTest):
50    run_setup_mappers = None
51
52    def test_custom_names(self):
53        User, users = self.classes.User, self.tables.users
54
55        mapper(User, users, properties={'uname': users.c.name})
56
57        row = create_session().query(User.id, User.uname).\
58            filter(User.id == 7).first()
59        assert row.id == 7
60        assert row.uname == 'jack'
61
62    def test_column_metadata(self):
63        users, Address, addresses, User = (self.tables.users,
64                                self.classes.Address,
65                                self.tables.addresses,
66                                self.classes.User)
67
68        mapper(User, users)
69        mapper(Address, addresses)
70        sess = create_session()
71        user_alias = aliased(User)
72        user_alias_id_label = user_alias.id.label('foo')
73        address_alias = aliased(Address, name='aalias')
74        fn = func.count(User.id)
75        name_label = User.name.label('uname')
76        bundle = Bundle('b1', User.id, User.name)
77        cte = sess.query(User.id).cte()
78        for q, asserted in [
79            (
80                sess.query(User),
81                [
82                    {
83                        'name': 'User', 'type': User, 'aliased': False,
84                        'expr': User, 'entity': User}]
85            ),
86            (
87                sess.query(User.id, User),
88                [
89                    {
90                        'name': 'id', 'type': users.c.id.type,
91                        'aliased': False, 'expr': User.id, 'entity': User},
92                    {
93                        'name': 'User', 'type': User, 'aliased': False,
94                        'expr': User, 'entity': User}
95                ]
96            ),
97            (
98                sess.query(User.id, user_alias),
99                [
100                    {
101                        'name': 'id', 'type': users.c.id.type,
102                        'aliased': False, 'expr': User.id, 'entity': User},
103                    {
104                        'name': None, 'type': User, 'aliased': True,
105                        'expr': user_alias, 'entity': user_alias}
106                ]
107            ),
108            (
109                sess.query(user_alias.id),
110                [
111                    {
112                        'name': 'id', 'type': users.c.id.type,
113                        'aliased': True, 'expr': user_alias.id,
114                        'entity': user_alias},
115                ]
116            ),
117            (
118                sess.query(user_alias_id_label),
119                [
120                    {
121                        'name': 'foo', 'type': users.c.id.type,
122                        'aliased': True, 'expr': user_alias_id_label,
123                        'entity': user_alias},
124                ]
125            ),
126            (
127                sess.query(address_alias),
128                [
129                    {
130                        'name': 'aalias', 'type': Address, 'aliased': True,
131                        'expr': address_alias, 'entity': address_alias}
132                ]
133            ),
134            (
135                sess.query(name_label, fn),
136                [
137                    {
138                        'name': 'uname', 'type': users.c.name.type,
139                        'aliased': False, 'expr': name_label, 'entity': User},
140                    {
141                        'name': None, 'type': fn.type, 'aliased': False,
142                        'expr': fn, 'entity': User},
143                ]
144            ),
145            (
146                sess.query(cte),
147                [
148                {
149                    'aliased': False,
150                    'expr': cte.c.id, 'type': cte.c.id.type,
151                    'name': 'id', 'entity': None
152                }]
153            ),
154            (
155                sess.query(users),
156                [
157                    {'aliased': False,
158                     'expr': users.c.id, 'type': users.c.id.type,
159                     'name': 'id', 'entity': None},
160                    {'aliased': False,
161                     'expr': users.c.name, 'type': users.c.name.type,
162                     'name': 'name', 'entity': None}
163                ]
164            ),
165            (
166                sess.query(users.c.name),
167                [{
168                    "name": "name", "type": users.c.name.type,
169                    "aliased": False, "expr": users.c.name, "entity": None
170                }]
171            ),
172            (
173                sess.query(bundle),
174                [
175                    {
176                        'aliased': False,
177                        'expr': bundle,
178                        'type': Bundle,
179                        'name': 'b1', 'entity': User
180                    }
181                ]
182            )
183        ]:
184            eq_(
185                q.column_descriptions,
186                asserted
187            )
188
189    def test_unhashable_type(self):
190        from sqlalchemy.types import TypeDecorator, Integer
191        from sqlalchemy.sql import type_coerce
192
193        class MyType(TypeDecorator):
194            impl = Integer
195            hashable = False
196
197            def process_result_value(self, value, dialect):
198                return [value]
199
200        User, users = self.classes.User, self.tables.users
201
202        mapper(User, users)
203
204        s = Session()
205        q = s.query(User, type_coerce(users.c.id, MyType).label('foo')).\
206            filter(User.id == 7)
207        row = q.first()
208        eq_(
209            row, (User(id=7), [7])
210        )
211
212
213class RawSelectTest(QueryTest, AssertsCompiledSQL):
214    __dialect__ = 'default'
215
216    def test_select_from_entity(self):
217        User = self.classes.User
218
219        self.assert_compile(
220            select(['*']).select_from(User),
221            "SELECT * FROM users"
222        )
223
224    def test_where_relationship(self):
225        User = self.classes.User
226
227        self.assert_compile(
228            select([User]).where(User.addresses),
229            "SELECT users.id, users.name FROM users, addresses "
230            "WHERE users.id = addresses.user_id"
231        )
232
233    def test_where_m2m_relationship(self):
234        Item = self.classes.Item
235
236        self.assert_compile(
237            select([Item]).where(Item.keywords),
238            "SELECT items.id, items.description FROM items, "
239            "item_keywords AS item_keywords_1, keywords "
240            "WHERE items.id = item_keywords_1.item_id "
241            "AND keywords.id = item_keywords_1.keyword_id"
242        )
243
244    def test_inline_select_from_entity(self):
245        User = self.classes.User
246
247        self.assert_compile(
248            select(['*'], from_obj=User),
249            "SELECT * FROM users"
250        )
251
252    def test_select_from_aliased_entity(self):
253        User = self.classes.User
254        ua = aliased(User, name="ua")
255        self.assert_compile(
256            select(['*']).select_from(ua),
257            "SELECT * FROM users AS ua"
258        )
259
260    def test_correlate_entity(self):
261        User = self.classes.User
262        Address = self.classes.Address
263
264        self.assert_compile(
265            select(
266                [
267                    User.name, Address.id,
268                    select([func.count(Address.id)]).
269                    where(User.id == Address.user_id).
270                    correlate(User).as_scalar()]),
271            "SELECT users.name, addresses.id, "
272            "(SELECT count(addresses.id) AS count_1 "
273            "FROM addresses WHERE users.id = addresses.user_id) AS anon_1 "
274            "FROM users, addresses"
275        )
276
277    def test_correlate_aliased_entity(self):
278        User = self.classes.User
279        Address = self.classes.Address
280        uu = aliased(User, name="uu")
281
282        self.assert_compile(
283            select(
284                [
285                    uu.name, Address.id,
286                    select([func.count(Address.id)]).
287                    where(uu.id == Address.user_id).
288                    correlate(uu).as_scalar()]),
289            # for a long time, "uu.id = address.user_id" was reversed;
290            # this was resolved as of #2872 and had to do with
291            # InstrumentedAttribute.__eq__() taking precedence over
292            # QueryableAttribute.__eq__()
293            "SELECT uu.name, addresses.id, "
294            "(SELECT count(addresses.id) AS count_1 "
295            "FROM addresses WHERE uu.id = addresses.user_id) AS anon_1 "
296            "FROM users AS uu, addresses"
297        )
298
299    def test_columns_clause_entity(self):
300        User = self.classes.User
301
302        self.assert_compile(
303            select([User]),
304            "SELECT users.id, users.name FROM users"
305        )
306
307    def test_columns_clause_columns(self):
308        User = self.classes.User
309
310        self.assert_compile(
311            select([User.id, User.name]),
312            "SELECT users.id, users.name FROM users"
313        )
314
315    def test_columns_clause_aliased_columns(self):
316        User = self.classes.User
317        ua = aliased(User, name='ua')
318        self.assert_compile(
319            select([ua.id, ua.name]),
320            "SELECT ua.id, ua.name FROM users AS ua"
321        )
322
323    def test_columns_clause_aliased_entity(self):
324        User = self.classes.User
325        ua = aliased(User, name='ua')
326        self.assert_compile(
327            select([ua]),
328            "SELECT ua.id, ua.name FROM users AS ua"
329        )
330
331    def test_core_join(self):
332        User = self.classes.User
333        Address = self.classes.Address
334        from sqlalchemy.sql import join
335        self.assert_compile(
336            select([User]).select_from(join(User, Address)),
337            "SELECT users.id, users.name FROM users "
338            "JOIN addresses ON users.id = addresses.user_id"
339        )
340
341    def test_insert_from_query(self):
342        User = self.classes.User
343        Address = self.classes.Address
344
345        s = Session()
346        q = s.query(User.id, User.name).filter_by(name='ed')
347        self.assert_compile(
348            insert(Address).from_select(('id', 'email_address'), q),
349            "INSERT INTO addresses (id, email_address) "
350            "SELECT users.id AS users_id, users.name AS users_name "
351            "FROM users WHERE users.name = :name_1"
352        )
353
354    def test_insert_from_query_col_attr(self):
355        User = self.classes.User
356        Address = self.classes.Address
357
358        s = Session()
359        q = s.query(User.id, User.name).filter_by(name='ed')
360        self.assert_compile(
361            insert(Address).from_select(
362                (Address.id, Address.email_address), q),
363            "INSERT INTO addresses (id, email_address) "
364            "SELECT users.id AS users_id, users.name AS users_name "
365            "FROM users WHERE users.name = :name_1"
366        )
367
368    def test_update_from_entity(self):
369        from sqlalchemy.sql import update
370        User = self.classes.User
371        self.assert_compile(
372            update(User),
373            "UPDATE users SET id=:id, name=:name"
374        )
375
376        self.assert_compile(
377            update(User).values(name='ed').where(User.id == 5),
378            "UPDATE users SET name=:name WHERE users.id = :id_1",
379            checkparams={"id_1": 5, "name": "ed"}
380        )
381
382    def test_delete_from_entity(self):
383        from sqlalchemy.sql import delete
384        User = self.classes.User
385        self.assert_compile(
386            delete(User),
387            "DELETE FROM users"
388        )
389
390        self.assert_compile(
391            delete(User).where(User.id == 5),
392            "DELETE FROM users WHERE users.id = :id_1",
393            checkparams={"id_1": 5}
394        )
395
396    def test_insert_from_entity(self):
397        from sqlalchemy.sql import insert
398        User = self.classes.User
399        self.assert_compile(
400            insert(User),
401            "INSERT INTO users (id, name) VALUES (:id, :name)"
402        )
403
404        self.assert_compile(
405            insert(User).values(name="ed"),
406            "INSERT INTO users (name) VALUES (:name)",
407            checkparams={"name": "ed"}
408        )
409
410    def test_col_prop_builtin_function(self):
411        class Foo(object):
412            pass
413
414        mapper(
415            Foo, self.tables.users, properties={
416                'foob': column_property(
417                    func.coalesce(self.tables.users.c.name))
418            })
419
420        self.assert_compile(
421            select([Foo]).where(Foo.foob == 'somename').order_by(Foo.foob),
422            "SELECT users.id, users.name FROM users "
423            "WHERE coalesce(users.name) = :param_1 "
424            "ORDER BY coalesce(users.name)"
425        )
426
427
428class GetTest(QueryTest):
429    def test_get(self):
430        User = self.classes.User
431
432        s = create_session()
433        assert s.query(User).get(19) is None
434        u = s.query(User).get(7)
435        u2 = s.query(User).get(7)
436        assert u is u2
437        s.expunge_all()
438        u2 = s.query(User).get(7)
439        assert u is not u2
440
441    def test_get_composite_pk_no_result(self):
442        CompositePk = self.classes.CompositePk
443
444        s = Session()
445        assert s.query(CompositePk).get((100, 100)) is None
446
447    def test_get_composite_pk_result(self):
448        CompositePk = self.classes.CompositePk
449
450        s = Session()
451        one_two = s.query(CompositePk).get((1, 2))
452        assert one_two.i == 1
453        assert one_two.j == 2
454        assert one_two.k == 3
455
456    def test_get_too_few_params(self):
457        CompositePk = self.classes.CompositePk
458
459        s = Session()
460        q = s.query(CompositePk)
461        assert_raises(sa_exc.InvalidRequestError, q.get, 7)
462
463    def test_get_too_few_params_tuple(self):
464        CompositePk = self.classes.CompositePk
465
466        s = Session()
467        q = s.query(CompositePk)
468        assert_raises(sa_exc.InvalidRequestError, q.get, (7,))
469
470    def test_get_too_many_params(self):
471        CompositePk = self.classes.CompositePk
472
473        s = Session()
474        q = s.query(CompositePk)
475        assert_raises(sa_exc.InvalidRequestError, q.get, (7, 10, 100))
476
477    def test_get_against_col(self):
478        User = self.classes.User
479
480        s = Session()
481        q = s.query(User.id)
482        assert_raises(sa_exc.InvalidRequestError, q.get, (5, ))
483
484    def test_get_null_pk(self):
485        """test that a mapping which can have None in a
486        PK (i.e. map to an outerjoin) works with get()."""
487
488        users, addresses = self.tables.users, self.tables.addresses
489
490        s = users.outerjoin(addresses)
491
492        class UserThing(fixtures.ComparableEntity):
493            pass
494
495        mapper(
496            UserThing, s, properties={
497                'id': (users.c.id, addresses.c.user_id),
498                'address_id': addresses.c.id,
499                })
500        sess = create_session()
501        u10 = sess.query(UserThing).get((10, None))
502        eq_(u10, UserThing(id=10))
503
504    def test_no_criterion(self):
505        """test that get()/load() does not use preexisting filter/etc.
506        criterion"""
507
508        User, Address = self.classes.User, self.classes.Address
509
510        s = create_session()
511
512        q = s.query(User).join('addresses').filter(Address.user_id == 8)
513        assert_raises(sa_exc.InvalidRequestError, q.get, 7)
514        assert_raises(
515            sa_exc.InvalidRequestError,
516            s.query(User).filter(User.id == 7).get, 19)
517
518        # order_by()/get() doesn't raise
519        s.query(User).order_by(User.id).get(8)
520
521    def test_no_criterion_when_already_loaded(self):
522        """test that get()/load() does not use preexisting filter/etc.
523        criterion, even when we're only using the identity map."""
524
525        User, Address = self.classes.User, self.classes.Address
526
527        s = create_session()
528
529        s.query(User).get(7)
530
531        q = s.query(User).join('addresses').filter(Address.user_id == 8)
532        assert_raises(sa_exc.InvalidRequestError, q.get, 7)
533
534    def test_unique_param_names(self):
535        users = self.tables.users
536
537        class SomeUser(object):
538            pass
539        s = users.select(users.c.id != 12).alias('users')
540        m = mapper(SomeUser, s)
541        assert s.primary_key == m.primary_key
542
543        sess = create_session()
544        assert sess.query(SomeUser).get(7).name == 'jack'
545
546    def test_load(self):
547        User, Address = self.classes.User, self.classes.Address
548
549        s = create_session()
550
551        assert s.query(User).populate_existing().get(19) is None
552
553        u = s.query(User).populate_existing().get(7)
554        u2 = s.query(User).populate_existing().get(7)
555        assert u is u2
556        s.expunge_all()
557        u2 = s.query(User).populate_existing().get(7)
558        assert u is not u2
559
560        u2.name = 'some name'
561        a = Address(email_address='some other name')
562        u2.addresses.append(a)
563        assert u2 in s.dirty
564        assert a in u2.addresses
565
566        s.query(User).populate_existing().get(7)
567        assert u2 not in s.dirty
568        assert u2.name == 'jack'
569        assert a not in u2.addresses
570
571    @testing.provide_metadata
572    @testing.requires.unicode_connections
573    def test_unicode(self):
574        """test that Query.get properly sets up the type for the bind
575        parameter. using unicode would normally fail on postgresql, mysql and
576        oracle unless it is converted to an encoded string"""
577
578        metadata = self.metadata
579        table = Table(
580            'unicode_data', metadata,
581            Column(
582                'id', Unicode(40), primary_key=True,
583                test_needs_autoincrement=True),
584            Column('data', Unicode(40)))
585        metadata.create_all()
586        ustring = util.b('petit voix m\xe2\x80\x99a').decode('utf-8')
587
588        table.insert().execute(id=ustring, data=ustring)
589
590        class LocalFoo(self.classes.Base):
591            pass
592        mapper(LocalFoo, table)
593        eq_(
594            create_session().query(LocalFoo).get(ustring),
595            LocalFoo(id=ustring, data=ustring))
596
597    def test_populate_existing(self):
598        User, Address = self.classes.User, self.classes.Address
599
600        s = create_session()
601
602        userlist = s.query(User).all()
603
604        u = userlist[0]
605        u.name = 'foo'
606        a = Address(name='ed')
607        u.addresses.append(a)
608
609        self.assert_(a in u.addresses)
610
611        s.query(User).populate_existing().all()
612
613        self.assert_(u not in s.dirty)
614
615        self.assert_(u.name == 'jack')
616
617        self.assert_(a not in u.addresses)
618
619        u.addresses[0].email_address = 'lala'
620        u.orders[1].items[2].description = 'item 12'
621        # test that lazy load doesn't change child items
622        s.query(User).populate_existing().all()
623        assert u.addresses[0].email_address == 'lala'
624        assert u.orders[1].items[2].description == 'item 12'
625
626        # eager load does
627        s.query(User). \
628            options(joinedload('addresses'), joinedload_all('orders.items')). \
629            populate_existing().all()
630        assert u.addresses[0].email_address == 'jack@bean.com'
631        assert u.orders[1].items[2].description == 'item 5'
632
633
634class InvalidGenerationsTest(QueryTest, AssertsCompiledSQL):
635    def test_no_limit_offset(self):
636        User = self.classes.User
637
638        s = create_session()
639
640        for q in (
641            s.query(User).limit(2),
642            s.query(User).offset(2),
643            s.query(User).limit(2).offset(2)
644        ):
645            assert_raises(sa_exc.InvalidRequestError, q.join, "addresses")
646
647            assert_raises(
648                sa_exc.InvalidRequestError, q.filter, User.name == 'ed')
649
650            assert_raises(sa_exc.InvalidRequestError, q.filter_by, name='ed')
651
652            assert_raises(sa_exc.InvalidRequestError, q.order_by, 'foo')
653
654            assert_raises(sa_exc.InvalidRequestError, q.group_by, 'foo')
655
656            assert_raises(sa_exc.InvalidRequestError, q.having, 'foo')
657
658            q.enable_assertions(False).join("addresses")
659            q.enable_assertions(False).filter(User.name == 'ed')
660            q.enable_assertions(False).order_by('foo')
661            q.enable_assertions(False).group_by('foo')
662
663    def test_no_from(self):
664        users, User = self.tables.users, self.classes.User
665
666        s = create_session()
667
668        q = s.query(User).select_from(users)
669        assert_raises(sa_exc.InvalidRequestError, q.select_from, users)
670
671        q = s.query(User).join('addresses')
672        assert_raises(sa_exc.InvalidRequestError, q.select_from, users)
673
674        q = s.query(User).order_by(User.id)
675        assert_raises(sa_exc.InvalidRequestError, q.select_from, users)
676
677        assert_raises(sa_exc.InvalidRequestError, q.select_from, users)
678
679        q.enable_assertions(False).select_from(users)
680
681        # this is fine, however
682        q.from_self()
683
684    def test_invalid_select_from(self):
685        User = self.classes.User
686
687        s = create_session()
688        q = s.query(User)
689        assert_raises(sa_exc.ArgumentError, q.select_from, User.id == 5)
690        assert_raises(sa_exc.ArgumentError, q.select_from, User.id)
691
692    def test_invalid_from_statement(self):
693        User, addresses, users = (self.classes.User,
694                                self.tables.addresses,
695                                self.tables.users)
696
697        s = create_session()
698        q = s.query(User)
699        assert_raises(sa_exc.ArgumentError, q.from_statement, User.id == 5)
700        assert_raises(
701            sa_exc.ArgumentError, q.from_statement, users.join(addresses))
702
703    def test_invalid_column(self):
704        User = self.classes.User
705
706        s = create_session()
707        q = s.query(User)
708        assert_raises(sa_exc.InvalidRequestError, q.add_column, object())
709
710    def test_invalid_column_tuple(self):
711        User = self.classes.User
712
713        s = create_session()
714        q = s.query(User)
715        assert_raises(sa_exc.InvalidRequestError, q.add_column, (1, 1))
716
717    def test_distinct(self):
718        """test that a distinct() call is not valid before 'clauseelement'
719        conditions."""
720
721        User = self.classes.User
722
723        s = create_session()
724        q = s.query(User).distinct()
725        assert_raises(sa_exc.InvalidRequestError, q.select_from, User)
726        assert_raises(
727            sa_exc.InvalidRequestError, q.from_statement,
728            text("select * from table"))
729        assert_raises(sa_exc.InvalidRequestError, q.with_polymorphic, User)
730
731    def test_order_by(self):
732        """test that an order_by() call is not valid before 'clauseelement'
733        conditions."""
734
735        User = self.classes.User
736
737        s = create_session()
738        q = s.query(User).order_by(User.id)
739        assert_raises(sa_exc.InvalidRequestError, q.select_from, User)
740        assert_raises(
741            sa_exc.InvalidRequestError, q.from_statement,
742            text("select * from table"))
743        assert_raises(sa_exc.InvalidRequestError, q.with_polymorphic, User)
744
745    def test_mapper_zero(self):
746        User, Address = self.classes.User, self.classes.Address
747
748        s = create_session()
749
750        q = s.query(User, Address)
751        assert_raises(sa_exc.InvalidRequestError, q.get, 5)
752
753    def test_from_statement(self):
754        User = self.classes.User
755
756        s = create_session()
757
758        for meth, arg, kw in [
759            (Query.filter, (User.id == 5,), {}),
760            (Query.filter_by, (), {'id': 5}),
761            (Query.limit, (5, ), {}),
762            (Query.group_by, (User.name,), {}),
763            (Query.order_by, (User.name,), {})
764        ]:
765            q = s.query(User)
766            q = meth(q, *arg, **kw)
767            assert_raises(
768                sa_exc.InvalidRequestError,
769                q.from_statement, text("x")
770            )
771
772            q = s.query(User)
773            q = q.from_statement(text("x"))
774            assert_raises(
775                sa_exc.InvalidRequestError,
776                meth, q, *arg, **kw
777            )
778
779
780class OperatorTest(QueryTest, AssertsCompiledSQL):
781    """test sql.Comparator implementation for MapperProperties"""
782
783    __dialect__ = 'default'
784
785    def _test(self, clause, expected, entity=None, checkparams=None):
786        dialect = default.DefaultDialect()
787        if entity is not None:
788            # specify a lead entity, so that when we are testing
789            # correlation, the correlation actually happens
790            sess = Session()
791            lead = sess.query(entity)
792            context = lead._compile_context()
793            context.statement.use_labels = True
794            lead = context.statement.compile(dialect=dialect)
795            expected = (str(lead) + " WHERE " + expected).replace("\n", "")
796            clause = sess.query(entity).filter(clause)
797        self.assert_compile(clause, expected, checkparams=checkparams)
798
799    def _test_filter_aliases(
800            self,
801            clause, expected, from_, onclause, checkparams=None):
802        dialect = default.DefaultDialect()
803        sess = Session()
804        lead = sess.query(from_).join(onclause, aliased=True)
805        full = lead.filter(clause)
806        context = lead._compile_context()
807        context.statement.use_labels = True
808        lead = context.statement.compile(dialect=dialect)
809        expected = (str(lead) + " WHERE " + expected).replace("\n", "")
810
811        self.assert_compile(full, expected, checkparams=checkparams)
812
813    def test_arithmetic(self):
814        User = self.classes.User
815
816        create_session().query(User)
817        for (py_op, sql_op) in ((operators.add, '+'), (operators.mul, '*'),
818                                (operators.sub, '-'),
819                                (operators.truediv, '/'),
820                                (operators.div, '/'),
821                                ):
822            for (lhs, rhs, res) in (
823                (5, User.id, ':id_1 %s users.id'),
824                (5, literal(6), ':param_1 %s :param_2'),
825                (User.id, 5, 'users.id %s :id_1'),
826                (User.id, literal('b'), 'users.id %s :param_1'),
827                (User.id, User.id, 'users.id %s users.id'),
828                (literal(5), 'b', ':param_1 %s :param_2'),
829                (literal(5), User.id, ':param_1 %s users.id'),
830                (literal(5), literal(6), ':param_1 %s :param_2'),
831            ):
832                self._test(py_op(lhs, rhs), res % sql_op)
833
834    def test_comparison(self):
835        User = self.classes.User
836
837        create_session().query(User)
838        ualias = aliased(User)
839
840        for (py_op, fwd_op, rev_op) in ((operators.lt, '<', '>'),
841                                        (operators.gt, '>', '<'),
842                                        (operators.eq, '=', '='),
843                                        (operators.ne, '!=', '!='),
844                                        (operators.le, '<=', '>='),
845                                        (operators.ge, '>=', '<=')):
846            for (lhs, rhs, l_sql, r_sql) in (
847                    ('a', User.id, ':id_1', 'users.id'),
848                    ('a', literal('b'), ':param_2', ':param_1'),  # note swap!
849                    (User.id, 'b', 'users.id', ':id_1'),
850                    (User.id, literal('b'), 'users.id', ':param_1'),
851                    (User.id, User.id, 'users.id', 'users.id'),
852                    (literal('a'), 'b', ':param_1', ':param_2'),
853                    (literal('a'), User.id, ':param_1', 'users.id'),
854                    (literal('a'), literal('b'), ':param_1', ':param_2'),
855                    (ualias.id, literal('b'), 'users_1.id', ':param_1'),
856                    (User.id, ualias.name, 'users.id', 'users_1.name'),
857                    (User.name, ualias.name, 'users.name', 'users_1.name'),
858                    (ualias.name, User.name, 'users_1.name', 'users.name'),
859            ):
860
861                # the compiled clause should match either (e.g.):
862                # 'a' < 'b' -or- 'b' > 'a'.
863                compiled = str(py_op(lhs, rhs).compile(
864                    dialect=default.DefaultDialect()))
865                fwd_sql = "%s %s %s" % (l_sql, fwd_op, r_sql)
866                rev_sql = "%s %s %s" % (r_sql, rev_op, l_sql)
867
868                self.assert_(compiled == fwd_sql or compiled == rev_sql,
869                             "\n'" + compiled + "'\n does not match\n'" +
870                             fwd_sql + "'\n or\n'" + rev_sql + "'")
871
872    def test_o2m_compare_to_null(self):
873        User = self.classes.User
874
875        self._test(User.id == None, "users.id IS NULL")
876        self._test(User.id != None, "users.id IS NOT NULL")
877        self._test(~(User.id == None), "users.id IS NOT NULL")
878        self._test(~(User.id != None), "users.id IS NULL")
879        self._test(None == User.id, "users.id IS NULL")
880        self._test(~(None == User.id), "users.id IS NOT NULL")
881
882    def test_m2o_compare_to_null(self):
883        Address = self.classes.Address
884        self._test(Address.user == None, "addresses.user_id IS NULL")
885        self._test(~(Address.user == None), "addresses.user_id IS NOT NULL")
886        self._test(~(Address.user != None), "addresses.user_id IS NULL")
887        self._test(None == Address.user, "addresses.user_id IS NULL")
888        self._test(~(None == Address.user), "addresses.user_id IS NOT NULL")
889
890    def test_o2m_compare_to_null_orm_adapt(self):
891        User, Address = self.classes.User, self.classes.Address
892        self._test_filter_aliases(
893            User.id == None,
894            "users_1.id IS NULL", Address, Address.user),
895        self._test_filter_aliases(
896            User.id != None,
897            "users_1.id IS NOT NULL", Address, Address.user),
898        self._test_filter_aliases(
899            ~(User.id == None),
900            "users_1.id IS NOT NULL", Address, Address.user),
901        self._test_filter_aliases(
902            ~(User.id != None),
903            "users_1.id IS NULL", Address, Address.user),
904
905    def test_m2o_compare_to_null_orm_adapt(self):
906        User, Address = self.classes.User, self.classes.Address
907        self._test_filter_aliases(
908            Address.user == None,
909            "addresses_1.user_id IS NULL", User, User.addresses),
910        self._test_filter_aliases(
911            Address.user != None,
912            "addresses_1.user_id IS NOT NULL", User, User.addresses),
913        self._test_filter_aliases(
914            ~(Address.user == None),
915            "addresses_1.user_id IS NOT NULL", User, User.addresses),
916        self._test_filter_aliases(
917            ~(Address.user != None),
918            "addresses_1.user_id IS NULL", User, User.addresses),
919
920    def test_o2m_compare_to_null_aliased(self):
921        User = self.classes.User
922        u1 = aliased(User)
923        self._test(u1.id == None, "users_1.id IS NULL")
924        self._test(u1.id != None, "users_1.id IS NOT NULL")
925        self._test(~(u1.id == None), "users_1.id IS NOT NULL")
926        self._test(~(u1.id != None), "users_1.id IS NULL")
927
928    def test_m2o_compare_to_null_aliased(self):
929        Address = self.classes.Address
930        a1 = aliased(Address)
931        self._test(a1.user == None, "addresses_1.user_id IS NULL")
932        self._test(~(a1.user == None), "addresses_1.user_id IS NOT NULL")
933        self._test(a1.user != None, "addresses_1.user_id IS NOT NULL")
934        self._test(~(a1.user != None), "addresses_1.user_id IS NULL")
935
936    def test_relationship_unimplemented(self):
937        User = self.classes.User
938        for op in [
939            User.addresses.like,
940            User.addresses.ilike,
941            User.addresses.__le__,
942            User.addresses.__gt__,
943        ]:
944            assert_raises(NotImplementedError, op, "x")
945
946    def test_o2m_any(self):
947        User, Address = self.classes.User, self.classes.Address
948        self._test(
949            User.addresses.any(Address.id == 17),
950            "EXISTS (SELECT 1 FROM addresses "
951            "WHERE users.id = addresses.user_id AND addresses.id = :id_1)",
952            entity=User
953        )
954
955    def test_o2m_any_aliased(self):
956        User, Address = self.classes.User, self.classes.Address
957        u1 = aliased(User)
958        a1 = aliased(Address)
959        self._test(
960            u1.addresses.of_type(a1).any(a1.id == 17),
961            "EXISTS (SELECT 1 FROM addresses AS addresses_1 "
962            "WHERE users_1.id = addresses_1.user_id AND "
963            "addresses_1.id = :id_1)",
964            entity=u1
965        )
966
967    def test_o2m_any_orm_adapt(self):
968        User, Address = self.classes.User, self.classes.Address
969        self._test_filter_aliases(
970            User.addresses.any(Address.id == 17),
971            "EXISTS (SELECT 1 FROM addresses "
972            "WHERE users_1.id = addresses.user_id AND addresses.id = :id_1)",
973            Address, Address.user
974        )
975
976    def test_m2o_compare_instance(self):
977        User, Address = self.classes.User, self.classes.Address
978        u7 = User(id=5)
979        attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7))
980        u7.id = 7
981
982        self._test(Address.user == u7, ":param_1 = addresses.user_id")
983
984    def test_m2o_compare_instance_negated(self):
985        User, Address = self.classes.User, self.classes.Address
986        u7 = User(id=5)
987        attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7))
988        u7.id = 7
989
990        self._test(
991            Address.user != u7,
992            "addresses.user_id != :user_id_1 OR addresses.user_id IS NULL",
993            checkparams={'user_id_1': 7})
994
995    def test_m2o_compare_instance_orm_adapt(self):
996        User, Address = self.classes.User, self.classes.Address
997        u7 = User(id=5)
998        attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7))
999        u7.id = 7
1000
1001        self._test_filter_aliases(
1002            Address.user == u7,
1003            ":param_1 = addresses_1.user_id", User, User.addresses,
1004            checkparams={'param_1': 7}
1005        )
1006
1007    def test_m2o_compare_instance_negated_warn_on_none(self):
1008        User, Address = self.classes.User, self.classes.Address
1009
1010        u7_transient = User(id=None)
1011
1012        with expect_warnings("Got None for value of column users.id; "):
1013            self._test_filter_aliases(
1014                Address.user != u7_transient,
1015                "addresses_1.user_id != :user_id_1 "
1016                "OR addresses_1.user_id IS NULL",
1017                User, User.addresses,
1018                checkparams={'user_id_1': None}
1019            )
1020
1021    def test_m2o_compare_instance_negated_orm_adapt(self):
1022        User, Address = self.classes.User, self.classes.Address
1023        u7 = User(id=5)
1024        attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7))
1025        u7.id = 7
1026
1027        u7_transient = User(id=7)
1028
1029        self._test_filter_aliases(
1030            Address.user != u7,
1031            "addresses_1.user_id != :user_id_1 OR addresses_1.user_id IS NULL",
1032            User, User.addresses,
1033            checkparams={'user_id_1': 7}
1034        )
1035
1036        self._test_filter_aliases(
1037            ~(Address.user == u7), ":param_1 != addresses_1.user_id",
1038            User, User.addresses,
1039            checkparams={'param_1': 7}
1040        )
1041
1042        self._test_filter_aliases(
1043            ~(Address.user != u7),
1044            "NOT (addresses_1.user_id != :user_id_1 "
1045            "OR addresses_1.user_id IS NULL)", User, User.addresses,
1046            checkparams={'user_id_1': 7}
1047        )
1048
1049        self._test_filter_aliases(
1050            Address.user != u7_transient,
1051            "addresses_1.user_id != :user_id_1 OR addresses_1.user_id IS NULL",
1052            User, User.addresses,
1053            checkparams={'user_id_1': 7}
1054        )
1055
1056        self._test_filter_aliases(
1057            ~(Address.user == u7_transient), ":param_1 != addresses_1.user_id",
1058            User, User.addresses,
1059            checkparams={'param_1': 7}
1060        )
1061
1062        self._test_filter_aliases(
1063            ~(Address.user != u7_transient),
1064            "NOT (addresses_1.user_id != :user_id_1 "
1065            "OR addresses_1.user_id IS NULL)", User, User.addresses,
1066            checkparams={'user_id_1': 7}
1067        )
1068
1069    def test_m2o_compare_instance_aliased(self):
1070        User, Address = self.classes.User, self.classes.Address
1071        u7 = User(id=5)
1072        attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7))
1073        u7.id = 7
1074
1075        u7_transient = User(id=7)
1076
1077        a1 = aliased(Address)
1078        self._test(
1079            a1.user == u7,
1080            ":param_1 = addresses_1.user_id",
1081            checkparams={'param_1': 7})
1082
1083        self._test(
1084            a1.user != u7,
1085            "addresses_1.user_id != :user_id_1 OR addresses_1.user_id IS NULL",
1086            checkparams={'user_id_1': 7})
1087
1088        a1 = aliased(Address)
1089        self._test(
1090            a1.user == u7_transient,
1091            ":param_1 = addresses_1.user_id",
1092            checkparams={'param_1': 7})
1093
1094        self._test(
1095            a1.user != u7_transient,
1096            "addresses_1.user_id != :user_id_1 OR addresses_1.user_id IS NULL",
1097            checkparams={'user_id_1': 7})
1098
1099    def test_selfref_relationship(self):
1100
1101        Node = self.classes.Node
1102
1103        nalias = aliased(Node)
1104
1105        # auto self-referential aliasing
1106        self._test(
1107            Node.children.any(Node.data == 'n1'),
1108            "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE "
1109            "nodes.id = nodes_1.parent_id AND nodes_1.data = :data_1)",
1110            entity=Node,
1111            checkparams={'data_1': 'n1'}
1112        )
1113
1114        # needs autoaliasing
1115        self._test(
1116            Node.children == None,
1117            "NOT (EXISTS (SELECT 1 FROM nodes AS nodes_1 "
1118            "WHERE nodes.id = nodes_1.parent_id))",
1119            entity=Node,
1120            checkparams={}
1121        )
1122
1123        self._test(
1124            Node.parent == None,
1125            "nodes.parent_id IS NULL",
1126            checkparams={}
1127        )
1128
1129        self._test(
1130            nalias.parent == None,
1131            "nodes_1.parent_id IS NULL",
1132            checkparams={}
1133        )
1134
1135        self._test(
1136            nalias.parent != None,
1137            "nodes_1.parent_id IS NOT NULL",
1138            checkparams={}
1139        )
1140
1141        self._test(
1142            nalias.children == None,
1143            "NOT (EXISTS ("
1144            "SELECT 1 FROM nodes WHERE nodes_1.id = nodes.parent_id))",
1145            entity=nalias,
1146            checkparams={}
1147        )
1148
1149        self._test(
1150            nalias.children.any(Node.data == 'some data'),
1151            "EXISTS (SELECT 1 FROM nodes WHERE "
1152            "nodes_1.id = nodes.parent_id AND nodes.data = :data_1)",
1153            entity=nalias,
1154            checkparams={'data_1': 'some data'}
1155            )
1156
1157        # this fails because self-referential any() is auto-aliasing;
1158        # the fact that we use "nalias" here means we get two aliases.
1159        #self._test(
1160        #        Node.children.any(nalias.data == 'some data'),
1161        #        "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE "
1162        #        "nodes.id = nodes_1.parent_id AND nodes_1.data = :data_1)",
1163        #        entity=Node
1164        #        )
1165
1166        self._test(
1167            nalias.parent.has(Node.data == 'some data'),
1168            "EXISTS (SELECT 1 FROM nodes WHERE nodes.id = nodes_1.parent_id "
1169            "AND nodes.data = :data_1)",
1170            entity=nalias,
1171            checkparams={'data_1': 'some data'}
1172        )
1173
1174        self._test(
1175            Node.parent.has(Node.data == 'some data'),
1176            "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE "
1177            "nodes_1.id = nodes.parent_id AND nodes_1.data = :data_1)",
1178            entity=Node,
1179            checkparams={'data_1': 'some data'}
1180        )
1181
1182        self._test(
1183            Node.parent == Node(id=7),
1184            ":param_1 = nodes.parent_id",
1185            checkparams={"param_1": 7}
1186        )
1187
1188        self._test(
1189            nalias.parent == Node(id=7),
1190            ":param_1 = nodes_1.parent_id",
1191            checkparams={"param_1": 7}
1192        )
1193
1194        self._test(
1195            nalias.parent != Node(id=7),
1196            'nodes_1.parent_id != :parent_id_1 '
1197            'OR nodes_1.parent_id IS NULL',
1198            checkparams={"parent_id_1": 7}
1199        )
1200
1201        self._test(
1202            nalias.parent != Node(id=7),
1203            'nodes_1.parent_id != :parent_id_1 '
1204            'OR nodes_1.parent_id IS NULL',
1205            checkparams={"parent_id_1": 7}
1206        )
1207
1208        self._test(
1209            nalias.children.contains(Node(id=7, parent_id=12)),
1210            "nodes_1.id = :param_1",
1211            checkparams={"param_1": 12}
1212        )
1213
1214    def test_multilevel_any(self):
1215        User, Address, Dingaling = \
1216            self.classes.User, self.classes.Address, self.classes.Dingaling
1217        sess = Session()
1218
1219        q = sess.query(User).filter(
1220            User.addresses.any(
1221                and_(Address.id == Dingaling.address_id,
1222                    Dingaling.data == 'x')))
1223        # new since #2746 - correlate_except() now takes context into account
1224        # so its usage in any() is not as disrupting.
1225        self.assert_compile(
1226            q,
1227            "SELECT users.id AS users_id, users.name AS users_name "
1228            "FROM users "
1229            "WHERE EXISTS (SELECT 1 "
1230            "FROM addresses, dingalings "
1231            "WHERE users.id = addresses.user_id AND "
1232            "addresses.id = dingalings.address_id AND "
1233            "dingalings.data = :data_1)"
1234        )
1235
1236    def test_op(self):
1237        User = self.classes.User
1238
1239        self._test(User.name.op('ilike')('17'), "users.name ilike :name_1")
1240
1241    def test_in(self):
1242        User = self.classes.User
1243
1244        self._test(User.id.in_(['a', 'b']), "users.id IN (:id_1, :id_2)")
1245
1246    def test_in_on_relationship_not_supported(self):
1247        User, Address = self.classes.User, self.classes.Address
1248
1249        assert_raises(NotImplementedError, Address.user.in_, [User(id=5)])
1250
1251    def test_neg(self):
1252        User = self.classes.User
1253
1254        self._test(-User.id, "-users.id")
1255        self._test(User.id + -User.id, "users.id + -users.id")
1256
1257    def test_between(self):
1258        User = self.classes.User
1259
1260        self._test(
1261            User.id.between('a', 'b'), "users.id BETWEEN :id_1 AND :id_2")
1262
1263    def test_collate(self):
1264        User = self.classes.User
1265
1266        self._test(collate(User.id, 'binary'), "users.id COLLATE binary")
1267
1268        self._test(User.id.collate('binary'), "users.id COLLATE binary")
1269
1270    def test_selfref_between(self):
1271        User = self.classes.User
1272
1273        ualias = aliased(User)
1274        self._test(
1275            User.id.between(ualias.id, ualias.id),
1276            "users.id BETWEEN users_1.id AND users_1.id")
1277        self._test(
1278            ualias.id.between(User.id, User.id),
1279            "users_1.id BETWEEN users.id AND users.id")
1280
1281    def test_clauses(self):
1282        User, Address = self.classes.User, self.classes.Address
1283
1284        for (expr, compare) in (
1285            (func.max(User.id), "max(users.id)"),
1286            (User.id.desc(), "users.id DESC"),
1287            (between(5, User.id, Address.id),
1288            ":param_1 BETWEEN users.id AND addresses.id"),
1289            # this one would require adding compile() to
1290            # InstrumentedScalarAttribute.  do we want this ?
1291            # (User.id, "users.id")
1292        ):
1293            c = expr.compile(dialect=default.DefaultDialect())
1294            assert str(c) == compare, "%s != %s" % (str(c), compare)
1295
1296
1297class ExpressionTest(QueryTest, AssertsCompiledSQL):
1298    __dialect__ = 'default'
1299
1300    def test_deferred_instances(self):
1301        User, addresses, Address = (self.classes.User,
1302                                self.tables.addresses,
1303                                self.classes.Address)
1304
1305        session = create_session()
1306        s = session.query(User).filter(
1307            and_(addresses.c.email_address == bindparam('emailad'),
1308            Address.user_id == User.id)).statement
1309
1310        l = list(
1311            session.query(User).instances(s.execute(emailad='jack@bean.com')))
1312        eq_([User(id=7)], l)
1313
1314    def test_aliased_sql_construct(self):
1315        User, Address = self.classes.User, self.classes.Address
1316
1317        j = join(User, Address)
1318        a1 = aliased(j)
1319        self.assert_compile(
1320            a1.select(),
1321            "SELECT anon_1.users_id, anon_1.users_name, anon_1.addresses_id, "
1322            "anon_1.addresses_user_id, anon_1.addresses_email_address "
1323            "FROM (SELECT users.id AS users_id, users.name AS users_name, "
1324            "addresses.id AS addresses_id, addresses.user_id AS "
1325            "addresses_user_id, addresses.email_address AS "
1326            "addresses_email_address FROM users JOIN addresses "
1327            "ON users.id = addresses.user_id) AS anon_1"
1328        )
1329
1330    def test_aliased_sql_construct_raises_adapt_on_names(self):
1331        User, Address = self.classes.User, self.classes.Address
1332
1333        j = join(User, Address)
1334        assert_raises_message(
1335            sa_exc.ArgumentError,
1336            "adapt_on_names only applies to ORM elements",
1337            aliased, j, adapt_on_names=True
1338        )
1339
1340    def test_scalar_subquery_compile_whereclause(self):
1341        User = self.classes.User
1342        Address = self.classes.Address
1343
1344        session = create_session()
1345
1346        q = session.query(User.id).filter(User.id == 7)
1347
1348        q = session.query(Address).filter(Address.user_id == q)
1349        assert isinstance(q._criterion.right, expression.ColumnElement)
1350        self.assert_compile(
1351            q,
1352            "SELECT addresses.id AS addresses_id, addresses.user_id "
1353            "AS addresses_user_id, addresses.email_address AS "
1354            "addresses_email_address FROM addresses WHERE "
1355            "addresses.user_id = (SELECT users.id AS users_id "
1356            "FROM users WHERE users.id = :id_1)"
1357        )
1358
1359    def test_named_subquery(self):
1360        User = self.classes.User
1361
1362        session = create_session()
1363        a1 = session.query(User.id).filter(User.id == 7).subquery('foo1')
1364        a2 = session.query(User.id).filter(User.id == 7).subquery(name='foo2')
1365        a3 = session.query(User.id).filter(User.id == 7).subquery()
1366
1367        eq_(a1.name, 'foo1')
1368        eq_(a2.name, 'foo2')
1369        eq_(a3.name, '%%(%d anon)s' % id(a3))
1370
1371    def test_labeled_subquery(self):
1372        User = self.classes.User
1373
1374        session = create_session()
1375        a1 = session.query(User.id).filter(User.id == 7). \
1376            subquery(with_labels=True)
1377        assert a1.c.users_id is not None
1378
1379    def test_reduced_subquery(self):
1380        User = self.classes.User
1381        ua = aliased(User)
1382
1383        session = create_session()
1384        a1 = session.query(User.id, ua.id, ua.name).\
1385            filter(User.id == ua.id).subquery(reduce_columns=True)
1386        self.assert_compile(a1,
1387                "SELECT users.id, users_1.name FROM "
1388                "users, users AS users_1 WHERE users.id = users_1.id")
1389
1390    def test_label(self):
1391        User = self.classes.User
1392
1393        session = create_session()
1394
1395        q = session.query(User.id).filter(User.id == 7).label('foo')
1396        self.assert_compile(
1397            session.query(q),
1398            "SELECT (SELECT users.id FROM users WHERE users.id = :id_1) AS foo"
1399        )
1400
1401    def test_as_scalar(self):
1402        User = self.classes.User
1403
1404        session = create_session()
1405
1406        q = session.query(User.id).filter(User.id == 7).as_scalar()
1407
1408        self.assert_compile(session.query(User).filter(User.id.in_(q)),
1409                            'SELECT users.id AS users_id, users.name '
1410                            'AS users_name FROM users WHERE users.id '
1411                            'IN (SELECT users.id FROM users WHERE '
1412                            'users.id = :id_1)')
1413
1414    def test_param_transfer(self):
1415        User = self.classes.User
1416
1417        session = create_session()
1418
1419        q = session.query(User.id).filter(User.id == bindparam('foo')).\
1420            params(foo=7).subquery()
1421
1422        q = session.query(User).filter(User.id.in_(q))
1423
1424        eq_(User(id=7), q.one())
1425
1426    def test_in(self):
1427        User, Address = self.classes.User, self.classes.Address
1428
1429        session = create_session()
1430        s = session.query(User.id).join(User.addresses).group_by(User.id).\
1431            having(func.count(Address.id) > 2)
1432        eq_(session.query(User).filter(User.id.in_(s)).all(), [User(id=8)])
1433
1434    def test_union(self):
1435        User = self.classes.User
1436
1437        s = create_session()
1438
1439        q1 = s.query(User).filter(User.name == 'ed').with_labels()
1440        q2 = s.query(User).filter(User.name == 'fred').with_labels()
1441        eq_(
1442            s.query(User).from_statement(union(q1, q2).
1443            order_by('users_name')).all(), [User(name='ed'), User(name='fred')]
1444        )
1445
1446    def test_select(self):
1447        User = self.classes.User
1448
1449        s = create_session()
1450
1451        # this is actually not legal on most DBs since the subquery has no
1452        # alias
1453        q1 = s.query(User).filter(User.name == 'ed')
1454
1455        self.assert_compile(
1456            select([q1]),
1457            "SELECT users_id, users_name FROM (SELECT users.id AS users_id, "
1458            "users.name AS users_name FROM users WHERE users.name = :name_1)"
1459        )
1460
1461    def test_join(self):
1462        User, Address = self.classes.User, self.classes.Address
1463
1464        s = create_session()
1465
1466        # TODO: do we want aliased() to detect a query and convert to
1467        # subquery() automatically ?
1468        q1 = s.query(Address).filter(Address.email_address == 'jack@bean.com')
1469        adalias = aliased(Address, q1.subquery())
1470        eq_(
1471            s.query(User, adalias).join(adalias, User.id == adalias.user_id).
1472            all(),
1473            [
1474                (
1475                    User(id=7, name='jack'),
1476                    Address(email_address='jack@bean.com', user_id=7, id=1))])
1477
1478
1479class ColumnPropertyTest(_fixtures.FixtureTest, AssertsCompiledSQL):
1480    __dialect__ = 'default'
1481    run_setup_mappers = 'each'
1482
1483    def _fixture(self, label=True, polymorphic=False):
1484        User, Address = self.classes("User", "Address")
1485        users, addresses = self.tables("users", "addresses")
1486        stmt = select([func.max(addresses.c.email_address)]).\
1487            where(addresses.c.user_id == users.c.id).\
1488            correlate(users)
1489        if label:
1490            stmt = stmt.label("email_ad")
1491
1492        mapper(User, users, properties={
1493            "ead": column_property(stmt)
1494        }, with_polymorphic="*" if polymorphic else None)
1495        mapper(Address, addresses)
1496
1497    def _func_fixture(self, label=False):
1498        User = self.classes.User
1499        users = self.tables.users
1500
1501        if label:
1502            mapper(User, users, properties={
1503                "foobar": column_property(
1504                    func.foob(users.c.name).label(None)
1505                )
1506            })
1507        else:
1508            mapper(User, users, properties={
1509                "foobar": column_property(
1510                    func.foob(users.c.name)
1511                )
1512            })
1513
1514    def test_anon_label_function_auto(self):
1515        self._func_fixture()
1516        User = self.classes.User
1517
1518        s = Session()
1519
1520        u1 = aliased(User)
1521        self.assert_compile(
1522            s.query(User.foobar, u1.foobar),
1523            "SELECT foob(users.name) AS foob_1, foob(users_1.name) AS foob_2 "
1524            "FROM users, users AS users_1"
1525        )
1526
1527    def test_anon_label_function_manual(self):
1528        self._func_fixture(label=True)
1529        User = self.classes.User
1530
1531        s = Session()
1532
1533        u1 = aliased(User)
1534        self.assert_compile(
1535            s.query(User.foobar, u1.foobar),
1536            "SELECT foob(users.name) AS foob_1, foob(users_1.name) AS foob_2 "
1537            "FROM users, users AS users_1"
1538        )
1539
1540    def test_anon_label_ad_hoc_labeling(self):
1541        self._func_fixture()
1542        User = self.classes.User
1543
1544        s = Session()
1545
1546        u1 = aliased(User)
1547        self.assert_compile(
1548            s.query(User.foobar.label('x'), u1.foobar.label('y')),
1549            "SELECT foob(users.name) AS x, foob(users_1.name) AS y "
1550            "FROM users, users AS users_1"
1551        )
1552
1553
1554    def test_order_by_column_prop_string(self):
1555        User, Address = self.classes("User", "Address")
1556        self._fixture(label=True)
1557
1558        s = Session()
1559        q = s.query(User).order_by("email_ad")
1560        self.assert_compile(
1561            q,
1562            "SELECT (SELECT max(addresses.email_address) AS max_1 "
1563            "FROM addresses "
1564            "WHERE addresses.user_id = users.id) AS email_ad, "
1565            "users.id AS users_id, users.name AS users_name "
1566            "FROM users ORDER BY email_ad"
1567        )
1568
1569    def test_order_by_column_prop_aliased_string(self):
1570        User, Address = self.classes("User", "Address")
1571        self._fixture(label=True)
1572
1573        s = Session()
1574        ua = aliased(User)
1575        q = s.query(ua).order_by("email_ad")
1576
1577        def go():
1578            self.assert_compile(
1579                q,
1580                "SELECT (SELECT max(addresses.email_address) AS max_1 "
1581                "FROM addresses WHERE addresses.user_id = users_1.id) "
1582                "AS anon_1, users_1.id AS users_1_id, "
1583                "users_1.name AS users_1_name FROM users AS users_1 "
1584                "ORDER BY email_ad"
1585            )
1586        assert_warnings(
1587            go,
1588            ["Can't resolve label reference 'email_ad'"], regex=True)
1589
1590    def test_order_by_column_labeled_prop_attr_aliased_one(self):
1591        User = self.classes.User
1592        self._fixture(label=True)
1593
1594        ua = aliased(User)
1595        s = Session()
1596        q = s.query(ua).order_by(ua.ead)
1597        self.assert_compile(
1598            q,
1599            "SELECT (SELECT max(addresses.email_address) AS max_1 "
1600            "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, "
1601            "users_1.id AS users_1_id, users_1.name AS users_1_name "
1602            "FROM users AS users_1 ORDER BY anon_1"
1603        )
1604
1605    def test_order_by_column_labeled_prop_attr_aliased_two(self):
1606        User = self.classes.User
1607        self._fixture(label=True)
1608
1609        ua = aliased(User)
1610        s = Session()
1611        q = s.query(ua.ead).order_by(ua.ead)
1612        self.assert_compile(
1613            q,
1614            "SELECT (SELECT max(addresses.email_address) AS max_1 "
1615            "FROM addresses, "
1616            "users AS users_1 WHERE addresses.user_id = users_1.id) "
1617            "AS anon_1 ORDER BY anon_1"
1618        )
1619
1620        # we're also testing that the state of "ua" is OK after the
1621        # previous call, so the batching into one test is intentional
1622        q = s.query(ua).order_by(ua.ead)
1623        self.assert_compile(
1624            q,
1625            "SELECT (SELECT max(addresses.email_address) AS max_1 "
1626            "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, "
1627            "users_1.id AS users_1_id, users_1.name AS users_1_name "
1628            "FROM users AS users_1 ORDER BY anon_1"
1629        )
1630
1631    def test_order_by_column_labeled_prop_attr_aliased_three(self):
1632        User = self.classes.User
1633        self._fixture(label=True)
1634
1635        ua = aliased(User)
1636        s = Session()
1637        q = s.query(User.ead, ua.ead).order_by(User.ead, ua.ead)
1638        self.assert_compile(
1639            q,
1640            "SELECT (SELECT max(addresses.email_address) AS max_1 "
1641            "FROM addresses, users WHERE addresses.user_id = users.id) "
1642            "AS email_ad, (SELECT max(addresses.email_address) AS max_1 "
1643            "FROM addresses, users AS users_1 WHERE addresses.user_id = "
1644            "users_1.id) AS anon_1 ORDER BY email_ad, anon_1"
1645        )
1646
1647        q = s.query(User, ua).order_by(User.ead, ua.ead)
1648        self.assert_compile(
1649            q,
1650            "SELECT (SELECT max(addresses.email_address) AS max_1 "
1651            "FROM addresses WHERE addresses.user_id = users.id) AS "
1652            "email_ad, users.id AS users_id, users.name AS users_name, "
1653            "(SELECT max(addresses.email_address) AS max_1 FROM addresses "
1654            "WHERE addresses.user_id = users_1.id) AS anon_1, users_1.id "
1655            "AS users_1_id, users_1.name AS users_1_name FROM users, "
1656            "users AS users_1 ORDER BY email_ad, anon_1"
1657        )
1658
1659    def test_order_by_column_labeled_prop_attr_aliased_four(self):
1660        User = self.classes.User
1661        self._fixture(label=True, polymorphic=True)
1662
1663        ua = aliased(User)
1664        s = Session()
1665        q = s.query(ua, User.id).order_by(ua.ead)
1666        self.assert_compile(
1667            q,
1668            "SELECT (SELECT max(addresses.email_address) AS max_1 FROM "
1669            "addresses WHERE addresses.user_id = users_1.id) AS anon_1, "
1670            "users_1.id AS users_1_id, users_1.name AS users_1_name, "
1671            "users.id AS users_id FROM users AS users_1, users ORDER BY anon_1"
1672        )
1673
1674
1675    def test_order_by_column_unlabeled_prop_attr_aliased_one(self):
1676        User = self.classes.User
1677        self._fixture(label=False)
1678
1679        ua = aliased(User)
1680        s = Session()
1681        q = s.query(ua).order_by(ua.ead)
1682        self.assert_compile(
1683            q,
1684            "SELECT (SELECT max(addresses.email_address) AS max_1 "
1685            "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, "
1686            "users_1.id AS users_1_id, users_1.name AS users_1_name "
1687            "FROM users AS users_1 ORDER BY anon_1"
1688        )
1689
1690    def test_order_by_column_unlabeled_prop_attr_aliased_two(self):
1691        User = self.classes.User
1692        self._fixture(label=False)
1693
1694        ua = aliased(User)
1695        s = Session()
1696        q = s.query(ua.ead).order_by(ua.ead)
1697        self.assert_compile(
1698            q,
1699            "SELECT (SELECT max(addresses.email_address) AS max_1 "
1700            "FROM addresses, "
1701            "users AS users_1 WHERE addresses.user_id = users_1.id) "
1702            "AS anon_1 ORDER BY anon_1"
1703        )
1704
1705        # we're also testing that the state of "ua" is OK after the
1706        # previous call, so the batching into one test is intentional
1707        q = s.query(ua).order_by(ua.ead)
1708        self.assert_compile(
1709            q,
1710            "SELECT (SELECT max(addresses.email_address) AS max_1 "
1711            "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, "
1712            "users_1.id AS users_1_id, users_1.name AS users_1_name "
1713            "FROM users AS users_1 ORDER BY anon_1"
1714        )
1715
1716    def test_order_by_column_unlabeled_prop_attr_aliased_three(self):
1717        User = self.classes.User
1718        self._fixture(label=False)
1719
1720        ua = aliased(User)
1721        s = Session()
1722        q = s.query(User.ead, ua.ead).order_by(User.ead, ua.ead)
1723        self.assert_compile(
1724            q,
1725            "SELECT (SELECT max(addresses.email_address) AS max_1 "
1726            "FROM addresses, users WHERE addresses.user_id = users.id) "
1727            "AS anon_1, (SELECT max(addresses.email_address) AS max_1 "
1728            "FROM addresses, users AS users_1 "
1729            "WHERE addresses.user_id = users_1.id) AS anon_2 "
1730            "ORDER BY anon_1, anon_2"
1731        )
1732
1733        q = s.query(User, ua).order_by(User.ead, ua.ead)
1734        self.assert_compile(
1735            q,
1736            "SELECT (SELECT max(addresses.email_address) AS max_1 "
1737            "FROM addresses WHERE addresses.user_id = users.id) AS "
1738            "anon_1, users.id AS users_id, users.name AS users_name, "
1739            "(SELECT max(addresses.email_address) AS max_1 FROM addresses "
1740            "WHERE addresses.user_id = users_1.id) AS anon_2, users_1.id "
1741            "AS users_1_id, users_1.name AS users_1_name FROM users, "
1742            "users AS users_1 ORDER BY anon_1, anon_2"
1743        )
1744
1745    def test_order_by_column_prop_attr(self):
1746        User, Address = self.classes("User", "Address")
1747        self._fixture(label=True)
1748
1749        s = Session()
1750        q = s.query(User).order_by(User.ead)
1751        # this one is a bit of a surprise; this is compiler
1752        # label-order-by logic kicking in, but won't work in more
1753        # complex cases.
1754        self.assert_compile(
1755            q,
1756            "SELECT (SELECT max(addresses.email_address) AS max_1 "
1757            "FROM addresses "
1758            "WHERE addresses.user_id = users.id) AS email_ad, "
1759            "users.id AS users_id, users.name AS users_name "
1760            "FROM users ORDER BY email_ad"
1761        )
1762
1763    def test_order_by_column_prop_attr_non_present(self):
1764        User, Address = self.classes("User", "Address")
1765        self._fixture(label=True)
1766
1767        s = Session()
1768        q = s.query(User).options(defer(User.ead)).order_by(User.ead)
1769        self.assert_compile(
1770            q,
1771            "SELECT users.id AS users_id, users.name AS users_name "
1772            "FROM users ORDER BY (SELECT max(addresses.email_address) AS max_1 "
1773            "FROM addresses "
1774            "WHERE addresses.user_id = users.id)"
1775        )
1776
1777
1778class ComparatorTest(QueryTest):
1779    def test_clause_element_query_resolve(self):
1780        from sqlalchemy.orm.properties import ColumnProperty
1781        User = self.classes.User
1782
1783        class Comparator(ColumnProperty.Comparator):
1784            def __init__(self, expr):
1785                self.expr = expr
1786
1787            def __clause_element__(self):
1788                return self.expr
1789
1790        sess = Session()
1791        eq_(
1792            sess.query(Comparator(User.id)).order_by(Comparator(User.id)).all(),
1793            [(7, ), (8, ), (9, ), (10, )]
1794        )
1795
1796
1797# more slice tests are available in test/orm/generative.py
1798class SliceTest(QueryTest):
1799    def test_first(self):
1800        User = self.classes.User
1801
1802        assert User(id=7) == create_session().query(User).first()
1803
1804        assert create_session().query(User).filter(User.id == 27). \
1805            first() is None
1806
1807    def test_limit_offset_applies(self):
1808        """Test that the expected LIMIT/OFFSET is applied for slices.
1809
1810        The LIMIT/OFFSET syntax differs slightly on all databases, and
1811        query[x:y] executes immediately, so we are asserting against
1812        SQL strings using sqlite's syntax.
1813
1814        """
1815
1816        User = self.classes.User
1817
1818        sess = create_session()
1819        q = sess.query(User)
1820
1821        self.assert_sql(
1822            testing.db, lambda: q[10:20], [
1823                (
1824                    "SELECT users.id AS users_id, users.name "
1825                    "AS users_name FROM users LIMIT :param_1 OFFSET :param_2",
1826                    {'param_1': 10, 'param_2': 10})])
1827
1828        self.assert_sql(
1829            testing.db, lambda: q[:20], [
1830                (
1831                    "SELECT users.id AS users_id, users.name "
1832                    "AS users_name FROM users LIMIT :param_1",
1833                    {'param_1': 20})])
1834
1835        self.assert_sql(
1836            testing.db, lambda: q[5:], [
1837                (
1838                    "SELECT users.id AS users_id, users.name "
1839                    "AS users_name FROM users LIMIT -1 OFFSET :param_1",
1840                    {'param_1': 5})])
1841
1842        self.assert_sql(testing.db, lambda: q[2:2], [])
1843
1844        self.assert_sql(testing.db, lambda: q[-2:-5], [])
1845
1846        self.assert_sql(
1847            testing.db, lambda: q[-5:-2], [
1848                (
1849                    "SELECT users.id AS users_id, users.name AS users_name "
1850                    "FROM users", {})])
1851
1852        self.assert_sql(
1853            testing.db, lambda: q[-5:], [
1854                (
1855                    "SELECT users.id AS users_id, users.name AS users_name "
1856                    "FROM users", {})])
1857
1858        self.assert_sql(
1859            testing.db, lambda: q[:], [
1860                (
1861                    "SELECT users.id AS users_id, users.name AS users_name "
1862                    "FROM users", {})])
1863
1864
1865class FilterTest(QueryTest, AssertsCompiledSQL):
1866    __dialect__ = 'default'
1867
1868    def test_basic(self):
1869        User = self.classes.User
1870
1871        users = create_session().query(User).all()
1872        eq_([User(id=7), User(id=8), User(id=9), User(id=10)], users)
1873
1874    @testing.requires.offset
1875    def test_limit_offset(self):
1876        User = self.classes.User
1877
1878        sess = create_session()
1879
1880        assert [User(id=8), User(id=9)] == \
1881            sess.query(User).order_by(User.id).limit(2).offset(1).all()
1882
1883        assert [User(id=8), User(id=9)] == \
1884            list(sess.query(User).order_by(User.id)[1:3])
1885
1886        assert User(id=8) == sess.query(User).order_by(User.id)[1]
1887
1888        assert [] == sess.query(User).order_by(User.id)[3:3]
1889        assert [] == sess.query(User).order_by(User.id)[0:0]
1890
1891    @testing.requires.bound_limit_offset
1892    def test_select_with_bindparam_offset_limit(self):
1893        """Does a query allow bindparam for the limit?"""
1894        User = self.classes.User
1895        sess = create_session()
1896        q1 = sess.query(self.classes.User).\
1897            order_by(self.classes.User.id).limit(bindparam('n'))
1898
1899        for n in range(1, 4):
1900            result = q1.params(n=n).all()
1901            eq_(len(result), n)
1902
1903        eq_(
1904            sess.query(User).order_by(User.id).limit(bindparam('limit')).
1905            offset(bindparam('offset')).params(limit=2, offset=1).all(),
1906            [User(id=8), User(id=9)]
1907        )
1908
1909    @testing.fails_on("mysql", "doesn't like CAST in the limit clause")
1910    @testing.requires.bound_limit_offset
1911    def test_select_with_bindparam_offset_limit_w_cast(self):
1912        User = self.classes.User
1913        sess = create_session()
1914        q1 = sess.query(self.classes.User).\
1915            order_by(self.classes.User.id).limit(bindparam('n'))
1916        eq_(
1917            list(
1918                sess.query(User).params(a=1, b=3).order_by(User.id)
1919                [cast(bindparam('a'), Integer):cast(bindparam('b'), Integer)]),
1920            [User(id=8), User(id=9)]
1921        )
1922
1923    @testing.requires.boolean_col_expressions
1924    def test_exists(self):
1925        User = self.classes.User
1926
1927        sess = create_session(testing.db)
1928
1929        assert sess.query(exists().where(User.id == 9)).scalar()
1930        assert not sess.query(exists().where(User.id == 29)).scalar()
1931
1932    def test_one_filter(self):
1933        User = self.classes.User
1934
1935        assert [User(id=8), User(id=9)] == \
1936            create_session().query(User).filter(User.name.endswith('ed')).all()
1937
1938    def test_contains(self):
1939        """test comparing a collection to an object instance."""
1940
1941        User, Address = self.classes.User, self.classes.Address
1942
1943        sess = create_session()
1944        address = sess.query(Address).get(3)
1945        assert [User(id=8)] == \
1946            sess.query(User).filter(User.addresses.contains(address)).all()
1947
1948        try:
1949            sess.query(User).filter(User.addresses == address)
1950            assert False
1951        except sa_exc.InvalidRequestError:
1952            assert True
1953
1954        assert [User(id=10)] == \
1955            sess.query(User).filter(User.addresses == None).all()
1956
1957        try:
1958            assert [User(id=7), User(id=9), User(id=10)] == \
1959                sess.query(User).filter(User.addresses != address).all()
1960            assert False
1961        except sa_exc.InvalidRequestError:
1962            assert True
1963
1964        # assert [User(id=7), User(id=9), User(id=10)] ==
1965        # sess.query(User).filter(User.addresses!=address).all()
1966
1967    def test_clause_element_ok(self):
1968        User = self.classes.User
1969        s = Session()
1970        self.assert_compile(
1971            s.query(User).filter(User.addresses),
1972            "SELECT users.id AS users_id, users.name AS users_name "
1973            "FROM users, addresses WHERE users.id = addresses.user_id"
1974        )
1975
1976    def test_unique_binds_join_cond(self):
1977        """test that binds used when the lazyclause is used in criterion are
1978        unique"""
1979
1980        User, Address = self.classes.User, self.classes.Address
1981        sess = Session()
1982        a1, a2 = sess.query(Address).order_by(Address.id)[0:2]
1983        self.assert_compile(
1984            sess.query(User).filter(User.addresses.contains(a1)).union(
1985                sess.query(User).filter(User.addresses.contains(a2))
1986            ),
1987            "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS "
1988            "anon_1_users_name FROM (SELECT users.id AS users_id, "
1989            "users.name AS users_name FROM users WHERE users.id = :param_1 "
1990            "UNION SELECT users.id AS users_id, users.name AS users_name "
1991            "FROM users WHERE users.id = :param_2) AS anon_1",
1992            checkparams={'param_1': 7, 'param_2': 8}
1993        )
1994
1995    def test_any(self):
1996        User, Address = self.classes.User, self.classes.Address
1997
1998        sess = create_session()
1999
2000        assert [User(id=8), User(id=9)] == \
2001            sess.query(User). \
2002            filter(
2003                User.addresses.any(Address.email_address.like('%ed%'))).all()
2004
2005        assert [User(id=8)] == \
2006            sess.query(User). \
2007            filter(
2008                User.addresses.any(
2009                    Address.email_address.like('%ed%'), id=4)).all()
2010
2011        assert [User(id=8)] == \
2012            sess.query(User). \
2013            filter(User.addresses.any(Address.email_address.like('%ed%'))).\
2014            filter(User.addresses.any(id=4)).all()
2015
2016        assert [User(id=9)] == \
2017            sess.query(User). \
2018            filter(User.addresses.any(email_address='fred@fred.com')).all()
2019
2020        # test that any() doesn't overcorrelate
2021        assert [User(id=7), User(id=8)] == \
2022            sess.query(User).join("addresses"). \
2023            filter(
2024                ~User.addresses.any(
2025                    Address.email_address == 'fred@fred.com')).all()
2026
2027        # test that the contents are not adapted by the aliased join
2028        assert [User(id=7), User(id=8)] == \
2029            sess.query(User).join("addresses", aliased=True). \
2030            filter(
2031                ~User.addresses.any(
2032                    Address.email_address == 'fred@fred.com')).all()
2033
2034        assert [User(id=10)] == \
2035            sess.query(User).outerjoin("addresses", aliased=True). \
2036            filter(~User.addresses.any()).all()
2037
2038    def test_has(self):
2039        Dingaling, User, Address = (
2040            self.classes.Dingaling, self.classes.User, self.classes.Address)
2041
2042        sess = create_session()
2043        assert [Address(id=5)] == \
2044            sess.query(Address).filter(Address.user.has(name='fred')).all()
2045
2046        assert [Address(id=2), Address(id=3), Address(id=4), Address(id=5)] \
2047            == sess.query(Address). \
2048            filter(Address.user.has(User.name.like('%ed%'))). \
2049            order_by(Address.id).all()
2050
2051        assert [Address(id=2), Address(id=3), Address(id=4)] == \
2052            sess.query(Address). \
2053            filter(Address.user.has(User.name.like('%ed%'), id=8)). \
2054            order_by(Address.id).all()
2055
2056        # test has() doesn't overcorrelate
2057        assert [Address(id=2), Address(id=3), Address(id=4)] == \
2058            sess.query(Address).join("user"). \
2059            filter(Address.user.has(User.name.like('%ed%'), id=8)). \
2060            order_by(Address.id).all()
2061
2062        # test has() doesn't get subquery contents adapted by aliased join
2063        assert [Address(id=2), Address(id=3), Address(id=4)] == \
2064            sess.query(Address).join("user", aliased=True). \
2065            filter(Address.user.has(User.name.like('%ed%'), id=8)). \
2066            order_by(Address.id).all()
2067
2068        dingaling = sess.query(Dingaling).get(2)
2069        assert [User(id=9)] == \
2070            sess.query(User). \
2071            filter(User.addresses.any(Address.dingaling == dingaling)).all()
2072
2073    def test_contains_m2m(self):
2074        Item, Order = self.classes.Item, self.classes.Order
2075
2076        sess = create_session()
2077        item = sess.query(Item).get(3)
2078
2079        eq_(
2080            sess.query(Order).filter(Order.items.contains(item)).
2081            order_by(Order.id).all(),
2082            [Order(id=1), Order(id=2), Order(id=3)]
2083        )
2084        eq_(
2085            sess.query(Order).filter(~Order.items.contains(item)).
2086            order_by(Order.id).all(),
2087            [Order(id=4), Order(id=5)]
2088        )
2089
2090        item2 = sess.query(Item).get(5)
2091        eq_(
2092            sess.query(Order).filter(Order.items.contains(item)).
2093            filter(Order.items.contains(item2)).all(),
2094            [Order(id=3)]
2095        )
2096
2097    def test_comparison(self):
2098        """test scalar comparison to an object instance"""
2099
2100        Item, Order, Dingaling, User, Address = (
2101            self.classes.Item, self.classes.Order, self.classes.Dingaling,
2102            self.classes.User, self.classes.Address)
2103
2104        sess = create_session()
2105        user = sess.query(User).get(8)
2106        assert [Address(id=2), Address(id=3), Address(id=4)] == \
2107            sess.query(Address).filter(Address.user == user).all()
2108
2109        assert [Address(id=1), Address(id=5)] == \
2110            sess.query(Address).filter(Address.user != user).all()
2111
2112        # generates an IS NULL
2113        assert [] == sess.query(Address).filter(Address.user == None).all()
2114        assert [] == sess.query(Address).filter(Address.user == null()).all()
2115
2116        assert [Order(id=5)] == \
2117            sess.query(Order).filter(Order.address == None).all()
2118
2119        # o2o
2120        dingaling = sess.query(Dingaling).get(2)
2121        assert [Address(id=5)] == \
2122            sess.query(Address).filter(Address.dingaling == dingaling).all()
2123
2124        # m2m
2125        eq_(
2126            sess.query(Item).filter(Item.keywords == None).
2127            order_by(Item.id).all(), [Item(id=4), Item(id=5)])
2128        eq_(
2129            sess.query(Item).filter(Item.keywords != None).
2130            order_by(Item.id).all(), [Item(id=1), Item(id=2), Item(id=3)])
2131
2132    def test_filter_by(self):
2133        User, Address = self.classes.User, self.classes.Address
2134
2135        sess = create_session()
2136        user = sess.query(User).get(8)
2137        assert [Address(id=2), Address(id=3), Address(id=4)] == \
2138            sess.query(Address).filter_by(user=user).all()
2139
2140        # many to one generates IS NULL
2141        assert [] == sess.query(Address).filter_by(user=None).all()
2142        assert [] == sess.query(Address).filter_by(user=null()).all()
2143
2144        # one to many generates WHERE NOT EXISTS
2145        assert [User(name='chuck')] == \
2146            sess.query(User).filter_by(addresses=None).all()
2147        assert [User(name='chuck')] == \
2148            sess.query(User).filter_by(addresses=null()).all()
2149
2150
2151    def test_filter_by_tables(self):
2152        users = self.tables.users
2153        addresses = self.tables.addresses
2154        sess = create_session()
2155        self.assert_compile(
2156            sess.query(users).filter_by(name='ed').
2157            join(addresses, users.c.id == addresses.c.user_id).
2158            filter_by(email_address='ed@ed.com'),
2159            "SELECT users.id AS users_id, users.name AS users_name "
2160            "FROM users JOIN addresses ON users.id = addresses.user_id "
2161            "WHERE users.name = :name_1 AND "
2162            "addresses.email_address = :email_address_1",
2163            checkparams={'email_address_1': 'ed@ed.com', 'name_1': 'ed'}
2164        )
2165
2166    def test_filter_by_no_property(self):
2167        addresses = self.tables.addresses
2168        sess = create_session()
2169        assert_raises_message(
2170            sa.exc.InvalidRequestError,
2171            "Entity 'addresses' has no property 'name'",
2172            sess.query(addresses).filter_by, name='ed'
2173        )
2174
2175    def test_none_comparison(self):
2176        Order, User, Address = (
2177            self.classes.Order, self.classes.User, self.classes.Address)
2178
2179        sess = create_session()
2180
2181        # scalar
2182        eq_(
2183            [Order(description="order 5")],
2184            sess.query(Order).filter(Order.address_id == None).all()
2185        )
2186        eq_(
2187            [Order(description="order 5")],
2188            sess.query(Order).filter(Order.address_id == null()).all()
2189        )
2190
2191        # o2o
2192        eq_(
2193            [Address(id=1), Address(id=3), Address(id=4)],
2194            sess.query(Address).filter(Address.dingaling == None).
2195            order_by(Address.id).all())
2196        eq_(
2197            [Address(id=1), Address(id=3), Address(id=4)],
2198            sess.query(Address).filter(Address.dingaling == null()).
2199            order_by(Address.id).all())
2200        eq_(
2201            [Address(id=2), Address(id=5)],
2202            sess.query(Address).filter(Address.dingaling != None).
2203            order_by(Address.id).all())
2204        eq_(
2205            [Address(id=2), Address(id=5)],
2206            sess.query(Address).filter(Address.dingaling != null()).
2207            order_by(Address.id).all())
2208
2209        # m2o
2210        eq_(
2211            [Order(id=5)],
2212            sess.query(Order).filter(Order.address == None).all())
2213        eq_(
2214            [Order(id=1), Order(id=2), Order(id=3), Order(id=4)],
2215            sess.query(Order).order_by(Order.id).
2216            filter(Order.address != None).all())
2217
2218        # o2m
2219        eq_(
2220            [User(id=10)],
2221            sess.query(User).filter(User.addresses == None).all())
2222        eq_(
2223            [User(id=7), User(id=8), User(id=9)],
2224            sess.query(User).filter(User.addresses != None).
2225            order_by(User.id).all())
2226
2227    def test_blank_filter_by(self):
2228        User = self.classes.User
2229
2230        eq_(
2231            [(7,), (8,), (9,), (10,)],
2232            create_session().query(User.id).filter_by().order_by(User.id).all()
2233        )
2234        eq_(
2235            [(7,), (8,), (9,), (10,)],
2236            create_session().query(User.id).filter_by(**{}).
2237            order_by(User.id).all()
2238        )
2239
2240    def test_text_coerce(self):
2241        User = self.classes.User
2242        s = create_session()
2243        self.assert_compile(
2244            s.query(User).filter(text("name='ed'")),
2245            "SELECT users.id AS users_id, users.name "
2246            "AS users_name FROM users WHERE name='ed'"
2247        )
2248
2249
2250class SetOpsTest(QueryTest, AssertsCompiledSQL):
2251    __dialect__ = 'default'
2252
2253    def test_union(self):
2254        User = self.classes.User
2255
2256        s = create_session()
2257
2258        fred = s.query(User).filter(User.name == 'fred')
2259        ed = s.query(User).filter(User.name == 'ed')
2260        jack = s.query(User).filter(User.name == 'jack')
2261
2262        eq_(
2263            fred.union(ed).order_by(User.name).all(),
2264            [User(name='ed'), User(name='fred')]
2265        )
2266
2267        eq_(
2268            fred.union(ed, jack).order_by(User.name).all(),
2269            [User(name='ed'), User(name='fred'), User(name='jack')]
2270        )
2271
2272    def test_statement_labels(self):
2273        """test that label conflicts don't occur with joins etc."""
2274
2275        User, Address = self.classes.User, self.classes.Address
2276
2277        s = create_session()
2278        q1 = s.query(User, Address).join(User.addresses).\
2279            filter(Address.email_address == "ed@wood.com")
2280        q2 = s.query(User, Address).join(User.addresses).\
2281            filter(Address.email_address == "jack@bean.com")
2282        q3 = q1.union(q2).order_by(User.name)
2283
2284        eq_(
2285            q3.all(),
2286            [
2287                (User(name='ed'), Address(email_address="ed@wood.com")),
2288                (User(name='jack'), Address(email_address="jack@bean.com")),
2289            ]
2290        )
2291
2292    def test_union_literal_expressions_compile(self):
2293        """test that column expressions translate during
2294            the _from_statement() portion of union(), others"""
2295
2296        User = self.classes.User
2297
2298        s = Session()
2299        q1 = s.query(User, literal("x"))
2300        q2 = s.query(User, literal_column("'y'"))
2301        q3 = q1.union(q2)
2302
2303        self.assert_compile(
2304            q3,
2305            "SELECT anon_1.users_id AS anon_1_users_id, "
2306            "anon_1.users_name AS anon_1_users_name, "
2307            "anon_1.param_1 AS anon_1_param_1 "
2308            "FROM (SELECT users.id AS users_id, users.name AS "
2309            "users_name, :param_1 AS param_1 "
2310            "FROM users UNION SELECT users.id AS users_id, "
2311            "users.name AS users_name, 'y' FROM users) AS anon_1"
2312        )
2313
2314    def test_union_literal_expressions_results(self):
2315        User = self.classes.User
2316
2317        s = Session()
2318
2319        q1 = s.query(User, literal("x"))
2320        q2 = s.query(User, literal_column("'y'"))
2321        q3 = q1.union(q2)
2322
2323        q4 = s.query(User, literal_column("'x'").label('foo'))
2324        q5 = s.query(User, literal("y"))
2325        q6 = q4.union(q5)
2326
2327        eq_(
2328            [x['name'] for x in q6.column_descriptions],
2329            ['User', 'foo']
2330        )
2331
2332        for q in (
2333                q3.order_by(User.id, text("anon_1_param_1")),
2334                q6.order_by(User.id, "foo")):
2335            eq_(
2336                q.all(),
2337                [
2338                    (User(id=7, name='jack'), 'x'),
2339                    (User(id=7, name='jack'), 'y'),
2340                    (User(id=8, name='ed'), 'x'),
2341                    (User(id=8, name='ed'), 'y'),
2342                    (User(id=9, name='fred'), 'x'),
2343                    (User(id=9, name='fred'), 'y'),
2344                    (User(id=10, name='chuck'), 'x'),
2345                    (User(id=10, name='chuck'), 'y')
2346                ]
2347            )
2348
2349    def test_union_labeled_anonymous_columns(self):
2350        User = self.classes.User
2351
2352        s = Session()
2353
2354        c1, c2 = column('c1'), column('c2')
2355        q1 = s.query(User, c1.label('foo'), c1.label('bar'))
2356        q2 = s.query(User, c1.label('foo'), c2.label('bar'))
2357        q3 = q1.union(q2)
2358
2359        eq_(
2360            [x['name'] for x in q3.column_descriptions],
2361            ['User', 'foo', 'bar']
2362        )
2363
2364        self.assert_compile(
2365            q3,
2366            "SELECT anon_1.users_id AS anon_1_users_id, "
2367            "anon_1.users_name AS anon_1_users_name, "
2368            "anon_1.foo AS anon_1_foo, anon_1.bar AS anon_1_bar "
2369            "FROM (SELECT users.id AS users_id, users.name AS users_name, "
2370            "c1 AS foo, c1 AS bar FROM users UNION SELECT users.id AS "
2371            "users_id, users.name AS users_name, c1 AS foo, c2 AS bar "
2372            "FROM users) AS anon_1"
2373        )
2374
2375    def test_order_by_anonymous_col(self):
2376        User = self.classes.User
2377
2378        s = Session()
2379
2380        c1, c2 = column('c1'), column('c2')
2381        f = c1.label('foo')
2382        q1 = s.query(User, f, c2.label('bar'))
2383        q2 = s.query(User, c1.label('foo'), c2.label('bar'))
2384        q3 = q1.union(q2)
2385
2386        self.assert_compile(
2387            q3.order_by(c1),
2388            "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS "
2389            "anon_1_users_name, anon_1.foo AS anon_1_foo, anon_1.bar AS "
2390            "anon_1_bar FROM (SELECT users.id AS users_id, users.name AS "
2391            "users_name, c1 AS foo, c2 AS bar "
2392            "FROM users UNION SELECT users.id "
2393            "AS users_id, users.name AS users_name, c1 AS foo, c2 AS bar "
2394            "FROM users) AS anon_1 ORDER BY anon_1.foo"
2395        )
2396
2397        self.assert_compile(
2398            q3.order_by(f),
2399            "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS "
2400            "anon_1_users_name, anon_1.foo AS anon_1_foo, anon_1.bar AS "
2401            "anon_1_bar FROM (SELECT users.id AS users_id, users.name AS "
2402            "users_name, c1 AS foo, c2 AS bar "
2403            "FROM users UNION SELECT users.id "
2404            "AS users_id, users.name AS users_name, c1 AS foo, c2 AS bar "
2405            "FROM users) AS anon_1 ORDER BY anon_1.foo"
2406        )
2407
2408    def test_union_mapped_colnames_preserved_across_subquery(self):
2409        User = self.classes.User
2410
2411        s = Session()
2412        q1 = s.query(User.name)
2413        q2 = s.query(User.name)
2414
2415        # the label names in the subquery are the typical anonymized ones
2416        self.assert_compile(
2417            q1.union(q2),
2418            "SELECT anon_1.users_name AS anon_1_users_name "
2419            "FROM (SELECT users.name AS users_name FROM users "
2420            "UNION SELECT users.name AS users_name FROM users) AS anon_1"
2421        )
2422
2423        # but in the returned named tuples,
2424        # due to [ticket:1942], this should be 'name', not 'users_name'
2425        eq_(
2426            [x['name'] for x in q1.union(q2).column_descriptions],
2427            ['name']
2428        )
2429
2430    @testing.requires.intersect
2431    def test_intersect(self):
2432        User = self.classes.User
2433
2434        s = create_session()
2435
2436        fred = s.query(User).filter(User.name == 'fred')
2437        ed = s.query(User).filter(User.name == 'ed')
2438        jack = s.query(User).filter(User.name == 'jack')
2439        eq_(fred.intersect(ed, jack).all(), [])
2440
2441        eq_(fred.union(ed).intersect(ed.union(jack)).all(), [User(name='ed')])
2442
2443    def test_eager_load(self):
2444        User, Address = self.classes.User, self.classes.Address
2445
2446        s = create_session()
2447
2448        fred = s.query(User).filter(User.name == 'fred')
2449        ed = s.query(User).filter(User.name == 'ed')
2450
2451        def go():
2452            eq_(
2453                fred.union(ed).order_by(User.name).
2454                options(joinedload(User.addresses)).all(), [
2455                    User(
2456                        name='ed', addresses=[Address(), Address(),
2457                        Address()]),
2458                    User(name='fred', addresses=[Address()])]
2459            )
2460        self.assert_sql_count(testing.db, go, 1)
2461
2462
2463class AggregateTest(QueryTest):
2464
2465    def test_sum(self):
2466        Order = self.classes.Order
2467
2468        sess = create_session()
2469        orders = sess.query(Order).filter(Order.id.in_([2, 3, 4]))
2470        eq_(
2471            next(orders.values(func.sum(Order.user_id * Order.address_id))),
2472            (79,))
2473        eq_(orders.value(func.sum(Order.user_id * Order.address_id)), 79)
2474
2475    def test_apply(self):
2476        Order = self.classes.Order
2477
2478        sess = create_session()
2479        assert sess.query(func.sum(Order.user_id * Order.address_id)). \
2480            filter(Order.id.in_([2, 3, 4])).one() == (79,)
2481
2482    def test_having(self):
2483        User, Address = self.classes.User, self.classes.Address
2484
2485        sess = create_session()
2486        assert [User(name='ed', id=8)] == \
2487            sess.query(User).order_by(User.id).group_by(User). \
2488            join('addresses').having(func.count(Address.id) > 2).all()
2489
2490        assert [User(name='jack', id=7), User(name='fred', id=9)] == \
2491            sess.query(User).order_by(User.id).group_by(User). \
2492            join('addresses').having(func.count(Address.id) < 2).all()
2493
2494
2495class ExistsTest(QueryTest, AssertsCompiledSQL):
2496    __dialect__ = 'default'
2497
2498    def test_exists(self):
2499        User = self.classes.User
2500        sess = create_session()
2501
2502        q1 = sess.query(User)
2503        self.assert_compile(
2504            sess.query(q1.exists()),
2505            'SELECT EXISTS ('
2506            'SELECT 1 FROM users'
2507            ') AS anon_1'
2508        )
2509
2510        q2 = sess.query(User).filter(User.name == 'fred')
2511        self.assert_compile(
2512            sess.query(q2.exists()),
2513            'SELECT EXISTS ('
2514            'SELECT 1 FROM users WHERE users.name = :name_1'
2515            ') AS anon_1'
2516        )
2517
2518    def test_exists_col_warning(self):
2519        User = self.classes.User
2520        Address = self.classes.Address
2521        sess = create_session()
2522
2523        q1 = sess.query(User, Address).filter(User.id == Address.user_id)
2524        self.assert_compile(
2525            sess.query(q1.exists()),
2526            'SELECT EXISTS ('
2527            'SELECT 1 FROM users, addresses '
2528            'WHERE users.id = addresses.user_id'
2529            ') AS anon_1'
2530        )
2531
2532    def test_exists_w_select_from(self):
2533        User = self.classes.User
2534        sess = create_session()
2535
2536        q1 = sess.query().select_from(User).exists()
2537        self.assert_compile(
2538            sess.query(q1),
2539            'SELECT EXISTS (SELECT 1 FROM users) AS anon_1'
2540        )
2541
2542
2543class CountTest(QueryTest):
2544    def test_basic(self):
2545        users, User = self.tables.users, self.classes.User
2546
2547        s = create_session()
2548
2549        eq_(s.query(User).count(), 4)
2550
2551        eq_(s.query(User).filter(users.c.name.endswith('ed')).count(), 2)
2552
2553    def test_count_char(self):
2554        User = self.classes.User
2555        s = create_session()
2556        # '*' is favored here as the most common character,
2557        # it is reported that Informix doesn't like count(1),
2558        # rumors about Oracle preferring count(1) don't appear
2559        # to be well founded.
2560        self.assert_sql_execution(
2561            testing.db, s.query(User).count, CompiledSQL(
2562                "SELECT count(*) AS count_1 FROM "
2563                "(SELECT users.id AS users_id, users.name "
2564                "AS users_name FROM users) AS anon_1", {}
2565            )
2566        )
2567
2568    def test_multiple_entity(self):
2569        User, Address = self.classes.User, self.classes.Address
2570
2571        s = create_session()
2572        q = s.query(User, Address)
2573        eq_(q.count(), 20)  # cartesian product
2574
2575        q = s.query(User, Address).join(User.addresses)
2576        eq_(q.count(), 5)
2577
2578    def test_nested(self):
2579        User, Address = self.classes.User, self.classes.Address
2580
2581        s = create_session()
2582        q = s.query(User, Address).limit(2)
2583        eq_(q.count(), 2)
2584
2585        q = s.query(User, Address).limit(100)
2586        eq_(q.count(), 20)
2587
2588        q = s.query(User, Address).join(User.addresses).limit(100)
2589        eq_(q.count(), 5)
2590
2591    def test_cols(self):
2592        """test that column-based queries always nest."""
2593
2594        User, Address = self.classes.User, self.classes.Address
2595
2596        s = create_session()
2597
2598        q = s.query(func.count(distinct(User.name)))
2599        eq_(q.count(), 1)
2600
2601        q = s.query(func.count(distinct(User.name))).distinct()
2602        eq_(q.count(), 1)
2603
2604        q = s.query(User.name)
2605        eq_(q.count(), 4)
2606
2607        q = s.query(User.name, Address)
2608        eq_(q.count(), 20)
2609
2610        q = s.query(Address.user_id)
2611        eq_(q.count(), 5)
2612        eq_(q.distinct().count(), 3)
2613
2614
2615class DistinctTest(QueryTest):
2616    def test_basic(self):
2617        User = self.classes.User
2618
2619        eq_(
2620            [User(id=7), User(id=8), User(id=9), User(id=10)],
2621            create_session().query(User).order_by(User.id).distinct().all()
2622        )
2623        eq_(
2624            [User(id=7), User(id=9), User(id=8), User(id=10)],
2625            create_session().query(User).distinct().
2626            order_by(desc(User.name)).all()
2627        )
2628
2629    def test_joined(self):
2630        """test that orderbys from a joined table get placed into the columns
2631        clause when DISTINCT is used"""
2632
2633        User, Address = self.classes.User, self.classes.Address
2634
2635        sess = create_session()
2636        q = sess.query(User).join('addresses').distinct(). \
2637            order_by(desc(Address.email_address))
2638
2639        assert [User(id=7), User(id=9), User(id=8)] == q.all()
2640
2641        sess.expunge_all()
2642
2643        # test that it works on embedded joinedload/LIMIT subquery
2644        q = sess.query(User).join('addresses').distinct(). \
2645            options(joinedload('addresses')).\
2646            order_by(desc(Address.email_address)).limit(2)
2647
2648        def go():
2649            assert [
2650                User(id=7, addresses=[
2651                    Address(id=1)
2652                ]),
2653                User(id=9, addresses=[
2654                    Address(id=5)
2655                ]),
2656            ] == q.all()
2657        self.assert_sql_count(testing.db, go, 1)
2658
2659
2660class PrefixWithTest(QueryTest, AssertsCompiledSQL):
2661
2662    def test_one_prefix(self):
2663        User = self.classes.User
2664        sess = create_session()
2665        query = sess.query(User.name)\
2666            .prefix_with('PREFIX_1')
2667        expected = "SELECT PREFIX_1 "\
2668            "users.name AS users_name FROM users"
2669        self.assert_compile(query, expected, dialect=default.DefaultDialect())
2670
2671    def test_many_prefixes(self):
2672        User = self.classes.User
2673        sess = create_session()
2674        query = sess.query(User.name).prefix_with('PREFIX_1', 'PREFIX_2')
2675        expected = "SELECT PREFIX_1 PREFIX_2 "\
2676            "users.name AS users_name FROM users"
2677        self.assert_compile(query, expected, dialect=default.DefaultDialect())
2678
2679    def test_chained_prefixes(self):
2680        User = self.classes.User
2681        sess = create_session()
2682        query = sess.query(User.name)\
2683            .prefix_with('PREFIX_1')\
2684            .prefix_with('PREFIX_2', 'PREFIX_3')
2685        expected = "SELECT PREFIX_1 PREFIX_2 PREFIX_3 "\
2686            "users.name AS users_name FROM users"
2687        self.assert_compile(query, expected, dialect=default.DefaultDialect())
2688
2689
2690class YieldTest(_fixtures.FixtureTest):
2691    run_setup_mappers = 'each'
2692    run_inserts = 'each'
2693
2694    def _eagerload_mappings(self, addresses_lazy=True, user_lazy=True):
2695        User, Address = self.classes("User", "Address")
2696        users, addresses = self.tables("users", "addresses")
2697        mapper(User, users, properties={
2698            "addresses": relationship(
2699                Address, lazy=addresses_lazy,
2700                backref=backref("user", lazy=user_lazy)
2701            )
2702        })
2703        mapper(Address, addresses)
2704
2705    def test_basic(self):
2706        self._eagerload_mappings()
2707
2708        User = self.classes.User
2709
2710        sess = create_session()
2711        q = iter(
2712            sess.query(User).yield_per(1).from_statement(
2713                text("select * from users")))
2714
2715        ret = []
2716        eq_(len(sess.identity_map), 0)
2717        ret.append(next(q))
2718        ret.append(next(q))
2719        eq_(len(sess.identity_map), 2)
2720        ret.append(next(q))
2721        ret.append(next(q))
2722        eq_(len(sess.identity_map), 4)
2723        try:
2724            next(q)
2725            assert False
2726        except StopIteration:
2727            pass
2728
2729    def test_yield_per_and_execution_options(self):
2730        self._eagerload_mappings()
2731
2732        User = self.classes.User
2733
2734        sess = create_session()
2735        q = sess.query(User).yield_per(15)
2736        q = q.execution_options(foo='bar')
2737        assert q._yield_per
2738        eq_(
2739            q._execution_options,
2740            {"stream_results": True, "foo": "bar", "max_row_buffer": 15})
2741
2742    def test_no_joinedload_opt(self):
2743        self._eagerload_mappings()
2744
2745        User = self.classes.User
2746        sess = create_session()
2747        q = sess.query(User).options(joinedload("addresses")).yield_per(1)
2748        assert_raises_message(
2749            sa_exc.InvalidRequestError,
2750            "The yield_per Query option is currently not compatible with "
2751            "joined collection eager loading.  Please specify ",
2752            q.all
2753        )
2754
2755    def test_no_subqueryload_opt(self):
2756        self._eagerload_mappings()
2757
2758        User = self.classes.User
2759        sess = create_session()
2760        q = sess.query(User).options(subqueryload("addresses")).yield_per(1)
2761        assert_raises_message(
2762            sa_exc.InvalidRequestError,
2763            "The yield_per Query option is currently not compatible with "
2764            "subquery eager loading.  Please specify ",
2765            q.all
2766        )
2767
2768    def test_no_subqueryload_mapping(self):
2769        self._eagerload_mappings(addresses_lazy="subquery")
2770
2771        User = self.classes.User
2772        sess = create_session()
2773        q = sess.query(User).yield_per(1)
2774        assert_raises_message(
2775            sa_exc.InvalidRequestError,
2776            "The yield_per Query option is currently not compatible with "
2777            "subquery eager loading.  Please specify ",
2778            q.all
2779        )
2780
2781    def test_joinedload_m2o_ok(self):
2782        self._eagerload_mappings(user_lazy="joined")
2783        Address = self.classes.Address
2784        sess = create_session()
2785        q = sess.query(Address).yield_per(1)
2786        q.all()
2787
2788    def test_eagerload_opt_disable(self):
2789        self._eagerload_mappings()
2790
2791        User = self.classes.User
2792        sess = create_session()
2793        q = sess.query(User).options(subqueryload("addresses")).\
2794            enable_eagerloads(False).yield_per(1)
2795        q.all()
2796
2797        q = sess.query(User).options(joinedload("addresses")).\
2798            enable_eagerloads(False).yield_per(1)
2799        q.all()
2800
2801    def test_m2o_joinedload_not_others(self):
2802        self._eagerload_mappings(addresses_lazy="joined")
2803        Address = self.classes.Address
2804        sess = create_session()
2805        q = sess.query(Address).options(
2806            lazyload('*'), joinedload("user")).yield_per(1).filter_by(id=1)
2807
2808        def go():
2809            result = q.all()
2810            assert result[0].user
2811        self.assert_sql_count(testing.db, go, 1)
2812
2813
2814class HintsTest(QueryTest, AssertsCompiledSQL):
2815    __dialect__ = 'default'
2816
2817    def test_hints(self):
2818        User = self.classes.User
2819
2820        from sqlalchemy.dialects import mysql
2821        dialect = mysql.dialect()
2822
2823        sess = create_session()
2824
2825        self.assert_compile(
2826            sess.query(User).with_hint(
2827                User, 'USE INDEX (col1_index,col2_index)'),
2828            "SELECT users.id AS users_id, users.name AS users_name "
2829            "FROM users USE INDEX (col1_index,col2_index)",
2830            dialect=dialect
2831        )
2832
2833        self.assert_compile(
2834            sess.query(User).with_hint(
2835                User, 'WITH INDEX col1_index', 'sybase'),
2836            "SELECT users.id AS users_id, users.name AS users_name "
2837            "FROM users", dialect=dialect
2838        )
2839
2840        ualias = aliased(User)
2841        self.assert_compile(
2842            sess.query(User, ualias).with_hint(
2843                ualias, 'USE INDEX (col1_index,col2_index)').
2844            join(ualias, ualias.id > User.id),
2845            "SELECT users.id AS users_id, users.name AS users_name, "
2846            "users_1.id AS users_1_id, users_1.name AS users_1_name "
2847            "FROM users INNER JOIN users AS users_1 "
2848            "USE INDEX (col1_index,col2_index) "
2849            "ON users_1.id > users.id", dialect=dialect
2850        )
2851
2852    def test_statement_hints(self):
2853        User = self.classes.User
2854
2855        sess = create_session()
2856        stmt = sess.query(User).\
2857            with_statement_hint("test hint one").\
2858            with_statement_hint("test hint two").\
2859            with_statement_hint("test hint three", "postgresql")
2860
2861        self.assert_compile(
2862            stmt,
2863            "SELECT users.id AS users_id, users.name AS users_name "
2864            "FROM users test hint one test hint two",
2865        )
2866
2867        self.assert_compile(
2868            stmt,
2869            "SELECT users.id AS users_id, users.name AS users_name "
2870            "FROM users test hint one test hint two test hint three",
2871            dialect='postgresql'
2872        )
2873
2874
2875class TextTest(QueryTest, AssertsCompiledSQL):
2876    __dialect__ = 'default'
2877
2878    def test_fulltext(self):
2879        User = self.classes.User
2880
2881        with expect_warnings("Textual SQL"):
2882            eq_(
2883                create_session().query(User).
2884                from_statement("select * from users order by id").all(),
2885                [User(id=7), User(id=8), User(id=9), User(id=10)]
2886            )
2887
2888        eq_(
2889            create_session().query(User).from_statement(
2890                text("select * from users order by id")).first(), User(id=7)
2891        )
2892        eq_(
2893            create_session().query(User).from_statement(
2894                text("select * from users where name='nonexistent'")).first(),
2895            None)
2896
2897    def test_fragment(self):
2898        User = self.classes.User
2899
2900        with expect_warnings("Textual SQL expression"):
2901            eq_(
2902                create_session().query(User).filter("id in (8, 9)").all(),
2903                [User(id=8), User(id=9)]
2904
2905            )
2906
2907            eq_(
2908                create_session().query(User).filter("name='fred'").
2909                filter("id=9").all(), [User(id=9)]
2910            )
2911            eq_(
2912                create_session().query(User).filter("name='fred'").
2913                filter(User.id == 9).all(), [User(id=9)]
2914            )
2915
2916    def test_binds_coerce(self):
2917        User = self.classes.User
2918
2919        with expect_warnings("Textual SQL expression"):
2920            eq_(
2921                create_session().query(User).filter("id in (:id1, :id2)").
2922                params(id1=8, id2=9).all(), [User(id=8), User(id=9)]
2923            )
2924
2925    def test_as_column(self):
2926        User = self.classes.User
2927
2928        s = create_session()
2929        assert_raises(
2930            sa_exc.InvalidRequestError, s.query,
2931            User.id, text("users.name"))
2932
2933        eq_(
2934            s.query(User.id, "name").order_by(User.id).all(),
2935            [(7, 'jack'), (8, 'ed'), (9, 'fred'), (10, 'chuck')])
2936
2937    def test_via_select(self):
2938        User = self.classes.User
2939        s = create_session()
2940        eq_(
2941            s.query(User).from_statement(
2942                select([column('id'), column('name')]).
2943                select_from(table('users')).order_by('id'),
2944            ).all(),
2945            [User(id=7), User(id=8), User(id=9), User(id=10)]
2946        )
2947
2948    def test_via_textasfrom_from_statement(self):
2949        User = self.classes.User
2950        s = create_session()
2951
2952        eq_(
2953            s.query(User).from_statement(
2954                text("select * from users order by id").
2955                columns(id=Integer, name=String)).all(),
2956            [User(id=7), User(id=8), User(id=9), User(id=10)]
2957        )
2958
2959    def test_via_textasfrom_use_mapped_columns(self):
2960        User = self.classes.User
2961        s = create_session()
2962
2963        eq_(
2964            s.query(User).from_statement(
2965                text("select * from users order by id").
2966                columns(User.id, User.name)).all(),
2967            [User(id=7), User(id=8), User(id=9), User(id=10)]
2968        )
2969
2970    def test_via_textasfrom_select_from(self):
2971        User = self.classes.User
2972        s = create_session()
2973
2974        eq_(
2975            s.query(User).select_from(
2976                text("select * from users").columns(id=Integer, name=String)
2977            ).order_by(User.id).all(),
2978            [User(id=7), User(id=8), User(id=9), User(id=10)]
2979        )
2980
2981    def test_group_by_accepts_text(self):
2982        User = self.classes.User
2983        s = create_session()
2984
2985        q = s.query(User).group_by(text("name"))
2986        self.assert_compile(
2987            q,
2988            "SELECT users.id AS users_id, users.name AS users_name "
2989            "FROM users GROUP BY name"
2990        )
2991
2992    def test_orm_columns_accepts_text(self):
2993        from sqlalchemy.orm.base import _orm_columns
2994        t = text("x")
2995        eq_(
2996            _orm_columns(t),
2997            [t]
2998        )
2999
3000    def test_order_by_w_eager_one(self):
3001        User = self.classes.User
3002        s = create_session()
3003
3004        # from 1.0.0 thru 1.0.2, the "name" symbol here was considered
3005        # to be part of the things we need to ORDER BY and it was being
3006        # placed into the inner query's columns clause, as part of
3007        # query._compound_eager_statement where we add unwrap_order_by()
3008        # to the columns clause.  However, as #3392 illustrates, unlocatable
3009        # string expressions like "name desc" will only fail in this scenario,
3010        # so in general the changing of the query structure with string labels
3011        # is dangerous.
3012        #
3013        # the queries here are again "invalid" from a SQL perspective, as the
3014        # "name" field isn't matched up to anything.
3015        #
3016        with expect_warnings("Can't resolve label reference 'name';"):
3017            self.assert_compile(
3018                s.query(User).options(joinedload("addresses")).
3019                order_by(desc("name")).limit(1),
3020                "SELECT anon_1.users_id AS anon_1_users_id, "
3021                "anon_1.users_name AS anon_1_users_name, "
3022                "addresses_1.id AS addresses_1_id, "
3023                "addresses_1.user_id AS addresses_1_user_id, "
3024                "addresses_1.email_address AS addresses_1_email_address "
3025                "FROM (SELECT users.id AS users_id, users.name AS users_name "
3026                "FROM users ORDER BY users.name "
3027                "DESC LIMIT :param_1) AS anon_1 "
3028                "LEFT OUTER JOIN addresses AS addresses_1 "
3029                "ON anon_1.users_id = addresses_1.user_id "
3030                "ORDER BY name DESC, addresses_1.id"
3031            )
3032
3033    def test_order_by_w_eager_two(self):
3034        User = self.classes.User
3035        s = create_session()
3036
3037        with expect_warnings("Can't resolve label reference 'name';"):
3038            self.assert_compile(
3039                s.query(User).options(joinedload("addresses")).
3040                order_by("name").limit(1),
3041                "SELECT anon_1.users_id AS anon_1_users_id, "
3042                "anon_1.users_name AS anon_1_users_name, "
3043                "addresses_1.id AS addresses_1_id, "
3044                "addresses_1.user_id AS addresses_1_user_id, "
3045                "addresses_1.email_address AS addresses_1_email_address "
3046                "FROM (SELECT users.id AS users_id, users.name AS users_name "
3047                "FROM users ORDER BY users.name "
3048                "LIMIT :param_1) AS anon_1 "
3049                "LEFT OUTER JOIN addresses AS addresses_1 "
3050                "ON anon_1.users_id = addresses_1.user_id "
3051                "ORDER BY name, addresses_1.id"
3052            )
3053
3054    def test_order_by_w_eager_three(self):
3055        User = self.classes.User
3056        s = create_session()
3057
3058        self.assert_compile(
3059            s.query(User).options(joinedload("addresses")).
3060            order_by("users_name").limit(1),
3061            "SELECT anon_1.users_id AS anon_1_users_id, "
3062            "anon_1.users_name AS anon_1_users_name, "
3063            "addresses_1.id AS addresses_1_id, "
3064            "addresses_1.user_id AS addresses_1_user_id, "
3065            "addresses_1.email_address AS addresses_1_email_address "
3066            "FROM (SELECT users.id AS users_id, users.name AS users_name "
3067            "FROM users ORDER BY users.name "
3068            "LIMIT :param_1) AS anon_1 "
3069            "LEFT OUTER JOIN addresses AS addresses_1 "
3070            "ON anon_1.users_id = addresses_1.user_id "
3071            "ORDER BY anon_1.users_name, addresses_1.id"
3072        )
3073
3074        # however! this works (again?)
3075        eq_(
3076            s.query(User).options(joinedload("addresses")).
3077            order_by("users_name").first(),
3078            User(name='chuck', addresses=[])
3079        )
3080
3081    def test_order_by_w_eager_four(self):
3082        User = self.classes.User
3083        Address = self.classes.Address
3084        s = create_session()
3085
3086        self.assert_compile(
3087            s.query(User).options(joinedload("addresses")).
3088            order_by(desc("users_name")).limit(1),
3089            "SELECT anon_1.users_id AS anon_1_users_id, "
3090            "anon_1.users_name AS anon_1_users_name, "
3091            "addresses_1.id AS addresses_1_id, "
3092            "addresses_1.user_id AS addresses_1_user_id, "
3093            "addresses_1.email_address AS addresses_1_email_address "
3094            "FROM (SELECT users.id AS users_id, users.name AS users_name "
3095            "FROM users ORDER BY users.name DESC "
3096            "LIMIT :param_1) AS anon_1 "
3097            "LEFT OUTER JOIN addresses AS addresses_1 "
3098            "ON anon_1.users_id = addresses_1.user_id "
3099            "ORDER BY anon_1.users_name DESC, addresses_1.id"
3100        )
3101
3102        # however! this works (again?)
3103        eq_(
3104            s.query(User).options(joinedload("addresses")).
3105            order_by(desc("users_name")).first(),
3106            User(name='jack', addresses=[Address()])
3107        )
3108
3109    def test_order_by_w_eager_five(self):
3110        """essentially the same as test_eager_relations -> test_limit_3,
3111        but test for textual label elements that are freeform.
3112        this is again #3392."""
3113
3114        User = self.classes.User
3115        Address = self.classes.Address
3116        Order = self.classes.Order
3117
3118        sess = create_session()
3119
3120        q = sess.query(User, Address.email_address.label('email_address'))
3121
3122        l = q.join('addresses').options(joinedload(User.orders)).\
3123            order_by(
3124            "email_address desc").limit(1).offset(0)
3125        with expect_warnings(
3126                "Can't resolve label reference 'email_address desc'"):
3127            eq_(
3128                [
3129                    (User(
3130                        id=7,
3131                        orders=[Order(id=1), Order(id=3), Order(id=5)],
3132                        addresses=[Address(id=1)]
3133                    ), 'jack@bean.com')
3134                ],
3135                l.all())
3136
3137
3138class TextWarningTest(QueryTest, AssertsCompiledSQL):
3139    def _test(self, fn, arg, offending_clause, expected):
3140        assert_raises_message(
3141            sa.exc.SAWarning,
3142            r"Textual (?:SQL|column|SQL FROM) expression %(stmt)r should be "
3143            r"explicitly declared (?:with|as) text\(%(stmt)r\)" % {
3144                "stmt": util.ellipses_string(offending_clause),
3145            },
3146            fn, arg
3147        )
3148
3149        with expect_warnings("Textual "):
3150            stmt = fn(arg)
3151            self.assert_compile(stmt, expected)
3152
3153    def test_filter(self):
3154        User = self.classes.User
3155        self._test(
3156            Session().query(User.id).filter, "myid == 5", "myid == 5",
3157            "SELECT users.id AS users_id FROM users WHERE myid == 5"
3158        )
3159
3160    def test_having(self):
3161        User = self.classes.User
3162        self._test(
3163            Session().query(User.id).having, "myid == 5", "myid == 5",
3164            "SELECT users.id AS users_id FROM users HAVING myid == 5"
3165        )
3166
3167    def test_from_statement(self):
3168        User = self.classes.User
3169        self._test(
3170            Session().query(User.id).from_statement,
3171            "select id from user",
3172            "select id from user",
3173            "select id from user",
3174        )
3175
3176
3177class ParentTest(QueryTest, AssertsCompiledSQL):
3178    __dialect__ = 'default'
3179
3180    def test_o2m(self):
3181        User, orders, Order = (
3182            self.classes.User, self.tables.orders, self.classes.Order)
3183
3184        sess = create_session()
3185        q = sess.query(User)
3186
3187        u1 = q.filter_by(name='jack').one()
3188
3189        # test auto-lookup of property
3190        o = sess.query(Order).with_parent(u1).all()
3191        assert [Order(description="order 1"), Order(description="order 3"),
3192            Order(description="order 5")] == o
3193
3194        # test with explicit property
3195        o = sess.query(Order).with_parent(u1, property='orders').all()
3196        assert [Order(description="order 1"), Order(description="order 3"),
3197            Order(description="order 5")] == o
3198
3199        o = sess.query(Order).with_parent(u1, property=User.orders).all()
3200        assert [Order(description="order 1"), Order(description="order 3"),
3201            Order(description="order 5")] == o
3202
3203        o = sess.query(Order).filter(with_parent(u1, User.orders)).all()
3204        assert [
3205            Order(description="order 1"), Order(description="order 3"),
3206            Order(description="order 5")] == o
3207
3208        # test generative criterion
3209        o = sess.query(Order).with_parent(u1).filter(orders.c.id > 2).all()
3210        assert [
3211            Order(description="order 3"), Order(description="order 5")] == o
3212
3213        # test against None for parent? this can't be done with the current
3214        # API since we don't know what mapper to use
3215        # assert
3216        #     sess.query(Order).with_parent(None, property='addresses').all()
3217        #     == [Order(description="order 5")]
3218
3219    def test_select_from(self):
3220        User, Address = self.classes.User, self.classes.Address
3221
3222        sess = create_session()
3223        u1 = sess.query(User).get(7)
3224        q = sess.query(Address).select_from(Address).with_parent(u1)
3225        self.assert_compile(
3226            q,
3227            "SELECT addresses.id AS addresses_id, "
3228            "addresses.user_id AS addresses_user_id, "
3229            "addresses.email_address AS addresses_email_address "
3230            "FROM addresses WHERE :param_1 = addresses.user_id",
3231            {'param_1': 7}
3232        )
3233
3234    @testing.fails("issue #3607")
3235    def test_select_from_alias(self):
3236        User, Address = self.classes.User, self.classes.Address
3237
3238        sess = create_session()
3239        u1 = sess.query(User).get(7)
3240        a1 = aliased(Address)
3241        q = sess.query(a1).with_parent(u1)
3242        self.assert_compile(
3243            q,
3244            "SELECT addresses_1.id AS addresses_1_id, "
3245            "addresses_1.user_id AS addresses_1_user_id, "
3246            "addresses_1.email_address AS addresses_1_email_address "
3247            "FROM addresses AS addresses_1 "
3248            "WHERE :param_1 = addresses_1.user_id",
3249            {'param_1': 7}
3250        )
3251
3252    def test_noparent(self):
3253        Item, User = self.classes.Item, self.classes.User
3254
3255        sess = create_session()
3256        q = sess.query(User)
3257
3258        u1 = q.filter_by(name='jack').one()
3259
3260        try:
3261            q = sess.query(Item).with_parent(u1)
3262            assert False
3263        except sa_exc.InvalidRequestError as e:
3264            assert str(e) \
3265                == "Could not locate a property which relates "\
3266                "instances of class 'Item' to instances of class 'User'"
3267
3268    def test_m2m(self):
3269        Item, Keyword = self.classes.Item, self.classes.Keyword
3270
3271        sess = create_session()
3272        i1 = sess.query(Item).filter_by(id=2).one()
3273        k = sess.query(Keyword).with_parent(i1).all()
3274        assert [
3275            Keyword(name='red'), Keyword(name='small'),
3276            Keyword(name='square')] == k
3277
3278    def test_with_transient(self):
3279        User, Order = self.classes.User, self.classes.Order
3280
3281        sess = Session()
3282
3283        q = sess.query(User)
3284        u1 = q.filter_by(name='jack').one()
3285        utrans = User(id=u1.id)
3286        o = sess.query(Order).with_parent(utrans, 'orders')
3287        eq_(
3288            [
3289                Order(description="order 1"), Order(description="order 3"),
3290                Order(description="order 5")],
3291            o.all()
3292        )
3293
3294        o = sess.query(Order).filter(with_parent(utrans, 'orders'))
3295        eq_(
3296            [
3297                Order(description="order 1"), Order(description="order 3"),
3298                Order(description="order 5")],
3299            o.all()
3300        )
3301
3302
3303    def test_with_pending_autoflush(self):
3304        Order, User = self.classes.Order, self.classes.User
3305
3306        sess = Session()
3307
3308        o1 = sess.query(Order).first()
3309        opending = Order(id=20, user_id=o1.user_id)
3310        sess.add(opending)
3311        eq_(
3312            sess.query(User).with_parent(opending, 'user').one(),
3313            User(id=o1.user_id)
3314        )
3315        eq_(
3316            sess.query(User).filter(with_parent(opending, 'user')).one(),
3317            User(id=o1.user_id)
3318        )
3319
3320    def test_with_pending_no_autoflush(self):
3321        Order, User = self.classes.Order, self.classes.User
3322
3323        sess = Session(autoflush=False)
3324
3325        o1 = sess.query(Order).first()
3326        opending = Order(user_id=o1.user_id)
3327        sess.add(opending)
3328        eq_(
3329            sess.query(User).with_parent(opending, 'user').one(),
3330            User(id=o1.user_id)
3331        )
3332
3333    def test_unique_binds_union(self):
3334        """bindparams used in the 'parent' query are unique"""
3335        User, Address = self.classes.User, self.classes.Address
3336
3337        sess = Session()
3338        u1, u2 = sess.query(User).order_by(User.id)[0:2]
3339
3340        q1 = sess.query(Address).with_parent(u1, 'addresses')
3341        q2 = sess.query(Address).with_parent(u2, 'addresses')
3342
3343        self.assert_compile(
3344            q1.union(q2),
3345            "SELECT anon_1.addresses_id AS anon_1_addresses_id, "
3346            "anon_1.addresses_user_id AS anon_1_addresses_user_id, "
3347            "anon_1.addresses_email_address AS "
3348            "anon_1_addresses_email_address FROM (SELECT addresses.id AS "
3349            "addresses_id, addresses.user_id AS addresses_user_id, "
3350            "addresses.email_address AS addresses_email_address FROM "
3351            "addresses WHERE :param_1 = addresses.user_id UNION SELECT "
3352            "addresses.id AS addresses_id, addresses.user_id AS "
3353            "addresses_user_id, addresses.email_address "
3354            "AS addresses_email_address "
3355            "FROM addresses WHERE :param_2 = addresses.user_id) AS anon_1",
3356            checkparams={'param_1': 7, 'param_2': 8},
3357        )
3358
3359    def test_unique_binds_or(self):
3360        User, Address = self.classes.User, self.classes.Address
3361
3362        sess = Session()
3363        u1, u2 = sess.query(User).order_by(User.id)[0:2]
3364
3365        self.assert_compile(
3366            sess.query(Address).filter(
3367                or_(with_parent(u1, 'addresses'), with_parent(u2, 'addresses'))
3368            ),
3369            "SELECT addresses.id AS addresses_id, addresses.user_id AS "
3370            "addresses_user_id, addresses.email_address AS "
3371            "addresses_email_address FROM addresses WHERE "
3372            ":param_1 = addresses.user_id OR :param_2 = addresses.user_id",
3373            checkparams={'param_1': 7, 'param_2': 8},
3374        )
3375
3376
3377class WithTransientOnNone(_fixtures.FixtureTest, AssertsCompiledSQL):
3378    run_inserts = None
3379    __dialect__ = 'default'
3380
3381    def _fixture1(self):
3382        User, Address = self.classes.User, self.classes.Address
3383        users, addresses = self.tables.users, self.tables.addresses
3384
3385        mapper(User, users)
3386        mapper(Address, addresses, properties={
3387            'user': relationship(User),
3388            'special_user': relationship(
3389                User, primaryjoin=and_(
3390                    users.c.id == addresses.c.user_id,
3391                    users.c.name == addresses.c.email_address))
3392        })
3393
3394    def test_filter_with_transient_assume_pk(self):
3395        self._fixture1()
3396        User, Address = self.classes.User, self.classes.Address
3397
3398        sess = Session()
3399
3400        q = sess.query(Address).filter(Address.user == User())
3401        with expect_warnings("Got None for value of column "):
3402            self.assert_compile(
3403                q,
3404                "SELECT addresses.id AS addresses_id, "
3405                "addresses.user_id AS addresses_user_id, "
3406                "addresses.email_address AS addresses_email_address "
3407                "FROM addresses WHERE :param_1 = addresses.user_id",
3408                checkparams={'param_1': None}
3409            )
3410
3411    def test_filter_with_transient_warn_for_none_against_non_pk(self):
3412        self._fixture1()
3413        User, Address = self.classes.User, self.classes.Address
3414
3415        s = Session()
3416        q = s.query(Address).filter(Address.special_user == User())
3417        with expect_warnings("Got None for value of column"):
3418
3419            self.assert_compile(
3420                q,
3421                "SELECT addresses.id AS addresses_id, "
3422                "addresses.user_id AS addresses_user_id, "
3423                "addresses.email_address AS addresses_email_address "
3424                "FROM addresses WHERE :param_1 = addresses.user_id "
3425                "AND :param_2 = addresses.email_address",
3426                checkparams={"param_1": None, "param_2": None}
3427            )
3428
3429    def test_with_parent_with_transient_assume_pk(self):
3430        self._fixture1()
3431        User, Address = self.classes.User, self.classes.Address
3432
3433        sess = Session()
3434
3435        q = sess.query(User).with_parent(Address(), "user")
3436        with expect_warnings("Got None for value of column"):
3437            self.assert_compile(
3438                q,
3439                "SELECT users.id AS users_id, users.name AS users_name "
3440                "FROM users WHERE users.id = :param_1",
3441                checkparams={'param_1': None}
3442            )
3443
3444    def test_with_parent_with_transient_warn_for_none_against_non_pk(self):
3445        self._fixture1()
3446        User, Address = self.classes.User, self.classes.Address
3447
3448        s = Session()
3449        q = s.query(User).with_parent(Address(), "special_user")
3450        with expect_warnings("Got None for value of column"):
3451
3452            self.assert_compile(
3453                q,
3454                "SELECT users.id AS users_id, users.name AS users_name "
3455                "FROM users WHERE users.id = :param_1 "
3456                "AND users.name = :param_2",
3457                checkparams={"param_1": None, "param_2": None}
3458            )
3459
3460    def test_negated_contains_or_equals_plain_m2o(self):
3461        self._fixture1()
3462        User, Address = self.classes.User, self.classes.Address
3463
3464        s = Session()
3465        q = s.query(Address).filter(Address.user != User())
3466        with expect_warnings("Got None for value of column"):
3467            self.assert_compile(
3468                q,
3469
3470                "SELECT addresses.id AS addresses_id, "
3471                "addresses.user_id AS addresses_user_id, "
3472                "addresses.email_address AS addresses_email_address "
3473                "FROM addresses "
3474                "WHERE addresses.user_id != :user_id_1 "
3475                "OR addresses.user_id IS NULL",
3476                checkparams={'user_id_1': None}
3477            )
3478
3479    def test_negated_contains_or_equals_complex_rel(self):
3480        self._fixture1()
3481        User, Address = self.classes.User, self.classes.Address
3482
3483        s = Session()
3484
3485        # this one does *not* warn because we do the criteria
3486        # without deferral
3487        q = s.query(Address).filter(Address.special_user != User())
3488        self.assert_compile(
3489            q,
3490            "SELECT addresses.id AS addresses_id, "
3491            "addresses.user_id AS addresses_user_id, "
3492            "addresses.email_address AS addresses_email_address "
3493            "FROM addresses "
3494            "WHERE NOT (EXISTS (SELECT 1 "
3495            "FROM users "
3496            "WHERE users.id = addresses.user_id AND "
3497            "users.name = addresses.email_address AND users.id IS NULL))",
3498            checkparams={}
3499        )
3500
3501
3502class SynonymTest(QueryTest, AssertsCompiledSQL):
3503    __dialect__ = 'default'
3504
3505    @classmethod
3506    def setup_mappers(cls):
3507        users, Keyword, items, order_items, orders, Item, User, \
3508            Address, keywords, Order, item_keywords, addresses = \
3509            cls.tables.users, cls.classes.Keyword, cls.tables.items, \
3510            cls.tables.order_items, cls.tables.orders, \
3511            cls.classes.Item, cls.classes.User, cls.classes.Address, \
3512            cls.tables.keywords, cls.classes.Order, \
3513            cls.tables.item_keywords, cls.tables.addresses
3514
3515        mapper(User, users, properties={
3516            'name_syn': synonym('name'),
3517            'addresses': relationship(Address),
3518            'orders': relationship(
3519                Order, backref='user', order_by=orders.c.id),  # o2m, m2o
3520            'orders_syn': synonym('orders'),
3521            'orders_syn_2': synonym('orders_syn')
3522        })
3523        mapper(Address, addresses)
3524        mapper(Order, orders, properties={
3525            'items': relationship(Item, secondary=order_items),  # m2m
3526            'address': relationship(Address),  # m2o
3527            'items_syn': synonym('items')
3528        })
3529        mapper(Item, items, properties={
3530            'keywords': relationship(Keyword, secondary=item_keywords)  # m2m
3531        })
3532        mapper(Keyword, keywords)
3533
3534    def test_options(self):
3535        User, Order = self.classes.User, self.classes.Order
3536
3537        s = create_session()
3538
3539        def go():
3540            result = s.query(User).filter_by(name='jack').\
3541                options(joinedload(User.orders_syn)).all()
3542            eq_(result, [
3543                User(id=7, name='jack', orders=[
3544                    Order(description='order 1'),
3545                    Order(description='order 3'),
3546                    Order(description='order 5')
3547                ])
3548            ])
3549        self.assert_sql_count(testing.db, go, 1)
3550
3551    def test_options_syn_of_syn(self):
3552        User, Order = self.classes.User, self.classes.Order
3553
3554        s = create_session()
3555
3556        def go():
3557            result = s.query(User).filter_by(name='jack').\
3558                options(joinedload(User.orders_syn_2)).all()
3559            eq_(result, [
3560                User(id=7, name='jack', orders=[
3561                    Order(description='order 1'),
3562                    Order(description='order 3'),
3563                    Order(description='order 5')
3564                ])
3565            ])
3566        self.assert_sql_count(testing.db, go, 1)
3567
3568    def test_options_syn_of_syn_string(self):
3569        User, Order = self.classes.User, self.classes.Order
3570
3571        s = create_session()
3572
3573        def go():
3574            result = s.query(User).filter_by(name='jack').\
3575                options(joinedload('orders_syn_2')).all()
3576            eq_(result, [
3577                User(id=7, name='jack', orders=[
3578                    Order(description='order 1'),
3579                    Order(description='order 3'),
3580                    Order(description='order 5')
3581                ])
3582            ])
3583        self.assert_sql_count(testing.db, go, 1)
3584
3585    def test_joins(self):
3586        User, Order = self.classes.User, self.classes.Order
3587
3588        for j in (
3589            ['orders', 'items'],
3590            ['orders_syn', 'items'],
3591            [User.orders_syn, Order.items],
3592            ['orders_syn_2', 'items'],
3593            [User.orders_syn_2, 'items'],
3594            ['orders', 'items_syn'],
3595            ['orders_syn', 'items_syn'],
3596            ['orders_syn_2', 'items_syn'],
3597        ):
3598            result = create_session().query(User).join(*j).filter_by(id=3). \
3599                all()
3600            assert [User(id=7, name='jack'), User(id=9, name='fred')] == result
3601
3602    def test_with_parent(self):
3603        Order, User = self.classes.Order, self.classes.User
3604
3605        for nameprop, orderprop in (
3606            ('name', 'orders'),
3607            ('name_syn', 'orders'),
3608            ('name', 'orders_syn'),
3609            ('name', 'orders_syn_2'),
3610            ('name_syn', 'orders_syn'),
3611            ('name_syn', 'orders_syn_2'),
3612        ):
3613            sess = create_session()
3614            q = sess.query(User)
3615
3616            u1 = q.filter_by(**{nameprop: 'jack'}).one()
3617
3618            o = sess.query(Order).with_parent(u1, property=orderprop).all()
3619            assert [
3620                Order(description="order 1"), Order(description="order 3"),
3621                Order(description="order 5")] == o
3622
3623    def test_froms_aliased_col(self):
3624        Address, User = self.classes.Address, self.classes.User
3625
3626        sess = create_session()
3627        ua = aliased(User)
3628
3629        q = sess.query(ua.name_syn).join(
3630            Address, ua.id == Address.user_id)
3631        self.assert_compile(
3632            q,
3633            "SELECT users_1.name AS users_1_name FROM "
3634            "users AS users_1 JOIN addresses ON users_1.id = addresses.user_id"
3635        )
3636
3637
3638class ImmediateTest(_fixtures.FixtureTest):
3639    run_inserts = 'once'
3640    run_deletes = None
3641
3642    @classmethod
3643    def setup_mappers(cls):
3644        Address, addresses, users, User = (cls.classes.Address,
3645                                cls.tables.addresses,
3646                                cls.tables.users,
3647                                cls.classes.User)
3648
3649        mapper(Address, addresses)
3650
3651        mapper(User, users, properties=dict(
3652            addresses=relationship(Address)))
3653
3654    def test_one(self):
3655        User, Address = self.classes.User, self.classes.Address
3656
3657        sess = create_session()
3658
3659        assert_raises(
3660            sa.orm.exc.NoResultFound,
3661            sess.query(User).filter(User.id == 99).one)
3662
3663        eq_(sess.query(User).filter(User.id == 7).one().id, 7)
3664
3665        assert_raises(sa.orm.exc.MultipleResultsFound, sess.query(User).one)
3666
3667        assert_raises(
3668            sa.orm.exc.NoResultFound,
3669            sess.query(User.id, User.name).filter(User.id == 99).one)
3670
3671        eq_(sess.query(User.id, User.name).filter(User.id == 7).one(),
3672            (7, 'jack'))
3673
3674        assert_raises(
3675            sa.orm.exc.MultipleResultsFound,
3676            sess.query(User.id, User.name).one)
3677
3678        assert_raises(
3679            sa.orm.exc.NoResultFound,
3680            (sess.query(User, Address).join(User.addresses).
3681           filter(Address.id == 99)).one)
3682
3683        eq_((sess.query(User, Address).
3684            join(User.addresses).
3685            filter(Address.id == 4)).one(),
3686           (User(id=8), Address(id=4)))
3687
3688        assert_raises(
3689            sa.orm.exc.MultipleResultsFound,
3690            sess.query(User, Address).join(User.addresses).one)
3691
3692        # this result returns multiple rows, the first
3693        # two rows being the same.  but uniquing is
3694        # not applied for a column based result.
3695        assert_raises(
3696            sa.orm.exc.MultipleResultsFound,
3697            sess.query(User.id).join(User.addresses).
3698            filter(User.id.in_([8, 9])).order_by(User.id).one)
3699
3700        # test that a join which ultimately returns
3701        # multiple identities across many rows still
3702        # raises, even though the first two rows are of
3703        # the same identity and unique filtering
3704        # is applied ([ticket:1688])
3705        assert_raises(
3706            sa.orm.exc.MultipleResultsFound,
3707            sess.query(User).join(User.addresses).filter(User.id.in_([8, 9])).
3708            order_by(User.id).one)
3709
3710    def test_one_or_none(self):
3711        User, Address = self.classes.User, self.classes.Address
3712
3713        sess = create_session()
3714
3715        eq_(sess.query(User).filter(User.id == 99).one_or_none(), None)
3716
3717        eq_(sess.query(User).filter(User.id == 7).one_or_none().id, 7)
3718
3719        assert_raises_message(
3720            sa.orm.exc.MultipleResultsFound,
3721            "Multiple rows were found for one_or_none\(\)",
3722            sess.query(User).one_or_none)
3723
3724        eq_(sess.query(User.id, User.name).filter(User.id == 99).one_or_none(), None)
3725
3726        eq_(sess.query(User.id, User.name).filter(User.id == 7).one_or_none(),
3727            (7, 'jack'))
3728
3729        assert_raises(
3730            sa.orm.exc.MultipleResultsFound,
3731            sess.query(User.id, User.name).one_or_none)
3732
3733        eq_(
3734            (sess.query(User, Address).join(User.addresses).
3735           filter(Address.id == 99)).one_or_none(), None)
3736
3737        eq_((sess.query(User, Address).
3738            join(User.addresses).
3739            filter(Address.id == 4)).one_or_none(),
3740           (User(id=8), Address(id=4)))
3741
3742        assert_raises(
3743            sa.orm.exc.MultipleResultsFound,
3744            sess.query(User, Address).join(User.addresses).one_or_none)
3745
3746        # this result returns multiple rows, the first
3747        # two rows being the same.  but uniquing is
3748        # not applied for a column based result.
3749        assert_raises(
3750            sa.orm.exc.MultipleResultsFound,
3751            sess.query(User.id).join(User.addresses).
3752            filter(User.id.in_([8, 9])).order_by(User.id).one_or_none)
3753
3754        # test that a join which ultimately returns
3755        # multiple identities across many rows still
3756        # raises, even though the first two rows are of
3757        # the same identity and unique filtering
3758        # is applied ([ticket:1688])
3759        assert_raises(
3760            sa.orm.exc.MultipleResultsFound,
3761            sess.query(User).join(User.addresses).filter(User.id.in_([8, 9])).
3762            order_by(User.id).one_or_none)
3763
3764    @testing.future
3765    def test_getslice(self):
3766        assert False
3767
3768    def test_scalar(self):
3769        User = self.classes.User
3770
3771        sess = create_session()
3772
3773        eq_(sess.query(User.id).filter_by(id=7).scalar(), 7)
3774        eq_(sess.query(User.id, User.name).filter_by(id=7).scalar(), 7)
3775        eq_(sess.query(User.id).filter_by(id=0).scalar(), None)
3776        eq_(sess.query(User).filter_by(id=7).scalar(),
3777            sess.query(User).filter_by(id=7).one())
3778
3779        assert_raises(sa.orm.exc.MultipleResultsFound, sess.query(User).scalar)
3780        assert_raises(
3781            sa.orm.exc.MultipleResultsFound,
3782            sess.query(User.id, User.name).scalar)
3783
3784    def test_value(self):
3785        User = self.classes.User
3786
3787        sess = create_session()
3788
3789        eq_(sess.query(User).filter_by(id=7).value(User.id), 7)
3790        eq_(sess.query(User.id, User.name).filter_by(id=7).value(User.id), 7)
3791        eq_(sess.query(User).filter_by(id=0).value(User.id), None)
3792
3793        sess.bind = testing.db
3794        eq_(sess.query().value(sa.literal_column('1').label('x')), 1)
3795
3796
3797class ExecutionOptionsTest(QueryTest):
3798
3799    def test_option_building(self):
3800        User = self.classes.User
3801
3802        sess = create_session(bind=testing.db, autocommit=False)
3803
3804        q1 = sess.query(User)
3805        assert q1._execution_options == dict()
3806        q2 = q1.execution_options(foo='bar', stream_results=True)
3807        # q1's options should be unchanged.
3808        assert q1._execution_options == dict()
3809        # q2 should have them set.
3810        assert q2._execution_options == dict(foo='bar', stream_results=True)
3811        q3 = q2.execution_options(foo='not bar', answer=42)
3812        assert q2._execution_options == dict(foo='bar', stream_results=True)
3813
3814        q3_options = dict(foo='not bar', stream_results=True, answer=42)
3815        assert q3._execution_options == q3_options
3816
3817    def test_options_in_connection(self):
3818        User = self.classes.User
3819
3820        execution_options = dict(foo='bar', stream_results=True)
3821
3822        class TQuery(Query):
3823            def instances(self, result, ctx):
3824                try:
3825                    eq_(
3826                        result.connection._execution_options,
3827                        execution_options)
3828                finally:
3829                    result.close()
3830                return iter([])
3831
3832        sess = create_session(
3833            bind=testing.db, autocommit=False, query_cls=TQuery)
3834        q1 = sess.query(User).execution_options(**execution_options)
3835        q1.all()
3836
3837
3838class BooleanEvalTest(fixtures.TestBase, testing.AssertsCompiledSQL):
3839    """test standalone booleans being wrapped in an AsBoolean, as well
3840    as true/false compilation."""
3841
3842    def _dialect(self, native_boolean):
3843        d = default.DefaultDialect()
3844        d.supports_native_boolean = native_boolean
3845        return d
3846
3847    def test_one(self):
3848        s = Session()
3849        c = column('x', Boolean)
3850        self.assert_compile(
3851            s.query(c).filter(c),
3852            "SELECT x WHERE x",
3853            dialect=self._dialect(True)
3854        )
3855
3856    def test_two(self):
3857        s = Session()
3858        c = column('x', Boolean)
3859        self.assert_compile(
3860            s.query(c).filter(c),
3861            "SELECT x WHERE x = 1",
3862            dialect=self._dialect(False)
3863        )
3864
3865    def test_three(self):
3866        s = Session()
3867        c = column('x', Boolean)
3868        self.assert_compile(
3869            s.query(c).filter(~c),
3870            "SELECT x WHERE x = 0",
3871            dialect=self._dialect(False)
3872        )
3873
3874    def test_four(self):
3875        s = Session()
3876        c = column('x', Boolean)
3877        self.assert_compile(
3878            s.query(c).filter(~c),
3879            "SELECT x WHERE NOT x",
3880            dialect=self._dialect(True)
3881        )
3882
3883    def test_five(self):
3884        s = Session()
3885        c = column('x', Boolean)
3886        self.assert_compile(
3887            s.query(c).having(c),
3888            "SELECT x HAVING x = 1",
3889            dialect=self._dialect(False)
3890        )
3891
3892
3893class SessionBindTest(QueryTest):
3894
3895    @contextlib.contextmanager
3896    def _assert_bind_args(self, session):
3897        get_bind = mock.Mock(side_effect=session.get_bind)
3898        with mock.patch.object(session, "get_bind", get_bind):
3899            yield
3900        for call_ in get_bind.mock_calls:
3901            is_(call_[1][0], inspect(self.classes.User))
3902            is_not_(call_[2]['clause'], None)
3903
3904    def test_single_entity_q(self):
3905        User = self.classes.User
3906        session = Session()
3907        with self._assert_bind_args(session):
3908            session.query(User).all()
3909
3910    def test_sql_expr_entity_q(self):
3911        User = self.classes.User
3912        session = Session()
3913        with self._assert_bind_args(session):
3914            session.query(User.id).all()
3915
3916    def test_count(self):
3917        User = self.classes.User
3918        session = Session()
3919        with self._assert_bind_args(session):
3920            session.query(User).count()
3921
3922    def test_aggregate_fn(self):
3923        User = self.classes.User
3924        session = Session()
3925        with self._assert_bind_args(session):
3926            session.query(func.max(User.name)).all()
3927
3928    def test_bulk_update_no_sync(self):
3929        User = self.classes.User
3930        session = Session()
3931        with self._assert_bind_args(session):
3932            session.query(User).filter(User.id == 15).update(
3933                {"name": "foob"}, synchronize_session=False)
3934
3935    def test_bulk_delete_no_sync(self):
3936        User = self.classes.User
3937        session = Session()
3938        with self._assert_bind_args(session):
3939            session.query(User).filter(User.id == 15).delete(
3940                synchronize_session=False)
3941
3942    def test_bulk_update_fetch_sync(self):
3943        User = self.classes.User
3944        session = Session()
3945        with self._assert_bind_args(session):
3946            session.query(User).filter(User.id == 15).update(
3947                {"name": "foob"}, synchronize_session='fetch')
3948
3949    def test_bulk_delete_fetch_sync(self):
3950        User = self.classes.User
3951        session = Session()
3952        with self._assert_bind_args(session):
3953            session.query(User).filter(User.id == 15).delete(
3954                synchronize_session='fetch')
3955
3956    def test_column_property(self):
3957        User = self.classes.User
3958
3959        mapper = inspect(User)
3960        mapper.add_property(
3961            "score",
3962            column_property(func.coalesce(self.tables.users.c.name, None)))
3963        session = Session()
3964        with self._assert_bind_args(session):
3965            session.query(func.max(User.score)).scalar()
3966
3967    def test_column_property_select(self):
3968        User = self.classes.User
3969        Address = self.classes.Address
3970
3971        mapper = inspect(User)
3972        mapper.add_property(
3973            "score",
3974            column_property(
3975                select([func.sum(Address.id)]).
3976                where(Address.user_id == User.id).as_scalar()
3977            )
3978        )
3979        session = Session()
3980
3981        with self._assert_bind_args(session):
3982            session.query(func.max(User.score)).scalar()
3983
3984