1import contextlib
2
3import sqlalchemy as sa
4from sqlalchemy import and_
5from sqlalchemy import between
6from sqlalchemy import bindparam
7from sqlalchemy import Boolean
8from sqlalchemy import cast
9from sqlalchemy import collate
10from sqlalchemy import column
11from sqlalchemy import desc
12from sqlalchemy import distinct
13from sqlalchemy import exc as sa_exc
14from sqlalchemy import exists
15from sqlalchemy import ForeignKey
16from sqlalchemy import func
17from sqlalchemy import insert
18from sqlalchemy import inspect
19from sqlalchemy import Integer
20from sqlalchemy import literal
21from sqlalchemy import literal_column
22from sqlalchemy import MetaData
23from sqlalchemy import null
24from sqlalchemy import or_
25from sqlalchemy import select
26from sqlalchemy import String
27from sqlalchemy import table
28from sqlalchemy import testing
29from sqlalchemy import text
30from sqlalchemy import Unicode
31from sqlalchemy import union
32from sqlalchemy import util
33from sqlalchemy.engine import default
34from sqlalchemy.orm import aliased
35from sqlalchemy.orm import attributes
36from sqlalchemy.orm import backref
37from sqlalchemy.orm import Bundle
38from sqlalchemy.orm import column_property
39from sqlalchemy.orm import create_session
40from sqlalchemy.orm import defer
41from sqlalchemy.orm import joinedload
42from sqlalchemy.orm import joinedload_all
43from sqlalchemy.orm import lazyload
44from sqlalchemy.orm import mapper
45from sqlalchemy.orm import Query
46from sqlalchemy.orm import relationship
47from sqlalchemy.orm import Session
48from sqlalchemy.orm import subqueryload
49from sqlalchemy.orm import synonym
50from sqlalchemy.orm.util import join
51from sqlalchemy.orm.util import with_parent
52from sqlalchemy.sql import expression
53from sqlalchemy.sql import operators
54from sqlalchemy.testing import assert_warnings
55from sqlalchemy.testing import AssertsCompiledSQL
56from sqlalchemy.testing import fixtures
57from sqlalchemy.testing import is_
58from sqlalchemy.testing import is_not_
59from sqlalchemy.testing import mock
60from sqlalchemy.testing.assertions import assert_raises
61from sqlalchemy.testing.assertions import assert_raises_message
62from sqlalchemy.testing.assertions import eq_
63from sqlalchemy.testing.assertions import eq_ignore_whitespace
64from sqlalchemy.testing.assertions import expect_warnings
65from sqlalchemy.testing.assertsql import CompiledSQL
66from sqlalchemy.testing.schema import Column
67from sqlalchemy.testing.schema import Table
68from test.orm import _fixtures
69
70
71class QueryTest(_fixtures.FixtureTest):
72    run_setup_mappers = "once"
73    run_inserts = "once"
74    run_deletes = None
75
76    @classmethod
77    def setup_mappers(cls):
78        cls._setup_stock_mapping()
79
80
81class MiscTest(QueryTest):
82    run_create_tables = None
83    run_inserts = None
84
85    def test_with_session(self):
86        User = self.classes.User
87        s1 = Session()
88        s2 = Session()
89        q1 = s1.query(User)
90        q2 = q1.with_session(s2)
91        assert q2.session is s2
92        assert q1.session is s1
93
94
95class OnlyReturnTuplesTest(QueryTest):
96    def test_single_entity_false(self):
97        User = self.classes.User
98        row = create_session().query(User).only_return_tuples(False).first()
99        assert isinstance(row, User)
100
101    def test_single_entity_true(self):
102        User = self.classes.User
103        row = create_session().query(User).only_return_tuples(True).first()
104        assert isinstance(row, tuple)
105
106    def test_multiple_entity_false(self):
107        User = self.classes.User
108        row = (
109            create_session()
110            .query(User.id, User)
111            .only_return_tuples(False)
112            .first()
113        )
114        assert isinstance(row, tuple)
115
116    def test_multiple_entity_true(self):
117        User = self.classes.User
118        row = (
119            create_session()
120            .query(User.id, User)
121            .only_return_tuples(True)
122            .first()
123        )
124        assert isinstance(row, tuple)
125
126
127class RowTupleTest(QueryTest):
128    run_setup_mappers = None
129
130    def test_custom_names(self):
131        User, users = self.classes.User, self.tables.users
132
133        mapper(User, users, properties={"uname": users.c.name})
134
135        row = (
136            create_session()
137            .query(User.id, User.uname)
138            .filter(User.id == 7)
139            .first()
140        )
141        assert row.id == 7
142        assert row.uname == "jack"
143
144    def test_column_metadata(self):
145        users, Address, addresses, User = (
146            self.tables.users,
147            self.classes.Address,
148            self.tables.addresses,
149            self.classes.User,
150        )
151
152        mapper(User, users)
153        mapper(Address, addresses)
154        sess = create_session()
155        user_alias = aliased(User)
156        user_alias_id_label = user_alias.id.label("foo")
157        address_alias = aliased(Address, name="aalias")
158        fn = func.count(User.id)
159        name_label = User.name.label("uname")
160        bundle = Bundle("b1", User.id, User.name)
161        cte = sess.query(User.id).cte()
162        for q, asserted in [
163            (
164                sess.query(User),
165                [
166                    {
167                        "name": "User",
168                        "type": User,
169                        "aliased": False,
170                        "expr": User,
171                        "entity": User,
172                    }
173                ],
174            ),
175            (
176                sess.query(User.id, User),
177                [
178                    {
179                        "name": "id",
180                        "type": users.c.id.type,
181                        "aliased": False,
182                        "expr": User.id,
183                        "entity": User,
184                    },
185                    {
186                        "name": "User",
187                        "type": User,
188                        "aliased": False,
189                        "expr": User,
190                        "entity": User,
191                    },
192                ],
193            ),
194            (
195                sess.query(User.id, user_alias),
196                [
197                    {
198                        "name": "id",
199                        "type": users.c.id.type,
200                        "aliased": False,
201                        "expr": User.id,
202                        "entity": User,
203                    },
204                    {
205                        "name": None,
206                        "type": User,
207                        "aliased": True,
208                        "expr": user_alias,
209                        "entity": user_alias,
210                    },
211                ],
212            ),
213            (
214                sess.query(user_alias.id),
215                [
216                    {
217                        "name": "id",
218                        "type": users.c.id.type,
219                        "aliased": True,
220                        "expr": user_alias.id,
221                        "entity": user_alias,
222                    }
223                ],
224            ),
225            (
226                sess.query(user_alias_id_label),
227                [
228                    {
229                        "name": "foo",
230                        "type": users.c.id.type,
231                        "aliased": True,
232                        "expr": user_alias_id_label,
233                        "entity": user_alias,
234                    }
235                ],
236            ),
237            (
238                sess.query(address_alias),
239                [
240                    {
241                        "name": "aalias",
242                        "type": Address,
243                        "aliased": True,
244                        "expr": address_alias,
245                        "entity": address_alias,
246                    }
247                ],
248            ),
249            (
250                sess.query(name_label, fn),
251                [
252                    {
253                        "name": "uname",
254                        "type": users.c.name.type,
255                        "aliased": False,
256                        "expr": name_label,
257                        "entity": User,
258                    },
259                    {
260                        "name": None,
261                        "type": fn.type,
262                        "aliased": False,
263                        "expr": fn,
264                        "entity": User,
265                    },
266                ],
267            ),
268            (
269                sess.query(cte),
270                [
271                    {
272                        "aliased": False,
273                        "expr": cte.c.id,
274                        "type": cte.c.id.type,
275                        "name": "id",
276                        "entity": None,
277                    }
278                ],
279            ),
280            (
281                sess.query(users),
282                [
283                    {
284                        "aliased": False,
285                        "expr": users.c.id,
286                        "type": users.c.id.type,
287                        "name": "id",
288                        "entity": None,
289                    },
290                    {
291                        "aliased": False,
292                        "expr": users.c.name,
293                        "type": users.c.name.type,
294                        "name": "name",
295                        "entity": None,
296                    },
297                ],
298            ),
299            (
300                sess.query(users.c.name),
301                [
302                    {
303                        "name": "name",
304                        "type": users.c.name.type,
305                        "aliased": False,
306                        "expr": users.c.name,
307                        "entity": None,
308                    }
309                ],
310            ),
311            (
312                sess.query(bundle),
313                [
314                    {
315                        "aliased": False,
316                        "expr": bundle,
317                        "type": Bundle,
318                        "name": "b1",
319                        "entity": User,
320                    }
321                ],
322            ),
323        ]:
324            eq_(q.column_descriptions, asserted)
325
326    def test_unhashable_type(self):
327        from sqlalchemy.types import TypeDecorator, Integer
328        from sqlalchemy.sql import type_coerce
329
330        class MyType(TypeDecorator):
331            impl = Integer
332            hashable = False
333
334            def process_result_value(self, value, dialect):
335                return [value]
336
337        User, users = self.classes.User, self.tables.users
338
339        mapper(User, users)
340
341        s = Session()
342        q = s.query(User, type_coerce(users.c.id, MyType).label("foo")).filter(
343            User.id == 7
344        )
345        row = q.first()
346        eq_(row, (User(id=7), [7]))
347
348
349class BindSensitiveStringifyTest(fixtures.TestBase):
350    def _fixture(self, bind_to=None):
351        # building a totally separate metadata /mapping here
352        # because we need to control if the MetaData is bound or not
353
354        class User(object):
355            pass
356
357        m = MetaData(bind=bind_to)
358        user_table = Table(
359            "users",
360            m,
361            Column("id", Integer, primary_key=True),
362            Column("name", String(50)),
363        )
364
365        mapper(User, user_table)
366        return User
367
368    def _dialect_fixture(self):
369        class MyDialect(default.DefaultDialect):
370            default_paramstyle = "qmark"
371
372        from sqlalchemy.engine import base
373
374        return base.Engine(mock.Mock(), MyDialect(), mock.Mock())
375
376    def _test(
377        self, bound_metadata, bound_session, session_present, expect_bound
378    ):
379        if bound_metadata or bound_session:
380            eng = self._dialect_fixture()
381        else:
382            eng = None
383
384        User = self._fixture(bind_to=eng if bound_metadata else None)
385
386        s = Session(eng if bound_session else None)
387        q = s.query(User).filter(User.id == 7)
388        if not session_present:
389            q = q.with_session(None)
390
391        eq_ignore_whitespace(
392            str(q),
393            "SELECT users.id AS users_id, users.name AS users_name "
394            "FROM users WHERE users.id = ?"
395            if expect_bound
396            else "SELECT users.id AS users_id, users.name AS users_name "
397            "FROM users WHERE users.id = :id_1",
398        )
399
400    def test_query_unbound_metadata_bound_session(self):
401        self._test(False, True, True, True)
402
403    def test_query_bound_metadata_unbound_session(self):
404        self._test(True, False, True, True)
405
406    def test_query_unbound_metadata_no_session(self):
407        self._test(False, False, False, False)
408
409    def test_query_unbound_metadata_unbound_session(self):
410        self._test(False, False, True, False)
411
412    def test_query_bound_metadata_bound_session(self):
413        self._test(True, True, True, True)
414
415
416class RawSelectTest(QueryTest, AssertsCompiledSQL):
417    __dialect__ = "default"
418
419    def test_select_from_entity(self):
420        User = self.classes.User
421
422        self.assert_compile(
423            select(["*"]).select_from(User), "SELECT * FROM users"
424        )
425
426    def test_where_relationship(self):
427        User = self.classes.User
428
429        self.assert_compile(
430            select([User]).where(User.addresses),
431            "SELECT users.id, users.name FROM users, addresses "
432            "WHERE users.id = addresses.user_id",
433        )
434
435    def test_where_m2m_relationship(self):
436        Item = self.classes.Item
437
438        self.assert_compile(
439            select([Item]).where(Item.keywords),
440            "SELECT items.id, items.description FROM items, "
441            "item_keywords AS item_keywords_1, keywords "
442            "WHERE items.id = item_keywords_1.item_id "
443            "AND keywords.id = item_keywords_1.keyword_id",
444        )
445
446    def test_inline_select_from_entity(self):
447        User = self.classes.User
448
449        self.assert_compile(
450            select(["*"], from_obj=User), "SELECT * FROM users"
451        )
452
453    def test_select_from_aliased_entity(self):
454        User = self.classes.User
455        ua = aliased(User, name="ua")
456        self.assert_compile(
457            select(["*"]).select_from(ua), "SELECT * FROM users AS ua"
458        )
459
460    def test_correlate_entity(self):
461        User = self.classes.User
462        Address = self.classes.Address
463
464        self.assert_compile(
465            select(
466                [
467                    User.name,
468                    Address.id,
469                    select([func.count(Address.id)])
470                    .where(User.id == Address.user_id)
471                    .correlate(User)
472                    .as_scalar(),
473                ]
474            ),
475            "SELECT users.name, addresses.id, "
476            "(SELECT count(addresses.id) AS count_1 "
477            "FROM addresses WHERE users.id = addresses.user_id) AS anon_1 "
478            "FROM users, addresses",
479        )
480
481    def test_correlate_aliased_entity(self):
482        User = self.classes.User
483        Address = self.classes.Address
484        uu = aliased(User, name="uu")
485
486        self.assert_compile(
487            select(
488                [
489                    uu.name,
490                    Address.id,
491                    select([func.count(Address.id)])
492                    .where(uu.id == Address.user_id)
493                    .correlate(uu)
494                    .as_scalar(),
495                ]
496            ),
497            # for a long time, "uu.id = address.user_id" was reversed;
498            # this was resolved as of #2872 and had to do with
499            # InstrumentedAttribute.__eq__() taking precedence over
500            # QueryableAttribute.__eq__()
501            "SELECT uu.name, addresses.id, "
502            "(SELECT count(addresses.id) AS count_1 "
503            "FROM addresses WHERE uu.id = addresses.user_id) AS anon_1 "
504            "FROM users AS uu, addresses",
505        )
506
507    def test_columns_clause_entity(self):
508        User = self.classes.User
509
510        self.assert_compile(
511            select([User]), "SELECT users.id, users.name FROM users"
512        )
513
514    def test_columns_clause_columns(self):
515        User = self.classes.User
516
517        self.assert_compile(
518            select([User.id, User.name]),
519            "SELECT users.id, users.name FROM users",
520        )
521
522    def test_columns_clause_aliased_columns(self):
523        User = self.classes.User
524        ua = aliased(User, name="ua")
525        self.assert_compile(
526            select([ua.id, ua.name]), "SELECT ua.id, ua.name FROM users AS ua"
527        )
528
529    def test_columns_clause_aliased_entity(self):
530        User = self.classes.User
531        ua = aliased(User, name="ua")
532        self.assert_compile(
533            select([ua]), "SELECT ua.id, ua.name FROM users AS ua"
534        )
535
536    def test_core_join(self):
537        User = self.classes.User
538        Address = self.classes.Address
539        from sqlalchemy.sql import join
540
541        self.assert_compile(
542            select([User]).select_from(join(User, Address)),
543            "SELECT users.id, users.name FROM users "
544            "JOIN addresses ON users.id = addresses.user_id",
545        )
546
547    def test_insert_from_query(self):
548        User = self.classes.User
549        Address = self.classes.Address
550
551        s = Session()
552        q = s.query(User.id, User.name).filter_by(name="ed")
553        self.assert_compile(
554            insert(Address).from_select(("id", "email_address"), q),
555            "INSERT INTO addresses (id, email_address) "
556            "SELECT users.id AS users_id, users.name AS users_name "
557            "FROM users WHERE users.name = :name_1",
558        )
559
560    def test_insert_from_query_col_attr(self):
561        User = self.classes.User
562        Address = self.classes.Address
563
564        s = Session()
565        q = s.query(User.id, User.name).filter_by(name="ed")
566        self.assert_compile(
567            insert(Address).from_select(
568                (Address.id, Address.email_address), q
569            ),
570            "INSERT INTO addresses (id, email_address) "
571            "SELECT users.id AS users_id, users.name AS users_name "
572            "FROM users WHERE users.name = :name_1",
573        )
574
575    def test_update_from_entity(self):
576        from sqlalchemy.sql import update
577
578        User = self.classes.User
579        self.assert_compile(
580            update(User), "UPDATE users SET id=:id, name=:name"
581        )
582
583        self.assert_compile(
584            update(User).values(name="ed").where(User.id == 5),
585            "UPDATE users SET name=:name WHERE users.id = :id_1",
586            checkparams={"id_1": 5, "name": "ed"},
587        )
588
589    def test_delete_from_entity(self):
590        from sqlalchemy.sql import delete
591
592        User = self.classes.User
593        self.assert_compile(delete(User), "DELETE FROM users")
594
595        self.assert_compile(
596            delete(User).where(User.id == 5),
597            "DELETE FROM users WHERE users.id = :id_1",
598            checkparams={"id_1": 5},
599        )
600
601    def test_insert_from_entity(self):
602        from sqlalchemy.sql import insert
603
604        User = self.classes.User
605        self.assert_compile(
606            insert(User), "INSERT INTO users (id, name) VALUES (:id, :name)"
607        )
608
609        self.assert_compile(
610            insert(User).values(name="ed"),
611            "INSERT INTO users (name) VALUES (:name)",
612            checkparams={"name": "ed"},
613        )
614
615    def test_col_prop_builtin_function(self):
616        class Foo(object):
617            pass
618
619        mapper(
620            Foo,
621            self.tables.users,
622            properties={
623                "foob": column_property(
624                    func.coalesce(self.tables.users.c.name)
625                )
626            },
627        )
628
629        self.assert_compile(
630            select([Foo]).where(Foo.foob == "somename").order_by(Foo.foob),
631            "SELECT users.id, users.name FROM users "
632            "WHERE coalesce(users.name) = :param_1 "
633            "ORDER BY coalesce(users.name)",
634        )
635
636
637class GetTest(QueryTest):
638    def test_get(self):
639        User = self.classes.User
640
641        s = create_session()
642        assert s.query(User).get(19) is None
643        u = s.query(User).get(7)
644        u2 = s.query(User).get(7)
645        assert u is u2
646        s.expunge_all()
647        u2 = s.query(User).get(7)
648        assert u is not u2
649
650    def test_get_composite_pk_no_result(self):
651        CompositePk = self.classes.CompositePk
652
653        s = Session()
654        assert s.query(CompositePk).get((100, 100)) is None
655
656    def test_get_composite_pk_result(self):
657        CompositePk = self.classes.CompositePk
658
659        s = Session()
660        one_two = s.query(CompositePk).get((1, 2))
661        assert one_two.i == 1
662        assert one_two.j == 2
663        assert one_two.k == 3
664
665    def test_get_too_few_params(self):
666        CompositePk = self.classes.CompositePk
667
668        s = Session()
669        q = s.query(CompositePk)
670        assert_raises(sa_exc.InvalidRequestError, q.get, 7)
671
672    def test_get_too_few_params_tuple(self):
673        CompositePk = self.classes.CompositePk
674
675        s = Session()
676        q = s.query(CompositePk)
677        assert_raises(sa_exc.InvalidRequestError, q.get, (7,))
678
679    def test_get_too_many_params(self):
680        CompositePk = self.classes.CompositePk
681
682        s = Session()
683        q = s.query(CompositePk)
684        assert_raises(sa_exc.InvalidRequestError, q.get, (7, 10, 100))
685
686    def test_get_against_col(self):
687        User = self.classes.User
688
689        s = Session()
690        q = s.query(User.id)
691        assert_raises(sa_exc.InvalidRequestError, q.get, (5,))
692
693    def test_get_null_pk(self):
694        """test that a mapping which can have None in a
695        PK (i.e. map to an outerjoin) works with get()."""
696
697        users, addresses = self.tables.users, self.tables.addresses
698
699        s = users.outerjoin(addresses)
700
701        class UserThing(fixtures.ComparableEntity):
702            pass
703
704        mapper(
705            UserThing,
706            s,
707            properties={
708                "id": (users.c.id, addresses.c.user_id),
709                "address_id": addresses.c.id,
710            },
711        )
712        sess = create_session()
713        u10 = sess.query(UserThing).get((10, None))
714        eq_(u10, UserThing(id=10))
715
716    def test_no_criterion(self):
717        """test that get()/load() does not use preexisting filter/etc.
718        criterion"""
719
720        User, Address = self.classes.User, self.classes.Address
721
722        s = create_session()
723
724        q = s.query(User).join("addresses").filter(Address.user_id == 8)
725        assert_raises(sa_exc.InvalidRequestError, q.get, 7)
726        assert_raises(
727            sa_exc.InvalidRequestError,
728            s.query(User).filter(User.id == 7).get,
729            19,
730        )
731
732        # order_by()/get() doesn't raise
733        s.query(User).order_by(User.id).get(8)
734
735    def test_no_criterion_when_already_loaded(self):
736        """test that get()/load() does not use preexisting filter/etc.
737        criterion, even when we're only using the identity map."""
738
739        User, Address = self.classes.User, self.classes.Address
740
741        s = create_session()
742
743        s.query(User).get(7)
744
745        q = s.query(User).join("addresses").filter(Address.user_id == 8)
746        assert_raises(sa_exc.InvalidRequestError, q.get, 7)
747
748    def test_unique_param_names(self):
749        users = self.tables.users
750
751        class SomeUser(object):
752            pass
753
754        s = users.select(users.c.id != 12).alias("users")
755        m = mapper(SomeUser, s)
756        assert s.primary_key == m.primary_key
757
758        sess = create_session()
759        assert sess.query(SomeUser).get(7).name == "jack"
760
761    def test_load(self):
762        User, Address = self.classes.User, self.classes.Address
763
764        s = create_session()
765
766        assert s.query(User).populate_existing().get(19) is None
767
768        u = s.query(User).populate_existing().get(7)
769        u2 = s.query(User).populate_existing().get(7)
770        assert u is u2
771        s.expunge_all()
772        u2 = s.query(User).populate_existing().get(7)
773        assert u is not u2
774
775        u2.name = "some name"
776        a = Address(email_address="some other name")
777        u2.addresses.append(a)
778        assert u2 in s.dirty
779        assert a in u2.addresses
780
781        s.query(User).populate_existing().get(7)
782        assert u2 not in s.dirty
783        assert u2.name == "jack"
784        assert a not in u2.addresses
785
786    @testing.provide_metadata
787    @testing.requires.unicode_connections
788    def test_unicode(self):
789        """test that Query.get properly sets up the type for the bind
790        parameter. using unicode would normally fail on postgresql, mysql and
791        oracle unless it is converted to an encoded string"""
792
793        metadata = self.metadata
794        table = Table(
795            "unicode_data",
796            metadata,
797            Column("id", Unicode(40), primary_key=True),
798            Column("data", Unicode(40)),
799        )
800        metadata.create_all()
801        ustring = util.b("petit voix m\xe2\x80\x99a").decode("utf-8")
802
803        table.insert().execute(id=ustring, data=ustring)
804
805        class LocalFoo(self.classes.Base):
806            pass
807
808        mapper(LocalFoo, table)
809        eq_(
810            create_session().query(LocalFoo).get(ustring),
811            LocalFoo(id=ustring, data=ustring),
812        )
813
814    def test_populate_existing(self):
815        User, Address = self.classes.User, self.classes.Address
816
817        s = create_session()
818
819        userlist = s.query(User).all()
820
821        u = userlist[0]
822        u.name = "foo"
823        a = Address(name="ed")
824        u.addresses.append(a)
825
826        self.assert_(a in u.addresses)
827
828        s.query(User).populate_existing().all()
829
830        self.assert_(u not in s.dirty)
831
832        self.assert_(u.name == "jack")
833
834        self.assert_(a not in u.addresses)
835
836        u.addresses[0].email_address = "lala"
837        u.orders[1].items[2].description = "item 12"
838        # test that lazy load doesn't change child items
839        s.query(User).populate_existing().all()
840        assert u.addresses[0].email_address == "lala"
841        assert u.orders[1].items[2].description == "item 12"
842
843        # eager load does
844        s.query(User).options(
845            joinedload("addresses"), joinedload_all("orders.items")
846        ).populate_existing().all()
847        assert u.addresses[0].email_address == "jack@bean.com"
848        assert u.orders[1].items[2].description == "item 5"
849
850
851class InvalidGenerationsTest(QueryTest, AssertsCompiledSQL):
852    def test_no_limit_offset(self):
853        User = self.classes.User
854
855        s = create_session()
856
857        for q in (
858            s.query(User).limit(2),
859            s.query(User).offset(2),
860            s.query(User).limit(2).offset(2),
861        ):
862            assert_raises(sa_exc.InvalidRequestError, q.join, "addresses")
863
864            assert_raises(
865                sa_exc.InvalidRequestError, q.filter, User.name == "ed"
866            )
867
868            assert_raises(sa_exc.InvalidRequestError, q.filter_by, name="ed")
869
870            assert_raises(sa_exc.InvalidRequestError, q.order_by, "foo")
871
872            assert_raises(sa_exc.InvalidRequestError, q.group_by, "foo")
873
874            assert_raises(sa_exc.InvalidRequestError, q.having, "foo")
875
876            q.enable_assertions(False).join("addresses")
877            q.enable_assertions(False).filter(User.name == "ed")
878            q.enable_assertions(False).order_by("foo")
879            q.enable_assertions(False).group_by("foo")
880
881    def test_no_from(self):
882        users, User = self.tables.users, self.classes.User
883
884        s = create_session()
885
886        q = s.query(User).select_from(users)
887        assert_raises(sa_exc.InvalidRequestError, q.select_from, users)
888
889        q = s.query(User).join("addresses")
890        assert_raises(sa_exc.InvalidRequestError, q.select_from, users)
891
892        q = s.query(User).order_by(User.id)
893        assert_raises(sa_exc.InvalidRequestError, q.select_from, users)
894
895        assert_raises(sa_exc.InvalidRequestError, q.select_from, users)
896
897        q.enable_assertions(False).select_from(users)
898
899        # this is fine, however
900        q.from_self()
901
902    def test_invalid_select_from(self):
903        User = self.classes.User
904
905        s = create_session()
906        q = s.query(User)
907        assert_raises(sa_exc.ArgumentError, q.select_from, User.id == 5)
908        assert_raises(sa_exc.ArgumentError, q.select_from, User.id)
909
910    def test_invalid_from_statement(self):
911        User, addresses, users = (
912            self.classes.User,
913            self.tables.addresses,
914            self.tables.users,
915        )
916
917        s = create_session()
918        q = s.query(User)
919        assert_raises(sa_exc.ArgumentError, q.from_statement, User.id == 5)
920        assert_raises(
921            sa_exc.ArgumentError, q.from_statement, users.join(addresses)
922        )
923
924    def test_invalid_column(self):
925        User = self.classes.User
926
927        s = create_session()
928        q = s.query(User)
929        assert_raises(sa_exc.InvalidRequestError, q.add_column, object())
930
931    def test_invalid_column_tuple(self):
932        User = self.classes.User
933
934        s = create_session()
935        q = s.query(User)
936        assert_raises(sa_exc.InvalidRequestError, q.add_column, (1, 1))
937
938    def test_distinct(self):
939        """test that a distinct() call is not valid before 'clauseelement'
940        conditions."""
941
942        User = self.classes.User
943
944        s = create_session()
945        q = s.query(User).distinct()
946        assert_raises(sa_exc.InvalidRequestError, q.select_from, User)
947        assert_raises(
948            sa_exc.InvalidRequestError,
949            q.from_statement,
950            text("select * from table"),
951        )
952        assert_raises(sa_exc.InvalidRequestError, q.with_polymorphic, User)
953
954    def test_order_by(self):
955        """test that an order_by() call is not valid before 'clauseelement'
956        conditions."""
957
958        User = self.classes.User
959
960        s = create_session()
961        q = s.query(User).order_by(User.id)
962        assert_raises(sa_exc.InvalidRequestError, q.select_from, User)
963        assert_raises(
964            sa_exc.InvalidRequestError,
965            q.from_statement,
966            text("select * from table"),
967        )
968        assert_raises(sa_exc.InvalidRequestError, q.with_polymorphic, User)
969
970    def test_only_full_mapper_zero(self):
971        User, Address = self.classes.User, self.classes.Address
972
973        s = create_session()
974
975        q = s.query(User, Address)
976        assert_raises(sa_exc.InvalidRequestError, q.get, 5)
977
978    def test_entity_or_mapper_zero(self):
979        User, Address = self.classes.User, self.classes.Address
980        s = create_session()
981
982        q = s.query(User, Address)
983        is_(q._mapper_zero(), inspect(User))
984        is_(q._entity_zero(), inspect(User))
985
986        u1 = aliased(User)
987        q = s.query(u1, Address)
988        is_(q._mapper_zero(), inspect(User))
989        is_(q._entity_zero(), inspect(u1))
990
991        q = s.query(User).select_from(Address)
992        is_(q._mapper_zero(), inspect(User))
993        is_(q._entity_zero(), inspect(Address))
994
995        q = s.query(User.name, Address)
996        is_(q._mapper_zero(), inspect(User))
997        is_(q._entity_zero(), inspect(User))
998
999        q = s.query(u1.name, Address)
1000        is_(q._mapper_zero(), inspect(User))
1001        is_(q._entity_zero(), inspect(u1))
1002
1003        q1 = s.query(User).exists()
1004        q = s.query(q1)
1005        is_(q._mapper_zero(), None)
1006        is_(q._entity_zero(), None)
1007
1008        q1 = s.query(Bundle("b1", User.id, User.name))
1009        is_(q1._mapper_zero(), inspect(User))
1010        is_(q1._entity_zero(), inspect(User))
1011
1012    def test_from_statement(self):
1013        User = self.classes.User
1014
1015        s = create_session()
1016
1017        for meth, arg, kw in [
1018            (Query.filter, (User.id == 5,), {}),
1019            (Query.filter_by, (), {"id": 5}),
1020            (Query.limit, (5,), {}),
1021            (Query.group_by, (User.name,), {}),
1022            (Query.order_by, (User.name,), {}),
1023        ]:
1024            q = s.query(User)
1025            q = meth(q, *arg, **kw)
1026            assert_raises(
1027                sa_exc.InvalidRequestError, q.from_statement, text("x")
1028            )
1029
1030            q = s.query(User)
1031            q = q.from_statement(text("x"))
1032            assert_raises(sa_exc.InvalidRequestError, meth, q, *arg, **kw)
1033
1034    def test_illegal_coercions(self):
1035        User = self.classes.User
1036
1037        assert_raises_message(
1038            sa_exc.ArgumentError,
1039            "Object .*User.* is not legal as a SQL literal value",
1040            distinct,
1041            User,
1042        )
1043
1044        ua = aliased(User)
1045        assert_raises_message(
1046            sa_exc.ArgumentError,
1047            "Object .*User.* is not legal as a SQL literal value",
1048            distinct,
1049            ua,
1050        )
1051
1052        s = Session()
1053        assert_raises_message(
1054            sa_exc.ArgumentError,
1055            "Object .*User.* is not legal as a SQL literal value",
1056            lambda: s.query(User).filter(User.name == User),
1057        )
1058
1059        u1 = User()
1060        assert_raises_message(
1061            sa_exc.ArgumentError,
1062            "Object .*User.* is not legal as a SQL literal value",
1063            distinct,
1064            u1,
1065        )
1066
1067        assert_raises_message(
1068            sa_exc.ArgumentError,
1069            "Object .*User.* is not legal as a SQL literal value",
1070            lambda: s.query(User).filter(User.name == u1),
1071        )
1072
1073
1074class OperatorTest(QueryTest, AssertsCompiledSQL):
1075    """test sql.Comparator implementation for MapperProperties"""
1076
1077    __dialect__ = "default"
1078
1079    def _test(self, clause, expected, entity=None, checkparams=None):
1080        dialect = default.DefaultDialect()
1081        if entity is not None:
1082            # specify a lead entity, so that when we are testing
1083            # correlation, the correlation actually happens
1084            sess = Session()
1085            lead = sess.query(entity)
1086            context = lead._compile_context()
1087            context.statement.use_labels = True
1088            lead = context.statement.compile(dialect=dialect)
1089            expected = (str(lead) + " WHERE " + expected).replace("\n", "")
1090            clause = sess.query(entity).filter(clause)
1091        self.assert_compile(clause, expected, checkparams=checkparams)
1092
1093    def _test_filter_aliases(
1094        self, clause, expected, from_, onclause, checkparams=None
1095    ):
1096        dialect = default.DefaultDialect()
1097        sess = Session()
1098        lead = sess.query(from_).join(onclause, aliased=True)
1099        full = lead.filter(clause)
1100        context = lead._compile_context()
1101        context.statement.use_labels = True
1102        lead = context.statement.compile(dialect=dialect)
1103        expected = (str(lead) + " WHERE " + expected).replace("\n", "")
1104
1105        self.assert_compile(full, expected, checkparams=checkparams)
1106
1107    def test_arithmetic(self):
1108        User = self.classes.User
1109
1110        create_session().query(User)
1111        for (py_op, sql_op) in (
1112            (operators.add, "+"),
1113            (operators.mul, "*"),
1114            (operators.sub, "-"),
1115            (operators.truediv, "/"),
1116            (operators.div, "/"),
1117        ):
1118            for (lhs, rhs, res) in (
1119                (5, User.id, ":id_1 %s users.id"),
1120                (5, literal(6), ":param_1 %s :param_2"),
1121                (User.id, 5, "users.id %s :id_1"),
1122                (User.id, literal("b"), "users.id %s :param_1"),
1123                (User.id, User.id, "users.id %s users.id"),
1124                (literal(5), "b", ":param_1 %s :param_2"),
1125                (literal(5), User.id, ":param_1 %s users.id"),
1126                (literal(5), literal(6), ":param_1 %s :param_2"),
1127            ):
1128                self._test(py_op(lhs, rhs), res % sql_op)
1129
1130    def test_comparison(self):
1131        User = self.classes.User
1132
1133        create_session().query(User)
1134        ualias = aliased(User)
1135
1136        for (py_op, fwd_op, rev_op) in (
1137            (operators.lt, "<", ">"),
1138            (operators.gt, ">", "<"),
1139            (operators.eq, "=", "="),
1140            (operators.ne, "!=", "!="),
1141            (operators.le, "<=", ">="),
1142            (operators.ge, ">=", "<="),
1143        ):
1144            for (lhs, rhs, l_sql, r_sql) in (
1145                ("a", User.id, ":id_1", "users.id"),
1146                ("a", literal("b"), ":param_2", ":param_1"),  # note swap!
1147                (User.id, "b", "users.id", ":id_1"),
1148                (User.id, literal("b"), "users.id", ":param_1"),
1149                (User.id, User.id, "users.id", "users.id"),
1150                (literal("a"), "b", ":param_1", ":param_2"),
1151                (literal("a"), User.id, ":param_1", "users.id"),
1152                (literal("a"), literal("b"), ":param_1", ":param_2"),
1153                (ualias.id, literal("b"), "users_1.id", ":param_1"),
1154                (User.id, ualias.name, "users.id", "users_1.name"),
1155                (User.name, ualias.name, "users.name", "users_1.name"),
1156                (ualias.name, User.name, "users_1.name", "users.name"),
1157            ):
1158
1159                # the compiled clause should match either (e.g.):
1160                # 'a' < 'b' -or- 'b' > 'a'.
1161                compiled = str(
1162                    py_op(lhs, rhs).compile(dialect=default.DefaultDialect())
1163                )
1164                fwd_sql = "%s %s %s" % (l_sql, fwd_op, r_sql)
1165                rev_sql = "%s %s %s" % (r_sql, rev_op, l_sql)
1166
1167                self.assert_(
1168                    compiled == fwd_sql or compiled == rev_sql,
1169                    "\n'"
1170                    + compiled
1171                    + "'\n does not match\n'"
1172                    + fwd_sql
1173                    + "'\n or\n'"
1174                    + rev_sql
1175                    + "'",
1176                )
1177
1178    def test_o2m_compare_to_null(self):
1179        User = self.classes.User
1180
1181        self._test(User.id == None, "users.id IS NULL")  # noqa
1182        self._test(User.id != None, "users.id IS NOT NULL")  # noqa
1183        self._test(~(User.id == None), "users.id IS NOT NULL")  # noqa
1184        self._test(~(User.id != None), "users.id IS NULL")  # noqa
1185        self._test(None == User.id, "users.id IS NULL")  # noqa
1186        self._test(~(None == User.id), "users.id IS NOT NULL")  # noqa
1187
1188    def test_m2o_compare_to_null(self):
1189        Address = self.classes.Address
1190        self._test(Address.user == None, "addresses.user_id IS NULL")  # noqa
1191        self._test(
1192            ~(Address.user == None), "addresses.user_id IS NOT NULL"  # noqa
1193        )
1194        self._test(
1195            ~(Address.user != None), "addresses.user_id IS NULL"  # noqa
1196        )
1197        self._test(None == Address.user, "addresses.user_id IS NULL")  # noqa
1198        self._test(
1199            ~(None == Address.user), "addresses.user_id IS NOT NULL"  # noqa
1200        )
1201
1202    def test_o2m_compare_to_null_orm_adapt(self):
1203        User, Address = self.classes.User, self.classes.Address
1204        self._test_filter_aliases(
1205            User.id == None,  # noqa
1206            "users_1.id IS NULL",
1207            Address,
1208            Address.user,
1209        ),
1210        self._test_filter_aliases(
1211            User.id != None,  # noqa
1212            "users_1.id IS NOT NULL",
1213            Address,
1214            Address.user,
1215        ),
1216        self._test_filter_aliases(
1217            ~(User.id == None),  # noqa
1218            "users_1.id IS NOT NULL",
1219            Address,
1220            Address.user,
1221        ),
1222        self._test_filter_aliases(
1223            ~(User.id != None),  # noqa
1224            "users_1.id IS NULL",
1225            Address,
1226            Address.user,
1227        ),
1228
1229    def test_m2o_compare_to_null_orm_adapt(self):
1230        User, Address = self.classes.User, self.classes.Address
1231        self._test_filter_aliases(
1232            Address.user == None,  # noqa
1233            "addresses_1.user_id IS NULL",
1234            User,
1235            User.addresses,
1236        ),
1237        self._test_filter_aliases(
1238            Address.user != None,  # noqa
1239            "addresses_1.user_id IS NOT NULL",
1240            User,
1241            User.addresses,
1242        ),
1243        self._test_filter_aliases(
1244            ~(Address.user == None),  # noqa
1245            "addresses_1.user_id IS NOT NULL",
1246            User,
1247            User.addresses,
1248        ),
1249        self._test_filter_aliases(
1250            ~(Address.user != None),  # noqa
1251            "addresses_1.user_id IS NULL",
1252            User,
1253            User.addresses,
1254        ),
1255
1256    def test_o2m_compare_to_null_aliased(self):
1257        User = self.classes.User
1258        u1 = aliased(User)
1259        self._test(u1.id == None, "users_1.id IS NULL")  # noqa
1260        self._test(u1.id != None, "users_1.id IS NOT NULL")  # noqa
1261        self._test(~(u1.id == None), "users_1.id IS NOT NULL")  # noqa
1262        self._test(~(u1.id != None), "users_1.id IS NULL")  # noqa
1263
1264    def test_m2o_compare_to_null_aliased(self):
1265        Address = self.classes.Address
1266        a1 = aliased(Address)
1267        self._test(a1.user == None, "addresses_1.user_id IS NULL")  # noqa
1268        self._test(
1269            ~(a1.user == None), "addresses_1.user_id IS NOT NULL"  # noqa
1270        )
1271        self._test(a1.user != None, "addresses_1.user_id IS NOT NULL")  # noqa
1272        self._test(~(a1.user != None), "addresses_1.user_id IS NULL")  # noqa
1273
1274    def test_relationship_unimplemented(self):
1275        User = self.classes.User
1276        for op in [
1277            User.addresses.like,
1278            User.addresses.ilike,
1279            User.addresses.__le__,
1280            User.addresses.__gt__,
1281        ]:
1282            assert_raises(NotImplementedError, op, "x")
1283
1284    def test_o2m_any(self):
1285        User, Address = self.classes.User, self.classes.Address
1286        self._test(
1287            User.addresses.any(Address.id == 17),
1288            "EXISTS (SELECT 1 FROM addresses "
1289            "WHERE users.id = addresses.user_id AND addresses.id = :id_1)",
1290            entity=User,
1291        )
1292
1293    def test_o2m_any_aliased(self):
1294        User, Address = self.classes.User, self.classes.Address
1295        u1 = aliased(User)
1296        a1 = aliased(Address)
1297        self._test(
1298            u1.addresses.of_type(a1).any(a1.id == 17),
1299            "EXISTS (SELECT 1 FROM addresses AS addresses_1 "
1300            "WHERE users_1.id = addresses_1.user_id AND "
1301            "addresses_1.id = :id_1)",
1302            entity=u1,
1303        )
1304
1305    def test_o2m_any_orm_adapt(self):
1306        User, Address = self.classes.User, self.classes.Address
1307        self._test_filter_aliases(
1308            User.addresses.any(Address.id == 17),
1309            "EXISTS (SELECT 1 FROM addresses "
1310            "WHERE users_1.id = addresses.user_id AND addresses.id = :id_1)",
1311            Address,
1312            Address.user,
1313        )
1314
1315    def test_m2o_compare_instance(self):
1316        User, Address = self.classes.User, self.classes.Address
1317        u7 = User(id=5)
1318        attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7))
1319        u7.id = 7
1320
1321        self._test(Address.user == u7, ":param_1 = addresses.user_id")
1322
1323    def test_m2o_compare_instance_negated(self):
1324        User, Address = self.classes.User, self.classes.Address
1325        u7 = User(id=5)
1326        attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7))
1327        u7.id = 7
1328
1329        self._test(
1330            Address.user != u7,
1331            "addresses.user_id != :user_id_1 OR addresses.user_id IS NULL",
1332            checkparams={"user_id_1": 7},
1333        )
1334
1335    def test_m2o_compare_instance_orm_adapt(self):
1336        User, Address = self.classes.User, self.classes.Address
1337        u7 = User(id=5)
1338        attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7))
1339        u7.id = 7
1340
1341        self._test_filter_aliases(
1342            Address.user == u7,
1343            ":param_1 = addresses_1.user_id",
1344            User,
1345            User.addresses,
1346            checkparams={"param_1": 7},
1347        )
1348
1349    def test_m2o_compare_instance_negated_warn_on_none(self):
1350        User, Address = self.classes.User, self.classes.Address
1351
1352        u7_transient = User(id=None)
1353
1354        with expect_warnings("Got None for value of column users.id; "):
1355            self._test_filter_aliases(
1356                Address.user != u7_transient,
1357                "addresses_1.user_id != :user_id_1 "
1358                "OR addresses_1.user_id IS NULL",
1359                User,
1360                User.addresses,
1361                checkparams={"user_id_1": None},
1362            )
1363
1364    def test_m2o_compare_instance_negated_orm_adapt(self):
1365        User, Address = self.classes.User, self.classes.Address
1366        u7 = User(id=5)
1367        attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7))
1368        u7.id = 7
1369
1370        u7_transient = User(id=7)
1371
1372        self._test_filter_aliases(
1373            Address.user != u7,
1374            "addresses_1.user_id != :user_id_1 OR addresses_1.user_id IS NULL",
1375            User,
1376            User.addresses,
1377            checkparams={"user_id_1": 7},
1378        )
1379
1380        self._test_filter_aliases(
1381            ~(Address.user == u7),
1382            ":param_1 != addresses_1.user_id",
1383            User,
1384            User.addresses,
1385            checkparams={"param_1": 7},
1386        )
1387
1388        self._test_filter_aliases(
1389            ~(Address.user != u7),
1390            "NOT (addresses_1.user_id != :user_id_1 "
1391            "OR addresses_1.user_id IS NULL)",
1392            User,
1393            User.addresses,
1394            checkparams={"user_id_1": 7},
1395        )
1396
1397        self._test_filter_aliases(
1398            Address.user != u7_transient,
1399            "addresses_1.user_id != :user_id_1 OR addresses_1.user_id IS NULL",
1400            User,
1401            User.addresses,
1402            checkparams={"user_id_1": 7},
1403        )
1404
1405        self._test_filter_aliases(
1406            ~(Address.user == u7_transient),
1407            ":param_1 != addresses_1.user_id",
1408            User,
1409            User.addresses,
1410            checkparams={"param_1": 7},
1411        )
1412
1413        self._test_filter_aliases(
1414            ~(Address.user != u7_transient),
1415            "NOT (addresses_1.user_id != :user_id_1 "
1416            "OR addresses_1.user_id IS NULL)",
1417            User,
1418            User.addresses,
1419            checkparams={"user_id_1": 7},
1420        )
1421
1422    def test_m2o_compare_instance_aliased(self):
1423        User, Address = self.classes.User, self.classes.Address
1424        u7 = User(id=5)
1425        attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7))
1426        u7.id = 7
1427
1428        u7_transient = User(id=7)
1429
1430        a1 = aliased(Address)
1431        self._test(
1432            a1.user == u7,
1433            ":param_1 = addresses_1.user_id",
1434            checkparams={"param_1": 7},
1435        )
1436
1437        self._test(
1438            a1.user != u7,
1439            "addresses_1.user_id != :user_id_1 OR addresses_1.user_id IS NULL",
1440            checkparams={"user_id_1": 7},
1441        )
1442
1443        a1 = aliased(Address)
1444        self._test(
1445            a1.user == u7_transient,
1446            ":param_1 = addresses_1.user_id",
1447            checkparams={"param_1": 7},
1448        )
1449
1450        self._test(
1451            a1.user != u7_transient,
1452            "addresses_1.user_id != :user_id_1 OR addresses_1.user_id IS NULL",
1453            checkparams={"user_id_1": 7},
1454        )
1455
1456    def test_selfref_relationship(self):
1457
1458        Node = self.classes.Node
1459
1460        nalias = aliased(Node)
1461
1462        # auto self-referential aliasing
1463        self._test(
1464            Node.children.any(Node.data == "n1"),
1465            "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE "
1466            "nodes.id = nodes_1.parent_id AND nodes_1.data = :data_1)",
1467            entity=Node,
1468            checkparams={"data_1": "n1"},
1469        )
1470
1471        # needs autoaliasing
1472        self._test(
1473            Node.children == None,  # noqa
1474            "NOT (EXISTS (SELECT 1 FROM nodes AS nodes_1 "
1475            "WHERE nodes.id = nodes_1.parent_id))",
1476            entity=Node,
1477            checkparams={},
1478        )
1479
1480        self._test(
1481            Node.parent == None,  # noqa
1482            "nodes.parent_id IS NULL",
1483            checkparams={},
1484        )
1485
1486        self._test(
1487            nalias.parent == None,  # noqa
1488            "nodes_1.parent_id IS NULL",
1489            checkparams={},
1490        )
1491
1492        self._test(
1493            nalias.parent != None,  # noqa
1494            "nodes_1.parent_id IS NOT NULL",
1495            checkparams={},
1496        )
1497
1498        self._test(
1499            nalias.children == None,  # noqa
1500            "NOT (EXISTS ("
1501            "SELECT 1 FROM nodes WHERE nodes_1.id = nodes.parent_id))",
1502            entity=nalias,
1503            checkparams={},
1504        )
1505
1506        self._test(
1507            nalias.children.any(Node.data == "some data"),
1508            "EXISTS (SELECT 1 FROM nodes WHERE "
1509            "nodes_1.id = nodes.parent_id AND nodes.data = :data_1)",
1510            entity=nalias,
1511            checkparams={"data_1": "some data"},
1512        )
1513
1514        # this fails because self-referential any() is auto-aliasing;
1515        # the fact that we use "nalias" here means we get two aliases.
1516        # self._test(
1517        #        Node.children.any(nalias.data == 'some data'),
1518        #        "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE "
1519        #        "nodes.id = nodes_1.parent_id AND nodes_1.data = :data_1)",
1520        #        entity=Node
1521        #        )
1522
1523        self._test(
1524            nalias.parent.has(Node.data == "some data"),
1525            "EXISTS (SELECT 1 FROM nodes WHERE nodes.id = nodes_1.parent_id "
1526            "AND nodes.data = :data_1)",
1527            entity=nalias,
1528            checkparams={"data_1": "some data"},
1529        )
1530
1531        self._test(
1532            Node.parent.has(Node.data == "some data"),
1533            "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE "
1534            "nodes_1.id = nodes.parent_id AND nodes_1.data = :data_1)",
1535            entity=Node,
1536            checkparams={"data_1": "some data"},
1537        )
1538
1539        self._test(
1540            Node.parent == Node(id=7),
1541            ":param_1 = nodes.parent_id",
1542            checkparams={"param_1": 7},
1543        )
1544
1545        self._test(
1546            nalias.parent == Node(id=7),
1547            ":param_1 = nodes_1.parent_id",
1548            checkparams={"param_1": 7},
1549        )
1550
1551        self._test(
1552            nalias.parent != Node(id=7),
1553            "nodes_1.parent_id != :parent_id_1 "
1554            "OR nodes_1.parent_id IS NULL",
1555            checkparams={"parent_id_1": 7},
1556        )
1557
1558        self._test(
1559            nalias.parent != Node(id=7),
1560            "nodes_1.parent_id != :parent_id_1 "
1561            "OR nodes_1.parent_id IS NULL",
1562            checkparams={"parent_id_1": 7},
1563        )
1564
1565        self._test(
1566            nalias.children.contains(Node(id=7, parent_id=12)),
1567            "nodes_1.id = :param_1",
1568            checkparams={"param_1": 12},
1569        )
1570
1571    def test_multilevel_any(self):
1572        User, Address, Dingaling = (
1573            self.classes.User,
1574            self.classes.Address,
1575            self.classes.Dingaling,
1576        )
1577        sess = Session()
1578
1579        q = sess.query(User).filter(
1580            User.addresses.any(
1581                and_(Address.id == Dingaling.address_id, Dingaling.data == "x")
1582            )
1583        )
1584        # new since #2746 - correlate_except() now takes context into account
1585        # so its usage in any() is not as disrupting.
1586        self.assert_compile(
1587            q,
1588            "SELECT users.id AS users_id, users.name AS users_name "
1589            "FROM users "
1590            "WHERE EXISTS (SELECT 1 "
1591            "FROM addresses, dingalings "
1592            "WHERE users.id = addresses.user_id AND "
1593            "addresses.id = dingalings.address_id AND "
1594            "dingalings.data = :data_1)",
1595        )
1596
1597    def test_op(self):
1598        User = self.classes.User
1599
1600        self._test(User.name.op("ilike")("17"), "users.name ilike :name_1")
1601
1602    def test_in(self):
1603        User = self.classes.User
1604
1605        self._test(User.id.in_(["a", "b"]), "users.id IN (:id_1, :id_2)")
1606
1607    def test_in_on_relationship_not_supported(self):
1608        User, Address = self.classes.User, self.classes.Address
1609
1610        assert_raises(NotImplementedError, Address.user.in_, [User(id=5)])
1611
1612    def test_neg(self):
1613        User = self.classes.User
1614
1615        self._test(-User.id, "-users.id")
1616        self._test(User.id + -User.id, "users.id + -users.id")
1617
1618    def test_between(self):
1619        User = self.classes.User
1620
1621        self._test(
1622            User.id.between("a", "b"), "users.id BETWEEN :id_1 AND :id_2"
1623        )
1624
1625    def test_collate(self):
1626        User = self.classes.User
1627
1628        self._test(collate(User.id, "utf8_bin"), "users.id COLLATE utf8_bin")
1629
1630        self._test(User.id.collate("utf8_bin"), "users.id COLLATE utf8_bin")
1631
1632    def test_selfref_between(self):
1633        User = self.classes.User
1634
1635        ualias = aliased(User)
1636        self._test(
1637            User.id.between(ualias.id, ualias.id),
1638            "users.id BETWEEN users_1.id AND users_1.id",
1639        )
1640        self._test(
1641            ualias.id.between(User.id, User.id),
1642            "users_1.id BETWEEN users.id AND users.id",
1643        )
1644
1645    def test_clauses(self):
1646        User, Address = self.classes.User, self.classes.Address
1647
1648        for (expr, compare) in (
1649            (func.max(User.id), "max(users.id)"),
1650            (User.id.desc(), "users.id DESC"),
1651            (
1652                between(5, User.id, Address.id),
1653                ":param_1 BETWEEN users.id AND addresses.id",
1654            ),
1655            # this one would require adding compile() to
1656            # InstrumentedScalarAttribute.  do we want this ?
1657            # (User.id, "users.id")
1658        ):
1659            c = expr.compile(dialect=default.DefaultDialect())
1660            assert str(c) == compare, "%s != %s" % (str(c), compare)
1661
1662
1663class ExpressionTest(QueryTest, AssertsCompiledSQL):
1664    __dialect__ = "default"
1665
1666    def test_deferred_instances(self):
1667        User, addresses, Address = (
1668            self.classes.User,
1669            self.tables.addresses,
1670            self.classes.Address,
1671        )
1672
1673        session = create_session()
1674        s = (
1675            session.query(User)
1676            .filter(
1677                and_(
1678                    addresses.c.email_address == bindparam("emailad"),
1679                    Address.user_id == User.id,
1680                )
1681            )
1682            .statement
1683        )
1684
1685        result = list(
1686            session.query(User).instances(s.execute(emailad="jack@bean.com"))
1687        )
1688        eq_([User(id=7)], result)
1689
1690    def test_aliased_sql_construct(self):
1691        User, Address = self.classes.User, self.classes.Address
1692
1693        j = join(User, Address)
1694        a1 = aliased(j)
1695        self.assert_compile(
1696            a1.select(),
1697            "SELECT anon_1.users_id, anon_1.users_name, anon_1.addresses_id, "
1698            "anon_1.addresses_user_id, anon_1.addresses_email_address "
1699            "FROM (SELECT users.id AS users_id, users.name AS users_name, "
1700            "addresses.id AS addresses_id, addresses.user_id AS "
1701            "addresses_user_id, addresses.email_address AS "
1702            "addresses_email_address FROM users JOIN addresses "
1703            "ON users.id = addresses.user_id) AS anon_1",
1704        )
1705
1706    def test_aliased_sql_construct_raises_adapt_on_names(self):
1707        User, Address = self.classes.User, self.classes.Address
1708
1709        j = join(User, Address)
1710        assert_raises_message(
1711            sa_exc.ArgumentError,
1712            "adapt_on_names only applies to ORM elements",
1713            aliased,
1714            j,
1715            adapt_on_names=True,
1716        )
1717
1718    def test_scalar_subquery_compile_whereclause(self):
1719        User = self.classes.User
1720        Address = self.classes.Address
1721
1722        session = create_session()
1723
1724        q = session.query(User.id).filter(User.id == 7)
1725
1726        q = session.query(Address).filter(Address.user_id == q)
1727        assert isinstance(q._criterion.right, expression.ColumnElement)
1728        self.assert_compile(
1729            q,
1730            "SELECT addresses.id AS addresses_id, addresses.user_id "
1731            "AS addresses_user_id, addresses.email_address AS "
1732            "addresses_email_address FROM addresses WHERE "
1733            "addresses.user_id = (SELECT users.id AS users_id "
1734            "FROM users WHERE users.id = :id_1)",
1735        )
1736
1737    def test_subquery_no_eagerloads(self):
1738        User = self.classes.User
1739        s = Session()
1740
1741        self.assert_compile(
1742            s.query(User).options(joinedload(User.addresses)).subquery(),
1743            "SELECT users.id, users.name FROM users",
1744        )
1745
1746    def test_exists_no_eagerloads(self):
1747        User = self.classes.User
1748        s = Session()
1749
1750        self.assert_compile(
1751            s.query(
1752                s.query(User).options(joinedload(User.addresses)).exists()
1753            ),
1754            "SELECT EXISTS (SELECT 1 FROM users) AS anon_1",
1755        )
1756
1757    def test_named_subquery(self):
1758        User = self.classes.User
1759
1760        session = create_session()
1761        a1 = session.query(User.id).filter(User.id == 7).subquery("foo1")
1762        a2 = session.query(User.id).filter(User.id == 7).subquery(name="foo2")
1763        a3 = session.query(User.id).filter(User.id == 7).subquery()
1764
1765        eq_(a1.name, "foo1")
1766        eq_(a2.name, "foo2")
1767        eq_(a3.name, "%%(%d anon)s" % id(a3))
1768
1769    def test_labeled_subquery(self):
1770        User = self.classes.User
1771
1772        session = create_session()
1773        a1 = (
1774            session.query(User.id)
1775            .filter(User.id == 7)
1776            .subquery(with_labels=True)
1777        )
1778        assert a1.c.users_id is not None
1779
1780    def test_reduced_subquery(self):
1781        User = self.classes.User
1782        ua = aliased(User)
1783
1784        session = create_session()
1785        a1 = (
1786            session.query(User.id, ua.id, ua.name)
1787            .filter(User.id == ua.id)
1788            .subquery(reduce_columns=True)
1789        )
1790        self.assert_compile(
1791            a1,
1792            "SELECT users.id, users_1.name FROM "
1793            "users, users AS users_1 "
1794            "WHERE users.id = users_1.id",
1795        )
1796
1797    def test_label(self):
1798        User = self.classes.User
1799
1800        session = create_session()
1801
1802        q = session.query(User.id).filter(User.id == 7).label("foo")
1803        self.assert_compile(
1804            session.query(q),
1805            "SELECT (SELECT users.id FROM users "
1806            "WHERE users.id = :id_1) AS foo",
1807        )
1808
1809    def test_as_scalar(self):
1810        User = self.classes.User
1811
1812        session = create_session()
1813
1814        q = session.query(User.id).filter(User.id == 7).as_scalar()
1815
1816        self.assert_compile(
1817            session.query(User).filter(User.id.in_(q)),
1818            "SELECT users.id AS users_id, users.name "
1819            "AS users_name FROM users WHERE users.id "
1820            "IN (SELECT users.id FROM users WHERE "
1821            "users.id = :id_1)",
1822        )
1823
1824    def test_param_transfer(self):
1825        User = self.classes.User
1826
1827        session = create_session()
1828
1829        q = (
1830            session.query(User.id)
1831            .filter(User.id == bindparam("foo"))
1832            .params(foo=7)
1833            .subquery()
1834        )
1835
1836        q = session.query(User).filter(User.id.in_(q))
1837
1838        eq_(User(id=7), q.one())
1839
1840    def test_in(self):
1841        User, Address = self.classes.User, self.classes.Address
1842
1843        session = create_session()
1844        s = (
1845            session.query(User.id)
1846            .join(User.addresses)
1847            .group_by(User.id)
1848            .having(func.count(Address.id) > 2)
1849        )
1850        eq_(session.query(User).filter(User.id.in_(s)).all(), [User(id=8)])
1851
1852    def test_union(self):
1853        User = self.classes.User
1854
1855        s = create_session()
1856
1857        q1 = s.query(User).filter(User.name == "ed").with_labels()
1858        q2 = s.query(User).filter(User.name == "fred").with_labels()
1859        eq_(
1860            s.query(User)
1861            .from_statement(union(q1, q2).order_by("users_name"))
1862            .all(),
1863            [User(name="ed"), User(name="fred")],
1864        )
1865
1866    def test_select(self):
1867        User = self.classes.User
1868
1869        s = create_session()
1870
1871        # this is actually not legal on most DBs since the subquery has no
1872        # alias
1873        q1 = s.query(User).filter(User.name == "ed")
1874
1875        self.assert_compile(
1876            select([q1]),
1877            "SELECT users_id, users_name FROM (SELECT users.id AS users_id, "
1878            "users.name AS users_name FROM users WHERE users.name = :name_1)",
1879        )
1880
1881    def test_join(self):
1882        User, Address = self.classes.User, self.classes.Address
1883
1884        s = create_session()
1885
1886        # TODO: do we want aliased() to detect a query and convert to
1887        # subquery() automatically ?
1888        q1 = s.query(Address).filter(Address.email_address == "jack@bean.com")
1889        adalias = aliased(Address, q1.subquery())
1890        eq_(
1891            s.query(User, adalias)
1892            .join(adalias, User.id == adalias.user_id)
1893            .all(),
1894            [
1895                (
1896                    User(id=7, name="jack"),
1897                    Address(email_address="jack@bean.com", user_id=7, id=1),
1898                )
1899            ],
1900        )
1901
1902    def test_group_by_plain(self):
1903        User = self.classes.User
1904        s = create_session()
1905
1906        q1 = s.query(User.id, User.name).group_by(User.name)
1907        self.assert_compile(
1908            select([q1]),
1909            "SELECT users_id, users_name FROM (SELECT users.id AS users_id, "
1910            "users.name AS users_name FROM users GROUP BY users.name)",
1911        )
1912
1913    def test_group_by_append(self):
1914        User = self.classes.User
1915        s = create_session()
1916
1917        q1 = s.query(User.id, User.name).group_by(User.name)
1918
1919        # test append something to group_by
1920        self.assert_compile(
1921            select([q1.group_by(User.id)]),
1922            "SELECT users_id, users_name FROM (SELECT users.id AS users_id, "
1923            "users.name AS users_name FROM users "
1924            "GROUP BY users.name, users.id)",
1925        )
1926
1927    def test_group_by_cancellation(self):
1928        User = self.classes.User
1929        s = create_session()
1930
1931        q1 = s.query(User.id, User.name).group_by(User.name)
1932        # test cancellation by using None, replacement with something else
1933        self.assert_compile(
1934            select([q1.group_by(None).group_by(User.id)]),
1935            "SELECT users_id, users_name FROM (SELECT users.id AS users_id, "
1936            "users.name AS users_name FROM users GROUP BY users.id)",
1937        )
1938
1939        # test cancellation by using None, replacement with nothing
1940        self.assert_compile(
1941            select([q1.group_by(None)]),
1942            "SELECT users_id, users_name FROM (SELECT users.id AS users_id, "
1943            "users.name AS users_name FROM users)",
1944        )
1945
1946    def test_group_by_cancelled_still_present(self):
1947        User = self.classes.User
1948        s = create_session()
1949
1950        q1 = s.query(User.id, User.name).group_by(User.name).group_by(None)
1951
1952        q1._no_criterion_assertion("foo")
1953
1954    def test_order_by_plain(self):
1955        User = self.classes.User
1956        s = create_session()
1957
1958        q1 = s.query(User.id, User.name).order_by(User.name)
1959        self.assert_compile(
1960            select([q1]),
1961            "SELECT users_id, users_name FROM (SELECT users.id AS users_id, "
1962            "users.name AS users_name FROM users ORDER BY users.name)",
1963        )
1964
1965    def test_order_by_append(self):
1966        User = self.classes.User
1967        s = create_session()
1968
1969        q1 = s.query(User.id, User.name).order_by(User.name)
1970
1971        # test append something to order_by
1972        self.assert_compile(
1973            select([q1.order_by(User.id)]),
1974            "SELECT users_id, users_name FROM (SELECT users.id AS users_id, "
1975            "users.name AS users_name FROM users "
1976            "ORDER BY users.name, users.id)",
1977        )
1978
1979    def test_order_by_cancellation(self):
1980        User = self.classes.User
1981        s = create_session()
1982
1983        q1 = s.query(User.id, User.name).order_by(User.name)
1984        # test cancellation by using None, replacement with something else
1985        self.assert_compile(
1986            select([q1.order_by(None).order_by(User.id)]),
1987            "SELECT users_id, users_name FROM (SELECT users.id AS users_id, "
1988            "users.name AS users_name FROM users ORDER BY users.id)",
1989        )
1990
1991        # test cancellation by using None, replacement with nothing
1992        self.assert_compile(
1993            select([q1.order_by(None)]),
1994            "SELECT users_id, users_name FROM (SELECT users.id AS users_id, "
1995            "users.name AS users_name FROM users)",
1996        )
1997
1998    def test_order_by_cancellation_false(self):
1999        User = self.classes.User
2000        s = create_session()
2001
2002        q1 = s.query(User.id, User.name).order_by(User.name)
2003        # test cancellation by using None, replacement with something else
2004        self.assert_compile(
2005            select([q1.order_by(False).order_by(User.id)]),
2006            "SELECT users_id, users_name FROM (SELECT users.id AS users_id, "
2007            "users.name AS users_name FROM users ORDER BY users.id)",
2008        )
2009
2010        # test cancellation by using None, replacement with nothing
2011        self.assert_compile(
2012            select([q1.order_by(False)]),
2013            "SELECT users_id, users_name FROM (SELECT users.id AS users_id, "
2014            "users.name AS users_name FROM users)",
2015        )
2016
2017    def test_order_by_cancelled_allows_assertions(self):
2018        User = self.classes.User
2019        s = create_session()
2020
2021        q1 = s.query(User.id, User.name).order_by(User.name).order_by(None)
2022
2023        q1._no_criterion_assertion("foo")
2024
2025    def test_legacy_order_by_cancelled_allows_assertions(self):
2026        User = self.classes.User
2027        s = create_session()
2028
2029        q1 = s.query(User.id, User.name).order_by(User.name).order_by(False)
2030
2031        q1._no_criterion_assertion("foo")
2032
2033
2034class ColumnPropertyTest(_fixtures.FixtureTest, AssertsCompiledSQL):
2035    __dialect__ = "default"
2036    run_setup_mappers = "each"
2037
2038    def _fixture(self, label=True, polymorphic=False):
2039        User, Address = self.classes("User", "Address")
2040        users, addresses = self.tables("users", "addresses")
2041        stmt = (
2042            select([func.max(addresses.c.email_address)])
2043            .where(addresses.c.user_id == users.c.id)
2044            .correlate(users)
2045        )
2046        if label:
2047            stmt = stmt.label("email_ad")
2048
2049        mapper(
2050            User,
2051            users,
2052            properties={"ead": column_property(stmt)},
2053            with_polymorphic="*" if polymorphic else None,
2054        )
2055        mapper(Address, addresses)
2056
2057    def _func_fixture(self, label=False):
2058        User = self.classes.User
2059        users = self.tables.users
2060
2061        if label:
2062            mapper(
2063                User,
2064                users,
2065                properties={
2066                    "foobar": column_property(
2067                        func.foob(users.c.name).label(None)
2068                    )
2069                },
2070            )
2071        else:
2072            mapper(
2073                User,
2074                users,
2075                properties={
2076                    "foobar": column_property(func.foob(users.c.name))
2077                },
2078            )
2079
2080    def test_anon_label_function_auto(self):
2081        self._func_fixture()
2082        User = self.classes.User
2083
2084        s = Session()
2085
2086        u1 = aliased(User)
2087        self.assert_compile(
2088            s.query(User.foobar, u1.foobar),
2089            "SELECT foob(users.name) AS foob_1, foob(users_1.name) AS foob_2 "
2090            "FROM users, users AS users_1",
2091        )
2092
2093    def test_anon_label_function_manual(self):
2094        self._func_fixture(label=True)
2095        User = self.classes.User
2096
2097        s = Session()
2098
2099        u1 = aliased(User)
2100        self.assert_compile(
2101            s.query(User.foobar, u1.foobar),
2102            "SELECT foob(users.name) AS foob_1, foob(users_1.name) AS foob_2 "
2103            "FROM users, users AS users_1",
2104        )
2105
2106    def test_anon_label_ad_hoc_labeling(self):
2107        self._func_fixture()
2108        User = self.classes.User
2109
2110        s = Session()
2111
2112        u1 = aliased(User)
2113        self.assert_compile(
2114            s.query(User.foobar.label("x"), u1.foobar.label("y")),
2115            "SELECT foob(users.name) AS x, foob(users_1.name) AS y "
2116            "FROM users, users AS users_1",
2117        )
2118
2119    def test_order_by_column_prop_string(self):
2120        User, Address = self.classes("User", "Address")
2121        self._fixture(label=True)
2122
2123        s = Session()
2124        q = s.query(User).order_by("email_ad")
2125        self.assert_compile(
2126            q,
2127            "SELECT (SELECT max(addresses.email_address) AS max_1 "
2128            "FROM addresses "
2129            "WHERE addresses.user_id = users.id) AS email_ad, "
2130            "users.id AS users_id, users.name AS users_name "
2131            "FROM users ORDER BY email_ad",
2132        )
2133
2134    def test_order_by_column_prop_aliased_string(self):
2135        User, Address = self.classes("User", "Address")
2136        self._fixture(label=True)
2137
2138        s = Session()
2139        ua = aliased(User)
2140        q = s.query(ua).order_by("email_ad")
2141
2142        def go():
2143            self.assert_compile(
2144                q,
2145                "SELECT (SELECT max(addresses.email_address) AS max_1 "
2146                "FROM addresses WHERE addresses.user_id = users_1.id) "
2147                "AS anon_1, users_1.id AS users_1_id, "
2148                "users_1.name AS users_1_name FROM users AS users_1 "
2149                "ORDER BY email_ad",
2150            )
2151
2152        assert_warnings(
2153            go, ["Can't resolve label reference 'email_ad'"], regex=True
2154        )
2155
2156    def test_order_by_column_labeled_prop_attr_aliased_one(self):
2157        User = self.classes.User
2158        self._fixture(label=True)
2159
2160        ua = aliased(User)
2161        s = Session()
2162        q = s.query(ua).order_by(ua.ead)
2163        self.assert_compile(
2164            q,
2165            "SELECT (SELECT max(addresses.email_address) AS max_1 "
2166            "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, "
2167            "users_1.id AS users_1_id, users_1.name AS users_1_name "
2168            "FROM users AS users_1 ORDER BY anon_1",
2169        )
2170
2171    def test_order_by_column_labeled_prop_attr_aliased_two(self):
2172        User = self.classes.User
2173        self._fixture(label=True)
2174
2175        ua = aliased(User)
2176        s = Session()
2177        q = s.query(ua.ead).order_by(ua.ead)
2178        self.assert_compile(
2179            q,
2180            "SELECT (SELECT max(addresses.email_address) AS max_1 "
2181            "FROM addresses, "
2182            "users AS users_1 WHERE addresses.user_id = users_1.id) "
2183            "AS anon_1 ORDER BY anon_1",
2184        )
2185
2186        # we're also testing that the state of "ua" is OK after the
2187        # previous call, so the batching into one test is intentional
2188        q = s.query(ua).order_by(ua.ead)
2189        self.assert_compile(
2190            q,
2191            "SELECT (SELECT max(addresses.email_address) AS max_1 "
2192            "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, "
2193            "users_1.id AS users_1_id, users_1.name AS users_1_name "
2194            "FROM users AS users_1 ORDER BY anon_1",
2195        )
2196
2197    def test_order_by_column_labeled_prop_attr_aliased_three(self):
2198        User = self.classes.User
2199        self._fixture(label=True)
2200
2201        ua = aliased(User)
2202        s = Session()
2203        q = s.query(User.ead, ua.ead).order_by(User.ead, ua.ead)
2204        self.assert_compile(
2205            q,
2206            "SELECT (SELECT max(addresses.email_address) AS max_1 "
2207            "FROM addresses, users WHERE addresses.user_id = users.id) "
2208            "AS email_ad, (SELECT max(addresses.email_address) AS max_1 "
2209            "FROM addresses, users AS users_1 WHERE addresses.user_id = "
2210            "users_1.id) AS anon_1 ORDER BY email_ad, anon_1",
2211        )
2212
2213        q = s.query(User, ua).order_by(User.ead, ua.ead)
2214        self.assert_compile(
2215            q,
2216            "SELECT (SELECT max(addresses.email_address) AS max_1 "
2217            "FROM addresses WHERE addresses.user_id = users.id) AS "
2218            "email_ad, users.id AS users_id, users.name AS users_name, "
2219            "(SELECT max(addresses.email_address) AS max_1 FROM addresses "
2220            "WHERE addresses.user_id = users_1.id) AS anon_1, users_1.id "
2221            "AS users_1_id, users_1.name AS users_1_name FROM users, "
2222            "users AS users_1 ORDER BY email_ad, anon_1",
2223        )
2224
2225    def test_order_by_column_labeled_prop_attr_aliased_four(self):
2226        User = self.classes.User
2227        self._fixture(label=True, polymorphic=True)
2228
2229        ua = aliased(User)
2230        s = Session()
2231        q = s.query(ua, User.id).order_by(ua.ead)
2232        self.assert_compile(
2233            q,
2234            "SELECT (SELECT max(addresses.email_address) AS max_1 FROM "
2235            "addresses WHERE addresses.user_id = users_1.id) AS anon_1, "
2236            "users_1.id AS users_1_id, users_1.name AS users_1_name, "
2237            "users.id AS users_id FROM users AS users_1, "
2238            "users ORDER BY anon_1",
2239        )
2240
2241    def test_order_by_column_unlabeled_prop_attr_aliased_one(self):
2242        User = self.classes.User
2243        self._fixture(label=False)
2244
2245        ua = aliased(User)
2246        s = Session()
2247        q = s.query(ua).order_by(ua.ead)
2248        self.assert_compile(
2249            q,
2250            "SELECT (SELECT max(addresses.email_address) AS max_1 "
2251            "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, "
2252            "users_1.id AS users_1_id, users_1.name AS users_1_name "
2253            "FROM users AS users_1 ORDER BY anon_1",
2254        )
2255
2256    def test_order_by_column_unlabeled_prop_attr_aliased_two(self):
2257        User = self.classes.User
2258        self._fixture(label=False)
2259
2260        ua = aliased(User)
2261        s = Session()
2262        q = s.query(ua.ead).order_by(ua.ead)
2263        self.assert_compile(
2264            q,
2265            "SELECT (SELECT max(addresses.email_address) AS max_1 "
2266            "FROM addresses, "
2267            "users AS users_1 WHERE addresses.user_id = users_1.id) "
2268            "AS anon_1 ORDER BY anon_1",
2269        )
2270
2271        # we're also testing that the state of "ua" is OK after the
2272        # previous call, so the batching into one test is intentional
2273        q = s.query(ua).order_by(ua.ead)
2274        self.assert_compile(
2275            q,
2276            "SELECT (SELECT max(addresses.email_address) AS max_1 "
2277            "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, "
2278            "users_1.id AS users_1_id, users_1.name AS users_1_name "
2279            "FROM users AS users_1 ORDER BY anon_1",
2280        )
2281
2282    def test_order_by_column_unlabeled_prop_attr_aliased_three(self):
2283        User = self.classes.User
2284        self._fixture(label=False)
2285
2286        ua = aliased(User)
2287        s = Session()
2288        q = s.query(User.ead, ua.ead).order_by(User.ead, ua.ead)
2289        self.assert_compile(
2290            q,
2291            "SELECT (SELECT max(addresses.email_address) AS max_1 "
2292            "FROM addresses, users WHERE addresses.user_id = users.id) "
2293            "AS anon_1, (SELECT max(addresses.email_address) AS max_1 "
2294            "FROM addresses, users AS users_1 "
2295            "WHERE addresses.user_id = users_1.id) AS anon_2 "
2296            "ORDER BY anon_1, anon_2",
2297        )
2298
2299        q = s.query(User, ua).order_by(User.ead, ua.ead)
2300        self.assert_compile(
2301            q,
2302            "SELECT (SELECT max(addresses.email_address) AS max_1 "
2303            "FROM addresses WHERE addresses.user_id = users.id) AS "
2304            "anon_1, users.id AS users_id, users.name AS users_name, "
2305            "(SELECT max(addresses.email_address) AS max_1 FROM addresses "
2306            "WHERE addresses.user_id = users_1.id) AS anon_2, users_1.id "
2307            "AS users_1_id, users_1.name AS users_1_name FROM users, "
2308            "users AS users_1 ORDER BY anon_1, anon_2",
2309        )
2310
2311    def test_order_by_column_prop_attr(self):
2312        User, Address = self.classes("User", "Address")
2313        self._fixture(label=True)
2314
2315        s = Session()
2316        q = s.query(User).order_by(User.ead)
2317        # this one is a bit of a surprise; this is compiler
2318        # label-order-by logic kicking in, but won't work in more
2319        # complex cases.
2320        self.assert_compile(
2321            q,
2322            "SELECT (SELECT max(addresses.email_address) AS max_1 "
2323            "FROM addresses "
2324            "WHERE addresses.user_id = users.id) AS email_ad, "
2325            "users.id AS users_id, users.name AS users_name "
2326            "FROM users ORDER BY email_ad",
2327        )
2328
2329    def test_order_by_column_prop_attr_non_present(self):
2330        User, Address = self.classes("User", "Address")
2331        self._fixture(label=True)
2332
2333        s = Session()
2334        q = s.query(User).options(defer(User.ead)).order_by(User.ead)
2335        self.assert_compile(
2336            q,
2337            "SELECT users.id AS users_id, users.name AS users_name "
2338            "FROM users ORDER BY "
2339            "(SELECT max(addresses.email_address) AS max_1 "
2340            "FROM addresses "
2341            "WHERE addresses.user_id = users.id)",
2342        )
2343
2344
2345class ComparatorTest(QueryTest):
2346    def test_clause_element_query_resolve(self):
2347        from sqlalchemy.orm.properties import ColumnProperty
2348
2349        User = self.classes.User
2350
2351        class Comparator(ColumnProperty.Comparator):
2352            def __init__(self, expr):
2353                self.expr = expr
2354
2355            def __clause_element__(self):
2356                return self.expr
2357
2358        sess = Session()
2359        eq_(
2360            sess.query(Comparator(User.id))
2361            .order_by(Comparator(User.id))
2362            .all(),
2363            [(7,), (8,), (9,), (10,)],
2364        )
2365
2366
2367# more slice tests are available in test/orm/generative.py
2368class SliceTest(QueryTest):
2369    def test_first(self):
2370        User = self.classes.User
2371
2372        assert User(id=7) == create_session().query(User).first()
2373
2374        assert (
2375            create_session().query(User).filter(User.id == 27).first() is None
2376        )
2377
2378    def test_limit_offset_applies(self):
2379        """Test that the expected LIMIT/OFFSET is applied for slices.
2380
2381        The LIMIT/OFFSET syntax differs slightly on all databases, and
2382        query[x:y] executes immediately, so we are asserting against
2383        SQL strings using sqlite's syntax.
2384
2385        """
2386
2387        User = self.classes.User
2388
2389        sess = create_session()
2390        q = sess.query(User).order_by(User.id)
2391
2392        self.assert_sql(
2393            testing.db,
2394            lambda: q[10:20],
2395            [
2396                (
2397                    "SELECT users.id AS users_id, users.name "
2398                    "AS users_name FROM users ORDER BY users.id "
2399                    "LIMIT :param_1 OFFSET :param_2",
2400                    {"param_1": 10, "param_2": 10},
2401                )
2402            ],
2403        )
2404
2405        self.assert_sql(
2406            testing.db,
2407            lambda: q[:20],
2408            [
2409                (
2410                    "SELECT users.id AS users_id, users.name "
2411                    "AS users_name FROM users ORDER BY users.id "
2412                    "LIMIT :param_1",
2413                    {"param_1": 20},
2414                )
2415            ],
2416        )
2417
2418        self.assert_sql(
2419            testing.db,
2420            lambda: q[5:],
2421            [
2422                (
2423                    "SELECT users.id AS users_id, users.name "
2424                    "AS users_name FROM users ORDER BY users.id "
2425                    "LIMIT -1 OFFSET :param_1",
2426                    {"param_1": 5},
2427                )
2428            ],
2429        )
2430
2431        self.assert_sql(testing.db, lambda: q[2:2], [])
2432
2433        self.assert_sql(testing.db, lambda: q[-2:-5], [])
2434
2435        self.assert_sql(
2436            testing.db,
2437            lambda: q[-5:-2],
2438            [
2439                (
2440                    "SELECT users.id AS users_id, users.name AS users_name "
2441                    "FROM users ORDER BY users.id",
2442                    {},
2443                )
2444            ],
2445        )
2446
2447        self.assert_sql(
2448            testing.db,
2449            lambda: q[-5:],
2450            [
2451                (
2452                    "SELECT users.id AS users_id, users.name AS users_name "
2453                    "FROM users ORDER BY users.id",
2454                    {},
2455                )
2456            ],
2457        )
2458
2459        self.assert_sql(
2460            testing.db,
2461            lambda: q[:],
2462            [
2463                (
2464                    "SELECT users.id AS users_id, users.name AS users_name "
2465                    "FROM users ORDER BY users.id",
2466                    {},
2467                )
2468            ],
2469        )
2470
2471
2472class FilterTest(QueryTest, AssertsCompiledSQL):
2473    __dialect__ = "default"
2474
2475    def test_basic(self):
2476        User = self.classes.User
2477
2478        users = create_session().query(User).all()
2479        eq_([User(id=7), User(id=8), User(id=9), User(id=10)], users)
2480
2481    @testing.requires.offset
2482    def test_limit_offset(self):
2483        User = self.classes.User
2484
2485        sess = create_session()
2486
2487        assert [User(id=8), User(id=9)] == sess.query(User).order_by(
2488            User.id
2489        ).limit(2).offset(1).all()
2490
2491        assert [User(id=8), User(id=9)] == list(
2492            sess.query(User).order_by(User.id)[1:3]
2493        )
2494
2495        assert User(id=8) == sess.query(User).order_by(User.id)[1]
2496
2497        assert [] == sess.query(User).order_by(User.id)[3:3]
2498        assert [] == sess.query(User).order_by(User.id)[0:0]
2499
2500    @testing.requires.bound_limit_offset
2501    def test_select_with_bindparam_offset_limit(self):
2502        """Does a query allow bindparam for the limit?"""
2503        User = self.classes.User
2504        sess = create_session()
2505        q1 = (
2506            sess.query(self.classes.User)
2507            .order_by(self.classes.User.id)
2508            .limit(bindparam("n"))
2509        )
2510
2511        for n in range(1, 4):
2512            result = q1.params(n=n).all()
2513            eq_(len(result), n)
2514
2515        eq_(
2516            sess.query(User)
2517            .order_by(User.id)
2518            .limit(bindparam("limit"))
2519            .offset(bindparam("offset"))
2520            .params(limit=2, offset=1)
2521            .all(),
2522            [User(id=8), User(id=9)],
2523        )
2524
2525    @testing.fails_on("mysql", "doesn't like CAST in the limit clause")
2526    @testing.requires.bound_limit_offset
2527    def test_select_with_bindparam_offset_limit_w_cast(self):
2528        User = self.classes.User
2529        sess = create_session()
2530        q1 = (
2531            sess.query(self.classes.User)
2532            .order_by(self.classes.User.id)
2533            .limit(bindparam("n"))
2534        )
2535        eq_(
2536            list(
2537                sess.query(User)
2538                .params(a=1, b=3)
2539                .order_by(User.id)[
2540                    cast(bindparam("a"), Integer) : cast(
2541                        bindparam("b"), Integer
2542                    )
2543                ]
2544            ),
2545            [User(id=8), User(id=9)],
2546        )
2547
2548    @testing.requires.boolean_col_expressions
2549    def test_exists(self):
2550        User = self.classes.User
2551
2552        sess = create_session(testing.db)
2553
2554        assert sess.query(exists().where(User.id == 9)).scalar()
2555        assert not sess.query(exists().where(User.id == 29)).scalar()
2556
2557    def test_one_filter(self):
2558        User = self.classes.User
2559
2560        assert [User(id=8), User(id=9)] == create_session().query(User).filter(
2561            User.name.endswith("ed")
2562        ).all()
2563
2564    def test_contains(self):
2565        """test comparing a collection to an object instance."""
2566
2567        User, Address = self.classes.User, self.classes.Address
2568
2569        sess = create_session()
2570        address = sess.query(Address).get(3)
2571        assert [User(id=8)] == sess.query(User).filter(
2572            User.addresses.contains(address)
2573        ).all()
2574
2575        try:
2576            sess.query(User).filter(User.addresses == address)
2577            assert False
2578        except sa_exc.InvalidRequestError:
2579            assert True
2580
2581        assert [User(id=10)] == sess.query(User).filter(
2582            User.addresses == None
2583        ).all()  # noqa
2584
2585        try:
2586            assert [User(id=7), User(id=9), User(id=10)] == sess.query(
2587                User
2588            ).filter(User.addresses != address).all()
2589            assert False
2590        except sa_exc.InvalidRequestError:
2591            assert True
2592
2593        # assert [User(id=7), User(id=9), User(id=10)] ==
2594        # sess.query(User).filter(User.addresses!=address).all()
2595
2596    def test_clause_element_ok(self):
2597        User = self.classes.User
2598        s = Session()
2599        self.assert_compile(
2600            s.query(User).filter(User.addresses),
2601            "SELECT users.id AS users_id, users.name AS users_name "
2602            "FROM users, addresses WHERE users.id = addresses.user_id",
2603        )
2604
2605    def test_unique_binds_join_cond(self):
2606        """test that binds used when the lazyclause is used in criterion are
2607        unique"""
2608
2609        User, Address = self.classes.User, self.classes.Address
2610        sess = Session()
2611        a1, a2 = sess.query(Address).order_by(Address.id)[0:2]
2612        self.assert_compile(
2613            sess.query(User)
2614            .filter(User.addresses.contains(a1))
2615            .union(sess.query(User).filter(User.addresses.contains(a2))),
2616            "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS "
2617            "anon_1_users_name FROM (SELECT users.id AS users_id, "
2618            "users.name AS users_name FROM users WHERE users.id = :param_1 "
2619            "UNION SELECT users.id AS users_id, users.name AS users_name "
2620            "FROM users WHERE users.id = :param_2) AS anon_1",
2621            checkparams={"param_1": 7, "param_2": 8},
2622        )
2623
2624    def test_any(self):
2625        # see also HasAnyTest, a newer suite which tests these at the level of
2626        # SQL compilation
2627        User, Address = self.classes.User, self.classes.Address
2628
2629        sess = create_session()
2630
2631        assert [User(id=8), User(id=9)] == sess.query(User).filter(
2632            User.addresses.any(Address.email_address.like("%ed%"))
2633        ).all()
2634
2635        assert [User(id=8)] == sess.query(User).filter(
2636            User.addresses.any(Address.email_address.like("%ed%"), id=4)
2637        ).all()
2638
2639        assert [User(id=8)] == sess.query(User).filter(
2640            User.addresses.any(Address.email_address.like("%ed%"))
2641        ).filter(User.addresses.any(id=4)).all()
2642
2643        assert [User(id=9)] == sess.query(User).filter(
2644            User.addresses.any(email_address="fred@fred.com")
2645        ).all()
2646
2647        # test that the contents are not adapted by the aliased join
2648        assert (
2649            [User(id=7), User(id=8)]
2650            == sess.query(User)
2651            .join("addresses", aliased=True)
2652            .filter(
2653                ~User.addresses.any(Address.email_address == "fred@fred.com")
2654            )
2655            .all()
2656        )
2657
2658        assert [User(id=10)] == sess.query(User).outerjoin(
2659            "addresses", aliased=True
2660        ).filter(~User.addresses.any()).all()
2661
2662    def test_any_doesnt_overcorrelate(self):
2663        # see also HasAnyTest, a newer suite which tests these at the level of
2664        # SQL compilation
2665        User, Address = self.classes.User, self.classes.Address
2666
2667        sess = create_session()
2668
2669        # test that any() doesn't overcorrelate
2670        assert (
2671            [User(id=7), User(id=8)]
2672            == sess.query(User)
2673            .join("addresses")
2674            .filter(
2675                ~User.addresses.any(Address.email_address == "fred@fred.com")
2676            )
2677            .all()
2678        )
2679
2680    def test_has(self):
2681        # see also HasAnyTest, a newer suite which tests these at the level of
2682        # SQL compilation
2683        Dingaling, User, Address = (
2684            self.classes.Dingaling,
2685            self.classes.User,
2686            self.classes.Address,
2687        )
2688
2689        sess = create_session()
2690        assert [Address(id=5)] == sess.query(Address).filter(
2691            Address.user.has(name="fred")
2692        ).all()
2693
2694        assert (
2695            [Address(id=2), Address(id=3), Address(id=4), Address(id=5)]
2696            == sess.query(Address)
2697            .filter(Address.user.has(User.name.like("%ed%")))
2698            .order_by(Address.id)
2699            .all()
2700        )
2701
2702        assert (
2703            [Address(id=2), Address(id=3), Address(id=4)]
2704            == sess.query(Address)
2705            .filter(Address.user.has(User.name.like("%ed%"), id=8))
2706            .order_by(Address.id)
2707            .all()
2708        )
2709
2710        # test has() doesn't overcorrelate
2711        assert (
2712            [Address(id=2), Address(id=3), Address(id=4)]
2713            == sess.query(Address)
2714            .join("user")
2715            .filter(Address.user.has(User.name.like("%ed%"), id=8))
2716            .order_by(Address.id)
2717            .all()
2718        )
2719
2720        # test has() doesn't get subquery contents adapted by aliased join
2721        assert (
2722            [Address(id=2), Address(id=3), Address(id=4)]
2723            == sess.query(Address)
2724            .join("user", aliased=True)
2725            .filter(Address.user.has(User.name.like("%ed%"), id=8))
2726            .order_by(Address.id)
2727            .all()
2728        )
2729
2730        dingaling = sess.query(Dingaling).get(2)
2731        assert [User(id=9)] == sess.query(User).filter(
2732            User.addresses.any(Address.dingaling == dingaling)
2733        ).all()
2734
2735    def test_contains_m2m(self):
2736        Item, Order = self.classes.Item, self.classes.Order
2737
2738        sess = create_session()
2739        item = sess.query(Item).get(3)
2740
2741        eq_(
2742            sess.query(Order)
2743            .filter(Order.items.contains(item))
2744            .order_by(Order.id)
2745            .all(),
2746            [Order(id=1), Order(id=2), Order(id=3)],
2747        )
2748        eq_(
2749            sess.query(Order)
2750            .filter(~Order.items.contains(item))
2751            .order_by(Order.id)
2752            .all(),
2753            [Order(id=4), Order(id=5)],
2754        )
2755
2756        item2 = sess.query(Item).get(5)
2757        eq_(
2758            sess.query(Order)
2759            .filter(Order.items.contains(item))
2760            .filter(Order.items.contains(item2))
2761            .all(),
2762            [Order(id=3)],
2763        )
2764
2765    def test_comparison(self):
2766        """test scalar comparison to an object instance"""
2767
2768        Item, Order, Dingaling, User, Address = (
2769            self.classes.Item,
2770            self.classes.Order,
2771            self.classes.Dingaling,
2772            self.classes.User,
2773            self.classes.Address,
2774        )
2775
2776        sess = create_session()
2777        user = sess.query(User).get(8)
2778        assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(
2779            Address
2780        ).filter(Address.user == user).all()
2781
2782        assert [Address(id=1), Address(id=5)] == sess.query(Address).filter(
2783            Address.user != user
2784        ).all()
2785
2786        # generates an IS NULL
2787        assert (
2788            [] == sess.query(Address).filter(Address.user == None).all()
2789        )  # noqa
2790        assert [] == sess.query(Address).filter(Address.user == null()).all()
2791
2792        assert [Order(id=5)] == sess.query(Order).filter(
2793            Order.address == None
2794        ).all()  # noqa
2795
2796        # o2o
2797        dingaling = sess.query(Dingaling).get(2)
2798        assert [Address(id=5)] == sess.query(Address).filter(
2799            Address.dingaling == dingaling
2800        ).all()
2801
2802        # m2m
2803        eq_(
2804            sess.query(Item)
2805            .filter(Item.keywords == None)
2806            .order_by(Item.id)  # noqa
2807            .all(),
2808            [Item(id=4), Item(id=5)],
2809        )
2810        eq_(
2811            sess.query(Item)
2812            .filter(Item.keywords != None)
2813            .order_by(Item.id)  # noqa
2814            .all(),
2815            [Item(id=1), Item(id=2), Item(id=3)],
2816        )
2817
2818    def test_filter_by(self):
2819        User, Address = self.classes.User, self.classes.Address
2820
2821        sess = create_session()
2822        user = sess.query(User).get(8)
2823        assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(
2824            Address
2825        ).filter_by(user=user).all()
2826
2827        # many to one generates IS NULL
2828        assert [] == sess.query(Address).filter_by(user=None).all()
2829        assert [] == sess.query(Address).filter_by(user=null()).all()
2830
2831        # one to many generates WHERE NOT EXISTS
2832        assert [User(name="chuck")] == sess.query(User).filter_by(
2833            addresses=None
2834        ).all()
2835        assert [User(name="chuck")] == sess.query(User).filter_by(
2836            addresses=null()
2837        ).all()
2838
2839    def test_filter_by_tables(self):
2840        users = self.tables.users
2841        addresses = self.tables.addresses
2842        sess = create_session()
2843        self.assert_compile(
2844            sess.query(users)
2845            .filter_by(name="ed")
2846            .join(addresses, users.c.id == addresses.c.user_id)
2847            .filter_by(email_address="ed@ed.com"),
2848            "SELECT users.id AS users_id, users.name AS users_name "
2849            "FROM users JOIN addresses ON users.id = addresses.user_id "
2850            "WHERE users.name = :name_1 AND "
2851            "addresses.email_address = :email_address_1",
2852            checkparams={"email_address_1": "ed@ed.com", "name_1": "ed"},
2853        )
2854
2855    def test_filter_by_no_property(self):
2856        addresses = self.tables.addresses
2857        sess = create_session()
2858        assert_raises_message(
2859            sa.exc.InvalidRequestError,
2860            "Entity 'addresses' has no property 'name'",
2861            sess.query(addresses).filter_by,
2862            name="ed",
2863        )
2864
2865    def test_none_comparison(self):
2866        Order, User, Address = (
2867            self.classes.Order,
2868            self.classes.User,
2869            self.classes.Address,
2870        )
2871
2872        sess = create_session()
2873
2874        # scalar
2875        eq_(
2876            [Order(description="order 5")],
2877            sess.query(Order).filter(Order.address_id == None).all(),  # noqa
2878        )
2879        eq_(
2880            [Order(description="order 5")],
2881            sess.query(Order).filter(Order.address_id == null()).all(),
2882        )
2883
2884        # o2o
2885        eq_(
2886            [Address(id=1), Address(id=3), Address(id=4)],
2887            sess.query(Address)
2888            .filter(Address.dingaling == None)
2889            .order_by(Address.id)  # noqa
2890            .all(),
2891        )
2892        eq_(
2893            [Address(id=1), Address(id=3), Address(id=4)],
2894            sess.query(Address)
2895            .filter(Address.dingaling == null())
2896            .order_by(Address.id)
2897            .all(),
2898        )
2899        eq_(
2900            [Address(id=2), Address(id=5)],
2901            sess.query(Address)
2902            .filter(Address.dingaling != None)
2903            .order_by(Address.id)  # noqa
2904            .all(),
2905        )
2906        eq_(
2907            [Address(id=2), Address(id=5)],
2908            sess.query(Address)
2909            .filter(Address.dingaling != null())
2910            .order_by(Address.id)
2911            .all(),
2912        )
2913
2914        # m2o
2915        eq_(
2916            [Order(id=5)],
2917            sess.query(Order).filter(Order.address == None).all(),
2918        )  # noqa
2919        eq_(
2920            [Order(id=1), Order(id=2), Order(id=3), Order(id=4)],
2921            sess.query(Order)
2922            .order_by(Order.id)
2923            .filter(Order.address != None)
2924            .all(),
2925        )  # noqa
2926
2927        # o2m
2928        eq_(
2929            [User(id=10)],
2930            sess.query(User).filter(User.addresses == None).all(),
2931        )  # noqa
2932        eq_(
2933            [User(id=7), User(id=8), User(id=9)],
2934            sess.query(User)
2935            .filter(User.addresses != None)
2936            .order_by(User.id)  # noqa
2937            .all(),
2938        )
2939
2940    def test_blank_filter_by(self):
2941        User = self.classes.User
2942
2943        eq_(
2944            [(7,), (8,), (9,), (10,)],
2945            create_session()
2946            .query(User.id)
2947            .filter_by()
2948            .order_by(User.id)
2949            .all(),
2950        )
2951        eq_(
2952            [(7,), (8,), (9,), (10,)],
2953            create_session()
2954            .query(User.id)
2955            .filter_by(**{})
2956            .order_by(User.id)
2957            .all(),
2958        )
2959
2960    def test_text_coerce(self):
2961        User = self.classes.User
2962        s = create_session()
2963        self.assert_compile(
2964            s.query(User).filter(text("name='ed'")),
2965            "SELECT users.id AS users_id, users.name "
2966            "AS users_name FROM users WHERE name='ed'",
2967        )
2968
2969
2970class HasAnyTest(fixtures.DeclarativeMappedTest, AssertsCompiledSQL):
2971    __dialect__ = "default"
2972
2973    @classmethod
2974    def setup_classes(cls):
2975        Base = cls.DeclarativeBasic
2976
2977        class D(Base):
2978            __tablename__ = "d"
2979            id = Column(Integer, primary_key=True)
2980
2981        class C(Base):
2982            __tablename__ = "c"
2983            id = Column(Integer, primary_key=True)
2984            d_id = Column(ForeignKey(D.id))
2985
2986            bs = relationship("B", back_populates="c")
2987
2988        b_d = Table(
2989            "b_d",
2990            Base.metadata,
2991            Column("bid", ForeignKey("b.id")),
2992            Column("did", ForeignKey("d.id")),
2993        )
2994
2995        # note we are using the ForeignKey pattern identified as a bug
2996        # in [ticket:4367]
2997        class B(Base):
2998            __tablename__ = "b"
2999            id = Column(Integer, primary_key=True)
3000            c_id = Column(ForeignKey(C.id))
3001
3002            c = relationship("C", back_populates="bs")
3003
3004            d = relationship("D", secondary=b_d)
3005
3006        class A(Base):
3007            __tablename__ = "a"
3008            id = Column(Integer, primary_key=True)
3009            b_id = Column(ForeignKey(B.id))
3010
3011            d = relationship(
3012                "D",
3013                secondary="join(B, C)",
3014                primaryjoin="A.b_id == B.id",
3015                secondaryjoin="C.d_id == D.id",
3016                uselist=False,
3017            )
3018
3019    def test_has_composite_secondary(self):
3020        A, D = self.classes("A", "D")
3021        s = Session()
3022        self.assert_compile(
3023            s.query(A).filter(A.d.has(D.id == 1)),
3024            "SELECT a.id AS a_id, a.b_id AS a_b_id FROM a WHERE EXISTS "
3025            "(SELECT 1 FROM d, b JOIN c ON c.id = b.c_id "
3026            "WHERE a.b_id = b.id AND c.d_id = d.id AND d.id = :id_1)",
3027        )
3028
3029    def test_has_many_to_one(self):
3030        B, C = self.classes("B", "C")
3031        s = Session()
3032        self.assert_compile(
3033            s.query(B).filter(B.c.has(C.id == 1)),
3034            "SELECT b.id AS b_id, b.c_id AS b_c_id FROM b WHERE "
3035            "EXISTS (SELECT 1 FROM c WHERE c.id = b.c_id AND c.id = :id_1)",
3036        )
3037
3038    def test_any_many_to_many(self):
3039        B, D = self.classes("B", "D")
3040        s = Session()
3041        self.assert_compile(
3042            s.query(B).filter(B.d.any(D.id == 1)),
3043            "SELECT b.id AS b_id, b.c_id AS b_c_id FROM b WHERE "
3044            "EXISTS (SELECT 1 FROM b_d, d WHERE b.id = b_d.bid "
3045            "AND d.id = b_d.did AND d.id = :id_1)",
3046        )
3047
3048    def test_any_one_to_many(self):
3049        B, C = self.classes("B", "C")
3050        s = Session()
3051        self.assert_compile(
3052            s.query(C).filter(C.bs.any(B.id == 1)),
3053            "SELECT c.id AS c_id, c.d_id AS c_d_id FROM c WHERE "
3054            "EXISTS (SELECT 1 FROM b WHERE c.id = b.c_id AND b.id = :id_1)",
3055        )
3056
3057    def test_any_many_to_many_doesnt_overcorrelate(self):
3058        B, D = self.classes("B", "D")
3059        s = Session()
3060
3061        self.assert_compile(
3062            s.query(B).join(B.d).filter(B.d.any(D.id == 1)),
3063            "SELECT b.id AS b_id, b.c_id AS b_c_id FROM "
3064            "b JOIN b_d AS b_d_1 ON b.id = b_d_1.bid "
3065            "JOIN d ON d.id = b_d_1.did WHERE "
3066            "EXISTS (SELECT 1 FROM b_d, d WHERE b.id = b_d.bid "
3067            "AND d.id = b_d.did AND d.id = :id_1)",
3068        )
3069
3070    def test_has_doesnt_overcorrelate(self):
3071        B, C = self.classes("B", "C")
3072        s = Session()
3073
3074        self.assert_compile(
3075            s.query(B).join(B.c).filter(B.c.has(C.id == 1)),
3076            "SELECT b.id AS b_id, b.c_id AS b_c_id "
3077            "FROM b JOIN c ON c.id = b.c_id "
3078            "WHERE EXISTS "
3079            "(SELECT 1 FROM c WHERE c.id = b.c_id AND c.id = :id_1)",
3080        )
3081
3082    def test_has_doesnt_get_aliased_join_subq(self):
3083        B, C = self.classes("B", "C")
3084        s = Session()
3085
3086        self.assert_compile(
3087            s.query(B).join(B.c, aliased=True).filter(B.c.has(C.id == 1)),
3088            "SELECT b.id AS b_id, b.c_id AS b_c_id "
3089            "FROM b JOIN c AS c_1 ON c_1.id = b.c_id "
3090            "WHERE EXISTS "
3091            "(SELECT 1 FROM c WHERE c.id = b.c_id AND c.id = :id_1)",
3092        )
3093
3094    def test_any_many_to_many_doesnt_get_aliased_join_subq(self):
3095        B, D = self.classes("B", "D")
3096        s = Session()
3097
3098        self.assert_compile(
3099            s.query(B).join(B.d, aliased=True).filter(B.d.any(D.id == 1)),
3100            "SELECT b.id AS b_id, b.c_id AS b_c_id "
3101            "FROM b JOIN b_d AS b_d_1 ON b.id = b_d_1.bid "
3102            "JOIN d AS d_1 ON d_1.id = b_d_1.did "
3103            "WHERE EXISTS "
3104            "(SELECT 1 FROM b_d, d WHERE b.id = b_d.bid "
3105            "AND d.id = b_d.did AND d.id = :id_1)",
3106        )
3107
3108
3109class HasMapperEntitiesTest(QueryTest):
3110    def test_entity(self):
3111        User = self.classes.User
3112        s = Session()
3113
3114        q = s.query(User)
3115
3116        assert q._has_mapper_entities
3117
3118    def test_cols(self):
3119        User = self.classes.User
3120        s = Session()
3121
3122        q = s.query(User.id)
3123
3124        assert not q._has_mapper_entities
3125
3126    def test_cols_set_entities(self):
3127        User = self.classes.User
3128        s = Session()
3129
3130        q = s.query(User.id)
3131
3132        q._set_entities(User)
3133        assert q._has_mapper_entities
3134
3135    def test_entity_set_entities(self):
3136        User = self.classes.User
3137        s = Session()
3138
3139        q = s.query(User)
3140
3141        q._set_entities(User.id)
3142        assert not q._has_mapper_entities
3143
3144
3145class SetOpsTest(QueryTest, AssertsCompiledSQL):
3146    __dialect__ = "default"
3147
3148    def test_union(self):
3149        User = self.classes.User
3150
3151        s = create_session()
3152
3153        fred = s.query(User).filter(User.name == "fred")
3154        ed = s.query(User).filter(User.name == "ed")
3155        jack = s.query(User).filter(User.name == "jack")
3156
3157        eq_(
3158            fred.union(ed).order_by(User.name).all(),
3159            [User(name="ed"), User(name="fred")],
3160        )
3161
3162        eq_(
3163            fred.union(ed, jack).order_by(User.name).all(),
3164            [User(name="ed"), User(name="fred"), User(name="jack")],
3165        )
3166
3167    def test_statement_labels(self):
3168        """test that label conflicts don't occur with joins etc."""
3169
3170        User, Address = self.classes.User, self.classes.Address
3171
3172        s = create_session()
3173        q1 = (
3174            s.query(User, Address)
3175            .join(User.addresses)
3176            .filter(Address.email_address == "ed@wood.com")
3177        )
3178        q2 = (
3179            s.query(User, Address)
3180            .join(User.addresses)
3181            .filter(Address.email_address == "jack@bean.com")
3182        )
3183        q3 = q1.union(q2).order_by(User.name)
3184
3185        eq_(
3186            q3.all(),
3187            [
3188                (User(name="ed"), Address(email_address="ed@wood.com")),
3189                (User(name="jack"), Address(email_address="jack@bean.com")),
3190            ],
3191        )
3192
3193    def test_union_literal_expressions_compile(self):
3194        """test that column expressions translate during
3195            the _from_statement() portion of union(), others"""
3196
3197        User = self.classes.User
3198
3199        s = Session()
3200        q1 = s.query(User, literal("x"))
3201        q2 = s.query(User, literal_column("'y'"))
3202        q3 = q1.union(q2)
3203
3204        self.assert_compile(
3205            q3,
3206            "SELECT anon_1.users_id AS anon_1_users_id, "
3207            "anon_1.users_name AS anon_1_users_name, "
3208            "anon_1.param_1 AS anon_1_param_1 "
3209            "FROM (SELECT users.id AS users_id, users.name AS "
3210            "users_name, :param_1 AS param_1 "
3211            "FROM users UNION SELECT users.id AS users_id, "
3212            "users.name AS users_name, 'y' FROM users) AS anon_1",
3213        )
3214
3215    def test_union_literal_expressions_results(self):
3216        User = self.classes.User
3217
3218        s = Session()
3219
3220        q1 = s.query(User, literal("x"))
3221        q2 = s.query(User, literal_column("'y'"))
3222        q3 = q1.union(q2)
3223
3224        q4 = s.query(User, literal_column("'x'").label("foo"))
3225        q5 = s.query(User, literal("y"))
3226        q6 = q4.union(q5)
3227
3228        eq_([x["name"] for x in q6.column_descriptions], ["User", "foo"])
3229
3230        for q in (
3231            q3.order_by(User.id, text("anon_1_param_1")),
3232            q6.order_by(User.id, "foo"),
3233        ):
3234            eq_(
3235                q.all(),
3236                [
3237                    (User(id=7, name="jack"), "x"),
3238                    (User(id=7, name="jack"), "y"),
3239                    (User(id=8, name="ed"), "x"),
3240                    (User(id=8, name="ed"), "y"),
3241                    (User(id=9, name="fred"), "x"),
3242                    (User(id=9, name="fred"), "y"),
3243                    (User(id=10, name="chuck"), "x"),
3244                    (User(id=10, name="chuck"), "y"),
3245                ],
3246            )
3247
3248    def test_union_labeled_anonymous_columns(self):
3249        User = self.classes.User
3250
3251        s = Session()
3252
3253        c1, c2 = column("c1"), column("c2")
3254        q1 = s.query(User, c1.label("foo"), c1.label("bar"))
3255        q2 = s.query(User, c1.label("foo"), c2.label("bar"))
3256        q3 = q1.union(q2)
3257
3258        eq_(
3259            [x["name"] for x in q3.column_descriptions], ["User", "foo", "bar"]
3260        )
3261
3262        self.assert_compile(
3263            q3,
3264            "SELECT anon_1.users_id AS anon_1_users_id, "
3265            "anon_1.users_name AS anon_1_users_name, "
3266            "anon_1.foo AS anon_1_foo, anon_1.bar AS anon_1_bar "
3267            "FROM (SELECT users.id AS users_id, users.name AS users_name, "
3268            "c1 AS foo, c1 AS bar FROM users UNION SELECT users.id AS "
3269            "users_id, users.name AS users_name, c1 AS foo, c2 AS bar "
3270            "FROM users) AS anon_1",
3271        )
3272
3273    def test_order_by_anonymous_col(self):
3274        User = self.classes.User
3275
3276        s = Session()
3277
3278        c1, c2 = column("c1"), column("c2")
3279        f = c1.label("foo")
3280        q1 = s.query(User, f, c2.label("bar"))
3281        q2 = s.query(User, c1.label("foo"), c2.label("bar"))
3282        q3 = q1.union(q2)
3283
3284        self.assert_compile(
3285            q3.order_by(c1),
3286            "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS "
3287            "anon_1_users_name, anon_1.foo AS anon_1_foo, anon_1.bar AS "
3288            "anon_1_bar FROM (SELECT users.id AS users_id, users.name AS "
3289            "users_name, c1 AS foo, c2 AS bar "
3290            "FROM users UNION SELECT users.id "
3291            "AS users_id, users.name AS users_name, c1 AS foo, c2 AS bar "
3292            "FROM users) AS anon_1 ORDER BY anon_1.foo",
3293        )
3294
3295        self.assert_compile(
3296            q3.order_by(f),
3297            "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS "
3298            "anon_1_users_name, anon_1.foo AS anon_1_foo, anon_1.bar AS "
3299            "anon_1_bar FROM (SELECT users.id AS users_id, users.name AS "
3300            "users_name, c1 AS foo, c2 AS bar "
3301            "FROM users UNION SELECT users.id "
3302            "AS users_id, users.name AS users_name, c1 AS foo, c2 AS bar "
3303            "FROM users) AS anon_1 ORDER BY anon_1.foo",
3304        )
3305
3306    def test_union_mapped_colnames_preserved_across_subquery(self):
3307        User = self.classes.User
3308
3309        s = Session()
3310        q1 = s.query(User.name)
3311        q2 = s.query(User.name)
3312
3313        # the label names in the subquery are the typical anonymized ones
3314        self.assert_compile(
3315            q1.union(q2),
3316            "SELECT anon_1.users_name AS anon_1_users_name "
3317            "FROM (SELECT users.name AS users_name FROM users "
3318            "UNION SELECT users.name AS users_name FROM users) AS anon_1",
3319        )
3320
3321        # but in the returned named tuples,
3322        # due to [ticket:1942], this should be 'name', not 'users_name'
3323        eq_([x["name"] for x in q1.union(q2).column_descriptions], ["name"])
3324
3325    @testing.requires.intersect
3326    def test_intersect(self):
3327        User = self.classes.User
3328
3329        s = create_session()
3330
3331        fred = s.query(User).filter(User.name == "fred")
3332        ed = s.query(User).filter(User.name == "ed")
3333        jack = s.query(User).filter(User.name == "jack")
3334        eq_(fred.intersect(ed, jack).all(), [])
3335
3336        eq_(fred.union(ed).intersect(ed.union(jack)).all(), [User(name="ed")])
3337
3338    def test_eager_load(self):
3339        User, Address = self.classes.User, self.classes.Address
3340
3341        s = create_session()
3342
3343        fred = s.query(User).filter(User.name == "fred")
3344        ed = s.query(User).filter(User.name == "ed")
3345
3346        def go():
3347            eq_(
3348                fred.union(ed)
3349                .order_by(User.name)
3350                .options(joinedload(User.addresses))
3351                .all(),
3352                [
3353                    User(
3354                        name="ed", addresses=[Address(), Address(), Address()]
3355                    ),
3356                    User(name="fred", addresses=[Address()]),
3357                ],
3358            )
3359
3360        self.assert_sql_count(testing.db, go, 1)
3361
3362
3363class AggregateTest(QueryTest):
3364    def test_sum(self):
3365        Order = self.classes.Order
3366
3367        sess = create_session()
3368        orders = sess.query(Order).filter(Order.id.in_([2, 3, 4]))
3369        eq_(
3370            next(orders.values(func.sum(Order.user_id * Order.address_id))),
3371            (79,),
3372        )
3373        eq_(orders.value(func.sum(Order.user_id * Order.address_id)), 79)
3374
3375    def test_apply(self):
3376        Order = self.classes.Order
3377
3378        sess = create_session()
3379        assert sess.query(func.sum(Order.user_id * Order.address_id)).filter(
3380            Order.id.in_([2, 3, 4])
3381        ).one() == (79,)
3382
3383    def test_having(self):
3384        User, Address = self.classes.User, self.classes.Address
3385
3386        sess = create_session()
3387        assert (
3388            [User(name="ed", id=8)]
3389            == sess.query(User)
3390            .order_by(User.id)
3391            .group_by(User)
3392            .join("addresses")
3393            .having(func.count(Address.id) > 2)
3394            .all()
3395        )
3396
3397        assert (
3398            [User(name="jack", id=7), User(name="fred", id=9)]
3399            == sess.query(User)
3400            .order_by(User.id)
3401            .group_by(User)
3402            .join("addresses")
3403            .having(func.count(Address.id) < 2)
3404            .all()
3405        )
3406
3407
3408class ExistsTest(QueryTest, AssertsCompiledSQL):
3409    __dialect__ = "default"
3410
3411    def test_exists(self):
3412        User = self.classes.User
3413        sess = create_session()
3414
3415        q1 = sess.query(User)
3416        self.assert_compile(
3417            sess.query(q1.exists()),
3418            "SELECT EXISTS (" "SELECT 1 FROM users" ") AS anon_1",
3419        )
3420
3421        q2 = sess.query(User).filter(User.name == "fred")
3422        self.assert_compile(
3423            sess.query(q2.exists()),
3424            "SELECT EXISTS ("
3425            "SELECT 1 FROM users WHERE users.name = :name_1"
3426            ") AS anon_1",
3427        )
3428
3429    def test_exists_col_warning(self):
3430        User = self.classes.User
3431        Address = self.classes.Address
3432        sess = create_session()
3433
3434        q1 = sess.query(User, Address).filter(User.id == Address.user_id)
3435        self.assert_compile(
3436            sess.query(q1.exists()),
3437            "SELECT EXISTS ("
3438            "SELECT 1 FROM users, addresses "
3439            "WHERE users.id = addresses.user_id"
3440            ") AS anon_1",
3441        )
3442
3443    def test_exists_w_select_from(self):
3444        User = self.classes.User
3445        sess = create_session()
3446
3447        q1 = sess.query().select_from(User).exists()
3448        self.assert_compile(
3449            sess.query(q1), "SELECT EXISTS (SELECT 1 FROM users) AS anon_1"
3450        )
3451
3452
3453class CountTest(QueryTest):
3454    def test_basic(self):
3455        users, User = self.tables.users, self.classes.User
3456
3457        s = create_session()
3458
3459        eq_(s.query(User).count(), 4)
3460
3461        eq_(s.query(User).filter(users.c.name.endswith("ed")).count(), 2)
3462
3463    def test_count_char(self):
3464        User = self.classes.User
3465        s = create_session()
3466        # '*' is favored here as the most common character,
3467        # it is reported that Informix doesn't like count(1),
3468        # rumors about Oracle preferring count(1) don't appear
3469        # to be well founded.
3470        self.assert_sql_execution(
3471            testing.db,
3472            s.query(User).count,
3473            CompiledSQL(
3474                "SELECT count(*) AS count_1 FROM "
3475                "(SELECT users.id AS users_id, users.name "
3476                "AS users_name FROM users) AS anon_1",
3477                {},
3478            ),
3479        )
3480
3481    def test_multiple_entity(self):
3482        User, Address = self.classes.User, self.classes.Address
3483
3484        s = create_session()
3485        q = s.query(User, Address)
3486        eq_(q.count(), 20)  # cartesian product
3487
3488        q = s.query(User, Address).join(User.addresses)
3489        eq_(q.count(), 5)
3490
3491    def test_nested(self):
3492        User, Address = self.classes.User, self.classes.Address
3493
3494        s = create_session()
3495        q = s.query(User, Address).limit(2)
3496        eq_(q.count(), 2)
3497
3498        q = s.query(User, Address).limit(100)
3499        eq_(q.count(), 20)
3500
3501        q = s.query(User, Address).join(User.addresses).limit(100)
3502        eq_(q.count(), 5)
3503
3504    def test_cols(self):
3505        """test that column-based queries always nest."""
3506
3507        User, Address = self.classes.User, self.classes.Address
3508
3509        s = create_session()
3510
3511        q = s.query(func.count(distinct(User.name)))
3512        eq_(q.count(), 1)
3513
3514        q = s.query(func.count(distinct(User.name))).distinct()
3515        eq_(q.count(), 1)
3516
3517        q = s.query(User.name)
3518        eq_(q.count(), 4)
3519
3520        q = s.query(User.name, Address)
3521        eq_(q.count(), 20)
3522
3523        q = s.query(Address.user_id)
3524        eq_(q.count(), 5)
3525        eq_(q.distinct().count(), 3)
3526
3527
3528class DistinctTest(QueryTest, AssertsCompiledSQL):
3529    __dialect__ = "default"
3530
3531    def test_basic(self):
3532        User = self.classes.User
3533
3534        eq_(
3535            [User(id=7), User(id=8), User(id=9), User(id=10)],
3536            create_session().query(User).order_by(User.id).distinct().all(),
3537        )
3538        eq_(
3539            [User(id=7), User(id=9), User(id=8), User(id=10)],
3540            create_session()
3541            .query(User)
3542            .distinct()
3543            .order_by(desc(User.name))
3544            .all(),
3545        )
3546
3547    def test_columns_augmented_roundtrip_one(self):
3548        User, Address = self.classes.User, self.classes.Address
3549
3550        sess = create_session()
3551        q = (
3552            sess.query(User)
3553            .join("addresses")
3554            .distinct()
3555            .order_by(desc(Address.email_address))
3556        )
3557
3558        eq_([User(id=7), User(id=9), User(id=8)], q.all())
3559
3560    def test_columns_augmented_roundtrip_two(self):
3561        User, Address = self.classes.User, self.classes.Address
3562
3563        sess = create_session()
3564
3565        # test that it works on embedded joinedload/LIMIT subquery
3566        q = (
3567            sess.query(User)
3568            .join("addresses")
3569            .distinct()
3570            .options(joinedload("addresses"))
3571            .order_by(desc(Address.email_address))
3572            .limit(2)
3573        )
3574
3575        def go():
3576            assert [
3577                User(id=7, addresses=[Address(id=1)]),
3578                User(id=9, addresses=[Address(id=5)]),
3579            ] == q.all()
3580
3581        self.assert_sql_count(testing.db, go, 1)
3582
3583    def test_columns_augmented_roundtrip_three(self):
3584        User, Address = self.classes.User, self.classes.Address
3585
3586        sess = create_session()
3587
3588        q = (
3589            sess.query(User.id, User.name.label("foo"), Address.id)
3590            .filter(User.name == "jack")
3591            .distinct()
3592            .order_by(User.id, User.name, Address.email_address)
3593        )
3594
3595        # even though columns are added, they aren't in the result
3596        eq_(
3597            q.all(),
3598            [
3599                (7, "jack", 3),
3600                (7, "jack", 4),
3601                (7, "jack", 2),
3602                (7, "jack", 5),
3603                (7, "jack", 1),
3604            ],
3605        )
3606        for row in q:
3607            eq_(row.keys(), ["id", "foo", "id"])
3608
3609    def test_columns_augmented_sql_one(self):
3610        User, Address = self.classes.User, self.classes.Address
3611
3612        sess = create_session()
3613
3614        q = (
3615            sess.query(User.id, User.name.label("foo"), Address.id)
3616            .distinct()
3617            .order_by(User.id, User.name, Address.email_address)
3618        )
3619
3620        # Address.email_address is added because of DISTINCT,
3621        # however User.id, User.name are not b.c. they're already there,
3622        # even though User.name is labeled
3623        self.assert_compile(
3624            q,
3625            "SELECT DISTINCT users.id AS users_id, users.name AS foo, "
3626            "addresses.id AS addresses_id, "
3627            "addresses.email_address AS addresses_email_address FROM users, "
3628            "addresses ORDER BY users.id, users.name, addresses.email_address",
3629        )
3630
3631    def test_columns_augmented_sql_two(self):
3632        User, Address = self.classes.User, self.classes.Address
3633
3634        sess = create_session()
3635
3636        q = (
3637            sess.query(User)
3638            .options(joinedload(User.addresses))
3639            .distinct()
3640            .order_by(User.name, Address.email_address)
3641            .limit(5)
3642        )
3643
3644        # addresses.email_address is added to inner query so that
3645        # it is available in ORDER BY
3646        self.assert_compile(
3647            q,
3648            "SELECT anon_1.users_id AS anon_1_users_id, "
3649            "anon_1.users_name AS anon_1_users_name, "
3650            "anon_1.addresses_email_address AS "
3651            "anon_1_addresses_email_address, "
3652            "addresses_1.id AS addresses_1_id, "
3653            "addresses_1.user_id AS addresses_1_user_id, "
3654            "addresses_1.email_address AS addresses_1_email_address "
3655            "FROM (SELECT DISTINCT users.id AS users_id, "
3656            "users.name AS users_name, "
3657            "addresses.email_address AS addresses_email_address "
3658            "FROM users, addresses "
3659            "ORDER BY users.name, addresses.email_address "
3660            "LIMIT :param_1) AS anon_1 LEFT OUTER JOIN "
3661            "addresses AS addresses_1 "
3662            "ON anon_1.users_id = addresses_1.user_id "
3663            "ORDER BY anon_1.users_name, "
3664            "anon_1.addresses_email_address, addresses_1.id",
3665        )
3666
3667    def test_columns_augmented_sql_three(self):
3668        User, Address = self.classes.User, self.classes.Address
3669
3670        sess = create_session()
3671
3672        q = (
3673            sess.query(User.id, User.name.label("foo"), Address.id)
3674            .distinct(User.name)
3675            .order_by(User.id, User.name, Address.email_address)
3676        )
3677
3678        # no columns are added when DISTINCT ON is used
3679        self.assert_compile(
3680            q,
3681            "SELECT DISTINCT ON (users.name) users.id AS users_id, "
3682            "users.name AS foo, addresses.id AS addresses_id FROM users, "
3683            "addresses ORDER BY users.id, users.name, addresses.email_address",
3684            dialect="postgresql",
3685        )
3686
3687    def test_columns_augmented_sql_four(self):
3688        User, Address = self.classes.User, self.classes.Address
3689
3690        sess = create_session()
3691
3692        q = (
3693            sess.query(User)
3694            .join("addresses")
3695            .distinct(Address.email_address)
3696            .options(joinedload("addresses"))
3697            .order_by(desc(Address.email_address))
3698            .limit(2)
3699        )
3700
3701        # but for the subquery / eager load case, we still need to make
3702        # the inner columns available for the ORDER BY even though its
3703        # a DISTINCT ON
3704        self.assert_compile(
3705            q,
3706            "SELECT anon_1.users_id AS anon_1_users_id, "
3707            "anon_1.users_name AS anon_1_users_name, "
3708            "anon_1.addresses_email_address AS "
3709            "anon_1_addresses_email_address, "
3710            "addresses_1.id AS addresses_1_id, "
3711            "addresses_1.user_id AS addresses_1_user_id, "
3712            "addresses_1.email_address AS addresses_1_email_address "
3713            "FROM (SELECT DISTINCT ON (addresses.email_address) "
3714            "users.id AS users_id, users.name AS users_name, "
3715            "addresses.email_address AS addresses_email_address "
3716            "FROM users JOIN addresses ON users.id = addresses.user_id "
3717            "ORDER BY addresses.email_address DESC  "
3718            "LIMIT %(param_1)s) AS anon_1 "
3719            "LEFT OUTER JOIN addresses AS addresses_1 "
3720            "ON anon_1.users_id = addresses_1.user_id "
3721            "ORDER BY anon_1.addresses_email_address DESC, addresses_1.id",
3722            dialect="postgresql",
3723        )
3724
3725
3726class PrefixWithTest(QueryTest, AssertsCompiledSQL):
3727    def test_one_prefix(self):
3728        User = self.classes.User
3729        sess = create_session()
3730        query = sess.query(User.name).prefix_with("PREFIX_1")
3731        expected = "SELECT PREFIX_1 " "users.name AS users_name FROM users"
3732        self.assert_compile(query, expected, dialect=default.DefaultDialect())
3733
3734    def test_many_prefixes(self):
3735        User = self.classes.User
3736        sess = create_session()
3737        query = sess.query(User.name).prefix_with("PREFIX_1", "PREFIX_2")
3738        expected = (
3739            "SELECT PREFIX_1 PREFIX_2 " "users.name AS users_name FROM users"
3740        )
3741        self.assert_compile(query, expected, dialect=default.DefaultDialect())
3742
3743    def test_chained_prefixes(self):
3744        User = self.classes.User
3745        sess = create_session()
3746        query = (
3747            sess.query(User.name)
3748            .prefix_with("PREFIX_1")
3749            .prefix_with("PREFIX_2", "PREFIX_3")
3750        )
3751        expected = (
3752            "SELECT PREFIX_1 PREFIX_2 PREFIX_3 "
3753            "users.name AS users_name FROM users"
3754        )
3755        self.assert_compile(query, expected, dialect=default.DefaultDialect())
3756
3757
3758class YieldTest(_fixtures.FixtureTest):
3759    run_setup_mappers = "each"
3760    run_inserts = "each"
3761
3762    def _eagerload_mappings(self, addresses_lazy=True, user_lazy=True):
3763        User, Address = self.classes("User", "Address")
3764        users, addresses = self.tables("users", "addresses")
3765        mapper(
3766            User,
3767            users,
3768            properties={
3769                "addresses": relationship(
3770                    Address,
3771                    lazy=addresses_lazy,
3772                    backref=backref("user", lazy=user_lazy),
3773                )
3774            },
3775        )
3776        mapper(Address, addresses)
3777
3778    def test_basic(self):
3779        self._eagerload_mappings()
3780
3781        User = self.classes.User
3782
3783        sess = create_session()
3784        q = iter(
3785            sess.query(User)
3786            .yield_per(1)
3787            .from_statement(text("select * from users"))
3788        )
3789
3790        ret = []
3791        eq_(len(sess.identity_map), 0)
3792        ret.append(next(q))
3793        ret.append(next(q))
3794        eq_(len(sess.identity_map), 2)
3795        ret.append(next(q))
3796        ret.append(next(q))
3797        eq_(len(sess.identity_map), 4)
3798        try:
3799            next(q)
3800            assert False
3801        except StopIteration:
3802            pass
3803
3804    def test_yield_per_and_execution_options(self):
3805        self._eagerload_mappings()
3806
3807        User = self.classes.User
3808
3809        sess = create_session()
3810        q = sess.query(User).yield_per(15)
3811        q = q.execution_options(foo="bar")
3812        assert q._yield_per
3813        eq_(
3814            q._execution_options,
3815            {"stream_results": True, "foo": "bar", "max_row_buffer": 15},
3816        )
3817
3818    def test_no_joinedload_opt(self):
3819        self._eagerload_mappings()
3820
3821        User = self.classes.User
3822        sess = create_session()
3823        q = sess.query(User).options(joinedload("addresses")).yield_per(1)
3824        assert_raises_message(
3825            sa_exc.InvalidRequestError,
3826            "The yield_per Query option is currently not compatible with "
3827            "joined collection eager loading.  Please specify ",
3828            q.all,
3829        )
3830
3831    def test_no_subqueryload_opt(self):
3832        self._eagerload_mappings()
3833
3834        User = self.classes.User
3835        sess = create_session()
3836        q = sess.query(User).options(subqueryload("addresses")).yield_per(1)
3837        assert_raises_message(
3838            sa_exc.InvalidRequestError,
3839            "The yield_per Query option is currently not compatible with "
3840            "subquery eager loading.  Please specify ",
3841            q.all,
3842        )
3843
3844    def test_no_subqueryload_mapping(self):
3845        self._eagerload_mappings(addresses_lazy="subquery")
3846
3847        User = self.classes.User
3848        sess = create_session()
3849        q = sess.query(User).yield_per(1)
3850        assert_raises_message(
3851            sa_exc.InvalidRequestError,
3852            "The yield_per Query option is currently not compatible with "
3853            "subquery eager loading.  Please specify ",
3854            q.all,
3855        )
3856
3857    def test_joinedload_m2o_ok(self):
3858        self._eagerload_mappings(user_lazy="joined")
3859        Address = self.classes.Address
3860        sess = create_session()
3861        q = sess.query(Address).yield_per(1)
3862        q.all()
3863
3864    def test_eagerload_opt_disable(self):
3865        self._eagerload_mappings()
3866
3867        User = self.classes.User
3868        sess = create_session()
3869        q = (
3870            sess.query(User)
3871            .options(subqueryload("addresses"))
3872            .enable_eagerloads(False)
3873            .yield_per(1)
3874        )
3875        q.all()
3876
3877        q = (
3878            sess.query(User)
3879            .options(joinedload("addresses"))
3880            .enable_eagerloads(False)
3881            .yield_per(1)
3882        )
3883        q.all()
3884
3885    def test_m2o_joinedload_not_others(self):
3886        self._eagerload_mappings(addresses_lazy="joined")
3887        Address = self.classes.Address
3888        sess = create_session()
3889        q = (
3890            sess.query(Address)
3891            .options(lazyload("*"), joinedload("user"))
3892            .yield_per(1)
3893            .filter_by(id=1)
3894        )
3895
3896        def go():
3897            result = q.all()
3898            assert result[0].user
3899
3900        self.assert_sql_count(testing.db, go, 1)
3901
3902
3903class HintsTest(QueryTest, AssertsCompiledSQL):
3904    __dialect__ = "default"
3905
3906    def test_hints(self):
3907        User = self.classes.User
3908
3909        from sqlalchemy.dialects import mysql
3910
3911        dialect = mysql.dialect()
3912
3913        sess = create_session()
3914
3915        self.assert_compile(
3916            sess.query(User).with_hint(
3917                User, "USE INDEX (col1_index,col2_index)"
3918            ),
3919            "SELECT users.id AS users_id, users.name AS users_name "
3920            "FROM users USE INDEX (col1_index,col2_index)",
3921            dialect=dialect,
3922        )
3923
3924        self.assert_compile(
3925            sess.query(User).with_hint(
3926                User, "WITH INDEX col1_index", "sybase"
3927            ),
3928            "SELECT users.id AS users_id, users.name AS users_name "
3929            "FROM users",
3930            dialect=dialect,
3931        )
3932
3933        ualias = aliased(User)
3934        self.assert_compile(
3935            sess.query(User, ualias)
3936            .with_hint(ualias, "USE INDEX (col1_index,col2_index)")
3937            .join(ualias, ualias.id > User.id),
3938            "SELECT users.id AS users_id, users.name AS users_name, "
3939            "users_1.id AS users_1_id, users_1.name AS users_1_name "
3940            "FROM users INNER JOIN users AS users_1 "
3941            "USE INDEX (col1_index,col2_index) "
3942            "ON users_1.id > users.id",
3943            dialect=dialect,
3944        )
3945
3946    def test_statement_hints(self):
3947        User = self.classes.User
3948
3949        sess = create_session()
3950        stmt = (
3951            sess.query(User)
3952            .with_statement_hint("test hint one")
3953            .with_statement_hint("test hint two")
3954            .with_statement_hint("test hint three", "postgresql")
3955        )
3956
3957        self.assert_compile(
3958            stmt,
3959            "SELECT users.id AS users_id, users.name AS users_name "
3960            "FROM users test hint one test hint two",
3961        )
3962
3963        self.assert_compile(
3964            stmt,
3965            "SELECT users.id AS users_id, users.name AS users_name "
3966            "FROM users test hint one test hint two test hint three",
3967            dialect="postgresql",
3968        )
3969
3970
3971class TextTest(QueryTest, AssertsCompiledSQL):
3972    __dialect__ = "default"
3973
3974    def test_fulltext(self):
3975        User = self.classes.User
3976
3977        with expect_warnings("Textual SQL"):
3978            eq_(
3979                create_session()
3980                .query(User)
3981                .from_statement("select * from users order by id")
3982                .all(),
3983                [User(id=7), User(id=8), User(id=9), User(id=10)],
3984            )
3985
3986        eq_(
3987            create_session()
3988            .query(User)
3989            .from_statement(text("select * from users order by id"))
3990            .first(),
3991            User(id=7),
3992        )
3993        eq_(
3994            create_session()
3995            .query(User)
3996            .from_statement(
3997                text("select * from users where name='nonexistent'")
3998            )
3999            .first(),
4000            None,
4001        )
4002
4003    def test_fragment(self):
4004        User = self.classes.User
4005
4006        with expect_warnings("Textual SQL expression"):
4007            eq_(
4008                create_session().query(User).filter("id in (8, 9)").all(),
4009                [User(id=8), User(id=9)],
4010            )
4011
4012            eq_(
4013                create_session()
4014                .query(User)
4015                .filter("name='fred'")
4016                .filter("id=9")
4017                .all(),
4018                [User(id=9)],
4019            )
4020            eq_(
4021                create_session()
4022                .query(User)
4023                .filter("name='fred'")
4024                .filter(User.id == 9)
4025                .all(),
4026                [User(id=9)],
4027            )
4028
4029    def test_binds_coerce(self):
4030        User = self.classes.User
4031
4032        with expect_warnings("Textual SQL expression"):
4033            eq_(
4034                create_session()
4035                .query(User)
4036                .filter("id in (:id1, :id2)")
4037                .params(id1=8, id2=9)
4038                .all(),
4039                [User(id=8), User(id=9)],
4040            )
4041
4042    def test_as_column(self):
4043        User = self.classes.User
4044
4045        s = create_session()
4046        assert_raises(
4047            sa_exc.InvalidRequestError, s.query, User.id, text("users.name")
4048        )
4049
4050        eq_(
4051            s.query(User.id, "name").order_by(User.id).all(),
4052            [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")],
4053        )
4054
4055    def test_via_select(self):
4056        User = self.classes.User
4057        s = create_session()
4058        eq_(
4059            s.query(User)
4060            .from_statement(
4061                select([column("id"), column("name")])
4062                .select_from(table("users"))
4063                .order_by("id")
4064            )
4065            .all(),
4066            [User(id=7), User(id=8), User(id=9), User(id=10)],
4067        )
4068
4069    def test_via_textasfrom_from_statement(self):
4070        User = self.classes.User
4071        s = create_session()
4072
4073        eq_(
4074            s.query(User)
4075            .from_statement(
4076                text("select * from users order by id").columns(
4077                    id=Integer, name=String
4078                )
4079            )
4080            .all(),
4081            [User(id=7), User(id=8), User(id=9), User(id=10)],
4082        )
4083
4084    def test_via_textasfrom_use_mapped_columns(self):
4085        User = self.classes.User
4086        s = create_session()
4087
4088        eq_(
4089            s.query(User)
4090            .from_statement(
4091                text("select * from users order by id").columns(
4092                    User.id, User.name
4093                )
4094            )
4095            .all(),
4096            [User(id=7), User(id=8), User(id=9), User(id=10)],
4097        )
4098
4099    def test_via_textasfrom_select_from(self):
4100        User = self.classes.User
4101        s = create_session()
4102
4103        eq_(
4104            s.query(User)
4105            .select_from(
4106                text("select * from users").columns(id=Integer, name=String)
4107            )
4108            .order_by(User.id)
4109            .all(),
4110            [User(id=7), User(id=8), User(id=9), User(id=10)],
4111        )
4112
4113    def test_group_by_accepts_text(self):
4114        User = self.classes.User
4115        s = create_session()
4116
4117        q = s.query(User).group_by(text("name"))
4118        self.assert_compile(
4119            q,
4120            "SELECT users.id AS users_id, users.name AS users_name "
4121            "FROM users GROUP BY name",
4122        )
4123
4124    def test_orm_columns_accepts_text(self):
4125        from sqlalchemy.orm.base import _orm_columns
4126
4127        t = text("x")
4128        eq_(_orm_columns(t), [t])
4129
4130    def test_order_by_w_eager_one(self):
4131        User = self.classes.User
4132        s = create_session()
4133
4134        # from 1.0.0 thru 1.0.2, the "name" symbol here was considered
4135        # to be part of the things we need to ORDER BY and it was being
4136        # placed into the inner query's columns clause, as part of
4137        # query._compound_eager_statement where we add unwrap_order_by()
4138        # to the columns clause.  However, as #3392 illustrates, unlocatable
4139        # string expressions like "name desc" will only fail in this scenario,
4140        # so in general the changing of the query structure with string labels
4141        # is dangerous.
4142        #
4143        # the queries here are again "invalid" from a SQL perspective, as the
4144        # "name" field isn't matched up to anything.
4145        #
4146        with expect_warnings("Can't resolve label reference 'name';"):
4147            self.assert_compile(
4148                s.query(User)
4149                .options(joinedload("addresses"))
4150                .order_by(desc("name"))
4151                .limit(1),
4152                "SELECT anon_1.users_id AS anon_1_users_id, "
4153                "anon_1.users_name AS anon_1_users_name, "
4154                "addresses_1.id AS addresses_1_id, "
4155                "addresses_1.user_id AS addresses_1_user_id, "
4156                "addresses_1.email_address AS addresses_1_email_address "
4157                "FROM (SELECT users.id AS users_id, users.name AS users_name "
4158                "FROM users ORDER BY users.name "
4159                "DESC LIMIT :param_1) AS anon_1 "
4160                "LEFT OUTER JOIN addresses AS addresses_1 "
4161                "ON anon_1.users_id = addresses_1.user_id "
4162                "ORDER BY name DESC, addresses_1.id",
4163            )
4164
4165    def test_order_by_w_eager_two(self):
4166        User = self.classes.User
4167        s = create_session()
4168
4169        with expect_warnings("Can't resolve label reference 'name';"):
4170            self.assert_compile(
4171                s.query(User)
4172                .options(joinedload("addresses"))
4173                .order_by("name")
4174                .limit(1),
4175                "SELECT anon_1.users_id AS anon_1_users_id, "
4176                "anon_1.users_name AS anon_1_users_name, "
4177                "addresses_1.id AS addresses_1_id, "
4178                "addresses_1.user_id AS addresses_1_user_id, "
4179                "addresses_1.email_address AS addresses_1_email_address "
4180                "FROM (SELECT users.id AS users_id, users.name AS users_name "
4181                "FROM users ORDER BY users.name "
4182                "LIMIT :param_1) AS anon_1 "
4183                "LEFT OUTER JOIN addresses AS addresses_1 "
4184                "ON anon_1.users_id = addresses_1.user_id "
4185                "ORDER BY name, addresses_1.id",
4186            )
4187
4188    def test_order_by_w_eager_three(self):
4189        User = self.classes.User
4190        s = create_session()
4191
4192        self.assert_compile(
4193            s.query(User)
4194            .options(joinedload("addresses"))
4195            .order_by("users_name")
4196            .limit(1),
4197            "SELECT anon_1.users_id AS anon_1_users_id, "
4198            "anon_1.users_name AS anon_1_users_name, "
4199            "addresses_1.id AS addresses_1_id, "
4200            "addresses_1.user_id AS addresses_1_user_id, "
4201            "addresses_1.email_address AS addresses_1_email_address "
4202            "FROM (SELECT users.id AS users_id, users.name AS users_name "
4203            "FROM users ORDER BY users.name "
4204            "LIMIT :param_1) AS anon_1 "
4205            "LEFT OUTER JOIN addresses AS addresses_1 "
4206            "ON anon_1.users_id = addresses_1.user_id "
4207            "ORDER BY anon_1.users_name, addresses_1.id",
4208        )
4209
4210        # however! this works (again?)
4211        eq_(
4212            s.query(User)
4213            .options(joinedload("addresses"))
4214            .order_by("users_name")
4215            .first(),
4216            User(name="chuck", addresses=[]),
4217        )
4218
4219    def test_order_by_w_eager_four(self):
4220        User = self.classes.User
4221        Address = self.classes.Address
4222        s = create_session()
4223
4224        self.assert_compile(
4225            s.query(User)
4226            .options(joinedload("addresses"))
4227            .order_by(desc("users_name"))
4228            .limit(1),
4229            "SELECT anon_1.users_id AS anon_1_users_id, "
4230            "anon_1.users_name AS anon_1_users_name, "
4231            "addresses_1.id AS addresses_1_id, "
4232            "addresses_1.user_id AS addresses_1_user_id, "
4233            "addresses_1.email_address AS addresses_1_email_address "
4234            "FROM (SELECT users.id AS users_id, users.name AS users_name "
4235            "FROM users ORDER BY users.name DESC "
4236            "LIMIT :param_1) AS anon_1 "
4237            "LEFT OUTER JOIN addresses AS addresses_1 "
4238            "ON anon_1.users_id = addresses_1.user_id "
4239            "ORDER BY anon_1.users_name DESC, addresses_1.id",
4240        )
4241
4242        # however! this works (again?)
4243        eq_(
4244            s.query(User)
4245            .options(joinedload("addresses"))
4246            .order_by(desc("users_name"))
4247            .first(),
4248            User(name="jack", addresses=[Address()]),
4249        )
4250
4251    def test_order_by_w_eager_five(self):
4252        """essentially the same as test_eager_relations -> test_limit_3,
4253        but test for textual label elements that are freeform.
4254        this is again #3392."""
4255
4256        User = self.classes.User
4257        Address = self.classes.Address
4258        Order = self.classes.Order
4259
4260        sess = create_session()
4261
4262        q = sess.query(User, Address.email_address.label("email_address"))
4263
4264        result = (
4265            q.join("addresses")
4266            .options(joinedload(User.orders))
4267            .order_by("email_address desc")
4268            .limit(1)
4269            .offset(0)
4270        )
4271        with expect_warnings(
4272            "Can't resolve label reference 'email_address desc'"
4273        ):
4274            eq_(
4275                [
4276                    (
4277                        User(
4278                            id=7,
4279                            orders=[Order(id=1), Order(id=3), Order(id=5)],
4280                            addresses=[Address(id=1)],
4281                        ),
4282                        "jack@bean.com",
4283                    )
4284                ],
4285                result.all(),
4286            )
4287
4288
4289class TextWarningTest(QueryTest, AssertsCompiledSQL):
4290    def _test(self, fn, arg, offending_clause, expected):
4291        assert_raises_message(
4292            sa.exc.SAWarning,
4293            r"Textual (?:SQL|column|SQL FROM) expression %(stmt)r should be "
4294            r"explicitly declared (?:with|as) text\(%(stmt)r\)"
4295            % {"stmt": util.ellipses_string(offending_clause)},
4296            fn,
4297            arg,
4298        )
4299
4300        with expect_warnings("Textual "):
4301            stmt = fn(arg)
4302            self.assert_compile(stmt, expected)
4303
4304    def test_filter(self):
4305        User = self.classes.User
4306        self._test(
4307            Session().query(User.id).filter,
4308            "myid == 5",
4309            "myid == 5",
4310            "SELECT users.id AS users_id FROM users WHERE myid == 5",
4311        )
4312
4313    def test_having(self):
4314        User = self.classes.User
4315        self._test(
4316            Session().query(User.id).having,
4317            "myid == 5",
4318            "myid == 5",
4319            "SELECT users.id AS users_id FROM users HAVING myid == 5",
4320        )
4321
4322    def test_from_statement(self):
4323        User = self.classes.User
4324        self._test(
4325            Session().query(User.id).from_statement,
4326            "select id from user",
4327            "select id from user",
4328            "select id from user",
4329        )
4330
4331
4332class ParentTest(QueryTest, AssertsCompiledSQL):
4333    __dialect__ = "default"
4334
4335    def test_o2m(self):
4336        User, orders, Order = (
4337            self.classes.User,
4338            self.tables.orders,
4339            self.classes.Order,
4340        )
4341
4342        sess = create_session()
4343        q = sess.query(User)
4344
4345        u1 = q.filter_by(name="jack").one()
4346
4347        # test auto-lookup of property
4348        o = sess.query(Order).with_parent(u1).all()
4349        assert [
4350            Order(description="order 1"),
4351            Order(description="order 3"),
4352            Order(description="order 5"),
4353        ] == o
4354
4355        # test with explicit property
4356        o = sess.query(Order).with_parent(u1, property="orders").all()
4357        assert [
4358            Order(description="order 1"),
4359            Order(description="order 3"),
4360            Order(description="order 5"),
4361        ] == o
4362
4363        o = sess.query(Order).with_parent(u1, property=User.orders).all()
4364        assert [
4365            Order(description="order 1"),
4366            Order(description="order 3"),
4367            Order(description="order 5"),
4368        ] == o
4369
4370        o = sess.query(Order).filter(with_parent(u1, User.orders)).all()
4371        assert [
4372            Order(description="order 1"),
4373            Order(description="order 3"),
4374            Order(description="order 5"),
4375        ] == o
4376
4377        # test generative criterion
4378        o = sess.query(Order).with_parent(u1).filter(orders.c.id > 2).all()
4379        assert [
4380            Order(description="order 3"),
4381            Order(description="order 5"),
4382        ] == o
4383
4384        # test against None for parent? this can't be done with the current
4385        # API since we don't know what mapper to use
4386        # assert
4387        #     sess.query(Order).with_parent(None, property='addresses').all()
4388        #     == [Order(description="order 5")]
4389
4390    def test_select_from(self):
4391        User, Address = self.classes.User, self.classes.Address
4392
4393        sess = create_session()
4394        u1 = sess.query(User).get(7)
4395        q = sess.query(Address).select_from(Address).with_parent(u1)
4396        self.assert_compile(
4397            q,
4398            "SELECT addresses.id AS addresses_id, "
4399            "addresses.user_id AS addresses_user_id, "
4400            "addresses.email_address AS addresses_email_address "
4401            "FROM addresses WHERE :param_1 = addresses.user_id",
4402            {"param_1": 7},
4403        )
4404
4405    def test_from_entity_standalone_fn(self):
4406        User, Address = self.classes.User, self.classes.Address
4407
4408        sess = create_session()
4409        u1 = sess.query(User).get(7)
4410        q = sess.query(User, Address).filter(
4411            with_parent(u1, "addresses", from_entity=Address)
4412        )
4413        self.assert_compile(
4414            q,
4415            "SELECT users.id AS users_id, users.name AS users_name, "
4416            "addresses.id AS addresses_id, addresses.user_id "
4417            "AS addresses_user_id, "
4418            "addresses.email_address AS addresses_email_address "
4419            "FROM users, addresses "
4420            "WHERE :param_1 = addresses.user_id",
4421            {"param_1": 7},
4422        )
4423
4424    def test_from_entity_query_entity(self):
4425        User, Address = self.classes.User, self.classes.Address
4426
4427        sess = create_session()
4428        u1 = sess.query(User).get(7)
4429        q = sess.query(User, Address).with_parent(
4430            u1, "addresses", from_entity=Address
4431        )
4432        self.assert_compile(
4433            q,
4434            "SELECT users.id AS users_id, users.name AS users_name, "
4435            "addresses.id AS addresses_id, addresses.user_id "
4436            "AS addresses_user_id, "
4437            "addresses.email_address AS addresses_email_address "
4438            "FROM users, addresses "
4439            "WHERE :param_1 = addresses.user_id",
4440            {"param_1": 7},
4441        )
4442
4443    def test_select_from_alias(self):
4444        User, Address = self.classes.User, self.classes.Address
4445
4446        sess = create_session()
4447        u1 = sess.query(User).get(7)
4448        a1 = aliased(Address)
4449        q = sess.query(a1).with_parent(u1)
4450        self.assert_compile(
4451            q,
4452            "SELECT addresses_1.id AS addresses_1_id, "
4453            "addresses_1.user_id AS addresses_1_user_id, "
4454            "addresses_1.email_address AS addresses_1_email_address "
4455            "FROM addresses AS addresses_1 "
4456            "WHERE :param_1 = addresses_1.user_id",
4457            {"param_1": 7},
4458        )
4459
4460    def test_select_from_alias_explicit_prop(self):
4461        User, Address = self.classes.User, self.classes.Address
4462
4463        sess = create_session()
4464        u1 = sess.query(User).get(7)
4465        a1 = aliased(Address)
4466        q = sess.query(a1).with_parent(u1, "addresses")
4467        self.assert_compile(
4468            q,
4469            "SELECT addresses_1.id AS addresses_1_id, "
4470            "addresses_1.user_id AS addresses_1_user_id, "
4471            "addresses_1.email_address AS addresses_1_email_address "
4472            "FROM addresses AS addresses_1 "
4473            "WHERE :param_1 = addresses_1.user_id",
4474            {"param_1": 7},
4475        )
4476
4477    def test_noparent(self):
4478        Item, User = self.classes.Item, self.classes.User
4479
4480        sess = create_session()
4481        q = sess.query(User)
4482
4483        u1 = q.filter_by(name="jack").one()
4484
4485        try:
4486            q = sess.query(Item).with_parent(u1)
4487            assert False
4488        except sa_exc.InvalidRequestError as e:
4489            assert (
4490                str(e) == "Could not locate a property which relates "
4491                "instances of class 'Item' to instances of class 'User'"
4492            )
4493
4494    def test_m2m(self):
4495        Item, Keyword = self.classes.Item, self.classes.Keyword
4496
4497        sess = create_session()
4498        i1 = sess.query(Item).filter_by(id=2).one()
4499        k = sess.query(Keyword).with_parent(i1).all()
4500        assert [
4501            Keyword(name="red"),
4502            Keyword(name="small"),
4503            Keyword(name="square"),
4504        ] == k
4505
4506    def test_with_transient(self):
4507        User, Order = self.classes.User, self.classes.Order
4508
4509        sess = Session()
4510
4511        q = sess.query(User)
4512        u1 = q.filter_by(name="jack").one()
4513        utrans = User(id=u1.id)
4514        o = sess.query(Order).with_parent(utrans, "orders")
4515        eq_(
4516            [
4517                Order(description="order 1"),
4518                Order(description="order 3"),
4519                Order(description="order 5"),
4520            ],
4521            o.all(),
4522        )
4523
4524        o = sess.query(Order).filter(with_parent(utrans, "orders"))
4525        eq_(
4526            [
4527                Order(description="order 1"),
4528                Order(description="order 3"),
4529                Order(description="order 5"),
4530            ],
4531            o.all(),
4532        )
4533
4534    def test_with_pending_autoflush(self):
4535        Order, User = self.classes.Order, self.classes.User
4536
4537        sess = Session()
4538
4539        o1 = sess.query(Order).first()
4540        opending = Order(id=20, user_id=o1.user_id)
4541        sess.add(opending)
4542        eq_(
4543            sess.query(User).with_parent(opending, "user").one(),
4544            User(id=o1.user_id),
4545        )
4546        eq_(
4547            sess.query(User).filter(with_parent(opending, "user")).one(),
4548            User(id=o1.user_id),
4549        )
4550
4551    def test_with_pending_no_autoflush(self):
4552        Order, User = self.classes.Order, self.classes.User
4553
4554        sess = Session(autoflush=False)
4555
4556        o1 = sess.query(Order).first()
4557        opending = Order(user_id=o1.user_id)
4558        sess.add(opending)
4559        eq_(
4560            sess.query(User).with_parent(opending, "user").one(),
4561            User(id=o1.user_id),
4562        )
4563
4564    def test_unique_binds_union(self):
4565        """bindparams used in the 'parent' query are unique"""
4566        User, Address = self.classes.User, self.classes.Address
4567
4568        sess = Session()
4569        u1, u2 = sess.query(User).order_by(User.id)[0:2]
4570
4571        q1 = sess.query(Address).with_parent(u1, "addresses")
4572        q2 = sess.query(Address).with_parent(u2, "addresses")
4573
4574        self.assert_compile(
4575            q1.union(q2),
4576            "SELECT anon_1.addresses_id AS anon_1_addresses_id, "
4577            "anon_1.addresses_user_id AS anon_1_addresses_user_id, "
4578            "anon_1.addresses_email_address AS "
4579            "anon_1_addresses_email_address FROM (SELECT addresses.id AS "
4580            "addresses_id, addresses.user_id AS addresses_user_id, "
4581            "addresses.email_address AS addresses_email_address FROM "
4582            "addresses WHERE :param_1 = addresses.user_id UNION SELECT "
4583            "addresses.id AS addresses_id, addresses.user_id AS "
4584            "addresses_user_id, addresses.email_address "
4585            "AS addresses_email_address "
4586            "FROM addresses WHERE :param_2 = addresses.user_id) AS anon_1",
4587            checkparams={"param_1": 7, "param_2": 8},
4588        )
4589
4590    def test_unique_binds_or(self):
4591        User, Address = self.classes.User, self.classes.Address
4592
4593        sess = Session()
4594        u1, u2 = sess.query(User).order_by(User.id)[0:2]
4595
4596        self.assert_compile(
4597            sess.query(Address).filter(
4598                or_(with_parent(u1, "addresses"), with_parent(u2, "addresses"))
4599            ),
4600            "SELECT addresses.id AS addresses_id, addresses.user_id AS "
4601            "addresses_user_id, addresses.email_address AS "
4602            "addresses_email_address FROM addresses WHERE "
4603            ":param_1 = addresses.user_id OR :param_2 = addresses.user_id",
4604            checkparams={"param_1": 7, "param_2": 8},
4605        )
4606
4607
4608class WithTransientOnNone(_fixtures.FixtureTest, AssertsCompiledSQL):
4609    run_inserts = None
4610    __dialect__ = "default"
4611
4612    def _fixture1(self):
4613        User, Address = self.classes.User, self.classes.Address
4614        users, addresses = self.tables.users, self.tables.addresses
4615
4616        mapper(User, users)
4617        mapper(
4618            Address,
4619            addresses,
4620            properties={
4621                "user": relationship(User),
4622                "special_user": relationship(
4623                    User,
4624                    primaryjoin=and_(
4625                        users.c.id == addresses.c.user_id,
4626                        users.c.name == addresses.c.email_address,
4627                    ),
4628                ),
4629            },
4630        )
4631
4632    def test_filter_with_transient_assume_pk(self):
4633        self._fixture1()
4634        User, Address = self.classes.User, self.classes.Address
4635
4636        sess = Session()
4637
4638        q = sess.query(Address).filter(Address.user == User())
4639        with expect_warnings("Got None for value of column "):
4640            self.assert_compile(
4641                q,
4642                "SELECT addresses.id AS addresses_id, "
4643                "addresses.user_id AS addresses_user_id, "
4644                "addresses.email_address AS addresses_email_address "
4645                "FROM addresses WHERE :param_1 = addresses.user_id",
4646                checkparams={"param_1": None},
4647            )
4648
4649    def test_filter_with_transient_warn_for_none_against_non_pk(self):
4650        self._fixture1()
4651        User, Address = self.classes.User, self.classes.Address
4652
4653        s = Session()
4654        q = s.query(Address).filter(Address.special_user == User())
4655        with expect_warnings("Got None for value of column"):
4656
4657            self.assert_compile(
4658                q,
4659                "SELECT addresses.id AS addresses_id, "
4660                "addresses.user_id AS addresses_user_id, "
4661                "addresses.email_address AS addresses_email_address "
4662                "FROM addresses WHERE :param_1 = addresses.user_id "
4663                "AND :param_2 = addresses.email_address",
4664                checkparams={"param_1": None, "param_2": None},
4665            )
4666
4667    def test_with_parent_with_transient_assume_pk(self):
4668        self._fixture1()
4669        User, Address = self.classes.User, self.classes.Address
4670
4671        sess = Session()
4672
4673        q = sess.query(User).with_parent(Address(), "user")
4674        with expect_warnings("Got None for value of column"):
4675            self.assert_compile(
4676                q,
4677                "SELECT users.id AS users_id, users.name AS users_name "
4678                "FROM users WHERE users.id = :param_1",
4679                checkparams={"param_1": None},
4680            )
4681
4682    def test_with_parent_with_transient_warn_for_none_against_non_pk(self):
4683        self._fixture1()
4684        User, Address = self.classes.User, self.classes.Address
4685
4686        s = Session()
4687        q = s.query(User).with_parent(Address(), "special_user")
4688        with expect_warnings("Got None for value of column"):
4689
4690            self.assert_compile(
4691                q,
4692                "SELECT users.id AS users_id, users.name AS users_name "
4693                "FROM users WHERE users.id = :param_1 "
4694                "AND users.name = :param_2",
4695                checkparams={"param_1": None, "param_2": None},
4696            )
4697
4698    def test_negated_contains_or_equals_plain_m2o(self):
4699        self._fixture1()
4700        User, Address = self.classes.User, self.classes.Address
4701
4702        s = Session()
4703        q = s.query(Address).filter(Address.user != User())
4704        with expect_warnings("Got None for value of column"):
4705            self.assert_compile(
4706                q,
4707                "SELECT addresses.id AS addresses_id, "
4708                "addresses.user_id AS addresses_user_id, "
4709                "addresses.email_address AS addresses_email_address "
4710                "FROM addresses "
4711                "WHERE addresses.user_id != :user_id_1 "
4712                "OR addresses.user_id IS NULL",
4713                checkparams={"user_id_1": None},
4714            )
4715
4716    def test_negated_contains_or_equals_complex_rel(self):
4717        self._fixture1()
4718        User, Address = self.classes.User, self.classes.Address
4719
4720        s = Session()
4721
4722        # this one does *not* warn because we do the criteria
4723        # without deferral
4724        q = s.query(Address).filter(Address.special_user != User())
4725        self.assert_compile(
4726            q,
4727            "SELECT addresses.id AS addresses_id, "
4728            "addresses.user_id AS addresses_user_id, "
4729            "addresses.email_address AS addresses_email_address "
4730            "FROM addresses "
4731            "WHERE NOT (EXISTS (SELECT 1 "
4732            "FROM users "
4733            "WHERE users.id = addresses.user_id AND "
4734            "users.name = addresses.email_address AND users.id IS NULL))",
4735            checkparams={},
4736        )
4737
4738
4739class SynonymTest(QueryTest, AssertsCompiledSQL):
4740    __dialect__ = "default"
4741
4742    @classmethod
4743    def setup_mappers(cls):
4744        (
4745            users,
4746            Keyword,
4747            items,
4748            order_items,
4749            orders,
4750            Item,
4751            User,
4752            Address,
4753            keywords,
4754            Order,
4755            item_keywords,
4756            addresses,
4757        ) = (
4758            cls.tables.users,
4759            cls.classes.Keyword,
4760            cls.tables.items,
4761            cls.tables.order_items,
4762            cls.tables.orders,
4763            cls.classes.Item,
4764            cls.classes.User,
4765            cls.classes.Address,
4766            cls.tables.keywords,
4767            cls.classes.Order,
4768            cls.tables.item_keywords,
4769            cls.tables.addresses,
4770        )
4771
4772        mapper(
4773            User,
4774            users,
4775            properties={
4776                "name_syn": synonym("name"),
4777                "addresses": relationship(Address),
4778                "orders": relationship(
4779                    Order, backref="user", order_by=orders.c.id
4780                ),  # o2m, m2o
4781                "orders_syn": synonym("orders"),
4782                "orders_syn_2": synonym("orders_syn"),
4783            },
4784        )
4785        mapper(Address, addresses)
4786        mapper(
4787            Order,
4788            orders,
4789            properties={
4790                "items": relationship(Item, secondary=order_items),  # m2m
4791                "address": relationship(Address),  # m2o
4792                "items_syn": synonym("items"),
4793            },
4794        )
4795        mapper(
4796            Item,
4797            items,
4798            properties={
4799                "keywords": relationship(
4800                    Keyword, secondary=item_keywords
4801                )  # m2m
4802            },
4803        )
4804        mapper(Keyword, keywords)
4805
4806    def test_options(self):
4807        User, Order = self.classes.User, self.classes.Order
4808
4809        s = create_session()
4810
4811        def go():
4812            result = (
4813                s.query(User)
4814                .filter_by(name="jack")
4815                .options(joinedload(User.orders_syn))
4816                .all()
4817            )
4818            eq_(
4819                result,
4820                [
4821                    User(
4822                        id=7,
4823                        name="jack",
4824                        orders=[
4825                            Order(description="order 1"),
4826                            Order(description="order 3"),
4827                            Order(description="order 5"),
4828                        ],
4829                    )
4830                ],
4831            )
4832
4833        self.assert_sql_count(testing.db, go, 1)
4834
4835    def test_options_syn_of_syn(self):
4836        User, Order = self.classes.User, self.classes.Order
4837
4838        s = create_session()
4839
4840        def go():
4841            result = (
4842                s.query(User)
4843                .filter_by(name="jack")
4844                .options(joinedload(User.orders_syn_2))
4845                .all()
4846            )
4847            eq_(
4848                result,
4849                [
4850                    User(
4851                        id=7,
4852                        name="jack",
4853                        orders=[
4854                            Order(description="order 1"),
4855                            Order(description="order 3"),
4856                            Order(description="order 5"),
4857                        ],
4858                    )
4859                ],
4860            )
4861
4862        self.assert_sql_count(testing.db, go, 1)
4863
4864    def test_options_syn_of_syn_string(self):
4865        User, Order = self.classes.User, self.classes.Order
4866
4867        s = create_session()
4868
4869        def go():
4870            result = (
4871                s.query(User)
4872                .filter_by(name="jack")
4873                .options(joinedload("orders_syn_2"))
4874                .all()
4875            )
4876            eq_(
4877                result,
4878                [
4879                    User(
4880                        id=7,
4881                        name="jack",
4882                        orders=[
4883                            Order(description="order 1"),
4884                            Order(description="order 3"),
4885                            Order(description="order 5"),
4886                        ],
4887                    )
4888                ],
4889            )
4890
4891        self.assert_sql_count(testing.db, go, 1)
4892
4893    def test_joins(self):
4894        User, Order = self.classes.User, self.classes.Order
4895
4896        for j in (
4897            ["orders", "items"],
4898            ["orders_syn", "items"],
4899            [User.orders_syn, Order.items],
4900            ["orders_syn_2", "items"],
4901            [User.orders_syn_2, "items"],
4902            ["orders", "items_syn"],
4903            ["orders_syn", "items_syn"],
4904            ["orders_syn_2", "items_syn"],
4905        ):
4906            result = (
4907                create_session().query(User).join(*j).filter_by(id=3).all()
4908            )
4909            assert [User(id=7, name="jack"), User(id=9, name="fred")] == result
4910
4911    def test_with_parent(self):
4912        Order, User = self.classes.Order, self.classes.User
4913
4914        for nameprop, orderprop in (
4915            ("name", "orders"),
4916            ("name_syn", "orders"),
4917            ("name", "orders_syn"),
4918            ("name", "orders_syn_2"),
4919            ("name_syn", "orders_syn"),
4920            ("name_syn", "orders_syn_2"),
4921        ):
4922            sess = create_session()
4923            q = sess.query(User)
4924
4925            u1 = q.filter_by(**{nameprop: "jack"}).one()
4926
4927            o = sess.query(Order).with_parent(u1, property=orderprop).all()
4928            assert [
4929                Order(description="order 1"),
4930                Order(description="order 3"),
4931                Order(description="order 5"),
4932            ] == o
4933
4934    def test_froms_aliased_col(self):
4935        Address, User = self.classes.Address, self.classes.User
4936
4937        sess = create_session()
4938        ua = aliased(User)
4939
4940        q = sess.query(ua.name_syn).join(Address, ua.id == Address.user_id)
4941        self.assert_compile(
4942            q,
4943            "SELECT users_1.name AS users_1_name FROM "
4944            "users AS users_1 JOIN addresses "
4945            "ON users_1.id = addresses.user_id",
4946        )
4947
4948
4949class ImmediateTest(_fixtures.FixtureTest):
4950    run_inserts = "once"
4951    run_deletes = None
4952
4953    @classmethod
4954    def setup_mappers(cls):
4955        Address, addresses, users, User = (
4956            cls.classes.Address,
4957            cls.tables.addresses,
4958            cls.tables.users,
4959            cls.classes.User,
4960        )
4961
4962        mapper(Address, addresses)
4963
4964        mapper(User, users, properties=dict(addresses=relationship(Address)))
4965
4966    def test_one(self):
4967        User, Address = self.classes.User, self.classes.Address
4968
4969        sess = create_session()
4970
4971        assert_raises_message(
4972            sa.orm.exc.NoResultFound,
4973            r"No row was found for one\(\)",
4974            sess.query(User).filter(User.id == 99).one,
4975        )
4976
4977        eq_(sess.query(User).filter(User.id == 7).one().id, 7)
4978
4979        assert_raises_message(
4980            sa.orm.exc.MultipleResultsFound,
4981            r"Multiple rows were found for one\(\)",
4982            sess.query(User).one,
4983        )
4984
4985        assert_raises(
4986            sa.orm.exc.NoResultFound,
4987            sess.query(User.id, User.name).filter(User.id == 99).one,
4988        )
4989
4990        eq_(
4991            sess.query(User.id, User.name).filter(User.id == 7).one(),
4992            (7, "jack"),
4993        )
4994
4995        assert_raises(
4996            sa.orm.exc.MultipleResultsFound, sess.query(User.id, User.name).one
4997        )
4998
4999        assert_raises(
5000            sa.orm.exc.NoResultFound,
5001            (
5002                sess.query(User, Address)
5003                .join(User.addresses)
5004                .filter(Address.id == 99)
5005            ).one,
5006        )
5007
5008        eq_(
5009            (
5010                sess.query(User, Address)
5011                .join(User.addresses)
5012                .filter(Address.id == 4)
5013            ).one(),
5014            (User(id=8), Address(id=4)),
5015        )
5016
5017        assert_raises(
5018            sa.orm.exc.MultipleResultsFound,
5019            sess.query(User, Address).join(User.addresses).one,
5020        )
5021
5022        # this result returns multiple rows, the first
5023        # two rows being the same.  but uniquing is
5024        # not applied for a column based result.
5025        assert_raises(
5026            sa.orm.exc.MultipleResultsFound,
5027            sess.query(User.id)
5028            .join(User.addresses)
5029            .filter(User.id.in_([8, 9]))
5030            .order_by(User.id)
5031            .one,
5032        )
5033
5034        # test that a join which ultimately returns
5035        # multiple identities across many rows still
5036        # raises, even though the first two rows are of
5037        # the same identity and unique filtering
5038        # is applied ([ticket:1688])
5039        assert_raises(
5040            sa.orm.exc.MultipleResultsFound,
5041            sess.query(User)
5042            .join(User.addresses)
5043            .filter(User.id.in_([8, 9]))
5044            .order_by(User.id)
5045            .one,
5046        )
5047
5048    def test_one_or_none(self):
5049        User, Address = self.classes.User, self.classes.Address
5050
5051        sess = create_session()
5052
5053        eq_(sess.query(User).filter(User.id == 99).one_or_none(), None)
5054
5055        eq_(sess.query(User).filter(User.id == 7).one_or_none().id, 7)
5056
5057        assert_raises_message(
5058            sa.orm.exc.MultipleResultsFound,
5059            r"Multiple rows were found for one_or_none\(\)",
5060            sess.query(User).one_or_none,
5061        )
5062
5063        eq_(
5064            sess.query(User.id, User.name).filter(User.id == 99).one_or_none(),
5065            None,
5066        )
5067
5068        eq_(
5069            sess.query(User.id, User.name).filter(User.id == 7).one_or_none(),
5070            (7, "jack"),
5071        )
5072
5073        assert_raises(
5074            sa.orm.exc.MultipleResultsFound,
5075            sess.query(User.id, User.name).one_or_none,
5076        )
5077
5078        eq_(
5079            (
5080                sess.query(User, Address)
5081                .join(User.addresses)
5082                .filter(Address.id == 99)
5083            ).one_or_none(),
5084            None,
5085        )
5086
5087        eq_(
5088            (
5089                sess.query(User, Address)
5090                .join(User.addresses)
5091                .filter(Address.id == 4)
5092            ).one_or_none(),
5093            (User(id=8), Address(id=4)),
5094        )
5095
5096        assert_raises(
5097            sa.orm.exc.MultipleResultsFound,
5098            sess.query(User, Address).join(User.addresses).one_or_none,
5099        )
5100
5101        # this result returns multiple rows, the first
5102        # two rows being the same.  but uniquing is
5103        # not applied for a column based result.
5104        assert_raises(
5105            sa.orm.exc.MultipleResultsFound,
5106            sess.query(User.id)
5107            .join(User.addresses)
5108            .filter(User.id.in_([8, 9]))
5109            .order_by(User.id)
5110            .one_or_none,
5111        )
5112
5113        # test that a join which ultimately returns
5114        # multiple identities across many rows still
5115        # raises, even though the first two rows are of
5116        # the same identity and unique filtering
5117        # is applied ([ticket:1688])
5118        assert_raises(
5119            sa.orm.exc.MultipleResultsFound,
5120            sess.query(User)
5121            .join(User.addresses)
5122            .filter(User.id.in_([8, 9]))
5123            .order_by(User.id)
5124            .one_or_none,
5125        )
5126
5127    @testing.future
5128    def test_getslice(self):
5129        assert False
5130
5131    def test_scalar(self):
5132        User = self.classes.User
5133
5134        sess = create_session()
5135
5136        eq_(sess.query(User.id).filter_by(id=7).scalar(), 7)
5137        eq_(sess.query(User.id, User.name).filter_by(id=7).scalar(), 7)
5138        eq_(sess.query(User.id).filter_by(id=0).scalar(), None)
5139        eq_(
5140            sess.query(User).filter_by(id=7).scalar(),
5141            sess.query(User).filter_by(id=7).one(),
5142        )
5143
5144        assert_raises(sa.orm.exc.MultipleResultsFound, sess.query(User).scalar)
5145        assert_raises(
5146            sa.orm.exc.MultipleResultsFound,
5147            sess.query(User.id, User.name).scalar,
5148        )
5149
5150    def test_value(self):
5151        User = self.classes.User
5152
5153        sess = create_session()
5154
5155        eq_(sess.query(User).filter_by(id=7).value(User.id), 7)
5156        eq_(sess.query(User.id, User.name).filter_by(id=7).value(User.id), 7)
5157        eq_(sess.query(User).filter_by(id=0).value(User.id), None)
5158
5159        sess.bind = testing.db
5160        eq_(sess.query().value(sa.literal_column("1").label("x")), 1)
5161
5162
5163class ExecutionOptionsTest(QueryTest):
5164    def test_option_building(self):
5165        User = self.classes.User
5166
5167        sess = create_session(bind=testing.db, autocommit=False)
5168
5169        q1 = sess.query(User)
5170        assert q1._execution_options == dict()
5171        q2 = q1.execution_options(foo="bar", stream_results=True)
5172        # q1's options should be unchanged.
5173        assert q1._execution_options == dict()
5174        # q2 should have them set.
5175        assert q2._execution_options == dict(foo="bar", stream_results=True)
5176        q3 = q2.execution_options(foo="not bar", answer=42)
5177        assert q2._execution_options == dict(foo="bar", stream_results=True)
5178
5179        q3_options = dict(foo="not bar", stream_results=True, answer=42)
5180        assert q3._execution_options == q3_options
5181
5182    def test_options_in_connection(self):
5183        User = self.classes.User
5184
5185        execution_options = dict(foo="bar", stream_results=True)
5186
5187        class TQuery(Query):
5188            def instances(self, result, ctx):
5189                try:
5190                    eq_(
5191                        result.connection._execution_options, execution_options
5192                    )
5193                finally:
5194                    result.close()
5195                return iter([])
5196
5197        sess = create_session(
5198            bind=testing.db, autocommit=False, query_cls=TQuery
5199        )
5200        q1 = sess.query(User).execution_options(**execution_options)
5201        q1.all()
5202
5203
5204class BooleanEvalTest(fixtures.TestBase, testing.AssertsCompiledSQL):
5205    """test standalone booleans being wrapped in an AsBoolean, as well
5206    as true/false compilation."""
5207
5208    def _dialect(self, native_boolean):
5209        d = default.DefaultDialect()
5210        d.supports_native_boolean = native_boolean
5211        return d
5212
5213    def test_one(self):
5214        s = Session()
5215        c = column("x", Boolean)
5216        self.assert_compile(
5217            s.query(c).filter(c),
5218            "SELECT x WHERE x",
5219            dialect=self._dialect(True),
5220        )
5221
5222    def test_two(self):
5223        s = Session()
5224        c = column("x", Boolean)
5225        self.assert_compile(
5226            s.query(c).filter(c),
5227            "SELECT x WHERE x = 1",
5228            dialect=self._dialect(False),
5229        )
5230
5231    def test_three(self):
5232        s = Session()
5233        c = column("x", Boolean)
5234        self.assert_compile(
5235            s.query(c).filter(~c),
5236            "SELECT x WHERE x = 0",
5237            dialect=self._dialect(False),
5238        )
5239
5240    def test_four(self):
5241        s = Session()
5242        c = column("x", Boolean)
5243        self.assert_compile(
5244            s.query(c).filter(~c),
5245            "SELECT x WHERE NOT x",
5246            dialect=self._dialect(True),
5247        )
5248
5249    def test_five(self):
5250        s = Session()
5251        c = column("x", Boolean)
5252        self.assert_compile(
5253            s.query(c).having(c),
5254            "SELECT x HAVING x = 1",
5255            dialect=self._dialect(False),
5256        )
5257
5258
5259class SessionBindTest(QueryTest):
5260    @contextlib.contextmanager
5261    def _assert_bind_args(self, session):
5262        get_bind = mock.Mock(side_effect=session.get_bind)
5263        with mock.patch.object(session, "get_bind", get_bind):
5264            yield
5265        for call_ in get_bind.mock_calls:
5266            is_(call_[1][0], inspect(self.classes.User))
5267            is_not_(call_[2]["clause"], None)
5268
5269    def test_single_entity_q(self):
5270        User = self.classes.User
5271        session = Session()
5272        with self._assert_bind_args(session):
5273            session.query(User).all()
5274
5275    def test_sql_expr_entity_q(self):
5276        User = self.classes.User
5277        session = Session()
5278        with self._assert_bind_args(session):
5279            session.query(User.id).all()
5280
5281    def test_count(self):
5282        User = self.classes.User
5283        session = Session()
5284        with self._assert_bind_args(session):
5285            session.query(User).count()
5286
5287    def test_aggregate_fn(self):
5288        User = self.classes.User
5289        session = Session()
5290        with self._assert_bind_args(session):
5291            session.query(func.max(User.name)).all()
5292
5293    def test_bulk_update_no_sync(self):
5294        User = self.classes.User
5295        session = Session()
5296        with self._assert_bind_args(session):
5297            session.query(User).filter(User.id == 15).update(
5298                {"name": "foob"}, synchronize_session=False
5299            )
5300
5301    def test_bulk_delete_no_sync(self):
5302        User = self.classes.User
5303        session = Session()
5304        with self._assert_bind_args(session):
5305            session.query(User).filter(User.id == 15).delete(
5306                synchronize_session=False
5307            )
5308
5309    def test_bulk_update_fetch_sync(self):
5310        User = self.classes.User
5311        session = Session()
5312        with self._assert_bind_args(session):
5313            session.query(User).filter(User.id == 15).update(
5314                {"name": "foob"}, synchronize_session="fetch"
5315            )
5316
5317    def test_bulk_delete_fetch_sync(self):
5318        User = self.classes.User
5319        session = Session()
5320        with self._assert_bind_args(session):
5321            session.query(User).filter(User.id == 15).delete(
5322                synchronize_session="fetch"
5323            )
5324
5325    def test_column_property(self):
5326        User = self.classes.User
5327
5328        mapper = inspect(User)
5329        mapper.add_property(
5330            "score",
5331            column_property(func.coalesce(self.tables.users.c.name, None)),
5332        )
5333        session = Session()
5334        with self._assert_bind_args(session):
5335            session.query(func.max(User.score)).scalar()
5336
5337    @testing.requires.nested_aggregates
5338    def test_column_property_select(self):
5339        User = self.classes.User
5340        Address = self.classes.Address
5341
5342        mapper = inspect(User)
5343        mapper.add_property(
5344            "score",
5345            column_property(
5346                select([func.sum(Address.id)])
5347                .where(Address.user_id == User.id)
5348                .as_scalar()
5349            ),
5350        )
5351        session = Session()
5352
5353        with self._assert_bind_args(session):
5354            session.query(func.max(User.score)).scalar()
5355
5356
5357class QueryClsTest(QueryTest):
5358    def _fn_fixture(self):
5359        def query(*arg, **kw):
5360            return Query(*arg, **kw)
5361
5362        return query
5363
5364    def _subclass_fixture(self):
5365        class MyQuery(Query):
5366            pass
5367
5368        return MyQuery
5369
5370    def _callable_fixture(self):
5371        class MyQueryFactory(object):
5372            def __call__(self, *arg, **kw):
5373                return Query(*arg, **kw)
5374
5375        return MyQueryFactory()
5376
5377    def _plain_fixture(self):
5378        return Query
5379
5380    def _test_get(self, fixture):
5381        User = self.classes.User
5382
5383        s = Session(query_cls=fixture())
5384
5385        assert s.query(User).get(19) is None
5386        u = s.query(User).get(7)
5387        u2 = s.query(User).get(7)
5388        assert u is u2
5389
5390    def _test_o2m_lazyload(self, fixture):
5391        User, Address = self.classes("User", "Address")
5392
5393        s = Session(query_cls=fixture())
5394
5395        u1 = s.query(User).filter(User.id == 7).first()
5396        eq_(u1.addresses, [Address(id=1)])
5397
5398    def _test_m2o_lazyload(self, fixture):
5399        User, Address = self.classes("User", "Address")
5400
5401        s = Session(query_cls=fixture())
5402
5403        a1 = s.query(Address).filter(Address.id == 1).first()
5404        eq_(a1.user, User(id=7))
5405
5406    def _test_expr(self, fixture):
5407        User, Address = self.classes("User", "Address")
5408
5409        s = Session(query_cls=fixture())
5410
5411        q = s.query(func.max(User.id).label("max"))
5412        eq_(q.scalar(), 10)
5413
5414    def _test_expr_undocumented_query_constructor(self, fixture):
5415        # see #4269.  not documented but already out there.
5416        User, Address = self.classes("User", "Address")
5417
5418        s = Session(query_cls=fixture())
5419
5420        q = Query(func.max(User.id).label("max")).with_session(s)
5421        eq_(q.scalar(), 10)
5422
5423    def test_plain_get(self):
5424        self._test_get(self._plain_fixture)
5425
5426    def test_callable_get(self):
5427        self._test_get(self._callable_fixture)
5428
5429    def test_subclass_get(self):
5430        self._test_get(self._subclass_fixture)
5431
5432    def test_fn_get(self):
5433        self._test_get(self._fn_fixture)
5434
5435    def test_plain_expr(self):
5436        self._test_expr(self._plain_fixture)
5437
5438    def test_callable_expr(self):
5439        self._test_expr(self._callable_fixture)
5440
5441    def test_subclass_expr(self):
5442        self._test_expr(self._subclass_fixture)
5443
5444    def test_fn_expr(self):
5445        self._test_expr(self._fn_fixture)
5446
5447    def test_plain_expr_undocumented_query_constructor(self):
5448        self._test_expr_undocumented_query_constructor(self._plain_fixture)
5449
5450    def test_callable_expr_undocumented_query_constructor(self):
5451        self._test_expr_undocumented_query_constructor(self._callable_fixture)
5452
5453    def test_subclass_expr_undocumented_query_constructor(self):
5454        self._test_expr_undocumented_query_constructor(self._subclass_fixture)
5455
5456    def test_fn_expr_undocumented_query_constructor(self):
5457        self._test_expr_undocumented_query_constructor(self._fn_fixture)
5458
5459    def test_callable_o2m_lazyload(self):
5460        self._test_o2m_lazyload(self._callable_fixture)
5461
5462    def test_subclass_o2m_lazyload(self):
5463        self._test_o2m_lazyload(self._subclass_fixture)
5464
5465    def test_fn_o2m_lazyload(self):
5466        self._test_o2m_lazyload(self._fn_fixture)
5467
5468    def test_callable_m2o_lazyload(self):
5469        self._test_m2o_lazyload(self._callable_fixture)
5470
5471    def test_subclass_m2o_lazyload(self):
5472        self._test_m2o_lazyload(self._subclass_fixture)
5473
5474    def test_fn_m2o_lazyload(self):
5475        self._test_m2o_lazyload(self._fn_fixture)
5476