1import contextlib 2 3import sqlalchemy as sa 4from sqlalchemy import and_ 5from sqlalchemy import between 6from sqlalchemy import bindparam 7from sqlalchemy import Boolean 8from sqlalchemy import cast 9from sqlalchemy import collate 10from sqlalchemy import column 11from sqlalchemy import desc 12from sqlalchemy import distinct 13from sqlalchemy import exc as sa_exc 14from sqlalchemy import exists 15from sqlalchemy import ForeignKey 16from sqlalchemy import func 17from sqlalchemy import insert 18from sqlalchemy import inspect 19from sqlalchemy import Integer 20from sqlalchemy import literal 21from sqlalchemy import literal_column 22from sqlalchemy import MetaData 23from sqlalchemy import null 24from sqlalchemy import or_ 25from sqlalchemy import select 26from sqlalchemy import String 27from sqlalchemy import table 28from sqlalchemy import testing 29from sqlalchemy import text 30from sqlalchemy import Unicode 31from sqlalchemy import union 32from sqlalchemy import util 33from sqlalchemy.engine import default 34from sqlalchemy.orm import aliased 35from sqlalchemy.orm import attributes 36from sqlalchemy.orm import backref 37from sqlalchemy.orm import Bundle 38from sqlalchemy.orm import column_property 39from sqlalchemy.orm import create_session 40from sqlalchemy.orm import defer 41from sqlalchemy.orm import joinedload 42from sqlalchemy.orm import joinedload_all 43from sqlalchemy.orm import lazyload 44from sqlalchemy.orm import mapper 45from sqlalchemy.orm import Query 46from sqlalchemy.orm import relationship 47from sqlalchemy.orm import Session 48from sqlalchemy.orm import subqueryload 49from sqlalchemy.orm import synonym 50from sqlalchemy.orm.util import join 51from sqlalchemy.orm.util import with_parent 52from sqlalchemy.sql import expression 53from sqlalchemy.sql import operators 54from sqlalchemy.testing import assert_warnings 55from sqlalchemy.testing import AssertsCompiledSQL 56from sqlalchemy.testing import fixtures 57from sqlalchemy.testing import is_ 58from sqlalchemy.testing import is_not_ 59from sqlalchemy.testing import mock 60from sqlalchemy.testing.assertions import assert_raises 61from sqlalchemy.testing.assertions import assert_raises_message 62from sqlalchemy.testing.assertions import eq_ 63from sqlalchemy.testing.assertions import eq_ignore_whitespace 64from sqlalchemy.testing.assertions import expect_warnings 65from sqlalchemy.testing.assertsql import CompiledSQL 66from sqlalchemy.testing.schema import Column 67from sqlalchemy.testing.schema import Table 68from test.orm import _fixtures 69 70 71class QueryTest(_fixtures.FixtureTest): 72 run_setup_mappers = "once" 73 run_inserts = "once" 74 run_deletes = None 75 76 @classmethod 77 def setup_mappers(cls): 78 cls._setup_stock_mapping() 79 80 81class MiscTest(QueryTest): 82 run_create_tables = None 83 run_inserts = None 84 85 def test_with_session(self): 86 User = self.classes.User 87 s1 = Session() 88 s2 = Session() 89 q1 = s1.query(User) 90 q2 = q1.with_session(s2) 91 assert q2.session is s2 92 assert q1.session is s1 93 94 95class OnlyReturnTuplesTest(QueryTest): 96 def test_single_entity_false(self): 97 User = self.classes.User 98 row = create_session().query(User).only_return_tuples(False).first() 99 assert isinstance(row, User) 100 101 def test_single_entity_true(self): 102 User = self.classes.User 103 row = create_session().query(User).only_return_tuples(True).first() 104 assert isinstance(row, tuple) 105 106 def test_multiple_entity_false(self): 107 User = self.classes.User 108 row = ( 109 create_session() 110 .query(User.id, User) 111 .only_return_tuples(False) 112 .first() 113 ) 114 assert isinstance(row, tuple) 115 116 def test_multiple_entity_true(self): 117 User = self.classes.User 118 row = ( 119 create_session() 120 .query(User.id, User) 121 .only_return_tuples(True) 122 .first() 123 ) 124 assert isinstance(row, tuple) 125 126 127class RowTupleTest(QueryTest): 128 run_setup_mappers = None 129 130 def test_custom_names(self): 131 User, users = self.classes.User, self.tables.users 132 133 mapper(User, users, properties={"uname": users.c.name}) 134 135 row = ( 136 create_session() 137 .query(User.id, User.uname) 138 .filter(User.id == 7) 139 .first() 140 ) 141 assert row.id == 7 142 assert row.uname == "jack" 143 144 def test_column_metadata(self): 145 users, Address, addresses, User = ( 146 self.tables.users, 147 self.classes.Address, 148 self.tables.addresses, 149 self.classes.User, 150 ) 151 152 mapper(User, users) 153 mapper(Address, addresses) 154 sess = create_session() 155 user_alias = aliased(User) 156 user_alias_id_label = user_alias.id.label("foo") 157 address_alias = aliased(Address, name="aalias") 158 fn = func.count(User.id) 159 name_label = User.name.label("uname") 160 bundle = Bundle("b1", User.id, User.name) 161 cte = sess.query(User.id).cte() 162 for q, asserted in [ 163 ( 164 sess.query(User), 165 [ 166 { 167 "name": "User", 168 "type": User, 169 "aliased": False, 170 "expr": User, 171 "entity": User, 172 } 173 ], 174 ), 175 ( 176 sess.query(User.id, User), 177 [ 178 { 179 "name": "id", 180 "type": users.c.id.type, 181 "aliased": False, 182 "expr": User.id, 183 "entity": User, 184 }, 185 { 186 "name": "User", 187 "type": User, 188 "aliased": False, 189 "expr": User, 190 "entity": User, 191 }, 192 ], 193 ), 194 ( 195 sess.query(User.id, user_alias), 196 [ 197 { 198 "name": "id", 199 "type": users.c.id.type, 200 "aliased": False, 201 "expr": User.id, 202 "entity": User, 203 }, 204 { 205 "name": None, 206 "type": User, 207 "aliased": True, 208 "expr": user_alias, 209 "entity": user_alias, 210 }, 211 ], 212 ), 213 ( 214 sess.query(user_alias.id), 215 [ 216 { 217 "name": "id", 218 "type": users.c.id.type, 219 "aliased": True, 220 "expr": user_alias.id, 221 "entity": user_alias, 222 } 223 ], 224 ), 225 ( 226 sess.query(user_alias_id_label), 227 [ 228 { 229 "name": "foo", 230 "type": users.c.id.type, 231 "aliased": True, 232 "expr": user_alias_id_label, 233 "entity": user_alias, 234 } 235 ], 236 ), 237 ( 238 sess.query(address_alias), 239 [ 240 { 241 "name": "aalias", 242 "type": Address, 243 "aliased": True, 244 "expr": address_alias, 245 "entity": address_alias, 246 } 247 ], 248 ), 249 ( 250 sess.query(name_label, fn), 251 [ 252 { 253 "name": "uname", 254 "type": users.c.name.type, 255 "aliased": False, 256 "expr": name_label, 257 "entity": User, 258 }, 259 { 260 "name": None, 261 "type": fn.type, 262 "aliased": False, 263 "expr": fn, 264 "entity": User, 265 }, 266 ], 267 ), 268 ( 269 sess.query(cte), 270 [ 271 { 272 "aliased": False, 273 "expr": cte.c.id, 274 "type": cte.c.id.type, 275 "name": "id", 276 "entity": None, 277 } 278 ], 279 ), 280 ( 281 sess.query(users), 282 [ 283 { 284 "aliased": False, 285 "expr": users.c.id, 286 "type": users.c.id.type, 287 "name": "id", 288 "entity": None, 289 }, 290 { 291 "aliased": False, 292 "expr": users.c.name, 293 "type": users.c.name.type, 294 "name": "name", 295 "entity": None, 296 }, 297 ], 298 ), 299 ( 300 sess.query(users.c.name), 301 [ 302 { 303 "name": "name", 304 "type": users.c.name.type, 305 "aliased": False, 306 "expr": users.c.name, 307 "entity": None, 308 } 309 ], 310 ), 311 ( 312 sess.query(bundle), 313 [ 314 { 315 "aliased": False, 316 "expr": bundle, 317 "type": Bundle, 318 "name": "b1", 319 "entity": User, 320 } 321 ], 322 ), 323 ]: 324 eq_(q.column_descriptions, asserted) 325 326 def test_unhashable_type(self): 327 from sqlalchemy.types import TypeDecorator, Integer 328 from sqlalchemy.sql import type_coerce 329 330 class MyType(TypeDecorator): 331 impl = Integer 332 hashable = False 333 334 def process_result_value(self, value, dialect): 335 return [value] 336 337 User, users = self.classes.User, self.tables.users 338 339 mapper(User, users) 340 341 s = Session() 342 q = s.query(User, type_coerce(users.c.id, MyType).label("foo")).filter( 343 User.id == 7 344 ) 345 row = q.first() 346 eq_(row, (User(id=7), [7])) 347 348 349class BindSensitiveStringifyTest(fixtures.TestBase): 350 def _fixture(self, bind_to=None): 351 # building a totally separate metadata /mapping here 352 # because we need to control if the MetaData is bound or not 353 354 class User(object): 355 pass 356 357 m = MetaData(bind=bind_to) 358 user_table = Table( 359 "users", 360 m, 361 Column("id", Integer, primary_key=True), 362 Column("name", String(50)), 363 ) 364 365 mapper(User, user_table) 366 return User 367 368 def _dialect_fixture(self): 369 class MyDialect(default.DefaultDialect): 370 default_paramstyle = "qmark" 371 372 from sqlalchemy.engine import base 373 374 return base.Engine(mock.Mock(), MyDialect(), mock.Mock()) 375 376 def _test( 377 self, bound_metadata, bound_session, session_present, expect_bound 378 ): 379 if bound_metadata or bound_session: 380 eng = self._dialect_fixture() 381 else: 382 eng = None 383 384 User = self._fixture(bind_to=eng if bound_metadata else None) 385 386 s = Session(eng if bound_session else None) 387 q = s.query(User).filter(User.id == 7) 388 if not session_present: 389 q = q.with_session(None) 390 391 eq_ignore_whitespace( 392 str(q), 393 "SELECT users.id AS users_id, users.name AS users_name " 394 "FROM users WHERE users.id = ?" 395 if expect_bound 396 else "SELECT users.id AS users_id, users.name AS users_name " 397 "FROM users WHERE users.id = :id_1", 398 ) 399 400 def test_query_unbound_metadata_bound_session(self): 401 self._test(False, True, True, True) 402 403 def test_query_bound_metadata_unbound_session(self): 404 self._test(True, False, True, True) 405 406 def test_query_unbound_metadata_no_session(self): 407 self._test(False, False, False, False) 408 409 def test_query_unbound_metadata_unbound_session(self): 410 self._test(False, False, True, False) 411 412 def test_query_bound_metadata_bound_session(self): 413 self._test(True, True, True, True) 414 415 416class RawSelectTest(QueryTest, AssertsCompiledSQL): 417 __dialect__ = "default" 418 419 def test_select_from_entity(self): 420 User = self.classes.User 421 422 self.assert_compile( 423 select(["*"]).select_from(User), "SELECT * FROM users" 424 ) 425 426 def test_where_relationship(self): 427 User = self.classes.User 428 429 self.assert_compile( 430 select([User]).where(User.addresses), 431 "SELECT users.id, users.name FROM users, addresses " 432 "WHERE users.id = addresses.user_id", 433 ) 434 435 def test_where_m2m_relationship(self): 436 Item = self.classes.Item 437 438 self.assert_compile( 439 select([Item]).where(Item.keywords), 440 "SELECT items.id, items.description FROM items, " 441 "item_keywords AS item_keywords_1, keywords " 442 "WHERE items.id = item_keywords_1.item_id " 443 "AND keywords.id = item_keywords_1.keyword_id", 444 ) 445 446 def test_inline_select_from_entity(self): 447 User = self.classes.User 448 449 self.assert_compile( 450 select(["*"], from_obj=User), "SELECT * FROM users" 451 ) 452 453 def test_select_from_aliased_entity(self): 454 User = self.classes.User 455 ua = aliased(User, name="ua") 456 self.assert_compile( 457 select(["*"]).select_from(ua), "SELECT * FROM users AS ua" 458 ) 459 460 def test_correlate_entity(self): 461 User = self.classes.User 462 Address = self.classes.Address 463 464 self.assert_compile( 465 select( 466 [ 467 User.name, 468 Address.id, 469 select([func.count(Address.id)]) 470 .where(User.id == Address.user_id) 471 .correlate(User) 472 .as_scalar(), 473 ] 474 ), 475 "SELECT users.name, addresses.id, " 476 "(SELECT count(addresses.id) AS count_1 " 477 "FROM addresses WHERE users.id = addresses.user_id) AS anon_1 " 478 "FROM users, addresses", 479 ) 480 481 def test_correlate_aliased_entity(self): 482 User = self.classes.User 483 Address = self.classes.Address 484 uu = aliased(User, name="uu") 485 486 self.assert_compile( 487 select( 488 [ 489 uu.name, 490 Address.id, 491 select([func.count(Address.id)]) 492 .where(uu.id == Address.user_id) 493 .correlate(uu) 494 .as_scalar(), 495 ] 496 ), 497 # for a long time, "uu.id = address.user_id" was reversed; 498 # this was resolved as of #2872 and had to do with 499 # InstrumentedAttribute.__eq__() taking precedence over 500 # QueryableAttribute.__eq__() 501 "SELECT uu.name, addresses.id, " 502 "(SELECT count(addresses.id) AS count_1 " 503 "FROM addresses WHERE uu.id = addresses.user_id) AS anon_1 " 504 "FROM users AS uu, addresses", 505 ) 506 507 def test_columns_clause_entity(self): 508 User = self.classes.User 509 510 self.assert_compile( 511 select([User]), "SELECT users.id, users.name FROM users" 512 ) 513 514 def test_columns_clause_columns(self): 515 User = self.classes.User 516 517 self.assert_compile( 518 select([User.id, User.name]), 519 "SELECT users.id, users.name FROM users", 520 ) 521 522 def test_columns_clause_aliased_columns(self): 523 User = self.classes.User 524 ua = aliased(User, name="ua") 525 self.assert_compile( 526 select([ua.id, ua.name]), "SELECT ua.id, ua.name FROM users AS ua" 527 ) 528 529 def test_columns_clause_aliased_entity(self): 530 User = self.classes.User 531 ua = aliased(User, name="ua") 532 self.assert_compile( 533 select([ua]), "SELECT ua.id, ua.name FROM users AS ua" 534 ) 535 536 def test_core_join(self): 537 User = self.classes.User 538 Address = self.classes.Address 539 from sqlalchemy.sql import join 540 541 self.assert_compile( 542 select([User]).select_from(join(User, Address)), 543 "SELECT users.id, users.name FROM users " 544 "JOIN addresses ON users.id = addresses.user_id", 545 ) 546 547 def test_insert_from_query(self): 548 User = self.classes.User 549 Address = self.classes.Address 550 551 s = Session() 552 q = s.query(User.id, User.name).filter_by(name="ed") 553 self.assert_compile( 554 insert(Address).from_select(("id", "email_address"), q), 555 "INSERT INTO addresses (id, email_address) " 556 "SELECT users.id AS users_id, users.name AS users_name " 557 "FROM users WHERE users.name = :name_1", 558 ) 559 560 def test_insert_from_query_col_attr(self): 561 User = self.classes.User 562 Address = self.classes.Address 563 564 s = Session() 565 q = s.query(User.id, User.name).filter_by(name="ed") 566 self.assert_compile( 567 insert(Address).from_select( 568 (Address.id, Address.email_address), q 569 ), 570 "INSERT INTO addresses (id, email_address) " 571 "SELECT users.id AS users_id, users.name AS users_name " 572 "FROM users WHERE users.name = :name_1", 573 ) 574 575 def test_update_from_entity(self): 576 from sqlalchemy.sql import update 577 578 User = self.classes.User 579 self.assert_compile( 580 update(User), "UPDATE users SET id=:id, name=:name" 581 ) 582 583 self.assert_compile( 584 update(User).values(name="ed").where(User.id == 5), 585 "UPDATE users SET name=:name WHERE users.id = :id_1", 586 checkparams={"id_1": 5, "name": "ed"}, 587 ) 588 589 def test_delete_from_entity(self): 590 from sqlalchemy.sql import delete 591 592 User = self.classes.User 593 self.assert_compile(delete(User), "DELETE FROM users") 594 595 self.assert_compile( 596 delete(User).where(User.id == 5), 597 "DELETE FROM users WHERE users.id = :id_1", 598 checkparams={"id_1": 5}, 599 ) 600 601 def test_insert_from_entity(self): 602 from sqlalchemy.sql import insert 603 604 User = self.classes.User 605 self.assert_compile( 606 insert(User), "INSERT INTO users (id, name) VALUES (:id, :name)" 607 ) 608 609 self.assert_compile( 610 insert(User).values(name="ed"), 611 "INSERT INTO users (name) VALUES (:name)", 612 checkparams={"name": "ed"}, 613 ) 614 615 def test_col_prop_builtin_function(self): 616 class Foo(object): 617 pass 618 619 mapper( 620 Foo, 621 self.tables.users, 622 properties={ 623 "foob": column_property( 624 func.coalesce(self.tables.users.c.name) 625 ) 626 }, 627 ) 628 629 self.assert_compile( 630 select([Foo]).where(Foo.foob == "somename").order_by(Foo.foob), 631 "SELECT users.id, users.name FROM users " 632 "WHERE coalesce(users.name) = :param_1 " 633 "ORDER BY coalesce(users.name)", 634 ) 635 636 637class GetTest(QueryTest): 638 def test_get(self): 639 User = self.classes.User 640 641 s = create_session() 642 assert s.query(User).get(19) is None 643 u = s.query(User).get(7) 644 u2 = s.query(User).get(7) 645 assert u is u2 646 s.expunge_all() 647 u2 = s.query(User).get(7) 648 assert u is not u2 649 650 def test_get_composite_pk_no_result(self): 651 CompositePk = self.classes.CompositePk 652 653 s = Session() 654 assert s.query(CompositePk).get((100, 100)) is None 655 656 def test_get_composite_pk_result(self): 657 CompositePk = self.classes.CompositePk 658 659 s = Session() 660 one_two = s.query(CompositePk).get((1, 2)) 661 assert one_two.i == 1 662 assert one_two.j == 2 663 assert one_two.k == 3 664 665 def test_get_too_few_params(self): 666 CompositePk = self.classes.CompositePk 667 668 s = Session() 669 q = s.query(CompositePk) 670 assert_raises(sa_exc.InvalidRequestError, q.get, 7) 671 672 def test_get_too_few_params_tuple(self): 673 CompositePk = self.classes.CompositePk 674 675 s = Session() 676 q = s.query(CompositePk) 677 assert_raises(sa_exc.InvalidRequestError, q.get, (7,)) 678 679 def test_get_too_many_params(self): 680 CompositePk = self.classes.CompositePk 681 682 s = Session() 683 q = s.query(CompositePk) 684 assert_raises(sa_exc.InvalidRequestError, q.get, (7, 10, 100)) 685 686 def test_get_against_col(self): 687 User = self.classes.User 688 689 s = Session() 690 q = s.query(User.id) 691 assert_raises(sa_exc.InvalidRequestError, q.get, (5,)) 692 693 def test_get_null_pk(self): 694 """test that a mapping which can have None in a 695 PK (i.e. map to an outerjoin) works with get().""" 696 697 users, addresses = self.tables.users, self.tables.addresses 698 699 s = users.outerjoin(addresses) 700 701 class UserThing(fixtures.ComparableEntity): 702 pass 703 704 mapper( 705 UserThing, 706 s, 707 properties={ 708 "id": (users.c.id, addresses.c.user_id), 709 "address_id": addresses.c.id, 710 }, 711 ) 712 sess = create_session() 713 u10 = sess.query(UserThing).get((10, None)) 714 eq_(u10, UserThing(id=10)) 715 716 def test_no_criterion(self): 717 """test that get()/load() does not use preexisting filter/etc. 718 criterion""" 719 720 User, Address = self.classes.User, self.classes.Address 721 722 s = create_session() 723 724 q = s.query(User).join("addresses").filter(Address.user_id == 8) 725 assert_raises(sa_exc.InvalidRequestError, q.get, 7) 726 assert_raises( 727 sa_exc.InvalidRequestError, 728 s.query(User).filter(User.id == 7).get, 729 19, 730 ) 731 732 # order_by()/get() doesn't raise 733 s.query(User).order_by(User.id).get(8) 734 735 def test_no_criterion_when_already_loaded(self): 736 """test that get()/load() does not use preexisting filter/etc. 737 criterion, even when we're only using the identity map.""" 738 739 User, Address = self.classes.User, self.classes.Address 740 741 s = create_session() 742 743 s.query(User).get(7) 744 745 q = s.query(User).join("addresses").filter(Address.user_id == 8) 746 assert_raises(sa_exc.InvalidRequestError, q.get, 7) 747 748 def test_unique_param_names(self): 749 users = self.tables.users 750 751 class SomeUser(object): 752 pass 753 754 s = users.select(users.c.id != 12).alias("users") 755 m = mapper(SomeUser, s) 756 assert s.primary_key == m.primary_key 757 758 sess = create_session() 759 assert sess.query(SomeUser).get(7).name == "jack" 760 761 def test_load(self): 762 User, Address = self.classes.User, self.classes.Address 763 764 s = create_session() 765 766 assert s.query(User).populate_existing().get(19) is None 767 768 u = s.query(User).populate_existing().get(7) 769 u2 = s.query(User).populate_existing().get(7) 770 assert u is u2 771 s.expunge_all() 772 u2 = s.query(User).populate_existing().get(7) 773 assert u is not u2 774 775 u2.name = "some name" 776 a = Address(email_address="some other name") 777 u2.addresses.append(a) 778 assert u2 in s.dirty 779 assert a in u2.addresses 780 781 s.query(User).populate_existing().get(7) 782 assert u2 not in s.dirty 783 assert u2.name == "jack" 784 assert a not in u2.addresses 785 786 @testing.provide_metadata 787 @testing.requires.unicode_connections 788 def test_unicode(self): 789 """test that Query.get properly sets up the type for the bind 790 parameter. using unicode would normally fail on postgresql, mysql and 791 oracle unless it is converted to an encoded string""" 792 793 metadata = self.metadata 794 table = Table( 795 "unicode_data", 796 metadata, 797 Column("id", Unicode(40), primary_key=True), 798 Column("data", Unicode(40)), 799 ) 800 metadata.create_all() 801 ustring = util.b("petit voix m\xe2\x80\x99a").decode("utf-8") 802 803 table.insert().execute(id=ustring, data=ustring) 804 805 class LocalFoo(self.classes.Base): 806 pass 807 808 mapper(LocalFoo, table) 809 eq_( 810 create_session().query(LocalFoo).get(ustring), 811 LocalFoo(id=ustring, data=ustring), 812 ) 813 814 def test_populate_existing(self): 815 User, Address = self.classes.User, self.classes.Address 816 817 s = create_session() 818 819 userlist = s.query(User).all() 820 821 u = userlist[0] 822 u.name = "foo" 823 a = Address(name="ed") 824 u.addresses.append(a) 825 826 self.assert_(a in u.addresses) 827 828 s.query(User).populate_existing().all() 829 830 self.assert_(u not in s.dirty) 831 832 self.assert_(u.name == "jack") 833 834 self.assert_(a not in u.addresses) 835 836 u.addresses[0].email_address = "lala" 837 u.orders[1].items[2].description = "item 12" 838 # test that lazy load doesn't change child items 839 s.query(User).populate_existing().all() 840 assert u.addresses[0].email_address == "lala" 841 assert u.orders[1].items[2].description == "item 12" 842 843 # eager load does 844 s.query(User).options( 845 joinedload("addresses"), joinedload_all("orders.items") 846 ).populate_existing().all() 847 assert u.addresses[0].email_address == "jack@bean.com" 848 assert u.orders[1].items[2].description == "item 5" 849 850 851class InvalidGenerationsTest(QueryTest, AssertsCompiledSQL): 852 def test_no_limit_offset(self): 853 User = self.classes.User 854 855 s = create_session() 856 857 for q in ( 858 s.query(User).limit(2), 859 s.query(User).offset(2), 860 s.query(User).limit(2).offset(2), 861 ): 862 assert_raises(sa_exc.InvalidRequestError, q.join, "addresses") 863 864 assert_raises( 865 sa_exc.InvalidRequestError, q.filter, User.name == "ed" 866 ) 867 868 assert_raises(sa_exc.InvalidRequestError, q.filter_by, name="ed") 869 870 assert_raises(sa_exc.InvalidRequestError, q.order_by, "foo") 871 872 assert_raises(sa_exc.InvalidRequestError, q.group_by, "foo") 873 874 assert_raises(sa_exc.InvalidRequestError, q.having, "foo") 875 876 q.enable_assertions(False).join("addresses") 877 q.enable_assertions(False).filter(User.name == "ed") 878 q.enable_assertions(False).order_by("foo") 879 q.enable_assertions(False).group_by("foo") 880 881 def test_no_from(self): 882 users, User = self.tables.users, self.classes.User 883 884 s = create_session() 885 886 q = s.query(User).select_from(users) 887 assert_raises(sa_exc.InvalidRequestError, q.select_from, users) 888 889 q = s.query(User).join("addresses") 890 assert_raises(sa_exc.InvalidRequestError, q.select_from, users) 891 892 q = s.query(User).order_by(User.id) 893 assert_raises(sa_exc.InvalidRequestError, q.select_from, users) 894 895 assert_raises(sa_exc.InvalidRequestError, q.select_from, users) 896 897 q.enable_assertions(False).select_from(users) 898 899 # this is fine, however 900 q.from_self() 901 902 def test_invalid_select_from(self): 903 User = self.classes.User 904 905 s = create_session() 906 q = s.query(User) 907 assert_raises(sa_exc.ArgumentError, q.select_from, User.id == 5) 908 assert_raises(sa_exc.ArgumentError, q.select_from, User.id) 909 910 def test_invalid_from_statement(self): 911 User, addresses, users = ( 912 self.classes.User, 913 self.tables.addresses, 914 self.tables.users, 915 ) 916 917 s = create_session() 918 q = s.query(User) 919 assert_raises(sa_exc.ArgumentError, q.from_statement, User.id == 5) 920 assert_raises( 921 sa_exc.ArgumentError, q.from_statement, users.join(addresses) 922 ) 923 924 def test_invalid_column(self): 925 User = self.classes.User 926 927 s = create_session() 928 q = s.query(User) 929 assert_raises(sa_exc.InvalidRequestError, q.add_column, object()) 930 931 def test_invalid_column_tuple(self): 932 User = self.classes.User 933 934 s = create_session() 935 q = s.query(User) 936 assert_raises(sa_exc.InvalidRequestError, q.add_column, (1, 1)) 937 938 def test_distinct(self): 939 """test that a distinct() call is not valid before 'clauseelement' 940 conditions.""" 941 942 User = self.classes.User 943 944 s = create_session() 945 q = s.query(User).distinct() 946 assert_raises(sa_exc.InvalidRequestError, q.select_from, User) 947 assert_raises( 948 sa_exc.InvalidRequestError, 949 q.from_statement, 950 text("select * from table"), 951 ) 952 assert_raises(sa_exc.InvalidRequestError, q.with_polymorphic, User) 953 954 def test_order_by(self): 955 """test that an order_by() call is not valid before 'clauseelement' 956 conditions.""" 957 958 User = self.classes.User 959 960 s = create_session() 961 q = s.query(User).order_by(User.id) 962 assert_raises(sa_exc.InvalidRequestError, q.select_from, User) 963 assert_raises( 964 sa_exc.InvalidRequestError, 965 q.from_statement, 966 text("select * from table"), 967 ) 968 assert_raises(sa_exc.InvalidRequestError, q.with_polymorphic, User) 969 970 def test_only_full_mapper_zero(self): 971 User, Address = self.classes.User, self.classes.Address 972 973 s = create_session() 974 975 q = s.query(User, Address) 976 assert_raises(sa_exc.InvalidRequestError, q.get, 5) 977 978 def test_entity_or_mapper_zero(self): 979 User, Address = self.classes.User, self.classes.Address 980 s = create_session() 981 982 q = s.query(User, Address) 983 is_(q._mapper_zero(), inspect(User)) 984 is_(q._entity_zero(), inspect(User)) 985 986 u1 = aliased(User) 987 q = s.query(u1, Address) 988 is_(q._mapper_zero(), inspect(User)) 989 is_(q._entity_zero(), inspect(u1)) 990 991 q = s.query(User).select_from(Address) 992 is_(q._mapper_zero(), inspect(User)) 993 is_(q._entity_zero(), inspect(Address)) 994 995 q = s.query(User.name, Address) 996 is_(q._mapper_zero(), inspect(User)) 997 is_(q._entity_zero(), inspect(User)) 998 999 q = s.query(u1.name, Address) 1000 is_(q._mapper_zero(), inspect(User)) 1001 is_(q._entity_zero(), inspect(u1)) 1002 1003 q1 = s.query(User).exists() 1004 q = s.query(q1) 1005 is_(q._mapper_zero(), None) 1006 is_(q._entity_zero(), None) 1007 1008 q1 = s.query(Bundle("b1", User.id, User.name)) 1009 is_(q1._mapper_zero(), inspect(User)) 1010 is_(q1._entity_zero(), inspect(User)) 1011 1012 def test_from_statement(self): 1013 User = self.classes.User 1014 1015 s = create_session() 1016 1017 for meth, arg, kw in [ 1018 (Query.filter, (User.id == 5,), {}), 1019 (Query.filter_by, (), {"id": 5}), 1020 (Query.limit, (5,), {}), 1021 (Query.group_by, (User.name,), {}), 1022 (Query.order_by, (User.name,), {}), 1023 ]: 1024 q = s.query(User) 1025 q = meth(q, *arg, **kw) 1026 assert_raises( 1027 sa_exc.InvalidRequestError, q.from_statement, text("x") 1028 ) 1029 1030 q = s.query(User) 1031 q = q.from_statement(text("x")) 1032 assert_raises(sa_exc.InvalidRequestError, meth, q, *arg, **kw) 1033 1034 def test_illegal_coercions(self): 1035 User = self.classes.User 1036 1037 assert_raises_message( 1038 sa_exc.ArgumentError, 1039 "Object .*User.* is not legal as a SQL literal value", 1040 distinct, 1041 User, 1042 ) 1043 1044 ua = aliased(User) 1045 assert_raises_message( 1046 sa_exc.ArgumentError, 1047 "Object .*User.* is not legal as a SQL literal value", 1048 distinct, 1049 ua, 1050 ) 1051 1052 s = Session() 1053 assert_raises_message( 1054 sa_exc.ArgumentError, 1055 "Object .*User.* is not legal as a SQL literal value", 1056 lambda: s.query(User).filter(User.name == User), 1057 ) 1058 1059 u1 = User() 1060 assert_raises_message( 1061 sa_exc.ArgumentError, 1062 "Object .*User.* is not legal as a SQL literal value", 1063 distinct, 1064 u1, 1065 ) 1066 1067 assert_raises_message( 1068 sa_exc.ArgumentError, 1069 "Object .*User.* is not legal as a SQL literal value", 1070 lambda: s.query(User).filter(User.name == u1), 1071 ) 1072 1073 1074class OperatorTest(QueryTest, AssertsCompiledSQL): 1075 """test sql.Comparator implementation for MapperProperties""" 1076 1077 __dialect__ = "default" 1078 1079 def _test(self, clause, expected, entity=None, checkparams=None): 1080 dialect = default.DefaultDialect() 1081 if entity is not None: 1082 # specify a lead entity, so that when we are testing 1083 # correlation, the correlation actually happens 1084 sess = Session() 1085 lead = sess.query(entity) 1086 context = lead._compile_context() 1087 context.statement.use_labels = True 1088 lead = context.statement.compile(dialect=dialect) 1089 expected = (str(lead) + " WHERE " + expected).replace("\n", "") 1090 clause = sess.query(entity).filter(clause) 1091 self.assert_compile(clause, expected, checkparams=checkparams) 1092 1093 def _test_filter_aliases( 1094 self, clause, expected, from_, onclause, checkparams=None 1095 ): 1096 dialect = default.DefaultDialect() 1097 sess = Session() 1098 lead = sess.query(from_).join(onclause, aliased=True) 1099 full = lead.filter(clause) 1100 context = lead._compile_context() 1101 context.statement.use_labels = True 1102 lead = context.statement.compile(dialect=dialect) 1103 expected = (str(lead) + " WHERE " + expected).replace("\n", "") 1104 1105 self.assert_compile(full, expected, checkparams=checkparams) 1106 1107 def test_arithmetic(self): 1108 User = self.classes.User 1109 1110 create_session().query(User) 1111 for (py_op, sql_op) in ( 1112 (operators.add, "+"), 1113 (operators.mul, "*"), 1114 (operators.sub, "-"), 1115 (operators.truediv, "/"), 1116 (operators.div, "/"), 1117 ): 1118 for (lhs, rhs, res) in ( 1119 (5, User.id, ":id_1 %s users.id"), 1120 (5, literal(6), ":param_1 %s :param_2"), 1121 (User.id, 5, "users.id %s :id_1"), 1122 (User.id, literal("b"), "users.id %s :param_1"), 1123 (User.id, User.id, "users.id %s users.id"), 1124 (literal(5), "b", ":param_1 %s :param_2"), 1125 (literal(5), User.id, ":param_1 %s users.id"), 1126 (literal(5), literal(6), ":param_1 %s :param_2"), 1127 ): 1128 self._test(py_op(lhs, rhs), res % sql_op) 1129 1130 def test_comparison(self): 1131 User = self.classes.User 1132 1133 create_session().query(User) 1134 ualias = aliased(User) 1135 1136 for (py_op, fwd_op, rev_op) in ( 1137 (operators.lt, "<", ">"), 1138 (operators.gt, ">", "<"), 1139 (operators.eq, "=", "="), 1140 (operators.ne, "!=", "!="), 1141 (operators.le, "<=", ">="), 1142 (operators.ge, ">=", "<="), 1143 ): 1144 for (lhs, rhs, l_sql, r_sql) in ( 1145 ("a", User.id, ":id_1", "users.id"), 1146 ("a", literal("b"), ":param_2", ":param_1"), # note swap! 1147 (User.id, "b", "users.id", ":id_1"), 1148 (User.id, literal("b"), "users.id", ":param_1"), 1149 (User.id, User.id, "users.id", "users.id"), 1150 (literal("a"), "b", ":param_1", ":param_2"), 1151 (literal("a"), User.id, ":param_1", "users.id"), 1152 (literal("a"), literal("b"), ":param_1", ":param_2"), 1153 (ualias.id, literal("b"), "users_1.id", ":param_1"), 1154 (User.id, ualias.name, "users.id", "users_1.name"), 1155 (User.name, ualias.name, "users.name", "users_1.name"), 1156 (ualias.name, User.name, "users_1.name", "users.name"), 1157 ): 1158 1159 # the compiled clause should match either (e.g.): 1160 # 'a' < 'b' -or- 'b' > 'a'. 1161 compiled = str( 1162 py_op(lhs, rhs).compile(dialect=default.DefaultDialect()) 1163 ) 1164 fwd_sql = "%s %s %s" % (l_sql, fwd_op, r_sql) 1165 rev_sql = "%s %s %s" % (r_sql, rev_op, l_sql) 1166 1167 self.assert_( 1168 compiled == fwd_sql or compiled == rev_sql, 1169 "\n'" 1170 + compiled 1171 + "'\n does not match\n'" 1172 + fwd_sql 1173 + "'\n or\n'" 1174 + rev_sql 1175 + "'", 1176 ) 1177 1178 def test_o2m_compare_to_null(self): 1179 User = self.classes.User 1180 1181 self._test(User.id == None, "users.id IS NULL") # noqa 1182 self._test(User.id != None, "users.id IS NOT NULL") # noqa 1183 self._test(~(User.id == None), "users.id IS NOT NULL") # noqa 1184 self._test(~(User.id != None), "users.id IS NULL") # noqa 1185 self._test(None == User.id, "users.id IS NULL") # noqa 1186 self._test(~(None == User.id), "users.id IS NOT NULL") # noqa 1187 1188 def test_m2o_compare_to_null(self): 1189 Address = self.classes.Address 1190 self._test(Address.user == None, "addresses.user_id IS NULL") # noqa 1191 self._test( 1192 ~(Address.user == None), "addresses.user_id IS NOT NULL" # noqa 1193 ) 1194 self._test( 1195 ~(Address.user != None), "addresses.user_id IS NULL" # noqa 1196 ) 1197 self._test(None == Address.user, "addresses.user_id IS NULL") # noqa 1198 self._test( 1199 ~(None == Address.user), "addresses.user_id IS NOT NULL" # noqa 1200 ) 1201 1202 def test_o2m_compare_to_null_orm_adapt(self): 1203 User, Address = self.classes.User, self.classes.Address 1204 self._test_filter_aliases( 1205 User.id == None, # noqa 1206 "users_1.id IS NULL", 1207 Address, 1208 Address.user, 1209 ), 1210 self._test_filter_aliases( 1211 User.id != None, # noqa 1212 "users_1.id IS NOT NULL", 1213 Address, 1214 Address.user, 1215 ), 1216 self._test_filter_aliases( 1217 ~(User.id == None), # noqa 1218 "users_1.id IS NOT NULL", 1219 Address, 1220 Address.user, 1221 ), 1222 self._test_filter_aliases( 1223 ~(User.id != None), # noqa 1224 "users_1.id IS NULL", 1225 Address, 1226 Address.user, 1227 ), 1228 1229 def test_m2o_compare_to_null_orm_adapt(self): 1230 User, Address = self.classes.User, self.classes.Address 1231 self._test_filter_aliases( 1232 Address.user == None, # noqa 1233 "addresses_1.user_id IS NULL", 1234 User, 1235 User.addresses, 1236 ), 1237 self._test_filter_aliases( 1238 Address.user != None, # noqa 1239 "addresses_1.user_id IS NOT NULL", 1240 User, 1241 User.addresses, 1242 ), 1243 self._test_filter_aliases( 1244 ~(Address.user == None), # noqa 1245 "addresses_1.user_id IS NOT NULL", 1246 User, 1247 User.addresses, 1248 ), 1249 self._test_filter_aliases( 1250 ~(Address.user != None), # noqa 1251 "addresses_1.user_id IS NULL", 1252 User, 1253 User.addresses, 1254 ), 1255 1256 def test_o2m_compare_to_null_aliased(self): 1257 User = self.classes.User 1258 u1 = aliased(User) 1259 self._test(u1.id == None, "users_1.id IS NULL") # noqa 1260 self._test(u1.id != None, "users_1.id IS NOT NULL") # noqa 1261 self._test(~(u1.id == None), "users_1.id IS NOT NULL") # noqa 1262 self._test(~(u1.id != None), "users_1.id IS NULL") # noqa 1263 1264 def test_m2o_compare_to_null_aliased(self): 1265 Address = self.classes.Address 1266 a1 = aliased(Address) 1267 self._test(a1.user == None, "addresses_1.user_id IS NULL") # noqa 1268 self._test( 1269 ~(a1.user == None), "addresses_1.user_id IS NOT NULL" # noqa 1270 ) 1271 self._test(a1.user != None, "addresses_1.user_id IS NOT NULL") # noqa 1272 self._test(~(a1.user != None), "addresses_1.user_id IS NULL") # noqa 1273 1274 def test_relationship_unimplemented(self): 1275 User = self.classes.User 1276 for op in [ 1277 User.addresses.like, 1278 User.addresses.ilike, 1279 User.addresses.__le__, 1280 User.addresses.__gt__, 1281 ]: 1282 assert_raises(NotImplementedError, op, "x") 1283 1284 def test_o2m_any(self): 1285 User, Address = self.classes.User, self.classes.Address 1286 self._test( 1287 User.addresses.any(Address.id == 17), 1288 "EXISTS (SELECT 1 FROM addresses " 1289 "WHERE users.id = addresses.user_id AND addresses.id = :id_1)", 1290 entity=User, 1291 ) 1292 1293 def test_o2m_any_aliased(self): 1294 User, Address = self.classes.User, self.classes.Address 1295 u1 = aliased(User) 1296 a1 = aliased(Address) 1297 self._test( 1298 u1.addresses.of_type(a1).any(a1.id == 17), 1299 "EXISTS (SELECT 1 FROM addresses AS addresses_1 " 1300 "WHERE users_1.id = addresses_1.user_id AND " 1301 "addresses_1.id = :id_1)", 1302 entity=u1, 1303 ) 1304 1305 def test_o2m_any_orm_adapt(self): 1306 User, Address = self.classes.User, self.classes.Address 1307 self._test_filter_aliases( 1308 User.addresses.any(Address.id == 17), 1309 "EXISTS (SELECT 1 FROM addresses " 1310 "WHERE users_1.id = addresses.user_id AND addresses.id = :id_1)", 1311 Address, 1312 Address.user, 1313 ) 1314 1315 def test_m2o_compare_instance(self): 1316 User, Address = self.classes.User, self.classes.Address 1317 u7 = User(id=5) 1318 attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7)) 1319 u7.id = 7 1320 1321 self._test(Address.user == u7, ":param_1 = addresses.user_id") 1322 1323 def test_m2o_compare_instance_negated(self): 1324 User, Address = self.classes.User, self.classes.Address 1325 u7 = User(id=5) 1326 attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7)) 1327 u7.id = 7 1328 1329 self._test( 1330 Address.user != u7, 1331 "addresses.user_id != :user_id_1 OR addresses.user_id IS NULL", 1332 checkparams={"user_id_1": 7}, 1333 ) 1334 1335 def test_m2o_compare_instance_orm_adapt(self): 1336 User, Address = self.classes.User, self.classes.Address 1337 u7 = User(id=5) 1338 attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7)) 1339 u7.id = 7 1340 1341 self._test_filter_aliases( 1342 Address.user == u7, 1343 ":param_1 = addresses_1.user_id", 1344 User, 1345 User.addresses, 1346 checkparams={"param_1": 7}, 1347 ) 1348 1349 def test_m2o_compare_instance_negated_warn_on_none(self): 1350 User, Address = self.classes.User, self.classes.Address 1351 1352 u7_transient = User(id=None) 1353 1354 with expect_warnings("Got None for value of column users.id; "): 1355 self._test_filter_aliases( 1356 Address.user != u7_transient, 1357 "addresses_1.user_id != :user_id_1 " 1358 "OR addresses_1.user_id IS NULL", 1359 User, 1360 User.addresses, 1361 checkparams={"user_id_1": None}, 1362 ) 1363 1364 def test_m2o_compare_instance_negated_orm_adapt(self): 1365 User, Address = self.classes.User, self.classes.Address 1366 u7 = User(id=5) 1367 attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7)) 1368 u7.id = 7 1369 1370 u7_transient = User(id=7) 1371 1372 self._test_filter_aliases( 1373 Address.user != u7, 1374 "addresses_1.user_id != :user_id_1 OR addresses_1.user_id IS NULL", 1375 User, 1376 User.addresses, 1377 checkparams={"user_id_1": 7}, 1378 ) 1379 1380 self._test_filter_aliases( 1381 ~(Address.user == u7), 1382 ":param_1 != addresses_1.user_id", 1383 User, 1384 User.addresses, 1385 checkparams={"param_1": 7}, 1386 ) 1387 1388 self._test_filter_aliases( 1389 ~(Address.user != u7), 1390 "NOT (addresses_1.user_id != :user_id_1 " 1391 "OR addresses_1.user_id IS NULL)", 1392 User, 1393 User.addresses, 1394 checkparams={"user_id_1": 7}, 1395 ) 1396 1397 self._test_filter_aliases( 1398 Address.user != u7_transient, 1399 "addresses_1.user_id != :user_id_1 OR addresses_1.user_id IS NULL", 1400 User, 1401 User.addresses, 1402 checkparams={"user_id_1": 7}, 1403 ) 1404 1405 self._test_filter_aliases( 1406 ~(Address.user == u7_transient), 1407 ":param_1 != addresses_1.user_id", 1408 User, 1409 User.addresses, 1410 checkparams={"param_1": 7}, 1411 ) 1412 1413 self._test_filter_aliases( 1414 ~(Address.user != u7_transient), 1415 "NOT (addresses_1.user_id != :user_id_1 " 1416 "OR addresses_1.user_id IS NULL)", 1417 User, 1418 User.addresses, 1419 checkparams={"user_id_1": 7}, 1420 ) 1421 1422 def test_m2o_compare_instance_aliased(self): 1423 User, Address = self.classes.User, self.classes.Address 1424 u7 = User(id=5) 1425 attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7)) 1426 u7.id = 7 1427 1428 u7_transient = User(id=7) 1429 1430 a1 = aliased(Address) 1431 self._test( 1432 a1.user == u7, 1433 ":param_1 = addresses_1.user_id", 1434 checkparams={"param_1": 7}, 1435 ) 1436 1437 self._test( 1438 a1.user != u7, 1439 "addresses_1.user_id != :user_id_1 OR addresses_1.user_id IS NULL", 1440 checkparams={"user_id_1": 7}, 1441 ) 1442 1443 a1 = aliased(Address) 1444 self._test( 1445 a1.user == u7_transient, 1446 ":param_1 = addresses_1.user_id", 1447 checkparams={"param_1": 7}, 1448 ) 1449 1450 self._test( 1451 a1.user != u7_transient, 1452 "addresses_1.user_id != :user_id_1 OR addresses_1.user_id IS NULL", 1453 checkparams={"user_id_1": 7}, 1454 ) 1455 1456 def test_selfref_relationship(self): 1457 1458 Node = self.classes.Node 1459 1460 nalias = aliased(Node) 1461 1462 # auto self-referential aliasing 1463 self._test( 1464 Node.children.any(Node.data == "n1"), 1465 "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE " 1466 "nodes.id = nodes_1.parent_id AND nodes_1.data = :data_1)", 1467 entity=Node, 1468 checkparams={"data_1": "n1"}, 1469 ) 1470 1471 # needs autoaliasing 1472 self._test( 1473 Node.children == None, # noqa 1474 "NOT (EXISTS (SELECT 1 FROM nodes AS nodes_1 " 1475 "WHERE nodes.id = nodes_1.parent_id))", 1476 entity=Node, 1477 checkparams={}, 1478 ) 1479 1480 self._test( 1481 Node.parent == None, # noqa 1482 "nodes.parent_id IS NULL", 1483 checkparams={}, 1484 ) 1485 1486 self._test( 1487 nalias.parent == None, # noqa 1488 "nodes_1.parent_id IS NULL", 1489 checkparams={}, 1490 ) 1491 1492 self._test( 1493 nalias.parent != None, # noqa 1494 "nodes_1.parent_id IS NOT NULL", 1495 checkparams={}, 1496 ) 1497 1498 self._test( 1499 nalias.children == None, # noqa 1500 "NOT (EXISTS (" 1501 "SELECT 1 FROM nodes WHERE nodes_1.id = nodes.parent_id))", 1502 entity=nalias, 1503 checkparams={}, 1504 ) 1505 1506 self._test( 1507 nalias.children.any(Node.data == "some data"), 1508 "EXISTS (SELECT 1 FROM nodes WHERE " 1509 "nodes_1.id = nodes.parent_id AND nodes.data = :data_1)", 1510 entity=nalias, 1511 checkparams={"data_1": "some data"}, 1512 ) 1513 1514 # this fails because self-referential any() is auto-aliasing; 1515 # the fact that we use "nalias" here means we get two aliases. 1516 # self._test( 1517 # Node.children.any(nalias.data == 'some data'), 1518 # "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE " 1519 # "nodes.id = nodes_1.parent_id AND nodes_1.data = :data_1)", 1520 # entity=Node 1521 # ) 1522 1523 self._test( 1524 nalias.parent.has(Node.data == "some data"), 1525 "EXISTS (SELECT 1 FROM nodes WHERE nodes.id = nodes_1.parent_id " 1526 "AND nodes.data = :data_1)", 1527 entity=nalias, 1528 checkparams={"data_1": "some data"}, 1529 ) 1530 1531 self._test( 1532 Node.parent.has(Node.data == "some data"), 1533 "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE " 1534 "nodes_1.id = nodes.parent_id AND nodes_1.data = :data_1)", 1535 entity=Node, 1536 checkparams={"data_1": "some data"}, 1537 ) 1538 1539 self._test( 1540 Node.parent == Node(id=7), 1541 ":param_1 = nodes.parent_id", 1542 checkparams={"param_1": 7}, 1543 ) 1544 1545 self._test( 1546 nalias.parent == Node(id=7), 1547 ":param_1 = nodes_1.parent_id", 1548 checkparams={"param_1": 7}, 1549 ) 1550 1551 self._test( 1552 nalias.parent != Node(id=7), 1553 "nodes_1.parent_id != :parent_id_1 " 1554 "OR nodes_1.parent_id IS NULL", 1555 checkparams={"parent_id_1": 7}, 1556 ) 1557 1558 self._test( 1559 nalias.parent != Node(id=7), 1560 "nodes_1.parent_id != :parent_id_1 " 1561 "OR nodes_1.parent_id IS NULL", 1562 checkparams={"parent_id_1": 7}, 1563 ) 1564 1565 self._test( 1566 nalias.children.contains(Node(id=7, parent_id=12)), 1567 "nodes_1.id = :param_1", 1568 checkparams={"param_1": 12}, 1569 ) 1570 1571 def test_multilevel_any(self): 1572 User, Address, Dingaling = ( 1573 self.classes.User, 1574 self.classes.Address, 1575 self.classes.Dingaling, 1576 ) 1577 sess = Session() 1578 1579 q = sess.query(User).filter( 1580 User.addresses.any( 1581 and_(Address.id == Dingaling.address_id, Dingaling.data == "x") 1582 ) 1583 ) 1584 # new since #2746 - correlate_except() now takes context into account 1585 # so its usage in any() is not as disrupting. 1586 self.assert_compile( 1587 q, 1588 "SELECT users.id AS users_id, users.name AS users_name " 1589 "FROM users " 1590 "WHERE EXISTS (SELECT 1 " 1591 "FROM addresses, dingalings " 1592 "WHERE users.id = addresses.user_id AND " 1593 "addresses.id = dingalings.address_id AND " 1594 "dingalings.data = :data_1)", 1595 ) 1596 1597 def test_op(self): 1598 User = self.classes.User 1599 1600 self._test(User.name.op("ilike")("17"), "users.name ilike :name_1") 1601 1602 def test_in(self): 1603 User = self.classes.User 1604 1605 self._test(User.id.in_(["a", "b"]), "users.id IN (:id_1, :id_2)") 1606 1607 def test_in_on_relationship_not_supported(self): 1608 User, Address = self.classes.User, self.classes.Address 1609 1610 assert_raises(NotImplementedError, Address.user.in_, [User(id=5)]) 1611 1612 def test_neg(self): 1613 User = self.classes.User 1614 1615 self._test(-User.id, "-users.id") 1616 self._test(User.id + -User.id, "users.id + -users.id") 1617 1618 def test_between(self): 1619 User = self.classes.User 1620 1621 self._test( 1622 User.id.between("a", "b"), "users.id BETWEEN :id_1 AND :id_2" 1623 ) 1624 1625 def test_collate(self): 1626 User = self.classes.User 1627 1628 self._test(collate(User.id, "utf8_bin"), "users.id COLLATE utf8_bin") 1629 1630 self._test(User.id.collate("utf8_bin"), "users.id COLLATE utf8_bin") 1631 1632 def test_selfref_between(self): 1633 User = self.classes.User 1634 1635 ualias = aliased(User) 1636 self._test( 1637 User.id.between(ualias.id, ualias.id), 1638 "users.id BETWEEN users_1.id AND users_1.id", 1639 ) 1640 self._test( 1641 ualias.id.between(User.id, User.id), 1642 "users_1.id BETWEEN users.id AND users.id", 1643 ) 1644 1645 def test_clauses(self): 1646 User, Address = self.classes.User, self.classes.Address 1647 1648 for (expr, compare) in ( 1649 (func.max(User.id), "max(users.id)"), 1650 (User.id.desc(), "users.id DESC"), 1651 ( 1652 between(5, User.id, Address.id), 1653 ":param_1 BETWEEN users.id AND addresses.id", 1654 ), 1655 # this one would require adding compile() to 1656 # InstrumentedScalarAttribute. do we want this ? 1657 # (User.id, "users.id") 1658 ): 1659 c = expr.compile(dialect=default.DefaultDialect()) 1660 assert str(c) == compare, "%s != %s" % (str(c), compare) 1661 1662 1663class ExpressionTest(QueryTest, AssertsCompiledSQL): 1664 __dialect__ = "default" 1665 1666 def test_deferred_instances(self): 1667 User, addresses, Address = ( 1668 self.classes.User, 1669 self.tables.addresses, 1670 self.classes.Address, 1671 ) 1672 1673 session = create_session() 1674 s = ( 1675 session.query(User) 1676 .filter( 1677 and_( 1678 addresses.c.email_address == bindparam("emailad"), 1679 Address.user_id == User.id, 1680 ) 1681 ) 1682 .statement 1683 ) 1684 1685 result = list( 1686 session.query(User).instances(s.execute(emailad="jack@bean.com")) 1687 ) 1688 eq_([User(id=7)], result) 1689 1690 def test_aliased_sql_construct(self): 1691 User, Address = self.classes.User, self.classes.Address 1692 1693 j = join(User, Address) 1694 a1 = aliased(j) 1695 self.assert_compile( 1696 a1.select(), 1697 "SELECT anon_1.users_id, anon_1.users_name, anon_1.addresses_id, " 1698 "anon_1.addresses_user_id, anon_1.addresses_email_address " 1699 "FROM (SELECT users.id AS users_id, users.name AS users_name, " 1700 "addresses.id AS addresses_id, addresses.user_id AS " 1701 "addresses_user_id, addresses.email_address AS " 1702 "addresses_email_address FROM users JOIN addresses " 1703 "ON users.id = addresses.user_id) AS anon_1", 1704 ) 1705 1706 def test_aliased_sql_construct_raises_adapt_on_names(self): 1707 User, Address = self.classes.User, self.classes.Address 1708 1709 j = join(User, Address) 1710 assert_raises_message( 1711 sa_exc.ArgumentError, 1712 "adapt_on_names only applies to ORM elements", 1713 aliased, 1714 j, 1715 adapt_on_names=True, 1716 ) 1717 1718 def test_scalar_subquery_compile_whereclause(self): 1719 User = self.classes.User 1720 Address = self.classes.Address 1721 1722 session = create_session() 1723 1724 q = session.query(User.id).filter(User.id == 7) 1725 1726 q = session.query(Address).filter(Address.user_id == q) 1727 assert isinstance(q._criterion.right, expression.ColumnElement) 1728 self.assert_compile( 1729 q, 1730 "SELECT addresses.id AS addresses_id, addresses.user_id " 1731 "AS addresses_user_id, addresses.email_address AS " 1732 "addresses_email_address FROM addresses WHERE " 1733 "addresses.user_id = (SELECT users.id AS users_id " 1734 "FROM users WHERE users.id = :id_1)", 1735 ) 1736 1737 def test_subquery_no_eagerloads(self): 1738 User = self.classes.User 1739 s = Session() 1740 1741 self.assert_compile( 1742 s.query(User).options(joinedload(User.addresses)).subquery(), 1743 "SELECT users.id, users.name FROM users", 1744 ) 1745 1746 def test_exists_no_eagerloads(self): 1747 User = self.classes.User 1748 s = Session() 1749 1750 self.assert_compile( 1751 s.query( 1752 s.query(User).options(joinedload(User.addresses)).exists() 1753 ), 1754 "SELECT EXISTS (SELECT 1 FROM users) AS anon_1", 1755 ) 1756 1757 def test_named_subquery(self): 1758 User = self.classes.User 1759 1760 session = create_session() 1761 a1 = session.query(User.id).filter(User.id == 7).subquery("foo1") 1762 a2 = session.query(User.id).filter(User.id == 7).subquery(name="foo2") 1763 a3 = session.query(User.id).filter(User.id == 7).subquery() 1764 1765 eq_(a1.name, "foo1") 1766 eq_(a2.name, "foo2") 1767 eq_(a3.name, "%%(%d anon)s" % id(a3)) 1768 1769 def test_labeled_subquery(self): 1770 User = self.classes.User 1771 1772 session = create_session() 1773 a1 = ( 1774 session.query(User.id) 1775 .filter(User.id == 7) 1776 .subquery(with_labels=True) 1777 ) 1778 assert a1.c.users_id is not None 1779 1780 def test_reduced_subquery(self): 1781 User = self.classes.User 1782 ua = aliased(User) 1783 1784 session = create_session() 1785 a1 = ( 1786 session.query(User.id, ua.id, ua.name) 1787 .filter(User.id == ua.id) 1788 .subquery(reduce_columns=True) 1789 ) 1790 self.assert_compile( 1791 a1, 1792 "SELECT users.id, users_1.name FROM " 1793 "users, users AS users_1 " 1794 "WHERE users.id = users_1.id", 1795 ) 1796 1797 def test_label(self): 1798 User = self.classes.User 1799 1800 session = create_session() 1801 1802 q = session.query(User.id).filter(User.id == 7).label("foo") 1803 self.assert_compile( 1804 session.query(q), 1805 "SELECT (SELECT users.id FROM users " 1806 "WHERE users.id = :id_1) AS foo", 1807 ) 1808 1809 def test_as_scalar(self): 1810 User = self.classes.User 1811 1812 session = create_session() 1813 1814 q = session.query(User.id).filter(User.id == 7).as_scalar() 1815 1816 self.assert_compile( 1817 session.query(User).filter(User.id.in_(q)), 1818 "SELECT users.id AS users_id, users.name " 1819 "AS users_name FROM users WHERE users.id " 1820 "IN (SELECT users.id FROM users WHERE " 1821 "users.id = :id_1)", 1822 ) 1823 1824 def test_param_transfer(self): 1825 User = self.classes.User 1826 1827 session = create_session() 1828 1829 q = ( 1830 session.query(User.id) 1831 .filter(User.id == bindparam("foo")) 1832 .params(foo=7) 1833 .subquery() 1834 ) 1835 1836 q = session.query(User).filter(User.id.in_(q)) 1837 1838 eq_(User(id=7), q.one()) 1839 1840 def test_in(self): 1841 User, Address = self.classes.User, self.classes.Address 1842 1843 session = create_session() 1844 s = ( 1845 session.query(User.id) 1846 .join(User.addresses) 1847 .group_by(User.id) 1848 .having(func.count(Address.id) > 2) 1849 ) 1850 eq_(session.query(User).filter(User.id.in_(s)).all(), [User(id=8)]) 1851 1852 def test_union(self): 1853 User = self.classes.User 1854 1855 s = create_session() 1856 1857 q1 = s.query(User).filter(User.name == "ed").with_labels() 1858 q2 = s.query(User).filter(User.name == "fred").with_labels() 1859 eq_( 1860 s.query(User) 1861 .from_statement(union(q1, q2).order_by("users_name")) 1862 .all(), 1863 [User(name="ed"), User(name="fred")], 1864 ) 1865 1866 def test_select(self): 1867 User = self.classes.User 1868 1869 s = create_session() 1870 1871 # this is actually not legal on most DBs since the subquery has no 1872 # alias 1873 q1 = s.query(User).filter(User.name == "ed") 1874 1875 self.assert_compile( 1876 select([q1]), 1877 "SELECT users_id, users_name FROM (SELECT users.id AS users_id, " 1878 "users.name AS users_name FROM users WHERE users.name = :name_1)", 1879 ) 1880 1881 def test_join(self): 1882 User, Address = self.classes.User, self.classes.Address 1883 1884 s = create_session() 1885 1886 # TODO: do we want aliased() to detect a query and convert to 1887 # subquery() automatically ? 1888 q1 = s.query(Address).filter(Address.email_address == "jack@bean.com") 1889 adalias = aliased(Address, q1.subquery()) 1890 eq_( 1891 s.query(User, adalias) 1892 .join(adalias, User.id == adalias.user_id) 1893 .all(), 1894 [ 1895 ( 1896 User(id=7, name="jack"), 1897 Address(email_address="jack@bean.com", user_id=7, id=1), 1898 ) 1899 ], 1900 ) 1901 1902 def test_group_by_plain(self): 1903 User = self.classes.User 1904 s = create_session() 1905 1906 q1 = s.query(User.id, User.name).group_by(User.name) 1907 self.assert_compile( 1908 select([q1]), 1909 "SELECT users_id, users_name FROM (SELECT users.id AS users_id, " 1910 "users.name AS users_name FROM users GROUP BY users.name)", 1911 ) 1912 1913 def test_group_by_append(self): 1914 User = self.classes.User 1915 s = create_session() 1916 1917 q1 = s.query(User.id, User.name).group_by(User.name) 1918 1919 # test append something to group_by 1920 self.assert_compile( 1921 select([q1.group_by(User.id)]), 1922 "SELECT users_id, users_name FROM (SELECT users.id AS users_id, " 1923 "users.name AS users_name FROM users " 1924 "GROUP BY users.name, users.id)", 1925 ) 1926 1927 def test_group_by_cancellation(self): 1928 User = self.classes.User 1929 s = create_session() 1930 1931 q1 = s.query(User.id, User.name).group_by(User.name) 1932 # test cancellation by using None, replacement with something else 1933 self.assert_compile( 1934 select([q1.group_by(None).group_by(User.id)]), 1935 "SELECT users_id, users_name FROM (SELECT users.id AS users_id, " 1936 "users.name AS users_name FROM users GROUP BY users.id)", 1937 ) 1938 1939 # test cancellation by using None, replacement with nothing 1940 self.assert_compile( 1941 select([q1.group_by(None)]), 1942 "SELECT users_id, users_name FROM (SELECT users.id AS users_id, " 1943 "users.name AS users_name FROM users)", 1944 ) 1945 1946 def test_group_by_cancelled_still_present(self): 1947 User = self.classes.User 1948 s = create_session() 1949 1950 q1 = s.query(User.id, User.name).group_by(User.name).group_by(None) 1951 1952 q1._no_criterion_assertion("foo") 1953 1954 def test_order_by_plain(self): 1955 User = self.classes.User 1956 s = create_session() 1957 1958 q1 = s.query(User.id, User.name).order_by(User.name) 1959 self.assert_compile( 1960 select([q1]), 1961 "SELECT users_id, users_name FROM (SELECT users.id AS users_id, " 1962 "users.name AS users_name FROM users ORDER BY users.name)", 1963 ) 1964 1965 def test_order_by_append(self): 1966 User = self.classes.User 1967 s = create_session() 1968 1969 q1 = s.query(User.id, User.name).order_by(User.name) 1970 1971 # test append something to order_by 1972 self.assert_compile( 1973 select([q1.order_by(User.id)]), 1974 "SELECT users_id, users_name FROM (SELECT users.id AS users_id, " 1975 "users.name AS users_name FROM users " 1976 "ORDER BY users.name, users.id)", 1977 ) 1978 1979 def test_order_by_cancellation(self): 1980 User = self.classes.User 1981 s = create_session() 1982 1983 q1 = s.query(User.id, User.name).order_by(User.name) 1984 # test cancellation by using None, replacement with something else 1985 self.assert_compile( 1986 select([q1.order_by(None).order_by(User.id)]), 1987 "SELECT users_id, users_name FROM (SELECT users.id AS users_id, " 1988 "users.name AS users_name FROM users ORDER BY users.id)", 1989 ) 1990 1991 # test cancellation by using None, replacement with nothing 1992 self.assert_compile( 1993 select([q1.order_by(None)]), 1994 "SELECT users_id, users_name FROM (SELECT users.id AS users_id, " 1995 "users.name AS users_name FROM users)", 1996 ) 1997 1998 def test_order_by_cancellation_false(self): 1999 User = self.classes.User 2000 s = create_session() 2001 2002 q1 = s.query(User.id, User.name).order_by(User.name) 2003 # test cancellation by using None, replacement with something else 2004 self.assert_compile( 2005 select([q1.order_by(False).order_by(User.id)]), 2006 "SELECT users_id, users_name FROM (SELECT users.id AS users_id, " 2007 "users.name AS users_name FROM users ORDER BY users.id)", 2008 ) 2009 2010 # test cancellation by using None, replacement with nothing 2011 self.assert_compile( 2012 select([q1.order_by(False)]), 2013 "SELECT users_id, users_name FROM (SELECT users.id AS users_id, " 2014 "users.name AS users_name FROM users)", 2015 ) 2016 2017 def test_order_by_cancelled_allows_assertions(self): 2018 User = self.classes.User 2019 s = create_session() 2020 2021 q1 = s.query(User.id, User.name).order_by(User.name).order_by(None) 2022 2023 q1._no_criterion_assertion("foo") 2024 2025 def test_legacy_order_by_cancelled_allows_assertions(self): 2026 User = self.classes.User 2027 s = create_session() 2028 2029 q1 = s.query(User.id, User.name).order_by(User.name).order_by(False) 2030 2031 q1._no_criterion_assertion("foo") 2032 2033 2034class ColumnPropertyTest(_fixtures.FixtureTest, AssertsCompiledSQL): 2035 __dialect__ = "default" 2036 run_setup_mappers = "each" 2037 2038 def _fixture(self, label=True, polymorphic=False): 2039 User, Address = self.classes("User", "Address") 2040 users, addresses = self.tables("users", "addresses") 2041 stmt = ( 2042 select([func.max(addresses.c.email_address)]) 2043 .where(addresses.c.user_id == users.c.id) 2044 .correlate(users) 2045 ) 2046 if label: 2047 stmt = stmt.label("email_ad") 2048 2049 mapper( 2050 User, 2051 users, 2052 properties={"ead": column_property(stmt)}, 2053 with_polymorphic="*" if polymorphic else None, 2054 ) 2055 mapper(Address, addresses) 2056 2057 def _func_fixture(self, label=False): 2058 User = self.classes.User 2059 users = self.tables.users 2060 2061 if label: 2062 mapper( 2063 User, 2064 users, 2065 properties={ 2066 "foobar": column_property( 2067 func.foob(users.c.name).label(None) 2068 ) 2069 }, 2070 ) 2071 else: 2072 mapper( 2073 User, 2074 users, 2075 properties={ 2076 "foobar": column_property(func.foob(users.c.name)) 2077 }, 2078 ) 2079 2080 def test_anon_label_function_auto(self): 2081 self._func_fixture() 2082 User = self.classes.User 2083 2084 s = Session() 2085 2086 u1 = aliased(User) 2087 self.assert_compile( 2088 s.query(User.foobar, u1.foobar), 2089 "SELECT foob(users.name) AS foob_1, foob(users_1.name) AS foob_2 " 2090 "FROM users, users AS users_1", 2091 ) 2092 2093 def test_anon_label_function_manual(self): 2094 self._func_fixture(label=True) 2095 User = self.classes.User 2096 2097 s = Session() 2098 2099 u1 = aliased(User) 2100 self.assert_compile( 2101 s.query(User.foobar, u1.foobar), 2102 "SELECT foob(users.name) AS foob_1, foob(users_1.name) AS foob_2 " 2103 "FROM users, users AS users_1", 2104 ) 2105 2106 def test_anon_label_ad_hoc_labeling(self): 2107 self._func_fixture() 2108 User = self.classes.User 2109 2110 s = Session() 2111 2112 u1 = aliased(User) 2113 self.assert_compile( 2114 s.query(User.foobar.label("x"), u1.foobar.label("y")), 2115 "SELECT foob(users.name) AS x, foob(users_1.name) AS y " 2116 "FROM users, users AS users_1", 2117 ) 2118 2119 def test_order_by_column_prop_string(self): 2120 User, Address = self.classes("User", "Address") 2121 self._fixture(label=True) 2122 2123 s = Session() 2124 q = s.query(User).order_by("email_ad") 2125 self.assert_compile( 2126 q, 2127 "SELECT (SELECT max(addresses.email_address) AS max_1 " 2128 "FROM addresses " 2129 "WHERE addresses.user_id = users.id) AS email_ad, " 2130 "users.id AS users_id, users.name AS users_name " 2131 "FROM users ORDER BY email_ad", 2132 ) 2133 2134 def test_order_by_column_prop_aliased_string(self): 2135 User, Address = self.classes("User", "Address") 2136 self._fixture(label=True) 2137 2138 s = Session() 2139 ua = aliased(User) 2140 q = s.query(ua).order_by("email_ad") 2141 2142 def go(): 2143 self.assert_compile( 2144 q, 2145 "SELECT (SELECT max(addresses.email_address) AS max_1 " 2146 "FROM addresses WHERE addresses.user_id = users_1.id) " 2147 "AS anon_1, users_1.id AS users_1_id, " 2148 "users_1.name AS users_1_name FROM users AS users_1 " 2149 "ORDER BY email_ad", 2150 ) 2151 2152 assert_warnings( 2153 go, ["Can't resolve label reference 'email_ad'"], regex=True 2154 ) 2155 2156 def test_order_by_column_labeled_prop_attr_aliased_one(self): 2157 User = self.classes.User 2158 self._fixture(label=True) 2159 2160 ua = aliased(User) 2161 s = Session() 2162 q = s.query(ua).order_by(ua.ead) 2163 self.assert_compile( 2164 q, 2165 "SELECT (SELECT max(addresses.email_address) AS max_1 " 2166 "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, " 2167 "users_1.id AS users_1_id, users_1.name AS users_1_name " 2168 "FROM users AS users_1 ORDER BY anon_1", 2169 ) 2170 2171 def test_order_by_column_labeled_prop_attr_aliased_two(self): 2172 User = self.classes.User 2173 self._fixture(label=True) 2174 2175 ua = aliased(User) 2176 s = Session() 2177 q = s.query(ua.ead).order_by(ua.ead) 2178 self.assert_compile( 2179 q, 2180 "SELECT (SELECT max(addresses.email_address) AS max_1 " 2181 "FROM addresses, " 2182 "users AS users_1 WHERE addresses.user_id = users_1.id) " 2183 "AS anon_1 ORDER BY anon_1", 2184 ) 2185 2186 # we're also testing that the state of "ua" is OK after the 2187 # previous call, so the batching into one test is intentional 2188 q = s.query(ua).order_by(ua.ead) 2189 self.assert_compile( 2190 q, 2191 "SELECT (SELECT max(addresses.email_address) AS max_1 " 2192 "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, " 2193 "users_1.id AS users_1_id, users_1.name AS users_1_name " 2194 "FROM users AS users_1 ORDER BY anon_1", 2195 ) 2196 2197 def test_order_by_column_labeled_prop_attr_aliased_three(self): 2198 User = self.classes.User 2199 self._fixture(label=True) 2200 2201 ua = aliased(User) 2202 s = Session() 2203 q = s.query(User.ead, ua.ead).order_by(User.ead, ua.ead) 2204 self.assert_compile( 2205 q, 2206 "SELECT (SELECT max(addresses.email_address) AS max_1 " 2207 "FROM addresses, users WHERE addresses.user_id = users.id) " 2208 "AS email_ad, (SELECT max(addresses.email_address) AS max_1 " 2209 "FROM addresses, users AS users_1 WHERE addresses.user_id = " 2210 "users_1.id) AS anon_1 ORDER BY email_ad, anon_1", 2211 ) 2212 2213 q = s.query(User, ua).order_by(User.ead, ua.ead) 2214 self.assert_compile( 2215 q, 2216 "SELECT (SELECT max(addresses.email_address) AS max_1 " 2217 "FROM addresses WHERE addresses.user_id = users.id) AS " 2218 "email_ad, users.id AS users_id, users.name AS users_name, " 2219 "(SELECT max(addresses.email_address) AS max_1 FROM addresses " 2220 "WHERE addresses.user_id = users_1.id) AS anon_1, users_1.id " 2221 "AS users_1_id, users_1.name AS users_1_name FROM users, " 2222 "users AS users_1 ORDER BY email_ad, anon_1", 2223 ) 2224 2225 def test_order_by_column_labeled_prop_attr_aliased_four(self): 2226 User = self.classes.User 2227 self._fixture(label=True, polymorphic=True) 2228 2229 ua = aliased(User) 2230 s = Session() 2231 q = s.query(ua, User.id).order_by(ua.ead) 2232 self.assert_compile( 2233 q, 2234 "SELECT (SELECT max(addresses.email_address) AS max_1 FROM " 2235 "addresses WHERE addresses.user_id = users_1.id) AS anon_1, " 2236 "users_1.id AS users_1_id, users_1.name AS users_1_name, " 2237 "users.id AS users_id FROM users AS users_1, " 2238 "users ORDER BY anon_1", 2239 ) 2240 2241 def test_order_by_column_unlabeled_prop_attr_aliased_one(self): 2242 User = self.classes.User 2243 self._fixture(label=False) 2244 2245 ua = aliased(User) 2246 s = Session() 2247 q = s.query(ua).order_by(ua.ead) 2248 self.assert_compile( 2249 q, 2250 "SELECT (SELECT max(addresses.email_address) AS max_1 " 2251 "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, " 2252 "users_1.id AS users_1_id, users_1.name AS users_1_name " 2253 "FROM users AS users_1 ORDER BY anon_1", 2254 ) 2255 2256 def test_order_by_column_unlabeled_prop_attr_aliased_two(self): 2257 User = self.classes.User 2258 self._fixture(label=False) 2259 2260 ua = aliased(User) 2261 s = Session() 2262 q = s.query(ua.ead).order_by(ua.ead) 2263 self.assert_compile( 2264 q, 2265 "SELECT (SELECT max(addresses.email_address) AS max_1 " 2266 "FROM addresses, " 2267 "users AS users_1 WHERE addresses.user_id = users_1.id) " 2268 "AS anon_1 ORDER BY anon_1", 2269 ) 2270 2271 # we're also testing that the state of "ua" is OK after the 2272 # previous call, so the batching into one test is intentional 2273 q = s.query(ua).order_by(ua.ead) 2274 self.assert_compile( 2275 q, 2276 "SELECT (SELECT max(addresses.email_address) AS max_1 " 2277 "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, " 2278 "users_1.id AS users_1_id, users_1.name AS users_1_name " 2279 "FROM users AS users_1 ORDER BY anon_1", 2280 ) 2281 2282 def test_order_by_column_unlabeled_prop_attr_aliased_three(self): 2283 User = self.classes.User 2284 self._fixture(label=False) 2285 2286 ua = aliased(User) 2287 s = Session() 2288 q = s.query(User.ead, ua.ead).order_by(User.ead, ua.ead) 2289 self.assert_compile( 2290 q, 2291 "SELECT (SELECT max(addresses.email_address) AS max_1 " 2292 "FROM addresses, users WHERE addresses.user_id = users.id) " 2293 "AS anon_1, (SELECT max(addresses.email_address) AS max_1 " 2294 "FROM addresses, users AS users_1 " 2295 "WHERE addresses.user_id = users_1.id) AS anon_2 " 2296 "ORDER BY anon_1, anon_2", 2297 ) 2298 2299 q = s.query(User, ua).order_by(User.ead, ua.ead) 2300 self.assert_compile( 2301 q, 2302 "SELECT (SELECT max(addresses.email_address) AS max_1 " 2303 "FROM addresses WHERE addresses.user_id = users.id) AS " 2304 "anon_1, users.id AS users_id, users.name AS users_name, " 2305 "(SELECT max(addresses.email_address) AS max_1 FROM addresses " 2306 "WHERE addresses.user_id = users_1.id) AS anon_2, users_1.id " 2307 "AS users_1_id, users_1.name AS users_1_name FROM users, " 2308 "users AS users_1 ORDER BY anon_1, anon_2", 2309 ) 2310 2311 def test_order_by_column_prop_attr(self): 2312 User, Address = self.classes("User", "Address") 2313 self._fixture(label=True) 2314 2315 s = Session() 2316 q = s.query(User).order_by(User.ead) 2317 # this one is a bit of a surprise; this is compiler 2318 # label-order-by logic kicking in, but won't work in more 2319 # complex cases. 2320 self.assert_compile( 2321 q, 2322 "SELECT (SELECT max(addresses.email_address) AS max_1 " 2323 "FROM addresses " 2324 "WHERE addresses.user_id = users.id) AS email_ad, " 2325 "users.id AS users_id, users.name AS users_name " 2326 "FROM users ORDER BY email_ad", 2327 ) 2328 2329 def test_order_by_column_prop_attr_non_present(self): 2330 User, Address = self.classes("User", "Address") 2331 self._fixture(label=True) 2332 2333 s = Session() 2334 q = s.query(User).options(defer(User.ead)).order_by(User.ead) 2335 self.assert_compile( 2336 q, 2337 "SELECT users.id AS users_id, users.name AS users_name " 2338 "FROM users ORDER BY " 2339 "(SELECT max(addresses.email_address) AS max_1 " 2340 "FROM addresses " 2341 "WHERE addresses.user_id = users.id)", 2342 ) 2343 2344 2345class ComparatorTest(QueryTest): 2346 def test_clause_element_query_resolve(self): 2347 from sqlalchemy.orm.properties import ColumnProperty 2348 2349 User = self.classes.User 2350 2351 class Comparator(ColumnProperty.Comparator): 2352 def __init__(self, expr): 2353 self.expr = expr 2354 2355 def __clause_element__(self): 2356 return self.expr 2357 2358 sess = Session() 2359 eq_( 2360 sess.query(Comparator(User.id)) 2361 .order_by(Comparator(User.id)) 2362 .all(), 2363 [(7,), (8,), (9,), (10,)], 2364 ) 2365 2366 2367# more slice tests are available in test/orm/generative.py 2368class SliceTest(QueryTest): 2369 def test_first(self): 2370 User = self.classes.User 2371 2372 assert User(id=7) == create_session().query(User).first() 2373 2374 assert ( 2375 create_session().query(User).filter(User.id == 27).first() is None 2376 ) 2377 2378 def test_limit_offset_applies(self): 2379 """Test that the expected LIMIT/OFFSET is applied for slices. 2380 2381 The LIMIT/OFFSET syntax differs slightly on all databases, and 2382 query[x:y] executes immediately, so we are asserting against 2383 SQL strings using sqlite's syntax. 2384 2385 """ 2386 2387 User = self.classes.User 2388 2389 sess = create_session() 2390 q = sess.query(User).order_by(User.id) 2391 2392 self.assert_sql( 2393 testing.db, 2394 lambda: q[10:20], 2395 [ 2396 ( 2397 "SELECT users.id AS users_id, users.name " 2398 "AS users_name FROM users ORDER BY users.id " 2399 "LIMIT :param_1 OFFSET :param_2", 2400 {"param_1": 10, "param_2": 10}, 2401 ) 2402 ], 2403 ) 2404 2405 self.assert_sql( 2406 testing.db, 2407 lambda: q[:20], 2408 [ 2409 ( 2410 "SELECT users.id AS users_id, users.name " 2411 "AS users_name FROM users ORDER BY users.id " 2412 "LIMIT :param_1", 2413 {"param_1": 20}, 2414 ) 2415 ], 2416 ) 2417 2418 self.assert_sql( 2419 testing.db, 2420 lambda: q[5:], 2421 [ 2422 ( 2423 "SELECT users.id AS users_id, users.name " 2424 "AS users_name FROM users ORDER BY users.id " 2425 "LIMIT -1 OFFSET :param_1", 2426 {"param_1": 5}, 2427 ) 2428 ], 2429 ) 2430 2431 self.assert_sql(testing.db, lambda: q[2:2], []) 2432 2433 self.assert_sql(testing.db, lambda: q[-2:-5], []) 2434 2435 self.assert_sql( 2436 testing.db, 2437 lambda: q[-5:-2], 2438 [ 2439 ( 2440 "SELECT users.id AS users_id, users.name AS users_name " 2441 "FROM users ORDER BY users.id", 2442 {}, 2443 ) 2444 ], 2445 ) 2446 2447 self.assert_sql( 2448 testing.db, 2449 lambda: q[-5:], 2450 [ 2451 ( 2452 "SELECT users.id AS users_id, users.name AS users_name " 2453 "FROM users ORDER BY users.id", 2454 {}, 2455 ) 2456 ], 2457 ) 2458 2459 self.assert_sql( 2460 testing.db, 2461 lambda: q[:], 2462 [ 2463 ( 2464 "SELECT users.id AS users_id, users.name AS users_name " 2465 "FROM users ORDER BY users.id", 2466 {}, 2467 ) 2468 ], 2469 ) 2470 2471 2472class FilterTest(QueryTest, AssertsCompiledSQL): 2473 __dialect__ = "default" 2474 2475 def test_basic(self): 2476 User = self.classes.User 2477 2478 users = create_session().query(User).all() 2479 eq_([User(id=7), User(id=8), User(id=9), User(id=10)], users) 2480 2481 @testing.requires.offset 2482 def test_limit_offset(self): 2483 User = self.classes.User 2484 2485 sess = create_session() 2486 2487 assert [User(id=8), User(id=9)] == sess.query(User).order_by( 2488 User.id 2489 ).limit(2).offset(1).all() 2490 2491 assert [User(id=8), User(id=9)] == list( 2492 sess.query(User).order_by(User.id)[1:3] 2493 ) 2494 2495 assert User(id=8) == sess.query(User).order_by(User.id)[1] 2496 2497 assert [] == sess.query(User).order_by(User.id)[3:3] 2498 assert [] == sess.query(User).order_by(User.id)[0:0] 2499 2500 @testing.requires.bound_limit_offset 2501 def test_select_with_bindparam_offset_limit(self): 2502 """Does a query allow bindparam for the limit?""" 2503 User = self.classes.User 2504 sess = create_session() 2505 q1 = ( 2506 sess.query(self.classes.User) 2507 .order_by(self.classes.User.id) 2508 .limit(bindparam("n")) 2509 ) 2510 2511 for n in range(1, 4): 2512 result = q1.params(n=n).all() 2513 eq_(len(result), n) 2514 2515 eq_( 2516 sess.query(User) 2517 .order_by(User.id) 2518 .limit(bindparam("limit")) 2519 .offset(bindparam("offset")) 2520 .params(limit=2, offset=1) 2521 .all(), 2522 [User(id=8), User(id=9)], 2523 ) 2524 2525 @testing.fails_on("mysql", "doesn't like CAST in the limit clause") 2526 @testing.requires.bound_limit_offset 2527 def test_select_with_bindparam_offset_limit_w_cast(self): 2528 User = self.classes.User 2529 sess = create_session() 2530 q1 = ( 2531 sess.query(self.classes.User) 2532 .order_by(self.classes.User.id) 2533 .limit(bindparam("n")) 2534 ) 2535 eq_( 2536 list( 2537 sess.query(User) 2538 .params(a=1, b=3) 2539 .order_by(User.id)[ 2540 cast(bindparam("a"), Integer) : cast( 2541 bindparam("b"), Integer 2542 ) 2543 ] 2544 ), 2545 [User(id=8), User(id=9)], 2546 ) 2547 2548 @testing.requires.boolean_col_expressions 2549 def test_exists(self): 2550 User = self.classes.User 2551 2552 sess = create_session(testing.db) 2553 2554 assert sess.query(exists().where(User.id == 9)).scalar() 2555 assert not sess.query(exists().where(User.id == 29)).scalar() 2556 2557 def test_one_filter(self): 2558 User = self.classes.User 2559 2560 assert [User(id=8), User(id=9)] == create_session().query(User).filter( 2561 User.name.endswith("ed") 2562 ).all() 2563 2564 def test_contains(self): 2565 """test comparing a collection to an object instance.""" 2566 2567 User, Address = self.classes.User, self.classes.Address 2568 2569 sess = create_session() 2570 address = sess.query(Address).get(3) 2571 assert [User(id=8)] == sess.query(User).filter( 2572 User.addresses.contains(address) 2573 ).all() 2574 2575 try: 2576 sess.query(User).filter(User.addresses == address) 2577 assert False 2578 except sa_exc.InvalidRequestError: 2579 assert True 2580 2581 assert [User(id=10)] == sess.query(User).filter( 2582 User.addresses == None 2583 ).all() # noqa 2584 2585 try: 2586 assert [User(id=7), User(id=9), User(id=10)] == sess.query( 2587 User 2588 ).filter(User.addresses != address).all() 2589 assert False 2590 except sa_exc.InvalidRequestError: 2591 assert True 2592 2593 # assert [User(id=7), User(id=9), User(id=10)] == 2594 # sess.query(User).filter(User.addresses!=address).all() 2595 2596 def test_clause_element_ok(self): 2597 User = self.classes.User 2598 s = Session() 2599 self.assert_compile( 2600 s.query(User).filter(User.addresses), 2601 "SELECT users.id AS users_id, users.name AS users_name " 2602 "FROM users, addresses WHERE users.id = addresses.user_id", 2603 ) 2604 2605 def test_unique_binds_join_cond(self): 2606 """test that binds used when the lazyclause is used in criterion are 2607 unique""" 2608 2609 User, Address = self.classes.User, self.classes.Address 2610 sess = Session() 2611 a1, a2 = sess.query(Address).order_by(Address.id)[0:2] 2612 self.assert_compile( 2613 sess.query(User) 2614 .filter(User.addresses.contains(a1)) 2615 .union(sess.query(User).filter(User.addresses.contains(a2))), 2616 "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS " 2617 "anon_1_users_name FROM (SELECT users.id AS users_id, " 2618 "users.name AS users_name FROM users WHERE users.id = :param_1 " 2619 "UNION SELECT users.id AS users_id, users.name AS users_name " 2620 "FROM users WHERE users.id = :param_2) AS anon_1", 2621 checkparams={"param_1": 7, "param_2": 8}, 2622 ) 2623 2624 def test_any(self): 2625 # see also HasAnyTest, a newer suite which tests these at the level of 2626 # SQL compilation 2627 User, Address = self.classes.User, self.classes.Address 2628 2629 sess = create_session() 2630 2631 assert [User(id=8), User(id=9)] == sess.query(User).filter( 2632 User.addresses.any(Address.email_address.like("%ed%")) 2633 ).all() 2634 2635 assert [User(id=8)] == sess.query(User).filter( 2636 User.addresses.any(Address.email_address.like("%ed%"), id=4) 2637 ).all() 2638 2639 assert [User(id=8)] == sess.query(User).filter( 2640 User.addresses.any(Address.email_address.like("%ed%")) 2641 ).filter(User.addresses.any(id=4)).all() 2642 2643 assert [User(id=9)] == sess.query(User).filter( 2644 User.addresses.any(email_address="fred@fred.com") 2645 ).all() 2646 2647 # test that the contents are not adapted by the aliased join 2648 assert ( 2649 [User(id=7), User(id=8)] 2650 == sess.query(User) 2651 .join("addresses", aliased=True) 2652 .filter( 2653 ~User.addresses.any(Address.email_address == "fred@fred.com") 2654 ) 2655 .all() 2656 ) 2657 2658 assert [User(id=10)] == sess.query(User).outerjoin( 2659 "addresses", aliased=True 2660 ).filter(~User.addresses.any()).all() 2661 2662 def test_any_doesnt_overcorrelate(self): 2663 # see also HasAnyTest, a newer suite which tests these at the level of 2664 # SQL compilation 2665 User, Address = self.classes.User, self.classes.Address 2666 2667 sess = create_session() 2668 2669 # test that any() doesn't overcorrelate 2670 assert ( 2671 [User(id=7), User(id=8)] 2672 == sess.query(User) 2673 .join("addresses") 2674 .filter( 2675 ~User.addresses.any(Address.email_address == "fred@fred.com") 2676 ) 2677 .all() 2678 ) 2679 2680 def test_has(self): 2681 # see also HasAnyTest, a newer suite which tests these at the level of 2682 # SQL compilation 2683 Dingaling, User, Address = ( 2684 self.classes.Dingaling, 2685 self.classes.User, 2686 self.classes.Address, 2687 ) 2688 2689 sess = create_session() 2690 assert [Address(id=5)] == sess.query(Address).filter( 2691 Address.user.has(name="fred") 2692 ).all() 2693 2694 assert ( 2695 [Address(id=2), Address(id=3), Address(id=4), Address(id=5)] 2696 == sess.query(Address) 2697 .filter(Address.user.has(User.name.like("%ed%"))) 2698 .order_by(Address.id) 2699 .all() 2700 ) 2701 2702 assert ( 2703 [Address(id=2), Address(id=3), Address(id=4)] 2704 == sess.query(Address) 2705 .filter(Address.user.has(User.name.like("%ed%"), id=8)) 2706 .order_by(Address.id) 2707 .all() 2708 ) 2709 2710 # test has() doesn't overcorrelate 2711 assert ( 2712 [Address(id=2), Address(id=3), Address(id=4)] 2713 == sess.query(Address) 2714 .join("user") 2715 .filter(Address.user.has(User.name.like("%ed%"), id=8)) 2716 .order_by(Address.id) 2717 .all() 2718 ) 2719 2720 # test has() doesn't get subquery contents adapted by aliased join 2721 assert ( 2722 [Address(id=2), Address(id=3), Address(id=4)] 2723 == sess.query(Address) 2724 .join("user", aliased=True) 2725 .filter(Address.user.has(User.name.like("%ed%"), id=8)) 2726 .order_by(Address.id) 2727 .all() 2728 ) 2729 2730 dingaling = sess.query(Dingaling).get(2) 2731 assert [User(id=9)] == sess.query(User).filter( 2732 User.addresses.any(Address.dingaling == dingaling) 2733 ).all() 2734 2735 def test_contains_m2m(self): 2736 Item, Order = self.classes.Item, self.classes.Order 2737 2738 sess = create_session() 2739 item = sess.query(Item).get(3) 2740 2741 eq_( 2742 sess.query(Order) 2743 .filter(Order.items.contains(item)) 2744 .order_by(Order.id) 2745 .all(), 2746 [Order(id=1), Order(id=2), Order(id=3)], 2747 ) 2748 eq_( 2749 sess.query(Order) 2750 .filter(~Order.items.contains(item)) 2751 .order_by(Order.id) 2752 .all(), 2753 [Order(id=4), Order(id=5)], 2754 ) 2755 2756 item2 = sess.query(Item).get(5) 2757 eq_( 2758 sess.query(Order) 2759 .filter(Order.items.contains(item)) 2760 .filter(Order.items.contains(item2)) 2761 .all(), 2762 [Order(id=3)], 2763 ) 2764 2765 def test_comparison(self): 2766 """test scalar comparison to an object instance""" 2767 2768 Item, Order, Dingaling, User, Address = ( 2769 self.classes.Item, 2770 self.classes.Order, 2771 self.classes.Dingaling, 2772 self.classes.User, 2773 self.classes.Address, 2774 ) 2775 2776 sess = create_session() 2777 user = sess.query(User).get(8) 2778 assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query( 2779 Address 2780 ).filter(Address.user == user).all() 2781 2782 assert [Address(id=1), Address(id=5)] == sess.query(Address).filter( 2783 Address.user != user 2784 ).all() 2785 2786 # generates an IS NULL 2787 assert ( 2788 [] == sess.query(Address).filter(Address.user == None).all() 2789 ) # noqa 2790 assert [] == sess.query(Address).filter(Address.user == null()).all() 2791 2792 assert [Order(id=5)] == sess.query(Order).filter( 2793 Order.address == None 2794 ).all() # noqa 2795 2796 # o2o 2797 dingaling = sess.query(Dingaling).get(2) 2798 assert [Address(id=5)] == sess.query(Address).filter( 2799 Address.dingaling == dingaling 2800 ).all() 2801 2802 # m2m 2803 eq_( 2804 sess.query(Item) 2805 .filter(Item.keywords == None) 2806 .order_by(Item.id) # noqa 2807 .all(), 2808 [Item(id=4), Item(id=5)], 2809 ) 2810 eq_( 2811 sess.query(Item) 2812 .filter(Item.keywords != None) 2813 .order_by(Item.id) # noqa 2814 .all(), 2815 [Item(id=1), Item(id=2), Item(id=3)], 2816 ) 2817 2818 def test_filter_by(self): 2819 User, Address = self.classes.User, self.classes.Address 2820 2821 sess = create_session() 2822 user = sess.query(User).get(8) 2823 assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query( 2824 Address 2825 ).filter_by(user=user).all() 2826 2827 # many to one generates IS NULL 2828 assert [] == sess.query(Address).filter_by(user=None).all() 2829 assert [] == sess.query(Address).filter_by(user=null()).all() 2830 2831 # one to many generates WHERE NOT EXISTS 2832 assert [User(name="chuck")] == sess.query(User).filter_by( 2833 addresses=None 2834 ).all() 2835 assert [User(name="chuck")] == sess.query(User).filter_by( 2836 addresses=null() 2837 ).all() 2838 2839 def test_filter_by_tables(self): 2840 users = self.tables.users 2841 addresses = self.tables.addresses 2842 sess = create_session() 2843 self.assert_compile( 2844 sess.query(users) 2845 .filter_by(name="ed") 2846 .join(addresses, users.c.id == addresses.c.user_id) 2847 .filter_by(email_address="ed@ed.com"), 2848 "SELECT users.id AS users_id, users.name AS users_name " 2849 "FROM users JOIN addresses ON users.id = addresses.user_id " 2850 "WHERE users.name = :name_1 AND " 2851 "addresses.email_address = :email_address_1", 2852 checkparams={"email_address_1": "ed@ed.com", "name_1": "ed"}, 2853 ) 2854 2855 def test_filter_by_no_property(self): 2856 addresses = self.tables.addresses 2857 sess = create_session() 2858 assert_raises_message( 2859 sa.exc.InvalidRequestError, 2860 "Entity 'addresses' has no property 'name'", 2861 sess.query(addresses).filter_by, 2862 name="ed", 2863 ) 2864 2865 def test_none_comparison(self): 2866 Order, User, Address = ( 2867 self.classes.Order, 2868 self.classes.User, 2869 self.classes.Address, 2870 ) 2871 2872 sess = create_session() 2873 2874 # scalar 2875 eq_( 2876 [Order(description="order 5")], 2877 sess.query(Order).filter(Order.address_id == None).all(), # noqa 2878 ) 2879 eq_( 2880 [Order(description="order 5")], 2881 sess.query(Order).filter(Order.address_id == null()).all(), 2882 ) 2883 2884 # o2o 2885 eq_( 2886 [Address(id=1), Address(id=3), Address(id=4)], 2887 sess.query(Address) 2888 .filter(Address.dingaling == None) 2889 .order_by(Address.id) # noqa 2890 .all(), 2891 ) 2892 eq_( 2893 [Address(id=1), Address(id=3), Address(id=4)], 2894 sess.query(Address) 2895 .filter(Address.dingaling == null()) 2896 .order_by(Address.id) 2897 .all(), 2898 ) 2899 eq_( 2900 [Address(id=2), Address(id=5)], 2901 sess.query(Address) 2902 .filter(Address.dingaling != None) 2903 .order_by(Address.id) # noqa 2904 .all(), 2905 ) 2906 eq_( 2907 [Address(id=2), Address(id=5)], 2908 sess.query(Address) 2909 .filter(Address.dingaling != null()) 2910 .order_by(Address.id) 2911 .all(), 2912 ) 2913 2914 # m2o 2915 eq_( 2916 [Order(id=5)], 2917 sess.query(Order).filter(Order.address == None).all(), 2918 ) # noqa 2919 eq_( 2920 [Order(id=1), Order(id=2), Order(id=3), Order(id=4)], 2921 sess.query(Order) 2922 .order_by(Order.id) 2923 .filter(Order.address != None) 2924 .all(), 2925 ) # noqa 2926 2927 # o2m 2928 eq_( 2929 [User(id=10)], 2930 sess.query(User).filter(User.addresses == None).all(), 2931 ) # noqa 2932 eq_( 2933 [User(id=7), User(id=8), User(id=9)], 2934 sess.query(User) 2935 .filter(User.addresses != None) 2936 .order_by(User.id) # noqa 2937 .all(), 2938 ) 2939 2940 def test_blank_filter_by(self): 2941 User = self.classes.User 2942 2943 eq_( 2944 [(7,), (8,), (9,), (10,)], 2945 create_session() 2946 .query(User.id) 2947 .filter_by() 2948 .order_by(User.id) 2949 .all(), 2950 ) 2951 eq_( 2952 [(7,), (8,), (9,), (10,)], 2953 create_session() 2954 .query(User.id) 2955 .filter_by(**{}) 2956 .order_by(User.id) 2957 .all(), 2958 ) 2959 2960 def test_text_coerce(self): 2961 User = self.classes.User 2962 s = create_session() 2963 self.assert_compile( 2964 s.query(User).filter(text("name='ed'")), 2965 "SELECT users.id AS users_id, users.name " 2966 "AS users_name FROM users WHERE name='ed'", 2967 ) 2968 2969 2970class HasAnyTest(fixtures.DeclarativeMappedTest, AssertsCompiledSQL): 2971 __dialect__ = "default" 2972 2973 @classmethod 2974 def setup_classes(cls): 2975 Base = cls.DeclarativeBasic 2976 2977 class D(Base): 2978 __tablename__ = "d" 2979 id = Column(Integer, primary_key=True) 2980 2981 class C(Base): 2982 __tablename__ = "c" 2983 id = Column(Integer, primary_key=True) 2984 d_id = Column(ForeignKey(D.id)) 2985 2986 bs = relationship("B", back_populates="c") 2987 2988 b_d = Table( 2989 "b_d", 2990 Base.metadata, 2991 Column("bid", ForeignKey("b.id")), 2992 Column("did", ForeignKey("d.id")), 2993 ) 2994 2995 # note we are using the ForeignKey pattern identified as a bug 2996 # in [ticket:4367] 2997 class B(Base): 2998 __tablename__ = "b" 2999 id = Column(Integer, primary_key=True) 3000 c_id = Column(ForeignKey(C.id)) 3001 3002 c = relationship("C", back_populates="bs") 3003 3004 d = relationship("D", secondary=b_d) 3005 3006 class A(Base): 3007 __tablename__ = "a" 3008 id = Column(Integer, primary_key=True) 3009 b_id = Column(ForeignKey(B.id)) 3010 3011 d = relationship( 3012 "D", 3013 secondary="join(B, C)", 3014 primaryjoin="A.b_id == B.id", 3015 secondaryjoin="C.d_id == D.id", 3016 uselist=False, 3017 ) 3018 3019 def test_has_composite_secondary(self): 3020 A, D = self.classes("A", "D") 3021 s = Session() 3022 self.assert_compile( 3023 s.query(A).filter(A.d.has(D.id == 1)), 3024 "SELECT a.id AS a_id, a.b_id AS a_b_id FROM a WHERE EXISTS " 3025 "(SELECT 1 FROM d, b JOIN c ON c.id = b.c_id " 3026 "WHERE a.b_id = b.id AND c.d_id = d.id AND d.id = :id_1)", 3027 ) 3028 3029 def test_has_many_to_one(self): 3030 B, C = self.classes("B", "C") 3031 s = Session() 3032 self.assert_compile( 3033 s.query(B).filter(B.c.has(C.id == 1)), 3034 "SELECT b.id AS b_id, b.c_id AS b_c_id FROM b WHERE " 3035 "EXISTS (SELECT 1 FROM c WHERE c.id = b.c_id AND c.id = :id_1)", 3036 ) 3037 3038 def test_any_many_to_many(self): 3039 B, D = self.classes("B", "D") 3040 s = Session() 3041 self.assert_compile( 3042 s.query(B).filter(B.d.any(D.id == 1)), 3043 "SELECT b.id AS b_id, b.c_id AS b_c_id FROM b WHERE " 3044 "EXISTS (SELECT 1 FROM b_d, d WHERE b.id = b_d.bid " 3045 "AND d.id = b_d.did AND d.id = :id_1)", 3046 ) 3047 3048 def test_any_one_to_many(self): 3049 B, C = self.classes("B", "C") 3050 s = Session() 3051 self.assert_compile( 3052 s.query(C).filter(C.bs.any(B.id == 1)), 3053 "SELECT c.id AS c_id, c.d_id AS c_d_id FROM c WHERE " 3054 "EXISTS (SELECT 1 FROM b WHERE c.id = b.c_id AND b.id = :id_1)", 3055 ) 3056 3057 def test_any_many_to_many_doesnt_overcorrelate(self): 3058 B, D = self.classes("B", "D") 3059 s = Session() 3060 3061 self.assert_compile( 3062 s.query(B).join(B.d).filter(B.d.any(D.id == 1)), 3063 "SELECT b.id AS b_id, b.c_id AS b_c_id FROM " 3064 "b JOIN b_d AS b_d_1 ON b.id = b_d_1.bid " 3065 "JOIN d ON d.id = b_d_1.did WHERE " 3066 "EXISTS (SELECT 1 FROM b_d, d WHERE b.id = b_d.bid " 3067 "AND d.id = b_d.did AND d.id = :id_1)", 3068 ) 3069 3070 def test_has_doesnt_overcorrelate(self): 3071 B, C = self.classes("B", "C") 3072 s = Session() 3073 3074 self.assert_compile( 3075 s.query(B).join(B.c).filter(B.c.has(C.id == 1)), 3076 "SELECT b.id AS b_id, b.c_id AS b_c_id " 3077 "FROM b JOIN c ON c.id = b.c_id " 3078 "WHERE EXISTS " 3079 "(SELECT 1 FROM c WHERE c.id = b.c_id AND c.id = :id_1)", 3080 ) 3081 3082 def test_has_doesnt_get_aliased_join_subq(self): 3083 B, C = self.classes("B", "C") 3084 s = Session() 3085 3086 self.assert_compile( 3087 s.query(B).join(B.c, aliased=True).filter(B.c.has(C.id == 1)), 3088 "SELECT b.id AS b_id, b.c_id AS b_c_id " 3089 "FROM b JOIN c AS c_1 ON c_1.id = b.c_id " 3090 "WHERE EXISTS " 3091 "(SELECT 1 FROM c WHERE c.id = b.c_id AND c.id = :id_1)", 3092 ) 3093 3094 def test_any_many_to_many_doesnt_get_aliased_join_subq(self): 3095 B, D = self.classes("B", "D") 3096 s = Session() 3097 3098 self.assert_compile( 3099 s.query(B).join(B.d, aliased=True).filter(B.d.any(D.id == 1)), 3100 "SELECT b.id AS b_id, b.c_id AS b_c_id " 3101 "FROM b JOIN b_d AS b_d_1 ON b.id = b_d_1.bid " 3102 "JOIN d AS d_1 ON d_1.id = b_d_1.did " 3103 "WHERE EXISTS " 3104 "(SELECT 1 FROM b_d, d WHERE b.id = b_d.bid " 3105 "AND d.id = b_d.did AND d.id = :id_1)", 3106 ) 3107 3108 3109class HasMapperEntitiesTest(QueryTest): 3110 def test_entity(self): 3111 User = self.classes.User 3112 s = Session() 3113 3114 q = s.query(User) 3115 3116 assert q._has_mapper_entities 3117 3118 def test_cols(self): 3119 User = self.classes.User 3120 s = Session() 3121 3122 q = s.query(User.id) 3123 3124 assert not q._has_mapper_entities 3125 3126 def test_cols_set_entities(self): 3127 User = self.classes.User 3128 s = Session() 3129 3130 q = s.query(User.id) 3131 3132 q._set_entities(User) 3133 assert q._has_mapper_entities 3134 3135 def test_entity_set_entities(self): 3136 User = self.classes.User 3137 s = Session() 3138 3139 q = s.query(User) 3140 3141 q._set_entities(User.id) 3142 assert not q._has_mapper_entities 3143 3144 3145class SetOpsTest(QueryTest, AssertsCompiledSQL): 3146 __dialect__ = "default" 3147 3148 def test_union(self): 3149 User = self.classes.User 3150 3151 s = create_session() 3152 3153 fred = s.query(User).filter(User.name == "fred") 3154 ed = s.query(User).filter(User.name == "ed") 3155 jack = s.query(User).filter(User.name == "jack") 3156 3157 eq_( 3158 fred.union(ed).order_by(User.name).all(), 3159 [User(name="ed"), User(name="fred")], 3160 ) 3161 3162 eq_( 3163 fred.union(ed, jack).order_by(User.name).all(), 3164 [User(name="ed"), User(name="fred"), User(name="jack")], 3165 ) 3166 3167 def test_statement_labels(self): 3168 """test that label conflicts don't occur with joins etc.""" 3169 3170 User, Address = self.classes.User, self.classes.Address 3171 3172 s = create_session() 3173 q1 = ( 3174 s.query(User, Address) 3175 .join(User.addresses) 3176 .filter(Address.email_address == "ed@wood.com") 3177 ) 3178 q2 = ( 3179 s.query(User, Address) 3180 .join(User.addresses) 3181 .filter(Address.email_address == "jack@bean.com") 3182 ) 3183 q3 = q1.union(q2).order_by(User.name) 3184 3185 eq_( 3186 q3.all(), 3187 [ 3188 (User(name="ed"), Address(email_address="ed@wood.com")), 3189 (User(name="jack"), Address(email_address="jack@bean.com")), 3190 ], 3191 ) 3192 3193 def test_union_literal_expressions_compile(self): 3194 """test that column expressions translate during 3195 the _from_statement() portion of union(), others""" 3196 3197 User = self.classes.User 3198 3199 s = Session() 3200 q1 = s.query(User, literal("x")) 3201 q2 = s.query(User, literal_column("'y'")) 3202 q3 = q1.union(q2) 3203 3204 self.assert_compile( 3205 q3, 3206 "SELECT anon_1.users_id AS anon_1_users_id, " 3207 "anon_1.users_name AS anon_1_users_name, " 3208 "anon_1.param_1 AS anon_1_param_1 " 3209 "FROM (SELECT users.id AS users_id, users.name AS " 3210 "users_name, :param_1 AS param_1 " 3211 "FROM users UNION SELECT users.id AS users_id, " 3212 "users.name AS users_name, 'y' FROM users) AS anon_1", 3213 ) 3214 3215 def test_union_literal_expressions_results(self): 3216 User = self.classes.User 3217 3218 s = Session() 3219 3220 q1 = s.query(User, literal("x")) 3221 q2 = s.query(User, literal_column("'y'")) 3222 q3 = q1.union(q2) 3223 3224 q4 = s.query(User, literal_column("'x'").label("foo")) 3225 q5 = s.query(User, literal("y")) 3226 q6 = q4.union(q5) 3227 3228 eq_([x["name"] for x in q6.column_descriptions], ["User", "foo"]) 3229 3230 for q in ( 3231 q3.order_by(User.id, text("anon_1_param_1")), 3232 q6.order_by(User.id, "foo"), 3233 ): 3234 eq_( 3235 q.all(), 3236 [ 3237 (User(id=7, name="jack"), "x"), 3238 (User(id=7, name="jack"), "y"), 3239 (User(id=8, name="ed"), "x"), 3240 (User(id=8, name="ed"), "y"), 3241 (User(id=9, name="fred"), "x"), 3242 (User(id=9, name="fred"), "y"), 3243 (User(id=10, name="chuck"), "x"), 3244 (User(id=10, name="chuck"), "y"), 3245 ], 3246 ) 3247 3248 def test_union_labeled_anonymous_columns(self): 3249 User = self.classes.User 3250 3251 s = Session() 3252 3253 c1, c2 = column("c1"), column("c2") 3254 q1 = s.query(User, c1.label("foo"), c1.label("bar")) 3255 q2 = s.query(User, c1.label("foo"), c2.label("bar")) 3256 q3 = q1.union(q2) 3257 3258 eq_( 3259 [x["name"] for x in q3.column_descriptions], ["User", "foo", "bar"] 3260 ) 3261 3262 self.assert_compile( 3263 q3, 3264 "SELECT anon_1.users_id AS anon_1_users_id, " 3265 "anon_1.users_name AS anon_1_users_name, " 3266 "anon_1.foo AS anon_1_foo, anon_1.bar AS anon_1_bar " 3267 "FROM (SELECT users.id AS users_id, users.name AS users_name, " 3268 "c1 AS foo, c1 AS bar FROM users UNION SELECT users.id AS " 3269 "users_id, users.name AS users_name, c1 AS foo, c2 AS bar " 3270 "FROM users) AS anon_1", 3271 ) 3272 3273 def test_order_by_anonymous_col(self): 3274 User = self.classes.User 3275 3276 s = Session() 3277 3278 c1, c2 = column("c1"), column("c2") 3279 f = c1.label("foo") 3280 q1 = s.query(User, f, c2.label("bar")) 3281 q2 = s.query(User, c1.label("foo"), c2.label("bar")) 3282 q3 = q1.union(q2) 3283 3284 self.assert_compile( 3285 q3.order_by(c1), 3286 "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS " 3287 "anon_1_users_name, anon_1.foo AS anon_1_foo, anon_1.bar AS " 3288 "anon_1_bar FROM (SELECT users.id AS users_id, users.name AS " 3289 "users_name, c1 AS foo, c2 AS bar " 3290 "FROM users UNION SELECT users.id " 3291 "AS users_id, users.name AS users_name, c1 AS foo, c2 AS bar " 3292 "FROM users) AS anon_1 ORDER BY anon_1.foo", 3293 ) 3294 3295 self.assert_compile( 3296 q3.order_by(f), 3297 "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS " 3298 "anon_1_users_name, anon_1.foo AS anon_1_foo, anon_1.bar AS " 3299 "anon_1_bar FROM (SELECT users.id AS users_id, users.name AS " 3300 "users_name, c1 AS foo, c2 AS bar " 3301 "FROM users UNION SELECT users.id " 3302 "AS users_id, users.name AS users_name, c1 AS foo, c2 AS bar " 3303 "FROM users) AS anon_1 ORDER BY anon_1.foo", 3304 ) 3305 3306 def test_union_mapped_colnames_preserved_across_subquery(self): 3307 User = self.classes.User 3308 3309 s = Session() 3310 q1 = s.query(User.name) 3311 q2 = s.query(User.name) 3312 3313 # the label names in the subquery are the typical anonymized ones 3314 self.assert_compile( 3315 q1.union(q2), 3316 "SELECT anon_1.users_name AS anon_1_users_name " 3317 "FROM (SELECT users.name AS users_name FROM users " 3318 "UNION SELECT users.name AS users_name FROM users) AS anon_1", 3319 ) 3320 3321 # but in the returned named tuples, 3322 # due to [ticket:1942], this should be 'name', not 'users_name' 3323 eq_([x["name"] for x in q1.union(q2).column_descriptions], ["name"]) 3324 3325 @testing.requires.intersect 3326 def test_intersect(self): 3327 User = self.classes.User 3328 3329 s = create_session() 3330 3331 fred = s.query(User).filter(User.name == "fred") 3332 ed = s.query(User).filter(User.name == "ed") 3333 jack = s.query(User).filter(User.name == "jack") 3334 eq_(fred.intersect(ed, jack).all(), []) 3335 3336 eq_(fred.union(ed).intersect(ed.union(jack)).all(), [User(name="ed")]) 3337 3338 def test_eager_load(self): 3339 User, Address = self.classes.User, self.classes.Address 3340 3341 s = create_session() 3342 3343 fred = s.query(User).filter(User.name == "fred") 3344 ed = s.query(User).filter(User.name == "ed") 3345 3346 def go(): 3347 eq_( 3348 fred.union(ed) 3349 .order_by(User.name) 3350 .options(joinedload(User.addresses)) 3351 .all(), 3352 [ 3353 User( 3354 name="ed", addresses=[Address(), Address(), Address()] 3355 ), 3356 User(name="fred", addresses=[Address()]), 3357 ], 3358 ) 3359 3360 self.assert_sql_count(testing.db, go, 1) 3361 3362 3363class AggregateTest(QueryTest): 3364 def test_sum(self): 3365 Order = self.classes.Order 3366 3367 sess = create_session() 3368 orders = sess.query(Order).filter(Order.id.in_([2, 3, 4])) 3369 eq_( 3370 next(orders.values(func.sum(Order.user_id * Order.address_id))), 3371 (79,), 3372 ) 3373 eq_(orders.value(func.sum(Order.user_id * Order.address_id)), 79) 3374 3375 def test_apply(self): 3376 Order = self.classes.Order 3377 3378 sess = create_session() 3379 assert sess.query(func.sum(Order.user_id * Order.address_id)).filter( 3380 Order.id.in_([2, 3, 4]) 3381 ).one() == (79,) 3382 3383 def test_having(self): 3384 User, Address = self.classes.User, self.classes.Address 3385 3386 sess = create_session() 3387 assert ( 3388 [User(name="ed", id=8)] 3389 == sess.query(User) 3390 .order_by(User.id) 3391 .group_by(User) 3392 .join("addresses") 3393 .having(func.count(Address.id) > 2) 3394 .all() 3395 ) 3396 3397 assert ( 3398 [User(name="jack", id=7), User(name="fred", id=9)] 3399 == sess.query(User) 3400 .order_by(User.id) 3401 .group_by(User) 3402 .join("addresses") 3403 .having(func.count(Address.id) < 2) 3404 .all() 3405 ) 3406 3407 3408class ExistsTest(QueryTest, AssertsCompiledSQL): 3409 __dialect__ = "default" 3410 3411 def test_exists(self): 3412 User = self.classes.User 3413 sess = create_session() 3414 3415 q1 = sess.query(User) 3416 self.assert_compile( 3417 sess.query(q1.exists()), 3418 "SELECT EXISTS (" "SELECT 1 FROM users" ") AS anon_1", 3419 ) 3420 3421 q2 = sess.query(User).filter(User.name == "fred") 3422 self.assert_compile( 3423 sess.query(q2.exists()), 3424 "SELECT EXISTS (" 3425 "SELECT 1 FROM users WHERE users.name = :name_1" 3426 ") AS anon_1", 3427 ) 3428 3429 def test_exists_col_warning(self): 3430 User = self.classes.User 3431 Address = self.classes.Address 3432 sess = create_session() 3433 3434 q1 = sess.query(User, Address).filter(User.id == Address.user_id) 3435 self.assert_compile( 3436 sess.query(q1.exists()), 3437 "SELECT EXISTS (" 3438 "SELECT 1 FROM users, addresses " 3439 "WHERE users.id = addresses.user_id" 3440 ") AS anon_1", 3441 ) 3442 3443 def test_exists_w_select_from(self): 3444 User = self.classes.User 3445 sess = create_session() 3446 3447 q1 = sess.query().select_from(User).exists() 3448 self.assert_compile( 3449 sess.query(q1), "SELECT EXISTS (SELECT 1 FROM users) AS anon_1" 3450 ) 3451 3452 3453class CountTest(QueryTest): 3454 def test_basic(self): 3455 users, User = self.tables.users, self.classes.User 3456 3457 s = create_session() 3458 3459 eq_(s.query(User).count(), 4) 3460 3461 eq_(s.query(User).filter(users.c.name.endswith("ed")).count(), 2) 3462 3463 def test_count_char(self): 3464 User = self.classes.User 3465 s = create_session() 3466 # '*' is favored here as the most common character, 3467 # it is reported that Informix doesn't like count(1), 3468 # rumors about Oracle preferring count(1) don't appear 3469 # to be well founded. 3470 self.assert_sql_execution( 3471 testing.db, 3472 s.query(User).count, 3473 CompiledSQL( 3474 "SELECT count(*) AS count_1 FROM " 3475 "(SELECT users.id AS users_id, users.name " 3476 "AS users_name FROM users) AS anon_1", 3477 {}, 3478 ), 3479 ) 3480 3481 def test_multiple_entity(self): 3482 User, Address = self.classes.User, self.classes.Address 3483 3484 s = create_session() 3485 q = s.query(User, Address) 3486 eq_(q.count(), 20) # cartesian product 3487 3488 q = s.query(User, Address).join(User.addresses) 3489 eq_(q.count(), 5) 3490 3491 def test_nested(self): 3492 User, Address = self.classes.User, self.classes.Address 3493 3494 s = create_session() 3495 q = s.query(User, Address).limit(2) 3496 eq_(q.count(), 2) 3497 3498 q = s.query(User, Address).limit(100) 3499 eq_(q.count(), 20) 3500 3501 q = s.query(User, Address).join(User.addresses).limit(100) 3502 eq_(q.count(), 5) 3503 3504 def test_cols(self): 3505 """test that column-based queries always nest.""" 3506 3507 User, Address = self.classes.User, self.classes.Address 3508 3509 s = create_session() 3510 3511 q = s.query(func.count(distinct(User.name))) 3512 eq_(q.count(), 1) 3513 3514 q = s.query(func.count(distinct(User.name))).distinct() 3515 eq_(q.count(), 1) 3516 3517 q = s.query(User.name) 3518 eq_(q.count(), 4) 3519 3520 q = s.query(User.name, Address) 3521 eq_(q.count(), 20) 3522 3523 q = s.query(Address.user_id) 3524 eq_(q.count(), 5) 3525 eq_(q.distinct().count(), 3) 3526 3527 3528class DistinctTest(QueryTest, AssertsCompiledSQL): 3529 __dialect__ = "default" 3530 3531 def test_basic(self): 3532 User = self.classes.User 3533 3534 eq_( 3535 [User(id=7), User(id=8), User(id=9), User(id=10)], 3536 create_session().query(User).order_by(User.id).distinct().all(), 3537 ) 3538 eq_( 3539 [User(id=7), User(id=9), User(id=8), User(id=10)], 3540 create_session() 3541 .query(User) 3542 .distinct() 3543 .order_by(desc(User.name)) 3544 .all(), 3545 ) 3546 3547 def test_columns_augmented_roundtrip_one(self): 3548 User, Address = self.classes.User, self.classes.Address 3549 3550 sess = create_session() 3551 q = ( 3552 sess.query(User) 3553 .join("addresses") 3554 .distinct() 3555 .order_by(desc(Address.email_address)) 3556 ) 3557 3558 eq_([User(id=7), User(id=9), User(id=8)], q.all()) 3559 3560 def test_columns_augmented_roundtrip_two(self): 3561 User, Address = self.classes.User, self.classes.Address 3562 3563 sess = create_session() 3564 3565 # test that it works on embedded joinedload/LIMIT subquery 3566 q = ( 3567 sess.query(User) 3568 .join("addresses") 3569 .distinct() 3570 .options(joinedload("addresses")) 3571 .order_by(desc(Address.email_address)) 3572 .limit(2) 3573 ) 3574 3575 def go(): 3576 assert [ 3577 User(id=7, addresses=[Address(id=1)]), 3578 User(id=9, addresses=[Address(id=5)]), 3579 ] == q.all() 3580 3581 self.assert_sql_count(testing.db, go, 1) 3582 3583 def test_columns_augmented_roundtrip_three(self): 3584 User, Address = self.classes.User, self.classes.Address 3585 3586 sess = create_session() 3587 3588 q = ( 3589 sess.query(User.id, User.name.label("foo"), Address.id) 3590 .filter(User.name == "jack") 3591 .distinct() 3592 .order_by(User.id, User.name, Address.email_address) 3593 ) 3594 3595 # even though columns are added, they aren't in the result 3596 eq_( 3597 q.all(), 3598 [ 3599 (7, "jack", 3), 3600 (7, "jack", 4), 3601 (7, "jack", 2), 3602 (7, "jack", 5), 3603 (7, "jack", 1), 3604 ], 3605 ) 3606 for row in q: 3607 eq_(row.keys(), ["id", "foo", "id"]) 3608 3609 def test_columns_augmented_sql_one(self): 3610 User, Address = self.classes.User, self.classes.Address 3611 3612 sess = create_session() 3613 3614 q = ( 3615 sess.query(User.id, User.name.label("foo"), Address.id) 3616 .distinct() 3617 .order_by(User.id, User.name, Address.email_address) 3618 ) 3619 3620 # Address.email_address is added because of DISTINCT, 3621 # however User.id, User.name are not b.c. they're already there, 3622 # even though User.name is labeled 3623 self.assert_compile( 3624 q, 3625 "SELECT DISTINCT users.id AS users_id, users.name AS foo, " 3626 "addresses.id AS addresses_id, " 3627 "addresses.email_address AS addresses_email_address FROM users, " 3628 "addresses ORDER BY users.id, users.name, addresses.email_address", 3629 ) 3630 3631 def test_columns_augmented_sql_two(self): 3632 User, Address = self.classes.User, self.classes.Address 3633 3634 sess = create_session() 3635 3636 q = ( 3637 sess.query(User) 3638 .options(joinedload(User.addresses)) 3639 .distinct() 3640 .order_by(User.name, Address.email_address) 3641 .limit(5) 3642 ) 3643 3644 # addresses.email_address is added to inner query so that 3645 # it is available in ORDER BY 3646 self.assert_compile( 3647 q, 3648 "SELECT anon_1.users_id AS anon_1_users_id, " 3649 "anon_1.users_name AS anon_1_users_name, " 3650 "anon_1.addresses_email_address AS " 3651 "anon_1_addresses_email_address, " 3652 "addresses_1.id AS addresses_1_id, " 3653 "addresses_1.user_id AS addresses_1_user_id, " 3654 "addresses_1.email_address AS addresses_1_email_address " 3655 "FROM (SELECT DISTINCT users.id AS users_id, " 3656 "users.name AS users_name, " 3657 "addresses.email_address AS addresses_email_address " 3658 "FROM users, addresses " 3659 "ORDER BY users.name, addresses.email_address " 3660 "LIMIT :param_1) AS anon_1 LEFT OUTER JOIN " 3661 "addresses AS addresses_1 " 3662 "ON anon_1.users_id = addresses_1.user_id " 3663 "ORDER BY anon_1.users_name, " 3664 "anon_1.addresses_email_address, addresses_1.id", 3665 ) 3666 3667 def test_columns_augmented_sql_three(self): 3668 User, Address = self.classes.User, self.classes.Address 3669 3670 sess = create_session() 3671 3672 q = ( 3673 sess.query(User.id, User.name.label("foo"), Address.id) 3674 .distinct(User.name) 3675 .order_by(User.id, User.name, Address.email_address) 3676 ) 3677 3678 # no columns are added when DISTINCT ON is used 3679 self.assert_compile( 3680 q, 3681 "SELECT DISTINCT ON (users.name) users.id AS users_id, " 3682 "users.name AS foo, addresses.id AS addresses_id FROM users, " 3683 "addresses ORDER BY users.id, users.name, addresses.email_address", 3684 dialect="postgresql", 3685 ) 3686 3687 def test_columns_augmented_sql_four(self): 3688 User, Address = self.classes.User, self.classes.Address 3689 3690 sess = create_session() 3691 3692 q = ( 3693 sess.query(User) 3694 .join("addresses") 3695 .distinct(Address.email_address) 3696 .options(joinedload("addresses")) 3697 .order_by(desc(Address.email_address)) 3698 .limit(2) 3699 ) 3700 3701 # but for the subquery / eager load case, we still need to make 3702 # the inner columns available for the ORDER BY even though its 3703 # a DISTINCT ON 3704 self.assert_compile( 3705 q, 3706 "SELECT anon_1.users_id AS anon_1_users_id, " 3707 "anon_1.users_name AS anon_1_users_name, " 3708 "anon_1.addresses_email_address AS " 3709 "anon_1_addresses_email_address, " 3710 "addresses_1.id AS addresses_1_id, " 3711 "addresses_1.user_id AS addresses_1_user_id, " 3712 "addresses_1.email_address AS addresses_1_email_address " 3713 "FROM (SELECT DISTINCT ON (addresses.email_address) " 3714 "users.id AS users_id, users.name AS users_name, " 3715 "addresses.email_address AS addresses_email_address " 3716 "FROM users JOIN addresses ON users.id = addresses.user_id " 3717 "ORDER BY addresses.email_address DESC " 3718 "LIMIT %(param_1)s) AS anon_1 " 3719 "LEFT OUTER JOIN addresses AS addresses_1 " 3720 "ON anon_1.users_id = addresses_1.user_id " 3721 "ORDER BY anon_1.addresses_email_address DESC, addresses_1.id", 3722 dialect="postgresql", 3723 ) 3724 3725 3726class PrefixWithTest(QueryTest, AssertsCompiledSQL): 3727 def test_one_prefix(self): 3728 User = self.classes.User 3729 sess = create_session() 3730 query = sess.query(User.name).prefix_with("PREFIX_1") 3731 expected = "SELECT PREFIX_1 " "users.name AS users_name FROM users" 3732 self.assert_compile(query, expected, dialect=default.DefaultDialect()) 3733 3734 def test_many_prefixes(self): 3735 User = self.classes.User 3736 sess = create_session() 3737 query = sess.query(User.name).prefix_with("PREFIX_1", "PREFIX_2") 3738 expected = ( 3739 "SELECT PREFIX_1 PREFIX_2 " "users.name AS users_name FROM users" 3740 ) 3741 self.assert_compile(query, expected, dialect=default.DefaultDialect()) 3742 3743 def test_chained_prefixes(self): 3744 User = self.classes.User 3745 sess = create_session() 3746 query = ( 3747 sess.query(User.name) 3748 .prefix_with("PREFIX_1") 3749 .prefix_with("PREFIX_2", "PREFIX_3") 3750 ) 3751 expected = ( 3752 "SELECT PREFIX_1 PREFIX_2 PREFIX_3 " 3753 "users.name AS users_name FROM users" 3754 ) 3755 self.assert_compile(query, expected, dialect=default.DefaultDialect()) 3756 3757 3758class YieldTest(_fixtures.FixtureTest): 3759 run_setup_mappers = "each" 3760 run_inserts = "each" 3761 3762 def _eagerload_mappings(self, addresses_lazy=True, user_lazy=True): 3763 User, Address = self.classes("User", "Address") 3764 users, addresses = self.tables("users", "addresses") 3765 mapper( 3766 User, 3767 users, 3768 properties={ 3769 "addresses": relationship( 3770 Address, 3771 lazy=addresses_lazy, 3772 backref=backref("user", lazy=user_lazy), 3773 ) 3774 }, 3775 ) 3776 mapper(Address, addresses) 3777 3778 def test_basic(self): 3779 self._eagerload_mappings() 3780 3781 User = self.classes.User 3782 3783 sess = create_session() 3784 q = iter( 3785 sess.query(User) 3786 .yield_per(1) 3787 .from_statement(text("select * from users")) 3788 ) 3789 3790 ret = [] 3791 eq_(len(sess.identity_map), 0) 3792 ret.append(next(q)) 3793 ret.append(next(q)) 3794 eq_(len(sess.identity_map), 2) 3795 ret.append(next(q)) 3796 ret.append(next(q)) 3797 eq_(len(sess.identity_map), 4) 3798 try: 3799 next(q) 3800 assert False 3801 except StopIteration: 3802 pass 3803 3804 def test_yield_per_and_execution_options(self): 3805 self._eagerload_mappings() 3806 3807 User = self.classes.User 3808 3809 sess = create_session() 3810 q = sess.query(User).yield_per(15) 3811 q = q.execution_options(foo="bar") 3812 assert q._yield_per 3813 eq_( 3814 q._execution_options, 3815 {"stream_results": True, "foo": "bar", "max_row_buffer": 15}, 3816 ) 3817 3818 def test_no_joinedload_opt(self): 3819 self._eagerload_mappings() 3820 3821 User = self.classes.User 3822 sess = create_session() 3823 q = sess.query(User).options(joinedload("addresses")).yield_per(1) 3824 assert_raises_message( 3825 sa_exc.InvalidRequestError, 3826 "The yield_per Query option is currently not compatible with " 3827 "joined collection eager loading. Please specify ", 3828 q.all, 3829 ) 3830 3831 def test_no_subqueryload_opt(self): 3832 self._eagerload_mappings() 3833 3834 User = self.classes.User 3835 sess = create_session() 3836 q = sess.query(User).options(subqueryload("addresses")).yield_per(1) 3837 assert_raises_message( 3838 sa_exc.InvalidRequestError, 3839 "The yield_per Query option is currently not compatible with " 3840 "subquery eager loading. Please specify ", 3841 q.all, 3842 ) 3843 3844 def test_no_subqueryload_mapping(self): 3845 self._eagerload_mappings(addresses_lazy="subquery") 3846 3847 User = self.classes.User 3848 sess = create_session() 3849 q = sess.query(User).yield_per(1) 3850 assert_raises_message( 3851 sa_exc.InvalidRequestError, 3852 "The yield_per Query option is currently not compatible with " 3853 "subquery eager loading. Please specify ", 3854 q.all, 3855 ) 3856 3857 def test_joinedload_m2o_ok(self): 3858 self._eagerload_mappings(user_lazy="joined") 3859 Address = self.classes.Address 3860 sess = create_session() 3861 q = sess.query(Address).yield_per(1) 3862 q.all() 3863 3864 def test_eagerload_opt_disable(self): 3865 self._eagerload_mappings() 3866 3867 User = self.classes.User 3868 sess = create_session() 3869 q = ( 3870 sess.query(User) 3871 .options(subqueryload("addresses")) 3872 .enable_eagerloads(False) 3873 .yield_per(1) 3874 ) 3875 q.all() 3876 3877 q = ( 3878 sess.query(User) 3879 .options(joinedload("addresses")) 3880 .enable_eagerloads(False) 3881 .yield_per(1) 3882 ) 3883 q.all() 3884 3885 def test_m2o_joinedload_not_others(self): 3886 self._eagerload_mappings(addresses_lazy="joined") 3887 Address = self.classes.Address 3888 sess = create_session() 3889 q = ( 3890 sess.query(Address) 3891 .options(lazyload("*"), joinedload("user")) 3892 .yield_per(1) 3893 .filter_by(id=1) 3894 ) 3895 3896 def go(): 3897 result = q.all() 3898 assert result[0].user 3899 3900 self.assert_sql_count(testing.db, go, 1) 3901 3902 3903class HintsTest(QueryTest, AssertsCompiledSQL): 3904 __dialect__ = "default" 3905 3906 def test_hints(self): 3907 User = self.classes.User 3908 3909 from sqlalchemy.dialects import mysql 3910 3911 dialect = mysql.dialect() 3912 3913 sess = create_session() 3914 3915 self.assert_compile( 3916 sess.query(User).with_hint( 3917 User, "USE INDEX (col1_index,col2_index)" 3918 ), 3919 "SELECT users.id AS users_id, users.name AS users_name " 3920 "FROM users USE INDEX (col1_index,col2_index)", 3921 dialect=dialect, 3922 ) 3923 3924 self.assert_compile( 3925 sess.query(User).with_hint( 3926 User, "WITH INDEX col1_index", "sybase" 3927 ), 3928 "SELECT users.id AS users_id, users.name AS users_name " 3929 "FROM users", 3930 dialect=dialect, 3931 ) 3932 3933 ualias = aliased(User) 3934 self.assert_compile( 3935 sess.query(User, ualias) 3936 .with_hint(ualias, "USE INDEX (col1_index,col2_index)") 3937 .join(ualias, ualias.id > User.id), 3938 "SELECT users.id AS users_id, users.name AS users_name, " 3939 "users_1.id AS users_1_id, users_1.name AS users_1_name " 3940 "FROM users INNER JOIN users AS users_1 " 3941 "USE INDEX (col1_index,col2_index) " 3942 "ON users_1.id > users.id", 3943 dialect=dialect, 3944 ) 3945 3946 def test_statement_hints(self): 3947 User = self.classes.User 3948 3949 sess = create_session() 3950 stmt = ( 3951 sess.query(User) 3952 .with_statement_hint("test hint one") 3953 .with_statement_hint("test hint two") 3954 .with_statement_hint("test hint three", "postgresql") 3955 ) 3956 3957 self.assert_compile( 3958 stmt, 3959 "SELECT users.id AS users_id, users.name AS users_name " 3960 "FROM users test hint one test hint two", 3961 ) 3962 3963 self.assert_compile( 3964 stmt, 3965 "SELECT users.id AS users_id, users.name AS users_name " 3966 "FROM users test hint one test hint two test hint three", 3967 dialect="postgresql", 3968 ) 3969 3970 3971class TextTest(QueryTest, AssertsCompiledSQL): 3972 __dialect__ = "default" 3973 3974 def test_fulltext(self): 3975 User = self.classes.User 3976 3977 with expect_warnings("Textual SQL"): 3978 eq_( 3979 create_session() 3980 .query(User) 3981 .from_statement("select * from users order by id") 3982 .all(), 3983 [User(id=7), User(id=8), User(id=9), User(id=10)], 3984 ) 3985 3986 eq_( 3987 create_session() 3988 .query(User) 3989 .from_statement(text("select * from users order by id")) 3990 .first(), 3991 User(id=7), 3992 ) 3993 eq_( 3994 create_session() 3995 .query(User) 3996 .from_statement( 3997 text("select * from users where name='nonexistent'") 3998 ) 3999 .first(), 4000 None, 4001 ) 4002 4003 def test_fragment(self): 4004 User = self.classes.User 4005 4006 with expect_warnings("Textual SQL expression"): 4007 eq_( 4008 create_session().query(User).filter("id in (8, 9)").all(), 4009 [User(id=8), User(id=9)], 4010 ) 4011 4012 eq_( 4013 create_session() 4014 .query(User) 4015 .filter("name='fred'") 4016 .filter("id=9") 4017 .all(), 4018 [User(id=9)], 4019 ) 4020 eq_( 4021 create_session() 4022 .query(User) 4023 .filter("name='fred'") 4024 .filter(User.id == 9) 4025 .all(), 4026 [User(id=9)], 4027 ) 4028 4029 def test_binds_coerce(self): 4030 User = self.classes.User 4031 4032 with expect_warnings("Textual SQL expression"): 4033 eq_( 4034 create_session() 4035 .query(User) 4036 .filter("id in (:id1, :id2)") 4037 .params(id1=8, id2=9) 4038 .all(), 4039 [User(id=8), User(id=9)], 4040 ) 4041 4042 def test_as_column(self): 4043 User = self.classes.User 4044 4045 s = create_session() 4046 assert_raises( 4047 sa_exc.InvalidRequestError, s.query, User.id, text("users.name") 4048 ) 4049 4050 eq_( 4051 s.query(User.id, "name").order_by(User.id).all(), 4052 [(7, "jack"), (8, "ed"), (9, "fred"), (10, "chuck")], 4053 ) 4054 4055 def test_via_select(self): 4056 User = self.classes.User 4057 s = create_session() 4058 eq_( 4059 s.query(User) 4060 .from_statement( 4061 select([column("id"), column("name")]) 4062 .select_from(table("users")) 4063 .order_by("id") 4064 ) 4065 .all(), 4066 [User(id=7), User(id=8), User(id=9), User(id=10)], 4067 ) 4068 4069 def test_via_textasfrom_from_statement(self): 4070 User = self.classes.User 4071 s = create_session() 4072 4073 eq_( 4074 s.query(User) 4075 .from_statement( 4076 text("select * from users order by id").columns( 4077 id=Integer, name=String 4078 ) 4079 ) 4080 .all(), 4081 [User(id=7), User(id=8), User(id=9), User(id=10)], 4082 ) 4083 4084 def test_via_textasfrom_use_mapped_columns(self): 4085 User = self.classes.User 4086 s = create_session() 4087 4088 eq_( 4089 s.query(User) 4090 .from_statement( 4091 text("select * from users order by id").columns( 4092 User.id, User.name 4093 ) 4094 ) 4095 .all(), 4096 [User(id=7), User(id=8), User(id=9), User(id=10)], 4097 ) 4098 4099 def test_via_textasfrom_select_from(self): 4100 User = self.classes.User 4101 s = create_session() 4102 4103 eq_( 4104 s.query(User) 4105 .select_from( 4106 text("select * from users").columns(id=Integer, name=String) 4107 ) 4108 .order_by(User.id) 4109 .all(), 4110 [User(id=7), User(id=8), User(id=9), User(id=10)], 4111 ) 4112 4113 def test_group_by_accepts_text(self): 4114 User = self.classes.User 4115 s = create_session() 4116 4117 q = s.query(User).group_by(text("name")) 4118 self.assert_compile( 4119 q, 4120 "SELECT users.id AS users_id, users.name AS users_name " 4121 "FROM users GROUP BY name", 4122 ) 4123 4124 def test_orm_columns_accepts_text(self): 4125 from sqlalchemy.orm.base import _orm_columns 4126 4127 t = text("x") 4128 eq_(_orm_columns(t), [t]) 4129 4130 def test_order_by_w_eager_one(self): 4131 User = self.classes.User 4132 s = create_session() 4133 4134 # from 1.0.0 thru 1.0.2, the "name" symbol here was considered 4135 # to be part of the things we need to ORDER BY and it was being 4136 # placed into the inner query's columns clause, as part of 4137 # query._compound_eager_statement where we add unwrap_order_by() 4138 # to the columns clause. However, as #3392 illustrates, unlocatable 4139 # string expressions like "name desc" will only fail in this scenario, 4140 # so in general the changing of the query structure with string labels 4141 # is dangerous. 4142 # 4143 # the queries here are again "invalid" from a SQL perspective, as the 4144 # "name" field isn't matched up to anything. 4145 # 4146 with expect_warnings("Can't resolve label reference 'name';"): 4147 self.assert_compile( 4148 s.query(User) 4149 .options(joinedload("addresses")) 4150 .order_by(desc("name")) 4151 .limit(1), 4152 "SELECT anon_1.users_id AS anon_1_users_id, " 4153 "anon_1.users_name AS anon_1_users_name, " 4154 "addresses_1.id AS addresses_1_id, " 4155 "addresses_1.user_id AS addresses_1_user_id, " 4156 "addresses_1.email_address AS addresses_1_email_address " 4157 "FROM (SELECT users.id AS users_id, users.name AS users_name " 4158 "FROM users ORDER BY users.name " 4159 "DESC LIMIT :param_1) AS anon_1 " 4160 "LEFT OUTER JOIN addresses AS addresses_1 " 4161 "ON anon_1.users_id = addresses_1.user_id " 4162 "ORDER BY name DESC, addresses_1.id", 4163 ) 4164 4165 def test_order_by_w_eager_two(self): 4166 User = self.classes.User 4167 s = create_session() 4168 4169 with expect_warnings("Can't resolve label reference 'name';"): 4170 self.assert_compile( 4171 s.query(User) 4172 .options(joinedload("addresses")) 4173 .order_by("name") 4174 .limit(1), 4175 "SELECT anon_1.users_id AS anon_1_users_id, " 4176 "anon_1.users_name AS anon_1_users_name, " 4177 "addresses_1.id AS addresses_1_id, " 4178 "addresses_1.user_id AS addresses_1_user_id, " 4179 "addresses_1.email_address AS addresses_1_email_address " 4180 "FROM (SELECT users.id AS users_id, users.name AS users_name " 4181 "FROM users ORDER BY users.name " 4182 "LIMIT :param_1) AS anon_1 " 4183 "LEFT OUTER JOIN addresses AS addresses_1 " 4184 "ON anon_1.users_id = addresses_1.user_id " 4185 "ORDER BY name, addresses_1.id", 4186 ) 4187 4188 def test_order_by_w_eager_three(self): 4189 User = self.classes.User 4190 s = create_session() 4191 4192 self.assert_compile( 4193 s.query(User) 4194 .options(joinedload("addresses")) 4195 .order_by("users_name") 4196 .limit(1), 4197 "SELECT anon_1.users_id AS anon_1_users_id, " 4198 "anon_1.users_name AS anon_1_users_name, " 4199 "addresses_1.id AS addresses_1_id, " 4200 "addresses_1.user_id AS addresses_1_user_id, " 4201 "addresses_1.email_address AS addresses_1_email_address " 4202 "FROM (SELECT users.id AS users_id, users.name AS users_name " 4203 "FROM users ORDER BY users.name " 4204 "LIMIT :param_1) AS anon_1 " 4205 "LEFT OUTER JOIN addresses AS addresses_1 " 4206 "ON anon_1.users_id = addresses_1.user_id " 4207 "ORDER BY anon_1.users_name, addresses_1.id", 4208 ) 4209 4210 # however! this works (again?) 4211 eq_( 4212 s.query(User) 4213 .options(joinedload("addresses")) 4214 .order_by("users_name") 4215 .first(), 4216 User(name="chuck", addresses=[]), 4217 ) 4218 4219 def test_order_by_w_eager_four(self): 4220 User = self.classes.User 4221 Address = self.classes.Address 4222 s = create_session() 4223 4224 self.assert_compile( 4225 s.query(User) 4226 .options(joinedload("addresses")) 4227 .order_by(desc("users_name")) 4228 .limit(1), 4229 "SELECT anon_1.users_id AS anon_1_users_id, " 4230 "anon_1.users_name AS anon_1_users_name, " 4231 "addresses_1.id AS addresses_1_id, " 4232 "addresses_1.user_id AS addresses_1_user_id, " 4233 "addresses_1.email_address AS addresses_1_email_address " 4234 "FROM (SELECT users.id AS users_id, users.name AS users_name " 4235 "FROM users ORDER BY users.name DESC " 4236 "LIMIT :param_1) AS anon_1 " 4237 "LEFT OUTER JOIN addresses AS addresses_1 " 4238 "ON anon_1.users_id = addresses_1.user_id " 4239 "ORDER BY anon_1.users_name DESC, addresses_1.id", 4240 ) 4241 4242 # however! this works (again?) 4243 eq_( 4244 s.query(User) 4245 .options(joinedload("addresses")) 4246 .order_by(desc("users_name")) 4247 .first(), 4248 User(name="jack", addresses=[Address()]), 4249 ) 4250 4251 def test_order_by_w_eager_five(self): 4252 """essentially the same as test_eager_relations -> test_limit_3, 4253 but test for textual label elements that are freeform. 4254 this is again #3392.""" 4255 4256 User = self.classes.User 4257 Address = self.classes.Address 4258 Order = self.classes.Order 4259 4260 sess = create_session() 4261 4262 q = sess.query(User, Address.email_address.label("email_address")) 4263 4264 result = ( 4265 q.join("addresses") 4266 .options(joinedload(User.orders)) 4267 .order_by("email_address desc") 4268 .limit(1) 4269 .offset(0) 4270 ) 4271 with expect_warnings( 4272 "Can't resolve label reference 'email_address desc'" 4273 ): 4274 eq_( 4275 [ 4276 ( 4277 User( 4278 id=7, 4279 orders=[Order(id=1), Order(id=3), Order(id=5)], 4280 addresses=[Address(id=1)], 4281 ), 4282 "jack@bean.com", 4283 ) 4284 ], 4285 result.all(), 4286 ) 4287 4288 4289class TextWarningTest(QueryTest, AssertsCompiledSQL): 4290 def _test(self, fn, arg, offending_clause, expected): 4291 assert_raises_message( 4292 sa.exc.SAWarning, 4293 r"Textual (?:SQL|column|SQL FROM) expression %(stmt)r should be " 4294 r"explicitly declared (?:with|as) text\(%(stmt)r\)" 4295 % {"stmt": util.ellipses_string(offending_clause)}, 4296 fn, 4297 arg, 4298 ) 4299 4300 with expect_warnings("Textual "): 4301 stmt = fn(arg) 4302 self.assert_compile(stmt, expected) 4303 4304 def test_filter(self): 4305 User = self.classes.User 4306 self._test( 4307 Session().query(User.id).filter, 4308 "myid == 5", 4309 "myid == 5", 4310 "SELECT users.id AS users_id FROM users WHERE myid == 5", 4311 ) 4312 4313 def test_having(self): 4314 User = self.classes.User 4315 self._test( 4316 Session().query(User.id).having, 4317 "myid == 5", 4318 "myid == 5", 4319 "SELECT users.id AS users_id FROM users HAVING myid == 5", 4320 ) 4321 4322 def test_from_statement(self): 4323 User = self.classes.User 4324 self._test( 4325 Session().query(User.id).from_statement, 4326 "select id from user", 4327 "select id from user", 4328 "select id from user", 4329 ) 4330 4331 4332class ParentTest(QueryTest, AssertsCompiledSQL): 4333 __dialect__ = "default" 4334 4335 def test_o2m(self): 4336 User, orders, Order = ( 4337 self.classes.User, 4338 self.tables.orders, 4339 self.classes.Order, 4340 ) 4341 4342 sess = create_session() 4343 q = sess.query(User) 4344 4345 u1 = q.filter_by(name="jack").one() 4346 4347 # test auto-lookup of property 4348 o = sess.query(Order).with_parent(u1).all() 4349 assert [ 4350 Order(description="order 1"), 4351 Order(description="order 3"), 4352 Order(description="order 5"), 4353 ] == o 4354 4355 # test with explicit property 4356 o = sess.query(Order).with_parent(u1, property="orders").all() 4357 assert [ 4358 Order(description="order 1"), 4359 Order(description="order 3"), 4360 Order(description="order 5"), 4361 ] == o 4362 4363 o = sess.query(Order).with_parent(u1, property=User.orders).all() 4364 assert [ 4365 Order(description="order 1"), 4366 Order(description="order 3"), 4367 Order(description="order 5"), 4368 ] == o 4369 4370 o = sess.query(Order).filter(with_parent(u1, User.orders)).all() 4371 assert [ 4372 Order(description="order 1"), 4373 Order(description="order 3"), 4374 Order(description="order 5"), 4375 ] == o 4376 4377 # test generative criterion 4378 o = sess.query(Order).with_parent(u1).filter(orders.c.id > 2).all() 4379 assert [ 4380 Order(description="order 3"), 4381 Order(description="order 5"), 4382 ] == o 4383 4384 # test against None for parent? this can't be done with the current 4385 # API since we don't know what mapper to use 4386 # assert 4387 # sess.query(Order).with_parent(None, property='addresses').all() 4388 # == [Order(description="order 5")] 4389 4390 def test_select_from(self): 4391 User, Address = self.classes.User, self.classes.Address 4392 4393 sess = create_session() 4394 u1 = sess.query(User).get(7) 4395 q = sess.query(Address).select_from(Address).with_parent(u1) 4396 self.assert_compile( 4397 q, 4398 "SELECT addresses.id AS addresses_id, " 4399 "addresses.user_id AS addresses_user_id, " 4400 "addresses.email_address AS addresses_email_address " 4401 "FROM addresses WHERE :param_1 = addresses.user_id", 4402 {"param_1": 7}, 4403 ) 4404 4405 def test_from_entity_standalone_fn(self): 4406 User, Address = self.classes.User, self.classes.Address 4407 4408 sess = create_session() 4409 u1 = sess.query(User).get(7) 4410 q = sess.query(User, Address).filter( 4411 with_parent(u1, "addresses", from_entity=Address) 4412 ) 4413 self.assert_compile( 4414 q, 4415 "SELECT users.id AS users_id, users.name AS users_name, " 4416 "addresses.id AS addresses_id, addresses.user_id " 4417 "AS addresses_user_id, " 4418 "addresses.email_address AS addresses_email_address " 4419 "FROM users, addresses " 4420 "WHERE :param_1 = addresses.user_id", 4421 {"param_1": 7}, 4422 ) 4423 4424 def test_from_entity_query_entity(self): 4425 User, Address = self.classes.User, self.classes.Address 4426 4427 sess = create_session() 4428 u1 = sess.query(User).get(7) 4429 q = sess.query(User, Address).with_parent( 4430 u1, "addresses", from_entity=Address 4431 ) 4432 self.assert_compile( 4433 q, 4434 "SELECT users.id AS users_id, users.name AS users_name, " 4435 "addresses.id AS addresses_id, addresses.user_id " 4436 "AS addresses_user_id, " 4437 "addresses.email_address AS addresses_email_address " 4438 "FROM users, addresses " 4439 "WHERE :param_1 = addresses.user_id", 4440 {"param_1": 7}, 4441 ) 4442 4443 def test_select_from_alias(self): 4444 User, Address = self.classes.User, self.classes.Address 4445 4446 sess = create_session() 4447 u1 = sess.query(User).get(7) 4448 a1 = aliased(Address) 4449 q = sess.query(a1).with_parent(u1) 4450 self.assert_compile( 4451 q, 4452 "SELECT addresses_1.id AS addresses_1_id, " 4453 "addresses_1.user_id AS addresses_1_user_id, " 4454 "addresses_1.email_address AS addresses_1_email_address " 4455 "FROM addresses AS addresses_1 " 4456 "WHERE :param_1 = addresses_1.user_id", 4457 {"param_1": 7}, 4458 ) 4459 4460 def test_select_from_alias_explicit_prop(self): 4461 User, Address = self.classes.User, self.classes.Address 4462 4463 sess = create_session() 4464 u1 = sess.query(User).get(7) 4465 a1 = aliased(Address) 4466 q = sess.query(a1).with_parent(u1, "addresses") 4467 self.assert_compile( 4468 q, 4469 "SELECT addresses_1.id AS addresses_1_id, " 4470 "addresses_1.user_id AS addresses_1_user_id, " 4471 "addresses_1.email_address AS addresses_1_email_address " 4472 "FROM addresses AS addresses_1 " 4473 "WHERE :param_1 = addresses_1.user_id", 4474 {"param_1": 7}, 4475 ) 4476 4477 def test_noparent(self): 4478 Item, User = self.classes.Item, self.classes.User 4479 4480 sess = create_session() 4481 q = sess.query(User) 4482 4483 u1 = q.filter_by(name="jack").one() 4484 4485 try: 4486 q = sess.query(Item).with_parent(u1) 4487 assert False 4488 except sa_exc.InvalidRequestError as e: 4489 assert ( 4490 str(e) == "Could not locate a property which relates " 4491 "instances of class 'Item' to instances of class 'User'" 4492 ) 4493 4494 def test_m2m(self): 4495 Item, Keyword = self.classes.Item, self.classes.Keyword 4496 4497 sess = create_session() 4498 i1 = sess.query(Item).filter_by(id=2).one() 4499 k = sess.query(Keyword).with_parent(i1).all() 4500 assert [ 4501 Keyword(name="red"), 4502 Keyword(name="small"), 4503 Keyword(name="square"), 4504 ] == k 4505 4506 def test_with_transient(self): 4507 User, Order = self.classes.User, self.classes.Order 4508 4509 sess = Session() 4510 4511 q = sess.query(User) 4512 u1 = q.filter_by(name="jack").one() 4513 utrans = User(id=u1.id) 4514 o = sess.query(Order).with_parent(utrans, "orders") 4515 eq_( 4516 [ 4517 Order(description="order 1"), 4518 Order(description="order 3"), 4519 Order(description="order 5"), 4520 ], 4521 o.all(), 4522 ) 4523 4524 o = sess.query(Order).filter(with_parent(utrans, "orders")) 4525 eq_( 4526 [ 4527 Order(description="order 1"), 4528 Order(description="order 3"), 4529 Order(description="order 5"), 4530 ], 4531 o.all(), 4532 ) 4533 4534 def test_with_pending_autoflush(self): 4535 Order, User = self.classes.Order, self.classes.User 4536 4537 sess = Session() 4538 4539 o1 = sess.query(Order).first() 4540 opending = Order(id=20, user_id=o1.user_id) 4541 sess.add(opending) 4542 eq_( 4543 sess.query(User).with_parent(opending, "user").one(), 4544 User(id=o1.user_id), 4545 ) 4546 eq_( 4547 sess.query(User).filter(with_parent(opending, "user")).one(), 4548 User(id=o1.user_id), 4549 ) 4550 4551 def test_with_pending_no_autoflush(self): 4552 Order, User = self.classes.Order, self.classes.User 4553 4554 sess = Session(autoflush=False) 4555 4556 o1 = sess.query(Order).first() 4557 opending = Order(user_id=o1.user_id) 4558 sess.add(opending) 4559 eq_( 4560 sess.query(User).with_parent(opending, "user").one(), 4561 User(id=o1.user_id), 4562 ) 4563 4564 def test_unique_binds_union(self): 4565 """bindparams used in the 'parent' query are unique""" 4566 User, Address = self.classes.User, self.classes.Address 4567 4568 sess = Session() 4569 u1, u2 = sess.query(User).order_by(User.id)[0:2] 4570 4571 q1 = sess.query(Address).with_parent(u1, "addresses") 4572 q2 = sess.query(Address).with_parent(u2, "addresses") 4573 4574 self.assert_compile( 4575 q1.union(q2), 4576 "SELECT anon_1.addresses_id AS anon_1_addresses_id, " 4577 "anon_1.addresses_user_id AS anon_1_addresses_user_id, " 4578 "anon_1.addresses_email_address AS " 4579 "anon_1_addresses_email_address FROM (SELECT addresses.id AS " 4580 "addresses_id, addresses.user_id AS addresses_user_id, " 4581 "addresses.email_address AS addresses_email_address FROM " 4582 "addresses WHERE :param_1 = addresses.user_id UNION SELECT " 4583 "addresses.id AS addresses_id, addresses.user_id AS " 4584 "addresses_user_id, addresses.email_address " 4585 "AS addresses_email_address " 4586 "FROM addresses WHERE :param_2 = addresses.user_id) AS anon_1", 4587 checkparams={"param_1": 7, "param_2": 8}, 4588 ) 4589 4590 def test_unique_binds_or(self): 4591 User, Address = self.classes.User, self.classes.Address 4592 4593 sess = Session() 4594 u1, u2 = sess.query(User).order_by(User.id)[0:2] 4595 4596 self.assert_compile( 4597 sess.query(Address).filter( 4598 or_(with_parent(u1, "addresses"), with_parent(u2, "addresses")) 4599 ), 4600 "SELECT addresses.id AS addresses_id, addresses.user_id AS " 4601 "addresses_user_id, addresses.email_address AS " 4602 "addresses_email_address FROM addresses WHERE " 4603 ":param_1 = addresses.user_id OR :param_2 = addresses.user_id", 4604 checkparams={"param_1": 7, "param_2": 8}, 4605 ) 4606 4607 4608class WithTransientOnNone(_fixtures.FixtureTest, AssertsCompiledSQL): 4609 run_inserts = None 4610 __dialect__ = "default" 4611 4612 def _fixture1(self): 4613 User, Address = self.classes.User, self.classes.Address 4614 users, addresses = self.tables.users, self.tables.addresses 4615 4616 mapper(User, users) 4617 mapper( 4618 Address, 4619 addresses, 4620 properties={ 4621 "user": relationship(User), 4622 "special_user": relationship( 4623 User, 4624 primaryjoin=and_( 4625 users.c.id == addresses.c.user_id, 4626 users.c.name == addresses.c.email_address, 4627 ), 4628 ), 4629 }, 4630 ) 4631 4632 def test_filter_with_transient_assume_pk(self): 4633 self._fixture1() 4634 User, Address = self.classes.User, self.classes.Address 4635 4636 sess = Session() 4637 4638 q = sess.query(Address).filter(Address.user == User()) 4639 with expect_warnings("Got None for value of column "): 4640 self.assert_compile( 4641 q, 4642 "SELECT addresses.id AS addresses_id, " 4643 "addresses.user_id AS addresses_user_id, " 4644 "addresses.email_address AS addresses_email_address " 4645 "FROM addresses WHERE :param_1 = addresses.user_id", 4646 checkparams={"param_1": None}, 4647 ) 4648 4649 def test_filter_with_transient_warn_for_none_against_non_pk(self): 4650 self._fixture1() 4651 User, Address = self.classes.User, self.classes.Address 4652 4653 s = Session() 4654 q = s.query(Address).filter(Address.special_user == User()) 4655 with expect_warnings("Got None for value of column"): 4656 4657 self.assert_compile( 4658 q, 4659 "SELECT addresses.id AS addresses_id, " 4660 "addresses.user_id AS addresses_user_id, " 4661 "addresses.email_address AS addresses_email_address " 4662 "FROM addresses WHERE :param_1 = addresses.user_id " 4663 "AND :param_2 = addresses.email_address", 4664 checkparams={"param_1": None, "param_2": None}, 4665 ) 4666 4667 def test_with_parent_with_transient_assume_pk(self): 4668 self._fixture1() 4669 User, Address = self.classes.User, self.classes.Address 4670 4671 sess = Session() 4672 4673 q = sess.query(User).with_parent(Address(), "user") 4674 with expect_warnings("Got None for value of column"): 4675 self.assert_compile( 4676 q, 4677 "SELECT users.id AS users_id, users.name AS users_name " 4678 "FROM users WHERE users.id = :param_1", 4679 checkparams={"param_1": None}, 4680 ) 4681 4682 def test_with_parent_with_transient_warn_for_none_against_non_pk(self): 4683 self._fixture1() 4684 User, Address = self.classes.User, self.classes.Address 4685 4686 s = Session() 4687 q = s.query(User).with_parent(Address(), "special_user") 4688 with expect_warnings("Got None for value of column"): 4689 4690 self.assert_compile( 4691 q, 4692 "SELECT users.id AS users_id, users.name AS users_name " 4693 "FROM users WHERE users.id = :param_1 " 4694 "AND users.name = :param_2", 4695 checkparams={"param_1": None, "param_2": None}, 4696 ) 4697 4698 def test_negated_contains_or_equals_plain_m2o(self): 4699 self._fixture1() 4700 User, Address = self.classes.User, self.classes.Address 4701 4702 s = Session() 4703 q = s.query(Address).filter(Address.user != User()) 4704 with expect_warnings("Got None for value of column"): 4705 self.assert_compile( 4706 q, 4707 "SELECT addresses.id AS addresses_id, " 4708 "addresses.user_id AS addresses_user_id, " 4709 "addresses.email_address AS addresses_email_address " 4710 "FROM addresses " 4711 "WHERE addresses.user_id != :user_id_1 " 4712 "OR addresses.user_id IS NULL", 4713 checkparams={"user_id_1": None}, 4714 ) 4715 4716 def test_negated_contains_or_equals_complex_rel(self): 4717 self._fixture1() 4718 User, Address = self.classes.User, self.classes.Address 4719 4720 s = Session() 4721 4722 # this one does *not* warn because we do the criteria 4723 # without deferral 4724 q = s.query(Address).filter(Address.special_user != User()) 4725 self.assert_compile( 4726 q, 4727 "SELECT addresses.id AS addresses_id, " 4728 "addresses.user_id AS addresses_user_id, " 4729 "addresses.email_address AS addresses_email_address " 4730 "FROM addresses " 4731 "WHERE NOT (EXISTS (SELECT 1 " 4732 "FROM users " 4733 "WHERE users.id = addresses.user_id AND " 4734 "users.name = addresses.email_address AND users.id IS NULL))", 4735 checkparams={}, 4736 ) 4737 4738 4739class SynonymTest(QueryTest, AssertsCompiledSQL): 4740 __dialect__ = "default" 4741 4742 @classmethod 4743 def setup_mappers(cls): 4744 ( 4745 users, 4746 Keyword, 4747 items, 4748 order_items, 4749 orders, 4750 Item, 4751 User, 4752 Address, 4753 keywords, 4754 Order, 4755 item_keywords, 4756 addresses, 4757 ) = ( 4758 cls.tables.users, 4759 cls.classes.Keyword, 4760 cls.tables.items, 4761 cls.tables.order_items, 4762 cls.tables.orders, 4763 cls.classes.Item, 4764 cls.classes.User, 4765 cls.classes.Address, 4766 cls.tables.keywords, 4767 cls.classes.Order, 4768 cls.tables.item_keywords, 4769 cls.tables.addresses, 4770 ) 4771 4772 mapper( 4773 User, 4774 users, 4775 properties={ 4776 "name_syn": synonym("name"), 4777 "addresses": relationship(Address), 4778 "orders": relationship( 4779 Order, backref="user", order_by=orders.c.id 4780 ), # o2m, m2o 4781 "orders_syn": synonym("orders"), 4782 "orders_syn_2": synonym("orders_syn"), 4783 }, 4784 ) 4785 mapper(Address, addresses) 4786 mapper( 4787 Order, 4788 orders, 4789 properties={ 4790 "items": relationship(Item, secondary=order_items), # m2m 4791 "address": relationship(Address), # m2o 4792 "items_syn": synonym("items"), 4793 }, 4794 ) 4795 mapper( 4796 Item, 4797 items, 4798 properties={ 4799 "keywords": relationship( 4800 Keyword, secondary=item_keywords 4801 ) # m2m 4802 }, 4803 ) 4804 mapper(Keyword, keywords) 4805 4806 def test_options(self): 4807 User, Order = self.classes.User, self.classes.Order 4808 4809 s = create_session() 4810 4811 def go(): 4812 result = ( 4813 s.query(User) 4814 .filter_by(name="jack") 4815 .options(joinedload(User.orders_syn)) 4816 .all() 4817 ) 4818 eq_( 4819 result, 4820 [ 4821 User( 4822 id=7, 4823 name="jack", 4824 orders=[ 4825 Order(description="order 1"), 4826 Order(description="order 3"), 4827 Order(description="order 5"), 4828 ], 4829 ) 4830 ], 4831 ) 4832 4833 self.assert_sql_count(testing.db, go, 1) 4834 4835 def test_options_syn_of_syn(self): 4836 User, Order = self.classes.User, self.classes.Order 4837 4838 s = create_session() 4839 4840 def go(): 4841 result = ( 4842 s.query(User) 4843 .filter_by(name="jack") 4844 .options(joinedload(User.orders_syn_2)) 4845 .all() 4846 ) 4847 eq_( 4848 result, 4849 [ 4850 User( 4851 id=7, 4852 name="jack", 4853 orders=[ 4854 Order(description="order 1"), 4855 Order(description="order 3"), 4856 Order(description="order 5"), 4857 ], 4858 ) 4859 ], 4860 ) 4861 4862 self.assert_sql_count(testing.db, go, 1) 4863 4864 def test_options_syn_of_syn_string(self): 4865 User, Order = self.classes.User, self.classes.Order 4866 4867 s = create_session() 4868 4869 def go(): 4870 result = ( 4871 s.query(User) 4872 .filter_by(name="jack") 4873 .options(joinedload("orders_syn_2")) 4874 .all() 4875 ) 4876 eq_( 4877 result, 4878 [ 4879 User( 4880 id=7, 4881 name="jack", 4882 orders=[ 4883 Order(description="order 1"), 4884 Order(description="order 3"), 4885 Order(description="order 5"), 4886 ], 4887 ) 4888 ], 4889 ) 4890 4891 self.assert_sql_count(testing.db, go, 1) 4892 4893 def test_joins(self): 4894 User, Order = self.classes.User, self.classes.Order 4895 4896 for j in ( 4897 ["orders", "items"], 4898 ["orders_syn", "items"], 4899 [User.orders_syn, Order.items], 4900 ["orders_syn_2", "items"], 4901 [User.orders_syn_2, "items"], 4902 ["orders", "items_syn"], 4903 ["orders_syn", "items_syn"], 4904 ["orders_syn_2", "items_syn"], 4905 ): 4906 result = ( 4907 create_session().query(User).join(*j).filter_by(id=3).all() 4908 ) 4909 assert [User(id=7, name="jack"), User(id=9, name="fred")] == result 4910 4911 def test_with_parent(self): 4912 Order, User = self.classes.Order, self.classes.User 4913 4914 for nameprop, orderprop in ( 4915 ("name", "orders"), 4916 ("name_syn", "orders"), 4917 ("name", "orders_syn"), 4918 ("name", "orders_syn_2"), 4919 ("name_syn", "orders_syn"), 4920 ("name_syn", "orders_syn_2"), 4921 ): 4922 sess = create_session() 4923 q = sess.query(User) 4924 4925 u1 = q.filter_by(**{nameprop: "jack"}).one() 4926 4927 o = sess.query(Order).with_parent(u1, property=orderprop).all() 4928 assert [ 4929 Order(description="order 1"), 4930 Order(description="order 3"), 4931 Order(description="order 5"), 4932 ] == o 4933 4934 def test_froms_aliased_col(self): 4935 Address, User = self.classes.Address, self.classes.User 4936 4937 sess = create_session() 4938 ua = aliased(User) 4939 4940 q = sess.query(ua.name_syn).join(Address, ua.id == Address.user_id) 4941 self.assert_compile( 4942 q, 4943 "SELECT users_1.name AS users_1_name FROM " 4944 "users AS users_1 JOIN addresses " 4945 "ON users_1.id = addresses.user_id", 4946 ) 4947 4948 4949class ImmediateTest(_fixtures.FixtureTest): 4950 run_inserts = "once" 4951 run_deletes = None 4952 4953 @classmethod 4954 def setup_mappers(cls): 4955 Address, addresses, users, User = ( 4956 cls.classes.Address, 4957 cls.tables.addresses, 4958 cls.tables.users, 4959 cls.classes.User, 4960 ) 4961 4962 mapper(Address, addresses) 4963 4964 mapper(User, users, properties=dict(addresses=relationship(Address))) 4965 4966 def test_one(self): 4967 User, Address = self.classes.User, self.classes.Address 4968 4969 sess = create_session() 4970 4971 assert_raises_message( 4972 sa.orm.exc.NoResultFound, 4973 r"No row was found for one\(\)", 4974 sess.query(User).filter(User.id == 99).one, 4975 ) 4976 4977 eq_(sess.query(User).filter(User.id == 7).one().id, 7) 4978 4979 assert_raises_message( 4980 sa.orm.exc.MultipleResultsFound, 4981 r"Multiple rows were found for one\(\)", 4982 sess.query(User).one, 4983 ) 4984 4985 assert_raises( 4986 sa.orm.exc.NoResultFound, 4987 sess.query(User.id, User.name).filter(User.id == 99).one, 4988 ) 4989 4990 eq_( 4991 sess.query(User.id, User.name).filter(User.id == 7).one(), 4992 (7, "jack"), 4993 ) 4994 4995 assert_raises( 4996 sa.orm.exc.MultipleResultsFound, sess.query(User.id, User.name).one 4997 ) 4998 4999 assert_raises( 5000 sa.orm.exc.NoResultFound, 5001 ( 5002 sess.query(User, Address) 5003 .join(User.addresses) 5004 .filter(Address.id == 99) 5005 ).one, 5006 ) 5007 5008 eq_( 5009 ( 5010 sess.query(User, Address) 5011 .join(User.addresses) 5012 .filter(Address.id == 4) 5013 ).one(), 5014 (User(id=8), Address(id=4)), 5015 ) 5016 5017 assert_raises( 5018 sa.orm.exc.MultipleResultsFound, 5019 sess.query(User, Address).join(User.addresses).one, 5020 ) 5021 5022 # this result returns multiple rows, the first 5023 # two rows being the same. but uniquing is 5024 # not applied for a column based result. 5025 assert_raises( 5026 sa.orm.exc.MultipleResultsFound, 5027 sess.query(User.id) 5028 .join(User.addresses) 5029 .filter(User.id.in_([8, 9])) 5030 .order_by(User.id) 5031 .one, 5032 ) 5033 5034 # test that a join which ultimately returns 5035 # multiple identities across many rows still 5036 # raises, even though the first two rows are of 5037 # the same identity and unique filtering 5038 # is applied ([ticket:1688]) 5039 assert_raises( 5040 sa.orm.exc.MultipleResultsFound, 5041 sess.query(User) 5042 .join(User.addresses) 5043 .filter(User.id.in_([8, 9])) 5044 .order_by(User.id) 5045 .one, 5046 ) 5047 5048 def test_one_or_none(self): 5049 User, Address = self.classes.User, self.classes.Address 5050 5051 sess = create_session() 5052 5053 eq_(sess.query(User).filter(User.id == 99).one_or_none(), None) 5054 5055 eq_(sess.query(User).filter(User.id == 7).one_or_none().id, 7) 5056 5057 assert_raises_message( 5058 sa.orm.exc.MultipleResultsFound, 5059 r"Multiple rows were found for one_or_none\(\)", 5060 sess.query(User).one_or_none, 5061 ) 5062 5063 eq_( 5064 sess.query(User.id, User.name).filter(User.id == 99).one_or_none(), 5065 None, 5066 ) 5067 5068 eq_( 5069 sess.query(User.id, User.name).filter(User.id == 7).one_or_none(), 5070 (7, "jack"), 5071 ) 5072 5073 assert_raises( 5074 sa.orm.exc.MultipleResultsFound, 5075 sess.query(User.id, User.name).one_or_none, 5076 ) 5077 5078 eq_( 5079 ( 5080 sess.query(User, Address) 5081 .join(User.addresses) 5082 .filter(Address.id == 99) 5083 ).one_or_none(), 5084 None, 5085 ) 5086 5087 eq_( 5088 ( 5089 sess.query(User, Address) 5090 .join(User.addresses) 5091 .filter(Address.id == 4) 5092 ).one_or_none(), 5093 (User(id=8), Address(id=4)), 5094 ) 5095 5096 assert_raises( 5097 sa.orm.exc.MultipleResultsFound, 5098 sess.query(User, Address).join(User.addresses).one_or_none, 5099 ) 5100 5101 # this result returns multiple rows, the first 5102 # two rows being the same. but uniquing is 5103 # not applied for a column based result. 5104 assert_raises( 5105 sa.orm.exc.MultipleResultsFound, 5106 sess.query(User.id) 5107 .join(User.addresses) 5108 .filter(User.id.in_([8, 9])) 5109 .order_by(User.id) 5110 .one_or_none, 5111 ) 5112 5113 # test that a join which ultimately returns 5114 # multiple identities across many rows still 5115 # raises, even though the first two rows are of 5116 # the same identity and unique filtering 5117 # is applied ([ticket:1688]) 5118 assert_raises( 5119 sa.orm.exc.MultipleResultsFound, 5120 sess.query(User) 5121 .join(User.addresses) 5122 .filter(User.id.in_([8, 9])) 5123 .order_by(User.id) 5124 .one_or_none, 5125 ) 5126 5127 @testing.future 5128 def test_getslice(self): 5129 assert False 5130 5131 def test_scalar(self): 5132 User = self.classes.User 5133 5134 sess = create_session() 5135 5136 eq_(sess.query(User.id).filter_by(id=7).scalar(), 7) 5137 eq_(sess.query(User.id, User.name).filter_by(id=7).scalar(), 7) 5138 eq_(sess.query(User.id).filter_by(id=0).scalar(), None) 5139 eq_( 5140 sess.query(User).filter_by(id=7).scalar(), 5141 sess.query(User).filter_by(id=7).one(), 5142 ) 5143 5144 assert_raises(sa.orm.exc.MultipleResultsFound, sess.query(User).scalar) 5145 assert_raises( 5146 sa.orm.exc.MultipleResultsFound, 5147 sess.query(User.id, User.name).scalar, 5148 ) 5149 5150 def test_value(self): 5151 User = self.classes.User 5152 5153 sess = create_session() 5154 5155 eq_(sess.query(User).filter_by(id=7).value(User.id), 7) 5156 eq_(sess.query(User.id, User.name).filter_by(id=7).value(User.id), 7) 5157 eq_(sess.query(User).filter_by(id=0).value(User.id), None) 5158 5159 sess.bind = testing.db 5160 eq_(sess.query().value(sa.literal_column("1").label("x")), 1) 5161 5162 5163class ExecutionOptionsTest(QueryTest): 5164 def test_option_building(self): 5165 User = self.classes.User 5166 5167 sess = create_session(bind=testing.db, autocommit=False) 5168 5169 q1 = sess.query(User) 5170 assert q1._execution_options == dict() 5171 q2 = q1.execution_options(foo="bar", stream_results=True) 5172 # q1's options should be unchanged. 5173 assert q1._execution_options == dict() 5174 # q2 should have them set. 5175 assert q2._execution_options == dict(foo="bar", stream_results=True) 5176 q3 = q2.execution_options(foo="not bar", answer=42) 5177 assert q2._execution_options == dict(foo="bar", stream_results=True) 5178 5179 q3_options = dict(foo="not bar", stream_results=True, answer=42) 5180 assert q3._execution_options == q3_options 5181 5182 def test_options_in_connection(self): 5183 User = self.classes.User 5184 5185 execution_options = dict(foo="bar", stream_results=True) 5186 5187 class TQuery(Query): 5188 def instances(self, result, ctx): 5189 try: 5190 eq_( 5191 result.connection._execution_options, execution_options 5192 ) 5193 finally: 5194 result.close() 5195 return iter([]) 5196 5197 sess = create_session( 5198 bind=testing.db, autocommit=False, query_cls=TQuery 5199 ) 5200 q1 = sess.query(User).execution_options(**execution_options) 5201 q1.all() 5202 5203 5204class BooleanEvalTest(fixtures.TestBase, testing.AssertsCompiledSQL): 5205 """test standalone booleans being wrapped in an AsBoolean, as well 5206 as true/false compilation.""" 5207 5208 def _dialect(self, native_boolean): 5209 d = default.DefaultDialect() 5210 d.supports_native_boolean = native_boolean 5211 return d 5212 5213 def test_one(self): 5214 s = Session() 5215 c = column("x", Boolean) 5216 self.assert_compile( 5217 s.query(c).filter(c), 5218 "SELECT x WHERE x", 5219 dialect=self._dialect(True), 5220 ) 5221 5222 def test_two(self): 5223 s = Session() 5224 c = column("x", Boolean) 5225 self.assert_compile( 5226 s.query(c).filter(c), 5227 "SELECT x WHERE x = 1", 5228 dialect=self._dialect(False), 5229 ) 5230 5231 def test_three(self): 5232 s = Session() 5233 c = column("x", Boolean) 5234 self.assert_compile( 5235 s.query(c).filter(~c), 5236 "SELECT x WHERE x = 0", 5237 dialect=self._dialect(False), 5238 ) 5239 5240 def test_four(self): 5241 s = Session() 5242 c = column("x", Boolean) 5243 self.assert_compile( 5244 s.query(c).filter(~c), 5245 "SELECT x WHERE NOT x", 5246 dialect=self._dialect(True), 5247 ) 5248 5249 def test_five(self): 5250 s = Session() 5251 c = column("x", Boolean) 5252 self.assert_compile( 5253 s.query(c).having(c), 5254 "SELECT x HAVING x = 1", 5255 dialect=self._dialect(False), 5256 ) 5257 5258 5259class SessionBindTest(QueryTest): 5260 @contextlib.contextmanager 5261 def _assert_bind_args(self, session): 5262 get_bind = mock.Mock(side_effect=session.get_bind) 5263 with mock.patch.object(session, "get_bind", get_bind): 5264 yield 5265 for call_ in get_bind.mock_calls: 5266 is_(call_[1][0], inspect(self.classes.User)) 5267 is_not_(call_[2]["clause"], None) 5268 5269 def test_single_entity_q(self): 5270 User = self.classes.User 5271 session = Session() 5272 with self._assert_bind_args(session): 5273 session.query(User).all() 5274 5275 def test_sql_expr_entity_q(self): 5276 User = self.classes.User 5277 session = Session() 5278 with self._assert_bind_args(session): 5279 session.query(User.id).all() 5280 5281 def test_count(self): 5282 User = self.classes.User 5283 session = Session() 5284 with self._assert_bind_args(session): 5285 session.query(User).count() 5286 5287 def test_aggregate_fn(self): 5288 User = self.classes.User 5289 session = Session() 5290 with self._assert_bind_args(session): 5291 session.query(func.max(User.name)).all() 5292 5293 def test_bulk_update_no_sync(self): 5294 User = self.classes.User 5295 session = Session() 5296 with self._assert_bind_args(session): 5297 session.query(User).filter(User.id == 15).update( 5298 {"name": "foob"}, synchronize_session=False 5299 ) 5300 5301 def test_bulk_delete_no_sync(self): 5302 User = self.classes.User 5303 session = Session() 5304 with self._assert_bind_args(session): 5305 session.query(User).filter(User.id == 15).delete( 5306 synchronize_session=False 5307 ) 5308 5309 def test_bulk_update_fetch_sync(self): 5310 User = self.classes.User 5311 session = Session() 5312 with self._assert_bind_args(session): 5313 session.query(User).filter(User.id == 15).update( 5314 {"name": "foob"}, synchronize_session="fetch" 5315 ) 5316 5317 def test_bulk_delete_fetch_sync(self): 5318 User = self.classes.User 5319 session = Session() 5320 with self._assert_bind_args(session): 5321 session.query(User).filter(User.id == 15).delete( 5322 synchronize_session="fetch" 5323 ) 5324 5325 def test_column_property(self): 5326 User = self.classes.User 5327 5328 mapper = inspect(User) 5329 mapper.add_property( 5330 "score", 5331 column_property(func.coalesce(self.tables.users.c.name, None)), 5332 ) 5333 session = Session() 5334 with self._assert_bind_args(session): 5335 session.query(func.max(User.score)).scalar() 5336 5337 @testing.requires.nested_aggregates 5338 def test_column_property_select(self): 5339 User = self.classes.User 5340 Address = self.classes.Address 5341 5342 mapper = inspect(User) 5343 mapper.add_property( 5344 "score", 5345 column_property( 5346 select([func.sum(Address.id)]) 5347 .where(Address.user_id == User.id) 5348 .as_scalar() 5349 ), 5350 ) 5351 session = Session() 5352 5353 with self._assert_bind_args(session): 5354 session.query(func.max(User.score)).scalar() 5355 5356 5357class QueryClsTest(QueryTest): 5358 def _fn_fixture(self): 5359 def query(*arg, **kw): 5360 return Query(*arg, **kw) 5361 5362 return query 5363 5364 def _subclass_fixture(self): 5365 class MyQuery(Query): 5366 pass 5367 5368 return MyQuery 5369 5370 def _callable_fixture(self): 5371 class MyQueryFactory(object): 5372 def __call__(self, *arg, **kw): 5373 return Query(*arg, **kw) 5374 5375 return MyQueryFactory() 5376 5377 def _plain_fixture(self): 5378 return Query 5379 5380 def _test_get(self, fixture): 5381 User = self.classes.User 5382 5383 s = Session(query_cls=fixture()) 5384 5385 assert s.query(User).get(19) is None 5386 u = s.query(User).get(7) 5387 u2 = s.query(User).get(7) 5388 assert u is u2 5389 5390 def _test_o2m_lazyload(self, fixture): 5391 User, Address = self.classes("User", "Address") 5392 5393 s = Session(query_cls=fixture()) 5394 5395 u1 = s.query(User).filter(User.id == 7).first() 5396 eq_(u1.addresses, [Address(id=1)]) 5397 5398 def _test_m2o_lazyload(self, fixture): 5399 User, Address = self.classes("User", "Address") 5400 5401 s = Session(query_cls=fixture()) 5402 5403 a1 = s.query(Address).filter(Address.id == 1).first() 5404 eq_(a1.user, User(id=7)) 5405 5406 def _test_expr(self, fixture): 5407 User, Address = self.classes("User", "Address") 5408 5409 s = Session(query_cls=fixture()) 5410 5411 q = s.query(func.max(User.id).label("max")) 5412 eq_(q.scalar(), 10) 5413 5414 def _test_expr_undocumented_query_constructor(self, fixture): 5415 # see #4269. not documented but already out there. 5416 User, Address = self.classes("User", "Address") 5417 5418 s = Session(query_cls=fixture()) 5419 5420 q = Query(func.max(User.id).label("max")).with_session(s) 5421 eq_(q.scalar(), 10) 5422 5423 def test_plain_get(self): 5424 self._test_get(self._plain_fixture) 5425 5426 def test_callable_get(self): 5427 self._test_get(self._callable_fixture) 5428 5429 def test_subclass_get(self): 5430 self._test_get(self._subclass_fixture) 5431 5432 def test_fn_get(self): 5433 self._test_get(self._fn_fixture) 5434 5435 def test_plain_expr(self): 5436 self._test_expr(self._plain_fixture) 5437 5438 def test_callable_expr(self): 5439 self._test_expr(self._callable_fixture) 5440 5441 def test_subclass_expr(self): 5442 self._test_expr(self._subclass_fixture) 5443 5444 def test_fn_expr(self): 5445 self._test_expr(self._fn_fixture) 5446 5447 def test_plain_expr_undocumented_query_constructor(self): 5448 self._test_expr_undocumented_query_constructor(self._plain_fixture) 5449 5450 def test_callable_expr_undocumented_query_constructor(self): 5451 self._test_expr_undocumented_query_constructor(self._callable_fixture) 5452 5453 def test_subclass_expr_undocumented_query_constructor(self): 5454 self._test_expr_undocumented_query_constructor(self._subclass_fixture) 5455 5456 def test_fn_expr_undocumented_query_constructor(self): 5457 self._test_expr_undocumented_query_constructor(self._fn_fixture) 5458 5459 def test_callable_o2m_lazyload(self): 5460 self._test_o2m_lazyload(self._callable_fixture) 5461 5462 def test_subclass_o2m_lazyload(self): 5463 self._test_o2m_lazyload(self._subclass_fixture) 5464 5465 def test_fn_o2m_lazyload(self): 5466 self._test_o2m_lazyload(self._fn_fixture) 5467 5468 def test_callable_m2o_lazyload(self): 5469 self._test_m2o_lazyload(self._callable_fixture) 5470 5471 def test_subclass_m2o_lazyload(self): 5472 self._test_m2o_lazyload(self._subclass_fixture) 5473 5474 def test_fn_m2o_lazyload(self): 5475 self._test_m2o_lazyload(self._fn_fixture) 5476