1from sqlalchemy import ( 2 testing, null, exists, text, union, literal, literal_column, func, between, 3 Unicode, desc, and_, bindparam, select, distinct, or_, collate, insert, 4 Integer, String, Boolean, exc as sa_exc, util, cast) 5from sqlalchemy.sql import operators, expression 6from sqlalchemy import column, table 7from sqlalchemy.engine import default 8from sqlalchemy.orm import ( 9 attributes, mapper, relationship, create_session, synonym, Session, 10 aliased, column_property, joinedload_all, joinedload, Query, Bundle, 11 subqueryload, backref, lazyload, defer) 12from sqlalchemy.testing.assertsql import CompiledSQL 13from sqlalchemy.testing.schema import Table, Column 14import sqlalchemy as sa 15from sqlalchemy.testing.assertions import ( 16 eq_, assert_raises, assert_raises_message, expect_warnings) 17from sqlalchemy.testing import fixtures, AssertsCompiledSQL, assert_warnings 18from test.orm import _fixtures 19from sqlalchemy.orm.util import join, with_parent 20import contextlib 21from sqlalchemy.testing import mock, is_, is_not_ 22from sqlalchemy import inspect 23 24 25class QueryTest(_fixtures.FixtureTest): 26 run_setup_mappers = 'once' 27 run_inserts = 'once' 28 run_deletes = None 29 30 @classmethod 31 def setup_mappers(cls): 32 cls._setup_stock_mapping() 33 34 35class MiscTest(QueryTest): 36 run_create_tables = None 37 run_inserts = None 38 39 def test_with_session(self): 40 User = self.classes.User 41 s1 = Session() 42 s2 = Session() 43 q1 = s1.query(User) 44 q2 = q1.with_session(s2) 45 assert q2.session is s2 46 assert q1.session is s1 47 48 49class RowTupleTest(QueryTest): 50 run_setup_mappers = None 51 52 def test_custom_names(self): 53 User, users = self.classes.User, self.tables.users 54 55 mapper(User, users, properties={'uname': users.c.name}) 56 57 row = create_session().query(User.id, User.uname).\ 58 filter(User.id == 7).first() 59 assert row.id == 7 60 assert row.uname == 'jack' 61 62 def test_column_metadata(self): 63 users, Address, addresses, User = (self.tables.users, 64 self.classes.Address, 65 self.tables.addresses, 66 self.classes.User) 67 68 mapper(User, users) 69 mapper(Address, addresses) 70 sess = create_session() 71 user_alias = aliased(User) 72 user_alias_id_label = user_alias.id.label('foo') 73 address_alias = aliased(Address, name='aalias') 74 fn = func.count(User.id) 75 name_label = User.name.label('uname') 76 bundle = Bundle('b1', User.id, User.name) 77 cte = sess.query(User.id).cte() 78 for q, asserted in [ 79 ( 80 sess.query(User), 81 [ 82 { 83 'name': 'User', 'type': User, 'aliased': False, 84 'expr': User, 'entity': User}] 85 ), 86 ( 87 sess.query(User.id, User), 88 [ 89 { 90 'name': 'id', 'type': users.c.id.type, 91 'aliased': False, 'expr': User.id, 'entity': User}, 92 { 93 'name': 'User', 'type': User, 'aliased': False, 94 'expr': User, 'entity': User} 95 ] 96 ), 97 ( 98 sess.query(User.id, user_alias), 99 [ 100 { 101 'name': 'id', 'type': users.c.id.type, 102 'aliased': False, 'expr': User.id, 'entity': User}, 103 { 104 'name': None, 'type': User, 'aliased': True, 105 'expr': user_alias, 'entity': user_alias} 106 ] 107 ), 108 ( 109 sess.query(user_alias.id), 110 [ 111 { 112 'name': 'id', 'type': users.c.id.type, 113 'aliased': True, 'expr': user_alias.id, 114 'entity': user_alias}, 115 ] 116 ), 117 ( 118 sess.query(user_alias_id_label), 119 [ 120 { 121 'name': 'foo', 'type': users.c.id.type, 122 'aliased': True, 'expr': user_alias_id_label, 123 'entity': user_alias}, 124 ] 125 ), 126 ( 127 sess.query(address_alias), 128 [ 129 { 130 'name': 'aalias', 'type': Address, 'aliased': True, 131 'expr': address_alias, 'entity': address_alias} 132 ] 133 ), 134 ( 135 sess.query(name_label, fn), 136 [ 137 { 138 'name': 'uname', 'type': users.c.name.type, 139 'aliased': False, 'expr': name_label, 'entity': User}, 140 { 141 'name': None, 'type': fn.type, 'aliased': False, 142 'expr': fn, 'entity': User}, 143 ] 144 ), 145 ( 146 sess.query(cte), 147 [ 148 { 149 'aliased': False, 150 'expr': cte.c.id, 'type': cte.c.id.type, 151 'name': 'id', 'entity': None 152 }] 153 ), 154 ( 155 sess.query(users), 156 [ 157 {'aliased': False, 158 'expr': users.c.id, 'type': users.c.id.type, 159 'name': 'id', 'entity': None}, 160 {'aliased': False, 161 'expr': users.c.name, 'type': users.c.name.type, 162 'name': 'name', 'entity': None} 163 ] 164 ), 165 ( 166 sess.query(users.c.name), 167 [{ 168 "name": "name", "type": users.c.name.type, 169 "aliased": False, "expr": users.c.name, "entity": None 170 }] 171 ), 172 ( 173 sess.query(bundle), 174 [ 175 { 176 'aliased': False, 177 'expr': bundle, 178 'type': Bundle, 179 'name': 'b1', 'entity': User 180 } 181 ] 182 ) 183 ]: 184 eq_( 185 q.column_descriptions, 186 asserted 187 ) 188 189 def test_unhashable_type(self): 190 from sqlalchemy.types import TypeDecorator, Integer 191 from sqlalchemy.sql import type_coerce 192 193 class MyType(TypeDecorator): 194 impl = Integer 195 hashable = False 196 197 def process_result_value(self, value, dialect): 198 return [value] 199 200 User, users = self.classes.User, self.tables.users 201 202 mapper(User, users) 203 204 s = Session() 205 q = s.query(User, type_coerce(users.c.id, MyType).label('foo')).\ 206 filter(User.id == 7) 207 row = q.first() 208 eq_( 209 row, (User(id=7), [7]) 210 ) 211 212 213class RawSelectTest(QueryTest, AssertsCompiledSQL): 214 __dialect__ = 'default' 215 216 def test_select_from_entity(self): 217 User = self.classes.User 218 219 self.assert_compile( 220 select(['*']).select_from(User), 221 "SELECT * FROM users" 222 ) 223 224 def test_where_relationship(self): 225 User = self.classes.User 226 227 self.assert_compile( 228 select([User]).where(User.addresses), 229 "SELECT users.id, users.name FROM users, addresses " 230 "WHERE users.id = addresses.user_id" 231 ) 232 233 def test_where_m2m_relationship(self): 234 Item = self.classes.Item 235 236 self.assert_compile( 237 select([Item]).where(Item.keywords), 238 "SELECT items.id, items.description FROM items, " 239 "item_keywords AS item_keywords_1, keywords " 240 "WHERE items.id = item_keywords_1.item_id " 241 "AND keywords.id = item_keywords_1.keyword_id" 242 ) 243 244 def test_inline_select_from_entity(self): 245 User = self.classes.User 246 247 self.assert_compile( 248 select(['*'], from_obj=User), 249 "SELECT * FROM users" 250 ) 251 252 def test_select_from_aliased_entity(self): 253 User = self.classes.User 254 ua = aliased(User, name="ua") 255 self.assert_compile( 256 select(['*']).select_from(ua), 257 "SELECT * FROM users AS ua" 258 ) 259 260 def test_correlate_entity(self): 261 User = self.classes.User 262 Address = self.classes.Address 263 264 self.assert_compile( 265 select( 266 [ 267 User.name, Address.id, 268 select([func.count(Address.id)]). 269 where(User.id == Address.user_id). 270 correlate(User).as_scalar()]), 271 "SELECT users.name, addresses.id, " 272 "(SELECT count(addresses.id) AS count_1 " 273 "FROM addresses WHERE users.id = addresses.user_id) AS anon_1 " 274 "FROM users, addresses" 275 ) 276 277 def test_correlate_aliased_entity(self): 278 User = self.classes.User 279 Address = self.classes.Address 280 uu = aliased(User, name="uu") 281 282 self.assert_compile( 283 select( 284 [ 285 uu.name, Address.id, 286 select([func.count(Address.id)]). 287 where(uu.id == Address.user_id). 288 correlate(uu).as_scalar()]), 289 # for a long time, "uu.id = address.user_id" was reversed; 290 # this was resolved as of #2872 and had to do with 291 # InstrumentedAttribute.__eq__() taking precedence over 292 # QueryableAttribute.__eq__() 293 "SELECT uu.name, addresses.id, " 294 "(SELECT count(addresses.id) AS count_1 " 295 "FROM addresses WHERE uu.id = addresses.user_id) AS anon_1 " 296 "FROM users AS uu, addresses" 297 ) 298 299 def test_columns_clause_entity(self): 300 User = self.classes.User 301 302 self.assert_compile( 303 select([User]), 304 "SELECT users.id, users.name FROM users" 305 ) 306 307 def test_columns_clause_columns(self): 308 User = self.classes.User 309 310 self.assert_compile( 311 select([User.id, User.name]), 312 "SELECT users.id, users.name FROM users" 313 ) 314 315 def test_columns_clause_aliased_columns(self): 316 User = self.classes.User 317 ua = aliased(User, name='ua') 318 self.assert_compile( 319 select([ua.id, ua.name]), 320 "SELECT ua.id, ua.name FROM users AS ua" 321 ) 322 323 def test_columns_clause_aliased_entity(self): 324 User = self.classes.User 325 ua = aliased(User, name='ua') 326 self.assert_compile( 327 select([ua]), 328 "SELECT ua.id, ua.name FROM users AS ua" 329 ) 330 331 def test_core_join(self): 332 User = self.classes.User 333 Address = self.classes.Address 334 from sqlalchemy.sql import join 335 self.assert_compile( 336 select([User]).select_from(join(User, Address)), 337 "SELECT users.id, users.name FROM users " 338 "JOIN addresses ON users.id = addresses.user_id" 339 ) 340 341 def test_insert_from_query(self): 342 User = self.classes.User 343 Address = self.classes.Address 344 345 s = Session() 346 q = s.query(User.id, User.name).filter_by(name='ed') 347 self.assert_compile( 348 insert(Address).from_select(('id', 'email_address'), q), 349 "INSERT INTO addresses (id, email_address) " 350 "SELECT users.id AS users_id, users.name AS users_name " 351 "FROM users WHERE users.name = :name_1" 352 ) 353 354 def test_insert_from_query_col_attr(self): 355 User = self.classes.User 356 Address = self.classes.Address 357 358 s = Session() 359 q = s.query(User.id, User.name).filter_by(name='ed') 360 self.assert_compile( 361 insert(Address).from_select( 362 (Address.id, Address.email_address), q), 363 "INSERT INTO addresses (id, email_address) " 364 "SELECT users.id AS users_id, users.name AS users_name " 365 "FROM users WHERE users.name = :name_1" 366 ) 367 368 def test_update_from_entity(self): 369 from sqlalchemy.sql import update 370 User = self.classes.User 371 self.assert_compile( 372 update(User), 373 "UPDATE users SET id=:id, name=:name" 374 ) 375 376 self.assert_compile( 377 update(User).values(name='ed').where(User.id == 5), 378 "UPDATE users SET name=:name WHERE users.id = :id_1", 379 checkparams={"id_1": 5, "name": "ed"} 380 ) 381 382 def test_delete_from_entity(self): 383 from sqlalchemy.sql import delete 384 User = self.classes.User 385 self.assert_compile( 386 delete(User), 387 "DELETE FROM users" 388 ) 389 390 self.assert_compile( 391 delete(User).where(User.id == 5), 392 "DELETE FROM users WHERE users.id = :id_1", 393 checkparams={"id_1": 5} 394 ) 395 396 def test_insert_from_entity(self): 397 from sqlalchemy.sql import insert 398 User = self.classes.User 399 self.assert_compile( 400 insert(User), 401 "INSERT INTO users (id, name) VALUES (:id, :name)" 402 ) 403 404 self.assert_compile( 405 insert(User).values(name="ed"), 406 "INSERT INTO users (name) VALUES (:name)", 407 checkparams={"name": "ed"} 408 ) 409 410 def test_col_prop_builtin_function(self): 411 class Foo(object): 412 pass 413 414 mapper( 415 Foo, self.tables.users, properties={ 416 'foob': column_property( 417 func.coalesce(self.tables.users.c.name)) 418 }) 419 420 self.assert_compile( 421 select([Foo]).where(Foo.foob == 'somename').order_by(Foo.foob), 422 "SELECT users.id, users.name FROM users " 423 "WHERE coalesce(users.name) = :param_1 " 424 "ORDER BY coalesce(users.name)" 425 ) 426 427 428class GetTest(QueryTest): 429 def test_get(self): 430 User = self.classes.User 431 432 s = create_session() 433 assert s.query(User).get(19) is None 434 u = s.query(User).get(7) 435 u2 = s.query(User).get(7) 436 assert u is u2 437 s.expunge_all() 438 u2 = s.query(User).get(7) 439 assert u is not u2 440 441 def test_get_composite_pk_no_result(self): 442 CompositePk = self.classes.CompositePk 443 444 s = Session() 445 assert s.query(CompositePk).get((100, 100)) is None 446 447 def test_get_composite_pk_result(self): 448 CompositePk = self.classes.CompositePk 449 450 s = Session() 451 one_two = s.query(CompositePk).get((1, 2)) 452 assert one_two.i == 1 453 assert one_two.j == 2 454 assert one_two.k == 3 455 456 def test_get_too_few_params(self): 457 CompositePk = self.classes.CompositePk 458 459 s = Session() 460 q = s.query(CompositePk) 461 assert_raises(sa_exc.InvalidRequestError, q.get, 7) 462 463 def test_get_too_few_params_tuple(self): 464 CompositePk = self.classes.CompositePk 465 466 s = Session() 467 q = s.query(CompositePk) 468 assert_raises(sa_exc.InvalidRequestError, q.get, (7,)) 469 470 def test_get_too_many_params(self): 471 CompositePk = self.classes.CompositePk 472 473 s = Session() 474 q = s.query(CompositePk) 475 assert_raises(sa_exc.InvalidRequestError, q.get, (7, 10, 100)) 476 477 def test_get_against_col(self): 478 User = self.classes.User 479 480 s = Session() 481 q = s.query(User.id) 482 assert_raises(sa_exc.InvalidRequestError, q.get, (5, )) 483 484 def test_get_null_pk(self): 485 """test that a mapping which can have None in a 486 PK (i.e. map to an outerjoin) works with get().""" 487 488 users, addresses = self.tables.users, self.tables.addresses 489 490 s = users.outerjoin(addresses) 491 492 class UserThing(fixtures.ComparableEntity): 493 pass 494 495 mapper( 496 UserThing, s, properties={ 497 'id': (users.c.id, addresses.c.user_id), 498 'address_id': addresses.c.id, 499 }) 500 sess = create_session() 501 u10 = sess.query(UserThing).get((10, None)) 502 eq_(u10, UserThing(id=10)) 503 504 def test_no_criterion(self): 505 """test that get()/load() does not use preexisting filter/etc. 506 criterion""" 507 508 User, Address = self.classes.User, self.classes.Address 509 510 s = create_session() 511 512 q = s.query(User).join('addresses').filter(Address.user_id == 8) 513 assert_raises(sa_exc.InvalidRequestError, q.get, 7) 514 assert_raises( 515 sa_exc.InvalidRequestError, 516 s.query(User).filter(User.id == 7).get, 19) 517 518 # order_by()/get() doesn't raise 519 s.query(User).order_by(User.id).get(8) 520 521 def test_no_criterion_when_already_loaded(self): 522 """test that get()/load() does not use preexisting filter/etc. 523 criterion, even when we're only using the identity map.""" 524 525 User, Address = self.classes.User, self.classes.Address 526 527 s = create_session() 528 529 s.query(User).get(7) 530 531 q = s.query(User).join('addresses').filter(Address.user_id == 8) 532 assert_raises(sa_exc.InvalidRequestError, q.get, 7) 533 534 def test_unique_param_names(self): 535 users = self.tables.users 536 537 class SomeUser(object): 538 pass 539 s = users.select(users.c.id != 12).alias('users') 540 m = mapper(SomeUser, s) 541 assert s.primary_key == m.primary_key 542 543 sess = create_session() 544 assert sess.query(SomeUser).get(7).name == 'jack' 545 546 def test_load(self): 547 User, Address = self.classes.User, self.classes.Address 548 549 s = create_session() 550 551 assert s.query(User).populate_existing().get(19) is None 552 553 u = s.query(User).populate_existing().get(7) 554 u2 = s.query(User).populate_existing().get(7) 555 assert u is u2 556 s.expunge_all() 557 u2 = s.query(User).populate_existing().get(7) 558 assert u is not u2 559 560 u2.name = 'some name' 561 a = Address(email_address='some other name') 562 u2.addresses.append(a) 563 assert u2 in s.dirty 564 assert a in u2.addresses 565 566 s.query(User).populate_existing().get(7) 567 assert u2 not in s.dirty 568 assert u2.name == 'jack' 569 assert a not in u2.addresses 570 571 @testing.provide_metadata 572 @testing.requires.unicode_connections 573 def test_unicode(self): 574 """test that Query.get properly sets up the type for the bind 575 parameter. using unicode would normally fail on postgresql, mysql and 576 oracle unless it is converted to an encoded string""" 577 578 metadata = self.metadata 579 table = Table( 580 'unicode_data', metadata, 581 Column( 582 'id', Unicode(40), primary_key=True, 583 test_needs_autoincrement=True), 584 Column('data', Unicode(40))) 585 metadata.create_all() 586 ustring = util.b('petit voix m\xe2\x80\x99a').decode('utf-8') 587 588 table.insert().execute(id=ustring, data=ustring) 589 590 class LocalFoo(self.classes.Base): 591 pass 592 mapper(LocalFoo, table) 593 eq_( 594 create_session().query(LocalFoo).get(ustring), 595 LocalFoo(id=ustring, data=ustring)) 596 597 def test_populate_existing(self): 598 User, Address = self.classes.User, self.classes.Address 599 600 s = create_session() 601 602 userlist = s.query(User).all() 603 604 u = userlist[0] 605 u.name = 'foo' 606 a = Address(name='ed') 607 u.addresses.append(a) 608 609 self.assert_(a in u.addresses) 610 611 s.query(User).populate_existing().all() 612 613 self.assert_(u not in s.dirty) 614 615 self.assert_(u.name == 'jack') 616 617 self.assert_(a not in u.addresses) 618 619 u.addresses[0].email_address = 'lala' 620 u.orders[1].items[2].description = 'item 12' 621 # test that lazy load doesn't change child items 622 s.query(User).populate_existing().all() 623 assert u.addresses[0].email_address == 'lala' 624 assert u.orders[1].items[2].description == 'item 12' 625 626 # eager load does 627 s.query(User). \ 628 options(joinedload('addresses'), joinedload_all('orders.items')). \ 629 populate_existing().all() 630 assert u.addresses[0].email_address == 'jack@bean.com' 631 assert u.orders[1].items[2].description == 'item 5' 632 633 634class InvalidGenerationsTest(QueryTest, AssertsCompiledSQL): 635 def test_no_limit_offset(self): 636 User = self.classes.User 637 638 s = create_session() 639 640 for q in ( 641 s.query(User).limit(2), 642 s.query(User).offset(2), 643 s.query(User).limit(2).offset(2) 644 ): 645 assert_raises(sa_exc.InvalidRequestError, q.join, "addresses") 646 647 assert_raises( 648 sa_exc.InvalidRequestError, q.filter, User.name == 'ed') 649 650 assert_raises(sa_exc.InvalidRequestError, q.filter_by, name='ed') 651 652 assert_raises(sa_exc.InvalidRequestError, q.order_by, 'foo') 653 654 assert_raises(sa_exc.InvalidRequestError, q.group_by, 'foo') 655 656 assert_raises(sa_exc.InvalidRequestError, q.having, 'foo') 657 658 q.enable_assertions(False).join("addresses") 659 q.enable_assertions(False).filter(User.name == 'ed') 660 q.enable_assertions(False).order_by('foo') 661 q.enable_assertions(False).group_by('foo') 662 663 def test_no_from(self): 664 users, User = self.tables.users, self.classes.User 665 666 s = create_session() 667 668 q = s.query(User).select_from(users) 669 assert_raises(sa_exc.InvalidRequestError, q.select_from, users) 670 671 q = s.query(User).join('addresses') 672 assert_raises(sa_exc.InvalidRequestError, q.select_from, users) 673 674 q = s.query(User).order_by(User.id) 675 assert_raises(sa_exc.InvalidRequestError, q.select_from, users) 676 677 assert_raises(sa_exc.InvalidRequestError, q.select_from, users) 678 679 q.enable_assertions(False).select_from(users) 680 681 # this is fine, however 682 q.from_self() 683 684 def test_invalid_select_from(self): 685 User = self.classes.User 686 687 s = create_session() 688 q = s.query(User) 689 assert_raises(sa_exc.ArgumentError, q.select_from, User.id == 5) 690 assert_raises(sa_exc.ArgumentError, q.select_from, User.id) 691 692 def test_invalid_from_statement(self): 693 User, addresses, users = (self.classes.User, 694 self.tables.addresses, 695 self.tables.users) 696 697 s = create_session() 698 q = s.query(User) 699 assert_raises(sa_exc.ArgumentError, q.from_statement, User.id == 5) 700 assert_raises( 701 sa_exc.ArgumentError, q.from_statement, users.join(addresses)) 702 703 def test_invalid_column(self): 704 User = self.classes.User 705 706 s = create_session() 707 q = s.query(User) 708 assert_raises(sa_exc.InvalidRequestError, q.add_column, object()) 709 710 def test_invalid_column_tuple(self): 711 User = self.classes.User 712 713 s = create_session() 714 q = s.query(User) 715 assert_raises(sa_exc.InvalidRequestError, q.add_column, (1, 1)) 716 717 def test_distinct(self): 718 """test that a distinct() call is not valid before 'clauseelement' 719 conditions.""" 720 721 User = self.classes.User 722 723 s = create_session() 724 q = s.query(User).distinct() 725 assert_raises(sa_exc.InvalidRequestError, q.select_from, User) 726 assert_raises( 727 sa_exc.InvalidRequestError, q.from_statement, 728 text("select * from table")) 729 assert_raises(sa_exc.InvalidRequestError, q.with_polymorphic, User) 730 731 def test_order_by(self): 732 """test that an order_by() call is not valid before 'clauseelement' 733 conditions.""" 734 735 User = self.classes.User 736 737 s = create_session() 738 q = s.query(User).order_by(User.id) 739 assert_raises(sa_exc.InvalidRequestError, q.select_from, User) 740 assert_raises( 741 sa_exc.InvalidRequestError, q.from_statement, 742 text("select * from table")) 743 assert_raises(sa_exc.InvalidRequestError, q.with_polymorphic, User) 744 745 def test_mapper_zero(self): 746 User, Address = self.classes.User, self.classes.Address 747 748 s = create_session() 749 750 q = s.query(User, Address) 751 assert_raises(sa_exc.InvalidRequestError, q.get, 5) 752 753 def test_from_statement(self): 754 User = self.classes.User 755 756 s = create_session() 757 758 for meth, arg, kw in [ 759 (Query.filter, (User.id == 5,), {}), 760 (Query.filter_by, (), {'id': 5}), 761 (Query.limit, (5, ), {}), 762 (Query.group_by, (User.name,), {}), 763 (Query.order_by, (User.name,), {}) 764 ]: 765 q = s.query(User) 766 q = meth(q, *arg, **kw) 767 assert_raises( 768 sa_exc.InvalidRequestError, 769 q.from_statement, text("x") 770 ) 771 772 q = s.query(User) 773 q = q.from_statement(text("x")) 774 assert_raises( 775 sa_exc.InvalidRequestError, 776 meth, q, *arg, **kw 777 ) 778 779 780class OperatorTest(QueryTest, AssertsCompiledSQL): 781 """test sql.Comparator implementation for MapperProperties""" 782 783 __dialect__ = 'default' 784 785 def _test(self, clause, expected, entity=None, checkparams=None): 786 dialect = default.DefaultDialect() 787 if entity is not None: 788 # specify a lead entity, so that when we are testing 789 # correlation, the correlation actually happens 790 sess = Session() 791 lead = sess.query(entity) 792 context = lead._compile_context() 793 context.statement.use_labels = True 794 lead = context.statement.compile(dialect=dialect) 795 expected = (str(lead) + " WHERE " + expected).replace("\n", "") 796 clause = sess.query(entity).filter(clause) 797 self.assert_compile(clause, expected, checkparams=checkparams) 798 799 def _test_filter_aliases( 800 self, 801 clause, expected, from_, onclause, checkparams=None): 802 dialect = default.DefaultDialect() 803 sess = Session() 804 lead = sess.query(from_).join(onclause, aliased=True) 805 full = lead.filter(clause) 806 context = lead._compile_context() 807 context.statement.use_labels = True 808 lead = context.statement.compile(dialect=dialect) 809 expected = (str(lead) + " WHERE " + expected).replace("\n", "") 810 811 self.assert_compile(full, expected, checkparams=checkparams) 812 813 def test_arithmetic(self): 814 User = self.classes.User 815 816 create_session().query(User) 817 for (py_op, sql_op) in ((operators.add, '+'), (operators.mul, '*'), 818 (operators.sub, '-'), 819 (operators.truediv, '/'), 820 (operators.div, '/'), 821 ): 822 for (lhs, rhs, res) in ( 823 (5, User.id, ':id_1 %s users.id'), 824 (5, literal(6), ':param_1 %s :param_2'), 825 (User.id, 5, 'users.id %s :id_1'), 826 (User.id, literal('b'), 'users.id %s :param_1'), 827 (User.id, User.id, 'users.id %s users.id'), 828 (literal(5), 'b', ':param_1 %s :param_2'), 829 (literal(5), User.id, ':param_1 %s users.id'), 830 (literal(5), literal(6), ':param_1 %s :param_2'), 831 ): 832 self._test(py_op(lhs, rhs), res % sql_op) 833 834 def test_comparison(self): 835 User = self.classes.User 836 837 create_session().query(User) 838 ualias = aliased(User) 839 840 for (py_op, fwd_op, rev_op) in ((operators.lt, '<', '>'), 841 (operators.gt, '>', '<'), 842 (operators.eq, '=', '='), 843 (operators.ne, '!=', '!='), 844 (operators.le, '<=', '>='), 845 (operators.ge, '>=', '<=')): 846 for (lhs, rhs, l_sql, r_sql) in ( 847 ('a', User.id, ':id_1', 'users.id'), 848 ('a', literal('b'), ':param_2', ':param_1'), # note swap! 849 (User.id, 'b', 'users.id', ':id_1'), 850 (User.id, literal('b'), 'users.id', ':param_1'), 851 (User.id, User.id, 'users.id', 'users.id'), 852 (literal('a'), 'b', ':param_1', ':param_2'), 853 (literal('a'), User.id, ':param_1', 'users.id'), 854 (literal('a'), literal('b'), ':param_1', ':param_2'), 855 (ualias.id, literal('b'), 'users_1.id', ':param_1'), 856 (User.id, ualias.name, 'users.id', 'users_1.name'), 857 (User.name, ualias.name, 'users.name', 'users_1.name'), 858 (ualias.name, User.name, 'users_1.name', 'users.name'), 859 ): 860 861 # the compiled clause should match either (e.g.): 862 # 'a' < 'b' -or- 'b' > 'a'. 863 compiled = str(py_op(lhs, rhs).compile( 864 dialect=default.DefaultDialect())) 865 fwd_sql = "%s %s %s" % (l_sql, fwd_op, r_sql) 866 rev_sql = "%s %s %s" % (r_sql, rev_op, l_sql) 867 868 self.assert_(compiled == fwd_sql or compiled == rev_sql, 869 "\n'" + compiled + "'\n does not match\n'" + 870 fwd_sql + "'\n or\n'" + rev_sql + "'") 871 872 def test_o2m_compare_to_null(self): 873 User = self.classes.User 874 875 self._test(User.id == None, "users.id IS NULL") 876 self._test(User.id != None, "users.id IS NOT NULL") 877 self._test(~(User.id == None), "users.id IS NOT NULL") 878 self._test(~(User.id != None), "users.id IS NULL") 879 self._test(None == User.id, "users.id IS NULL") 880 self._test(~(None == User.id), "users.id IS NOT NULL") 881 882 def test_m2o_compare_to_null(self): 883 Address = self.classes.Address 884 self._test(Address.user == None, "addresses.user_id IS NULL") 885 self._test(~(Address.user == None), "addresses.user_id IS NOT NULL") 886 self._test(~(Address.user != None), "addresses.user_id IS NULL") 887 self._test(None == Address.user, "addresses.user_id IS NULL") 888 self._test(~(None == Address.user), "addresses.user_id IS NOT NULL") 889 890 def test_o2m_compare_to_null_orm_adapt(self): 891 User, Address = self.classes.User, self.classes.Address 892 self._test_filter_aliases( 893 User.id == None, 894 "users_1.id IS NULL", Address, Address.user), 895 self._test_filter_aliases( 896 User.id != None, 897 "users_1.id IS NOT NULL", Address, Address.user), 898 self._test_filter_aliases( 899 ~(User.id == None), 900 "users_1.id IS NOT NULL", Address, Address.user), 901 self._test_filter_aliases( 902 ~(User.id != None), 903 "users_1.id IS NULL", Address, Address.user), 904 905 def test_m2o_compare_to_null_orm_adapt(self): 906 User, Address = self.classes.User, self.classes.Address 907 self._test_filter_aliases( 908 Address.user == None, 909 "addresses_1.user_id IS NULL", User, User.addresses), 910 self._test_filter_aliases( 911 Address.user != None, 912 "addresses_1.user_id IS NOT NULL", User, User.addresses), 913 self._test_filter_aliases( 914 ~(Address.user == None), 915 "addresses_1.user_id IS NOT NULL", User, User.addresses), 916 self._test_filter_aliases( 917 ~(Address.user != None), 918 "addresses_1.user_id IS NULL", User, User.addresses), 919 920 def test_o2m_compare_to_null_aliased(self): 921 User = self.classes.User 922 u1 = aliased(User) 923 self._test(u1.id == None, "users_1.id IS NULL") 924 self._test(u1.id != None, "users_1.id IS NOT NULL") 925 self._test(~(u1.id == None), "users_1.id IS NOT NULL") 926 self._test(~(u1.id != None), "users_1.id IS NULL") 927 928 def test_m2o_compare_to_null_aliased(self): 929 Address = self.classes.Address 930 a1 = aliased(Address) 931 self._test(a1.user == None, "addresses_1.user_id IS NULL") 932 self._test(~(a1.user == None), "addresses_1.user_id IS NOT NULL") 933 self._test(a1.user != None, "addresses_1.user_id IS NOT NULL") 934 self._test(~(a1.user != None), "addresses_1.user_id IS NULL") 935 936 def test_relationship_unimplemented(self): 937 User = self.classes.User 938 for op in [ 939 User.addresses.like, 940 User.addresses.ilike, 941 User.addresses.__le__, 942 User.addresses.__gt__, 943 ]: 944 assert_raises(NotImplementedError, op, "x") 945 946 def test_o2m_any(self): 947 User, Address = self.classes.User, self.classes.Address 948 self._test( 949 User.addresses.any(Address.id == 17), 950 "EXISTS (SELECT 1 FROM addresses " 951 "WHERE users.id = addresses.user_id AND addresses.id = :id_1)", 952 entity=User 953 ) 954 955 def test_o2m_any_aliased(self): 956 User, Address = self.classes.User, self.classes.Address 957 u1 = aliased(User) 958 a1 = aliased(Address) 959 self._test( 960 u1.addresses.of_type(a1).any(a1.id == 17), 961 "EXISTS (SELECT 1 FROM addresses AS addresses_1 " 962 "WHERE users_1.id = addresses_1.user_id AND " 963 "addresses_1.id = :id_1)", 964 entity=u1 965 ) 966 967 def test_o2m_any_orm_adapt(self): 968 User, Address = self.classes.User, self.classes.Address 969 self._test_filter_aliases( 970 User.addresses.any(Address.id == 17), 971 "EXISTS (SELECT 1 FROM addresses " 972 "WHERE users_1.id = addresses.user_id AND addresses.id = :id_1)", 973 Address, Address.user 974 ) 975 976 def test_m2o_compare_instance(self): 977 User, Address = self.classes.User, self.classes.Address 978 u7 = User(id=5) 979 attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7)) 980 u7.id = 7 981 982 self._test(Address.user == u7, ":param_1 = addresses.user_id") 983 984 def test_m2o_compare_instance_negated(self): 985 User, Address = self.classes.User, self.classes.Address 986 u7 = User(id=5) 987 attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7)) 988 u7.id = 7 989 990 self._test( 991 Address.user != u7, 992 "addresses.user_id != :user_id_1 OR addresses.user_id IS NULL", 993 checkparams={'user_id_1': 7}) 994 995 def test_m2o_compare_instance_orm_adapt(self): 996 User, Address = self.classes.User, self.classes.Address 997 u7 = User(id=5) 998 attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7)) 999 u7.id = 7 1000 1001 self._test_filter_aliases( 1002 Address.user == u7, 1003 ":param_1 = addresses_1.user_id", User, User.addresses, 1004 checkparams={'param_1': 7} 1005 ) 1006 1007 def test_m2o_compare_instance_negated_warn_on_none(self): 1008 User, Address = self.classes.User, self.classes.Address 1009 1010 u7_transient = User(id=None) 1011 1012 with expect_warnings("Got None for value of column users.id; "): 1013 self._test_filter_aliases( 1014 Address.user != u7_transient, 1015 "addresses_1.user_id != :user_id_1 " 1016 "OR addresses_1.user_id IS NULL", 1017 User, User.addresses, 1018 checkparams={'user_id_1': None} 1019 ) 1020 1021 def test_m2o_compare_instance_negated_orm_adapt(self): 1022 User, Address = self.classes.User, self.classes.Address 1023 u7 = User(id=5) 1024 attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7)) 1025 u7.id = 7 1026 1027 u7_transient = User(id=7) 1028 1029 self._test_filter_aliases( 1030 Address.user != u7, 1031 "addresses_1.user_id != :user_id_1 OR addresses_1.user_id IS NULL", 1032 User, User.addresses, 1033 checkparams={'user_id_1': 7} 1034 ) 1035 1036 self._test_filter_aliases( 1037 ~(Address.user == u7), ":param_1 != addresses_1.user_id", 1038 User, User.addresses, 1039 checkparams={'param_1': 7} 1040 ) 1041 1042 self._test_filter_aliases( 1043 ~(Address.user != u7), 1044 "NOT (addresses_1.user_id != :user_id_1 " 1045 "OR addresses_1.user_id IS NULL)", User, User.addresses, 1046 checkparams={'user_id_1': 7} 1047 ) 1048 1049 self._test_filter_aliases( 1050 Address.user != u7_transient, 1051 "addresses_1.user_id != :user_id_1 OR addresses_1.user_id IS NULL", 1052 User, User.addresses, 1053 checkparams={'user_id_1': 7} 1054 ) 1055 1056 self._test_filter_aliases( 1057 ~(Address.user == u7_transient), ":param_1 != addresses_1.user_id", 1058 User, User.addresses, 1059 checkparams={'param_1': 7} 1060 ) 1061 1062 self._test_filter_aliases( 1063 ~(Address.user != u7_transient), 1064 "NOT (addresses_1.user_id != :user_id_1 " 1065 "OR addresses_1.user_id IS NULL)", User, User.addresses, 1066 checkparams={'user_id_1': 7} 1067 ) 1068 1069 def test_m2o_compare_instance_aliased(self): 1070 User, Address = self.classes.User, self.classes.Address 1071 u7 = User(id=5) 1072 attributes.instance_state(u7)._commit_all(attributes.instance_dict(u7)) 1073 u7.id = 7 1074 1075 u7_transient = User(id=7) 1076 1077 a1 = aliased(Address) 1078 self._test( 1079 a1.user == u7, 1080 ":param_1 = addresses_1.user_id", 1081 checkparams={'param_1': 7}) 1082 1083 self._test( 1084 a1.user != u7, 1085 "addresses_1.user_id != :user_id_1 OR addresses_1.user_id IS NULL", 1086 checkparams={'user_id_1': 7}) 1087 1088 a1 = aliased(Address) 1089 self._test( 1090 a1.user == u7_transient, 1091 ":param_1 = addresses_1.user_id", 1092 checkparams={'param_1': 7}) 1093 1094 self._test( 1095 a1.user != u7_transient, 1096 "addresses_1.user_id != :user_id_1 OR addresses_1.user_id IS NULL", 1097 checkparams={'user_id_1': 7}) 1098 1099 def test_selfref_relationship(self): 1100 1101 Node = self.classes.Node 1102 1103 nalias = aliased(Node) 1104 1105 # auto self-referential aliasing 1106 self._test( 1107 Node.children.any(Node.data == 'n1'), 1108 "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE " 1109 "nodes.id = nodes_1.parent_id AND nodes_1.data = :data_1)", 1110 entity=Node, 1111 checkparams={'data_1': 'n1'} 1112 ) 1113 1114 # needs autoaliasing 1115 self._test( 1116 Node.children == None, 1117 "NOT (EXISTS (SELECT 1 FROM nodes AS nodes_1 " 1118 "WHERE nodes.id = nodes_1.parent_id))", 1119 entity=Node, 1120 checkparams={} 1121 ) 1122 1123 self._test( 1124 Node.parent == None, 1125 "nodes.parent_id IS NULL", 1126 checkparams={} 1127 ) 1128 1129 self._test( 1130 nalias.parent == None, 1131 "nodes_1.parent_id IS NULL", 1132 checkparams={} 1133 ) 1134 1135 self._test( 1136 nalias.parent != None, 1137 "nodes_1.parent_id IS NOT NULL", 1138 checkparams={} 1139 ) 1140 1141 self._test( 1142 nalias.children == None, 1143 "NOT (EXISTS (" 1144 "SELECT 1 FROM nodes WHERE nodes_1.id = nodes.parent_id))", 1145 entity=nalias, 1146 checkparams={} 1147 ) 1148 1149 self._test( 1150 nalias.children.any(Node.data == 'some data'), 1151 "EXISTS (SELECT 1 FROM nodes WHERE " 1152 "nodes_1.id = nodes.parent_id AND nodes.data = :data_1)", 1153 entity=nalias, 1154 checkparams={'data_1': 'some data'} 1155 ) 1156 1157 # this fails because self-referential any() is auto-aliasing; 1158 # the fact that we use "nalias" here means we get two aliases. 1159 #self._test( 1160 # Node.children.any(nalias.data == 'some data'), 1161 # "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE " 1162 # "nodes.id = nodes_1.parent_id AND nodes_1.data = :data_1)", 1163 # entity=Node 1164 # ) 1165 1166 self._test( 1167 nalias.parent.has(Node.data == 'some data'), 1168 "EXISTS (SELECT 1 FROM nodes WHERE nodes.id = nodes_1.parent_id " 1169 "AND nodes.data = :data_1)", 1170 entity=nalias, 1171 checkparams={'data_1': 'some data'} 1172 ) 1173 1174 self._test( 1175 Node.parent.has(Node.data == 'some data'), 1176 "EXISTS (SELECT 1 FROM nodes AS nodes_1 WHERE " 1177 "nodes_1.id = nodes.parent_id AND nodes_1.data = :data_1)", 1178 entity=Node, 1179 checkparams={'data_1': 'some data'} 1180 ) 1181 1182 self._test( 1183 Node.parent == Node(id=7), 1184 ":param_1 = nodes.parent_id", 1185 checkparams={"param_1": 7} 1186 ) 1187 1188 self._test( 1189 nalias.parent == Node(id=7), 1190 ":param_1 = nodes_1.parent_id", 1191 checkparams={"param_1": 7} 1192 ) 1193 1194 self._test( 1195 nalias.parent != Node(id=7), 1196 'nodes_1.parent_id != :parent_id_1 ' 1197 'OR nodes_1.parent_id IS NULL', 1198 checkparams={"parent_id_1": 7} 1199 ) 1200 1201 self._test( 1202 nalias.parent != Node(id=7), 1203 'nodes_1.parent_id != :parent_id_1 ' 1204 'OR nodes_1.parent_id IS NULL', 1205 checkparams={"parent_id_1": 7} 1206 ) 1207 1208 self._test( 1209 nalias.children.contains(Node(id=7, parent_id=12)), 1210 "nodes_1.id = :param_1", 1211 checkparams={"param_1": 12} 1212 ) 1213 1214 def test_multilevel_any(self): 1215 User, Address, Dingaling = \ 1216 self.classes.User, self.classes.Address, self.classes.Dingaling 1217 sess = Session() 1218 1219 q = sess.query(User).filter( 1220 User.addresses.any( 1221 and_(Address.id == Dingaling.address_id, 1222 Dingaling.data == 'x'))) 1223 # new since #2746 - correlate_except() now takes context into account 1224 # so its usage in any() is not as disrupting. 1225 self.assert_compile( 1226 q, 1227 "SELECT users.id AS users_id, users.name AS users_name " 1228 "FROM users " 1229 "WHERE EXISTS (SELECT 1 " 1230 "FROM addresses, dingalings " 1231 "WHERE users.id = addresses.user_id AND " 1232 "addresses.id = dingalings.address_id AND " 1233 "dingalings.data = :data_1)" 1234 ) 1235 1236 def test_op(self): 1237 User = self.classes.User 1238 1239 self._test(User.name.op('ilike')('17'), "users.name ilike :name_1") 1240 1241 def test_in(self): 1242 User = self.classes.User 1243 1244 self._test(User.id.in_(['a', 'b']), "users.id IN (:id_1, :id_2)") 1245 1246 def test_in_on_relationship_not_supported(self): 1247 User, Address = self.classes.User, self.classes.Address 1248 1249 assert_raises(NotImplementedError, Address.user.in_, [User(id=5)]) 1250 1251 def test_neg(self): 1252 User = self.classes.User 1253 1254 self._test(-User.id, "-users.id") 1255 self._test(User.id + -User.id, "users.id + -users.id") 1256 1257 def test_between(self): 1258 User = self.classes.User 1259 1260 self._test( 1261 User.id.between('a', 'b'), "users.id BETWEEN :id_1 AND :id_2") 1262 1263 def test_collate(self): 1264 User = self.classes.User 1265 1266 self._test(collate(User.id, 'binary'), "users.id COLLATE binary") 1267 1268 self._test(User.id.collate('binary'), "users.id COLLATE binary") 1269 1270 def test_selfref_between(self): 1271 User = self.classes.User 1272 1273 ualias = aliased(User) 1274 self._test( 1275 User.id.between(ualias.id, ualias.id), 1276 "users.id BETWEEN users_1.id AND users_1.id") 1277 self._test( 1278 ualias.id.between(User.id, User.id), 1279 "users_1.id BETWEEN users.id AND users.id") 1280 1281 def test_clauses(self): 1282 User, Address = self.classes.User, self.classes.Address 1283 1284 for (expr, compare) in ( 1285 (func.max(User.id), "max(users.id)"), 1286 (User.id.desc(), "users.id DESC"), 1287 (between(5, User.id, Address.id), 1288 ":param_1 BETWEEN users.id AND addresses.id"), 1289 # this one would require adding compile() to 1290 # InstrumentedScalarAttribute. do we want this ? 1291 # (User.id, "users.id") 1292 ): 1293 c = expr.compile(dialect=default.DefaultDialect()) 1294 assert str(c) == compare, "%s != %s" % (str(c), compare) 1295 1296 1297class ExpressionTest(QueryTest, AssertsCompiledSQL): 1298 __dialect__ = 'default' 1299 1300 def test_deferred_instances(self): 1301 User, addresses, Address = (self.classes.User, 1302 self.tables.addresses, 1303 self.classes.Address) 1304 1305 session = create_session() 1306 s = session.query(User).filter( 1307 and_(addresses.c.email_address == bindparam('emailad'), 1308 Address.user_id == User.id)).statement 1309 1310 l = list( 1311 session.query(User).instances(s.execute(emailad='jack@bean.com'))) 1312 eq_([User(id=7)], l) 1313 1314 def test_aliased_sql_construct(self): 1315 User, Address = self.classes.User, self.classes.Address 1316 1317 j = join(User, Address) 1318 a1 = aliased(j) 1319 self.assert_compile( 1320 a1.select(), 1321 "SELECT anon_1.users_id, anon_1.users_name, anon_1.addresses_id, " 1322 "anon_1.addresses_user_id, anon_1.addresses_email_address " 1323 "FROM (SELECT users.id AS users_id, users.name AS users_name, " 1324 "addresses.id AS addresses_id, addresses.user_id AS " 1325 "addresses_user_id, addresses.email_address AS " 1326 "addresses_email_address FROM users JOIN addresses " 1327 "ON users.id = addresses.user_id) AS anon_1" 1328 ) 1329 1330 def test_aliased_sql_construct_raises_adapt_on_names(self): 1331 User, Address = self.classes.User, self.classes.Address 1332 1333 j = join(User, Address) 1334 assert_raises_message( 1335 sa_exc.ArgumentError, 1336 "adapt_on_names only applies to ORM elements", 1337 aliased, j, adapt_on_names=True 1338 ) 1339 1340 def test_scalar_subquery_compile_whereclause(self): 1341 User = self.classes.User 1342 Address = self.classes.Address 1343 1344 session = create_session() 1345 1346 q = session.query(User.id).filter(User.id == 7) 1347 1348 q = session.query(Address).filter(Address.user_id == q) 1349 assert isinstance(q._criterion.right, expression.ColumnElement) 1350 self.assert_compile( 1351 q, 1352 "SELECT addresses.id AS addresses_id, addresses.user_id " 1353 "AS addresses_user_id, addresses.email_address AS " 1354 "addresses_email_address FROM addresses WHERE " 1355 "addresses.user_id = (SELECT users.id AS users_id " 1356 "FROM users WHERE users.id = :id_1)" 1357 ) 1358 1359 def test_named_subquery(self): 1360 User = self.classes.User 1361 1362 session = create_session() 1363 a1 = session.query(User.id).filter(User.id == 7).subquery('foo1') 1364 a2 = session.query(User.id).filter(User.id == 7).subquery(name='foo2') 1365 a3 = session.query(User.id).filter(User.id == 7).subquery() 1366 1367 eq_(a1.name, 'foo1') 1368 eq_(a2.name, 'foo2') 1369 eq_(a3.name, '%%(%d anon)s' % id(a3)) 1370 1371 def test_labeled_subquery(self): 1372 User = self.classes.User 1373 1374 session = create_session() 1375 a1 = session.query(User.id).filter(User.id == 7). \ 1376 subquery(with_labels=True) 1377 assert a1.c.users_id is not None 1378 1379 def test_reduced_subquery(self): 1380 User = self.classes.User 1381 ua = aliased(User) 1382 1383 session = create_session() 1384 a1 = session.query(User.id, ua.id, ua.name).\ 1385 filter(User.id == ua.id).subquery(reduce_columns=True) 1386 self.assert_compile(a1, 1387 "SELECT users.id, users_1.name FROM " 1388 "users, users AS users_1 WHERE users.id = users_1.id") 1389 1390 def test_label(self): 1391 User = self.classes.User 1392 1393 session = create_session() 1394 1395 q = session.query(User.id).filter(User.id == 7).label('foo') 1396 self.assert_compile( 1397 session.query(q), 1398 "SELECT (SELECT users.id FROM users WHERE users.id = :id_1) AS foo" 1399 ) 1400 1401 def test_as_scalar(self): 1402 User = self.classes.User 1403 1404 session = create_session() 1405 1406 q = session.query(User.id).filter(User.id == 7).as_scalar() 1407 1408 self.assert_compile(session.query(User).filter(User.id.in_(q)), 1409 'SELECT users.id AS users_id, users.name ' 1410 'AS users_name FROM users WHERE users.id ' 1411 'IN (SELECT users.id FROM users WHERE ' 1412 'users.id = :id_1)') 1413 1414 def test_param_transfer(self): 1415 User = self.classes.User 1416 1417 session = create_session() 1418 1419 q = session.query(User.id).filter(User.id == bindparam('foo')).\ 1420 params(foo=7).subquery() 1421 1422 q = session.query(User).filter(User.id.in_(q)) 1423 1424 eq_(User(id=7), q.one()) 1425 1426 def test_in(self): 1427 User, Address = self.classes.User, self.classes.Address 1428 1429 session = create_session() 1430 s = session.query(User.id).join(User.addresses).group_by(User.id).\ 1431 having(func.count(Address.id) > 2) 1432 eq_(session.query(User).filter(User.id.in_(s)).all(), [User(id=8)]) 1433 1434 def test_union(self): 1435 User = self.classes.User 1436 1437 s = create_session() 1438 1439 q1 = s.query(User).filter(User.name == 'ed').with_labels() 1440 q2 = s.query(User).filter(User.name == 'fred').with_labels() 1441 eq_( 1442 s.query(User).from_statement(union(q1, q2). 1443 order_by('users_name')).all(), [User(name='ed'), User(name='fred')] 1444 ) 1445 1446 def test_select(self): 1447 User = self.classes.User 1448 1449 s = create_session() 1450 1451 # this is actually not legal on most DBs since the subquery has no 1452 # alias 1453 q1 = s.query(User).filter(User.name == 'ed') 1454 1455 self.assert_compile( 1456 select([q1]), 1457 "SELECT users_id, users_name FROM (SELECT users.id AS users_id, " 1458 "users.name AS users_name FROM users WHERE users.name = :name_1)" 1459 ) 1460 1461 def test_join(self): 1462 User, Address = self.classes.User, self.classes.Address 1463 1464 s = create_session() 1465 1466 # TODO: do we want aliased() to detect a query and convert to 1467 # subquery() automatically ? 1468 q1 = s.query(Address).filter(Address.email_address == 'jack@bean.com') 1469 adalias = aliased(Address, q1.subquery()) 1470 eq_( 1471 s.query(User, adalias).join(adalias, User.id == adalias.user_id). 1472 all(), 1473 [ 1474 ( 1475 User(id=7, name='jack'), 1476 Address(email_address='jack@bean.com', user_id=7, id=1))]) 1477 1478 1479class ColumnPropertyTest(_fixtures.FixtureTest, AssertsCompiledSQL): 1480 __dialect__ = 'default' 1481 run_setup_mappers = 'each' 1482 1483 def _fixture(self, label=True, polymorphic=False): 1484 User, Address = self.classes("User", "Address") 1485 users, addresses = self.tables("users", "addresses") 1486 stmt = select([func.max(addresses.c.email_address)]).\ 1487 where(addresses.c.user_id == users.c.id).\ 1488 correlate(users) 1489 if label: 1490 stmt = stmt.label("email_ad") 1491 1492 mapper(User, users, properties={ 1493 "ead": column_property(stmt) 1494 }, with_polymorphic="*" if polymorphic else None) 1495 mapper(Address, addresses) 1496 1497 def _func_fixture(self, label=False): 1498 User = self.classes.User 1499 users = self.tables.users 1500 1501 if label: 1502 mapper(User, users, properties={ 1503 "foobar": column_property( 1504 func.foob(users.c.name).label(None) 1505 ) 1506 }) 1507 else: 1508 mapper(User, users, properties={ 1509 "foobar": column_property( 1510 func.foob(users.c.name) 1511 ) 1512 }) 1513 1514 def test_anon_label_function_auto(self): 1515 self._func_fixture() 1516 User = self.classes.User 1517 1518 s = Session() 1519 1520 u1 = aliased(User) 1521 self.assert_compile( 1522 s.query(User.foobar, u1.foobar), 1523 "SELECT foob(users.name) AS foob_1, foob(users_1.name) AS foob_2 " 1524 "FROM users, users AS users_1" 1525 ) 1526 1527 def test_anon_label_function_manual(self): 1528 self._func_fixture(label=True) 1529 User = self.classes.User 1530 1531 s = Session() 1532 1533 u1 = aliased(User) 1534 self.assert_compile( 1535 s.query(User.foobar, u1.foobar), 1536 "SELECT foob(users.name) AS foob_1, foob(users_1.name) AS foob_2 " 1537 "FROM users, users AS users_1" 1538 ) 1539 1540 def test_anon_label_ad_hoc_labeling(self): 1541 self._func_fixture() 1542 User = self.classes.User 1543 1544 s = Session() 1545 1546 u1 = aliased(User) 1547 self.assert_compile( 1548 s.query(User.foobar.label('x'), u1.foobar.label('y')), 1549 "SELECT foob(users.name) AS x, foob(users_1.name) AS y " 1550 "FROM users, users AS users_1" 1551 ) 1552 1553 1554 def test_order_by_column_prop_string(self): 1555 User, Address = self.classes("User", "Address") 1556 self._fixture(label=True) 1557 1558 s = Session() 1559 q = s.query(User).order_by("email_ad") 1560 self.assert_compile( 1561 q, 1562 "SELECT (SELECT max(addresses.email_address) AS max_1 " 1563 "FROM addresses " 1564 "WHERE addresses.user_id = users.id) AS email_ad, " 1565 "users.id AS users_id, users.name AS users_name " 1566 "FROM users ORDER BY email_ad" 1567 ) 1568 1569 def test_order_by_column_prop_aliased_string(self): 1570 User, Address = self.classes("User", "Address") 1571 self._fixture(label=True) 1572 1573 s = Session() 1574 ua = aliased(User) 1575 q = s.query(ua).order_by("email_ad") 1576 1577 def go(): 1578 self.assert_compile( 1579 q, 1580 "SELECT (SELECT max(addresses.email_address) AS max_1 " 1581 "FROM addresses WHERE addresses.user_id = users_1.id) " 1582 "AS anon_1, users_1.id AS users_1_id, " 1583 "users_1.name AS users_1_name FROM users AS users_1 " 1584 "ORDER BY email_ad" 1585 ) 1586 assert_warnings( 1587 go, 1588 ["Can't resolve label reference 'email_ad'"], regex=True) 1589 1590 def test_order_by_column_labeled_prop_attr_aliased_one(self): 1591 User = self.classes.User 1592 self._fixture(label=True) 1593 1594 ua = aliased(User) 1595 s = Session() 1596 q = s.query(ua).order_by(ua.ead) 1597 self.assert_compile( 1598 q, 1599 "SELECT (SELECT max(addresses.email_address) AS max_1 " 1600 "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, " 1601 "users_1.id AS users_1_id, users_1.name AS users_1_name " 1602 "FROM users AS users_1 ORDER BY anon_1" 1603 ) 1604 1605 def test_order_by_column_labeled_prop_attr_aliased_two(self): 1606 User = self.classes.User 1607 self._fixture(label=True) 1608 1609 ua = aliased(User) 1610 s = Session() 1611 q = s.query(ua.ead).order_by(ua.ead) 1612 self.assert_compile( 1613 q, 1614 "SELECT (SELECT max(addresses.email_address) AS max_1 " 1615 "FROM addresses, " 1616 "users AS users_1 WHERE addresses.user_id = users_1.id) " 1617 "AS anon_1 ORDER BY anon_1" 1618 ) 1619 1620 # we're also testing that the state of "ua" is OK after the 1621 # previous call, so the batching into one test is intentional 1622 q = s.query(ua).order_by(ua.ead) 1623 self.assert_compile( 1624 q, 1625 "SELECT (SELECT max(addresses.email_address) AS max_1 " 1626 "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, " 1627 "users_1.id AS users_1_id, users_1.name AS users_1_name " 1628 "FROM users AS users_1 ORDER BY anon_1" 1629 ) 1630 1631 def test_order_by_column_labeled_prop_attr_aliased_three(self): 1632 User = self.classes.User 1633 self._fixture(label=True) 1634 1635 ua = aliased(User) 1636 s = Session() 1637 q = s.query(User.ead, ua.ead).order_by(User.ead, ua.ead) 1638 self.assert_compile( 1639 q, 1640 "SELECT (SELECT max(addresses.email_address) AS max_1 " 1641 "FROM addresses, users WHERE addresses.user_id = users.id) " 1642 "AS email_ad, (SELECT max(addresses.email_address) AS max_1 " 1643 "FROM addresses, users AS users_1 WHERE addresses.user_id = " 1644 "users_1.id) AS anon_1 ORDER BY email_ad, anon_1" 1645 ) 1646 1647 q = s.query(User, ua).order_by(User.ead, ua.ead) 1648 self.assert_compile( 1649 q, 1650 "SELECT (SELECT max(addresses.email_address) AS max_1 " 1651 "FROM addresses WHERE addresses.user_id = users.id) AS " 1652 "email_ad, users.id AS users_id, users.name AS users_name, " 1653 "(SELECT max(addresses.email_address) AS max_1 FROM addresses " 1654 "WHERE addresses.user_id = users_1.id) AS anon_1, users_1.id " 1655 "AS users_1_id, users_1.name AS users_1_name FROM users, " 1656 "users AS users_1 ORDER BY email_ad, anon_1" 1657 ) 1658 1659 def test_order_by_column_labeled_prop_attr_aliased_four(self): 1660 User = self.classes.User 1661 self._fixture(label=True, polymorphic=True) 1662 1663 ua = aliased(User) 1664 s = Session() 1665 q = s.query(ua, User.id).order_by(ua.ead) 1666 self.assert_compile( 1667 q, 1668 "SELECT (SELECT max(addresses.email_address) AS max_1 FROM " 1669 "addresses WHERE addresses.user_id = users_1.id) AS anon_1, " 1670 "users_1.id AS users_1_id, users_1.name AS users_1_name, " 1671 "users.id AS users_id FROM users AS users_1, users ORDER BY anon_1" 1672 ) 1673 1674 1675 def test_order_by_column_unlabeled_prop_attr_aliased_one(self): 1676 User = self.classes.User 1677 self._fixture(label=False) 1678 1679 ua = aliased(User) 1680 s = Session() 1681 q = s.query(ua).order_by(ua.ead) 1682 self.assert_compile( 1683 q, 1684 "SELECT (SELECT max(addresses.email_address) AS max_1 " 1685 "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, " 1686 "users_1.id AS users_1_id, users_1.name AS users_1_name " 1687 "FROM users AS users_1 ORDER BY anon_1" 1688 ) 1689 1690 def test_order_by_column_unlabeled_prop_attr_aliased_two(self): 1691 User = self.classes.User 1692 self._fixture(label=False) 1693 1694 ua = aliased(User) 1695 s = Session() 1696 q = s.query(ua.ead).order_by(ua.ead) 1697 self.assert_compile( 1698 q, 1699 "SELECT (SELECT max(addresses.email_address) AS max_1 " 1700 "FROM addresses, " 1701 "users AS users_1 WHERE addresses.user_id = users_1.id) " 1702 "AS anon_1 ORDER BY anon_1" 1703 ) 1704 1705 # we're also testing that the state of "ua" is OK after the 1706 # previous call, so the batching into one test is intentional 1707 q = s.query(ua).order_by(ua.ead) 1708 self.assert_compile( 1709 q, 1710 "SELECT (SELECT max(addresses.email_address) AS max_1 " 1711 "FROM addresses WHERE addresses.user_id = users_1.id) AS anon_1, " 1712 "users_1.id AS users_1_id, users_1.name AS users_1_name " 1713 "FROM users AS users_1 ORDER BY anon_1" 1714 ) 1715 1716 def test_order_by_column_unlabeled_prop_attr_aliased_three(self): 1717 User = self.classes.User 1718 self._fixture(label=False) 1719 1720 ua = aliased(User) 1721 s = Session() 1722 q = s.query(User.ead, ua.ead).order_by(User.ead, ua.ead) 1723 self.assert_compile( 1724 q, 1725 "SELECT (SELECT max(addresses.email_address) AS max_1 " 1726 "FROM addresses, users WHERE addresses.user_id = users.id) " 1727 "AS anon_1, (SELECT max(addresses.email_address) AS max_1 " 1728 "FROM addresses, users AS users_1 " 1729 "WHERE addresses.user_id = users_1.id) AS anon_2 " 1730 "ORDER BY anon_1, anon_2" 1731 ) 1732 1733 q = s.query(User, ua).order_by(User.ead, ua.ead) 1734 self.assert_compile( 1735 q, 1736 "SELECT (SELECT max(addresses.email_address) AS max_1 " 1737 "FROM addresses WHERE addresses.user_id = users.id) AS " 1738 "anon_1, users.id AS users_id, users.name AS users_name, " 1739 "(SELECT max(addresses.email_address) AS max_1 FROM addresses " 1740 "WHERE addresses.user_id = users_1.id) AS anon_2, users_1.id " 1741 "AS users_1_id, users_1.name AS users_1_name FROM users, " 1742 "users AS users_1 ORDER BY anon_1, anon_2" 1743 ) 1744 1745 def test_order_by_column_prop_attr(self): 1746 User, Address = self.classes("User", "Address") 1747 self._fixture(label=True) 1748 1749 s = Session() 1750 q = s.query(User).order_by(User.ead) 1751 # this one is a bit of a surprise; this is compiler 1752 # label-order-by logic kicking in, but won't work in more 1753 # complex cases. 1754 self.assert_compile( 1755 q, 1756 "SELECT (SELECT max(addresses.email_address) AS max_1 " 1757 "FROM addresses " 1758 "WHERE addresses.user_id = users.id) AS email_ad, " 1759 "users.id AS users_id, users.name AS users_name " 1760 "FROM users ORDER BY email_ad" 1761 ) 1762 1763 def test_order_by_column_prop_attr_non_present(self): 1764 User, Address = self.classes("User", "Address") 1765 self._fixture(label=True) 1766 1767 s = Session() 1768 q = s.query(User).options(defer(User.ead)).order_by(User.ead) 1769 self.assert_compile( 1770 q, 1771 "SELECT users.id AS users_id, users.name AS users_name " 1772 "FROM users ORDER BY (SELECT max(addresses.email_address) AS max_1 " 1773 "FROM addresses " 1774 "WHERE addresses.user_id = users.id)" 1775 ) 1776 1777 1778class ComparatorTest(QueryTest): 1779 def test_clause_element_query_resolve(self): 1780 from sqlalchemy.orm.properties import ColumnProperty 1781 User = self.classes.User 1782 1783 class Comparator(ColumnProperty.Comparator): 1784 def __init__(self, expr): 1785 self.expr = expr 1786 1787 def __clause_element__(self): 1788 return self.expr 1789 1790 sess = Session() 1791 eq_( 1792 sess.query(Comparator(User.id)).order_by(Comparator(User.id)).all(), 1793 [(7, ), (8, ), (9, ), (10, )] 1794 ) 1795 1796 1797# more slice tests are available in test/orm/generative.py 1798class SliceTest(QueryTest): 1799 def test_first(self): 1800 User = self.classes.User 1801 1802 assert User(id=7) == create_session().query(User).first() 1803 1804 assert create_session().query(User).filter(User.id == 27). \ 1805 first() is None 1806 1807 def test_limit_offset_applies(self): 1808 """Test that the expected LIMIT/OFFSET is applied for slices. 1809 1810 The LIMIT/OFFSET syntax differs slightly on all databases, and 1811 query[x:y] executes immediately, so we are asserting against 1812 SQL strings using sqlite's syntax. 1813 1814 """ 1815 1816 User = self.classes.User 1817 1818 sess = create_session() 1819 q = sess.query(User) 1820 1821 self.assert_sql( 1822 testing.db, lambda: q[10:20], [ 1823 ( 1824 "SELECT users.id AS users_id, users.name " 1825 "AS users_name FROM users LIMIT :param_1 OFFSET :param_2", 1826 {'param_1': 10, 'param_2': 10})]) 1827 1828 self.assert_sql( 1829 testing.db, lambda: q[:20], [ 1830 ( 1831 "SELECT users.id AS users_id, users.name " 1832 "AS users_name FROM users LIMIT :param_1", 1833 {'param_1': 20})]) 1834 1835 self.assert_sql( 1836 testing.db, lambda: q[5:], [ 1837 ( 1838 "SELECT users.id AS users_id, users.name " 1839 "AS users_name FROM users LIMIT -1 OFFSET :param_1", 1840 {'param_1': 5})]) 1841 1842 self.assert_sql(testing.db, lambda: q[2:2], []) 1843 1844 self.assert_sql(testing.db, lambda: q[-2:-5], []) 1845 1846 self.assert_sql( 1847 testing.db, lambda: q[-5:-2], [ 1848 ( 1849 "SELECT users.id AS users_id, users.name AS users_name " 1850 "FROM users", {})]) 1851 1852 self.assert_sql( 1853 testing.db, lambda: q[-5:], [ 1854 ( 1855 "SELECT users.id AS users_id, users.name AS users_name " 1856 "FROM users", {})]) 1857 1858 self.assert_sql( 1859 testing.db, lambda: q[:], [ 1860 ( 1861 "SELECT users.id AS users_id, users.name AS users_name " 1862 "FROM users", {})]) 1863 1864 1865class FilterTest(QueryTest, AssertsCompiledSQL): 1866 __dialect__ = 'default' 1867 1868 def test_basic(self): 1869 User = self.classes.User 1870 1871 users = create_session().query(User).all() 1872 eq_([User(id=7), User(id=8), User(id=9), User(id=10)], users) 1873 1874 @testing.requires.offset 1875 def test_limit_offset(self): 1876 User = self.classes.User 1877 1878 sess = create_session() 1879 1880 assert [User(id=8), User(id=9)] == \ 1881 sess.query(User).order_by(User.id).limit(2).offset(1).all() 1882 1883 assert [User(id=8), User(id=9)] == \ 1884 list(sess.query(User).order_by(User.id)[1:3]) 1885 1886 assert User(id=8) == sess.query(User).order_by(User.id)[1] 1887 1888 assert [] == sess.query(User).order_by(User.id)[3:3] 1889 assert [] == sess.query(User).order_by(User.id)[0:0] 1890 1891 @testing.requires.bound_limit_offset 1892 def test_select_with_bindparam_offset_limit(self): 1893 """Does a query allow bindparam for the limit?""" 1894 User = self.classes.User 1895 sess = create_session() 1896 q1 = sess.query(self.classes.User).\ 1897 order_by(self.classes.User.id).limit(bindparam('n')) 1898 1899 for n in range(1, 4): 1900 result = q1.params(n=n).all() 1901 eq_(len(result), n) 1902 1903 eq_( 1904 sess.query(User).order_by(User.id).limit(bindparam('limit')). 1905 offset(bindparam('offset')).params(limit=2, offset=1).all(), 1906 [User(id=8), User(id=9)] 1907 ) 1908 1909 @testing.fails_on("mysql", "doesn't like CAST in the limit clause") 1910 @testing.requires.bound_limit_offset 1911 def test_select_with_bindparam_offset_limit_w_cast(self): 1912 User = self.classes.User 1913 sess = create_session() 1914 q1 = sess.query(self.classes.User).\ 1915 order_by(self.classes.User.id).limit(bindparam('n')) 1916 eq_( 1917 list( 1918 sess.query(User).params(a=1, b=3).order_by(User.id) 1919 [cast(bindparam('a'), Integer):cast(bindparam('b'), Integer)]), 1920 [User(id=8), User(id=9)] 1921 ) 1922 1923 @testing.requires.boolean_col_expressions 1924 def test_exists(self): 1925 User = self.classes.User 1926 1927 sess = create_session(testing.db) 1928 1929 assert sess.query(exists().where(User.id == 9)).scalar() 1930 assert not sess.query(exists().where(User.id == 29)).scalar() 1931 1932 def test_one_filter(self): 1933 User = self.classes.User 1934 1935 assert [User(id=8), User(id=9)] == \ 1936 create_session().query(User).filter(User.name.endswith('ed')).all() 1937 1938 def test_contains(self): 1939 """test comparing a collection to an object instance.""" 1940 1941 User, Address = self.classes.User, self.classes.Address 1942 1943 sess = create_session() 1944 address = sess.query(Address).get(3) 1945 assert [User(id=8)] == \ 1946 sess.query(User).filter(User.addresses.contains(address)).all() 1947 1948 try: 1949 sess.query(User).filter(User.addresses == address) 1950 assert False 1951 except sa_exc.InvalidRequestError: 1952 assert True 1953 1954 assert [User(id=10)] == \ 1955 sess.query(User).filter(User.addresses == None).all() 1956 1957 try: 1958 assert [User(id=7), User(id=9), User(id=10)] == \ 1959 sess.query(User).filter(User.addresses != address).all() 1960 assert False 1961 except sa_exc.InvalidRequestError: 1962 assert True 1963 1964 # assert [User(id=7), User(id=9), User(id=10)] == 1965 # sess.query(User).filter(User.addresses!=address).all() 1966 1967 def test_clause_element_ok(self): 1968 User = self.classes.User 1969 s = Session() 1970 self.assert_compile( 1971 s.query(User).filter(User.addresses), 1972 "SELECT users.id AS users_id, users.name AS users_name " 1973 "FROM users, addresses WHERE users.id = addresses.user_id" 1974 ) 1975 1976 def test_unique_binds_join_cond(self): 1977 """test that binds used when the lazyclause is used in criterion are 1978 unique""" 1979 1980 User, Address = self.classes.User, self.classes.Address 1981 sess = Session() 1982 a1, a2 = sess.query(Address).order_by(Address.id)[0:2] 1983 self.assert_compile( 1984 sess.query(User).filter(User.addresses.contains(a1)).union( 1985 sess.query(User).filter(User.addresses.contains(a2)) 1986 ), 1987 "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS " 1988 "anon_1_users_name FROM (SELECT users.id AS users_id, " 1989 "users.name AS users_name FROM users WHERE users.id = :param_1 " 1990 "UNION SELECT users.id AS users_id, users.name AS users_name " 1991 "FROM users WHERE users.id = :param_2) AS anon_1", 1992 checkparams={'param_1': 7, 'param_2': 8} 1993 ) 1994 1995 def test_any(self): 1996 User, Address = self.classes.User, self.classes.Address 1997 1998 sess = create_session() 1999 2000 assert [User(id=8), User(id=9)] == \ 2001 sess.query(User). \ 2002 filter( 2003 User.addresses.any(Address.email_address.like('%ed%'))).all() 2004 2005 assert [User(id=8)] == \ 2006 sess.query(User). \ 2007 filter( 2008 User.addresses.any( 2009 Address.email_address.like('%ed%'), id=4)).all() 2010 2011 assert [User(id=8)] == \ 2012 sess.query(User). \ 2013 filter(User.addresses.any(Address.email_address.like('%ed%'))).\ 2014 filter(User.addresses.any(id=4)).all() 2015 2016 assert [User(id=9)] == \ 2017 sess.query(User). \ 2018 filter(User.addresses.any(email_address='fred@fred.com')).all() 2019 2020 # test that any() doesn't overcorrelate 2021 assert [User(id=7), User(id=8)] == \ 2022 sess.query(User).join("addresses"). \ 2023 filter( 2024 ~User.addresses.any( 2025 Address.email_address == 'fred@fred.com')).all() 2026 2027 # test that the contents are not adapted by the aliased join 2028 assert [User(id=7), User(id=8)] == \ 2029 sess.query(User).join("addresses", aliased=True). \ 2030 filter( 2031 ~User.addresses.any( 2032 Address.email_address == 'fred@fred.com')).all() 2033 2034 assert [User(id=10)] == \ 2035 sess.query(User).outerjoin("addresses", aliased=True). \ 2036 filter(~User.addresses.any()).all() 2037 2038 def test_has(self): 2039 Dingaling, User, Address = ( 2040 self.classes.Dingaling, self.classes.User, self.classes.Address) 2041 2042 sess = create_session() 2043 assert [Address(id=5)] == \ 2044 sess.query(Address).filter(Address.user.has(name='fred')).all() 2045 2046 assert [Address(id=2), Address(id=3), Address(id=4), Address(id=5)] \ 2047 == sess.query(Address). \ 2048 filter(Address.user.has(User.name.like('%ed%'))). \ 2049 order_by(Address.id).all() 2050 2051 assert [Address(id=2), Address(id=3), Address(id=4)] == \ 2052 sess.query(Address). \ 2053 filter(Address.user.has(User.name.like('%ed%'), id=8)). \ 2054 order_by(Address.id).all() 2055 2056 # test has() doesn't overcorrelate 2057 assert [Address(id=2), Address(id=3), Address(id=4)] == \ 2058 sess.query(Address).join("user"). \ 2059 filter(Address.user.has(User.name.like('%ed%'), id=8)). \ 2060 order_by(Address.id).all() 2061 2062 # test has() doesn't get subquery contents adapted by aliased join 2063 assert [Address(id=2), Address(id=3), Address(id=4)] == \ 2064 sess.query(Address).join("user", aliased=True). \ 2065 filter(Address.user.has(User.name.like('%ed%'), id=8)). \ 2066 order_by(Address.id).all() 2067 2068 dingaling = sess.query(Dingaling).get(2) 2069 assert [User(id=9)] == \ 2070 sess.query(User). \ 2071 filter(User.addresses.any(Address.dingaling == dingaling)).all() 2072 2073 def test_contains_m2m(self): 2074 Item, Order = self.classes.Item, self.classes.Order 2075 2076 sess = create_session() 2077 item = sess.query(Item).get(3) 2078 2079 eq_( 2080 sess.query(Order).filter(Order.items.contains(item)). 2081 order_by(Order.id).all(), 2082 [Order(id=1), Order(id=2), Order(id=3)] 2083 ) 2084 eq_( 2085 sess.query(Order).filter(~Order.items.contains(item)). 2086 order_by(Order.id).all(), 2087 [Order(id=4), Order(id=5)] 2088 ) 2089 2090 item2 = sess.query(Item).get(5) 2091 eq_( 2092 sess.query(Order).filter(Order.items.contains(item)). 2093 filter(Order.items.contains(item2)).all(), 2094 [Order(id=3)] 2095 ) 2096 2097 def test_comparison(self): 2098 """test scalar comparison to an object instance""" 2099 2100 Item, Order, Dingaling, User, Address = ( 2101 self.classes.Item, self.classes.Order, self.classes.Dingaling, 2102 self.classes.User, self.classes.Address) 2103 2104 sess = create_session() 2105 user = sess.query(User).get(8) 2106 assert [Address(id=2), Address(id=3), Address(id=4)] == \ 2107 sess.query(Address).filter(Address.user == user).all() 2108 2109 assert [Address(id=1), Address(id=5)] == \ 2110 sess.query(Address).filter(Address.user != user).all() 2111 2112 # generates an IS NULL 2113 assert [] == sess.query(Address).filter(Address.user == None).all() 2114 assert [] == sess.query(Address).filter(Address.user == null()).all() 2115 2116 assert [Order(id=5)] == \ 2117 sess.query(Order).filter(Order.address == None).all() 2118 2119 # o2o 2120 dingaling = sess.query(Dingaling).get(2) 2121 assert [Address(id=5)] == \ 2122 sess.query(Address).filter(Address.dingaling == dingaling).all() 2123 2124 # m2m 2125 eq_( 2126 sess.query(Item).filter(Item.keywords == None). 2127 order_by(Item.id).all(), [Item(id=4), Item(id=5)]) 2128 eq_( 2129 sess.query(Item).filter(Item.keywords != None). 2130 order_by(Item.id).all(), [Item(id=1), Item(id=2), Item(id=3)]) 2131 2132 def test_filter_by(self): 2133 User, Address = self.classes.User, self.classes.Address 2134 2135 sess = create_session() 2136 user = sess.query(User).get(8) 2137 assert [Address(id=2), Address(id=3), Address(id=4)] == \ 2138 sess.query(Address).filter_by(user=user).all() 2139 2140 # many to one generates IS NULL 2141 assert [] == sess.query(Address).filter_by(user=None).all() 2142 assert [] == sess.query(Address).filter_by(user=null()).all() 2143 2144 # one to many generates WHERE NOT EXISTS 2145 assert [User(name='chuck')] == \ 2146 sess.query(User).filter_by(addresses=None).all() 2147 assert [User(name='chuck')] == \ 2148 sess.query(User).filter_by(addresses=null()).all() 2149 2150 2151 def test_filter_by_tables(self): 2152 users = self.tables.users 2153 addresses = self.tables.addresses 2154 sess = create_session() 2155 self.assert_compile( 2156 sess.query(users).filter_by(name='ed'). 2157 join(addresses, users.c.id == addresses.c.user_id). 2158 filter_by(email_address='ed@ed.com'), 2159 "SELECT users.id AS users_id, users.name AS users_name " 2160 "FROM users JOIN addresses ON users.id = addresses.user_id " 2161 "WHERE users.name = :name_1 AND " 2162 "addresses.email_address = :email_address_1", 2163 checkparams={'email_address_1': 'ed@ed.com', 'name_1': 'ed'} 2164 ) 2165 2166 def test_filter_by_no_property(self): 2167 addresses = self.tables.addresses 2168 sess = create_session() 2169 assert_raises_message( 2170 sa.exc.InvalidRequestError, 2171 "Entity 'addresses' has no property 'name'", 2172 sess.query(addresses).filter_by, name='ed' 2173 ) 2174 2175 def test_none_comparison(self): 2176 Order, User, Address = ( 2177 self.classes.Order, self.classes.User, self.classes.Address) 2178 2179 sess = create_session() 2180 2181 # scalar 2182 eq_( 2183 [Order(description="order 5")], 2184 sess.query(Order).filter(Order.address_id == None).all() 2185 ) 2186 eq_( 2187 [Order(description="order 5")], 2188 sess.query(Order).filter(Order.address_id == null()).all() 2189 ) 2190 2191 # o2o 2192 eq_( 2193 [Address(id=1), Address(id=3), Address(id=4)], 2194 sess.query(Address).filter(Address.dingaling == None). 2195 order_by(Address.id).all()) 2196 eq_( 2197 [Address(id=1), Address(id=3), Address(id=4)], 2198 sess.query(Address).filter(Address.dingaling == null()). 2199 order_by(Address.id).all()) 2200 eq_( 2201 [Address(id=2), Address(id=5)], 2202 sess.query(Address).filter(Address.dingaling != None). 2203 order_by(Address.id).all()) 2204 eq_( 2205 [Address(id=2), Address(id=5)], 2206 sess.query(Address).filter(Address.dingaling != null()). 2207 order_by(Address.id).all()) 2208 2209 # m2o 2210 eq_( 2211 [Order(id=5)], 2212 sess.query(Order).filter(Order.address == None).all()) 2213 eq_( 2214 [Order(id=1), Order(id=2), Order(id=3), Order(id=4)], 2215 sess.query(Order).order_by(Order.id). 2216 filter(Order.address != None).all()) 2217 2218 # o2m 2219 eq_( 2220 [User(id=10)], 2221 sess.query(User).filter(User.addresses == None).all()) 2222 eq_( 2223 [User(id=7), User(id=8), User(id=9)], 2224 sess.query(User).filter(User.addresses != None). 2225 order_by(User.id).all()) 2226 2227 def test_blank_filter_by(self): 2228 User = self.classes.User 2229 2230 eq_( 2231 [(7,), (8,), (9,), (10,)], 2232 create_session().query(User.id).filter_by().order_by(User.id).all() 2233 ) 2234 eq_( 2235 [(7,), (8,), (9,), (10,)], 2236 create_session().query(User.id).filter_by(**{}). 2237 order_by(User.id).all() 2238 ) 2239 2240 def test_text_coerce(self): 2241 User = self.classes.User 2242 s = create_session() 2243 self.assert_compile( 2244 s.query(User).filter(text("name='ed'")), 2245 "SELECT users.id AS users_id, users.name " 2246 "AS users_name FROM users WHERE name='ed'" 2247 ) 2248 2249 2250class SetOpsTest(QueryTest, AssertsCompiledSQL): 2251 __dialect__ = 'default' 2252 2253 def test_union(self): 2254 User = self.classes.User 2255 2256 s = create_session() 2257 2258 fred = s.query(User).filter(User.name == 'fred') 2259 ed = s.query(User).filter(User.name == 'ed') 2260 jack = s.query(User).filter(User.name == 'jack') 2261 2262 eq_( 2263 fred.union(ed).order_by(User.name).all(), 2264 [User(name='ed'), User(name='fred')] 2265 ) 2266 2267 eq_( 2268 fred.union(ed, jack).order_by(User.name).all(), 2269 [User(name='ed'), User(name='fred'), User(name='jack')] 2270 ) 2271 2272 def test_statement_labels(self): 2273 """test that label conflicts don't occur with joins etc.""" 2274 2275 User, Address = self.classes.User, self.classes.Address 2276 2277 s = create_session() 2278 q1 = s.query(User, Address).join(User.addresses).\ 2279 filter(Address.email_address == "ed@wood.com") 2280 q2 = s.query(User, Address).join(User.addresses).\ 2281 filter(Address.email_address == "jack@bean.com") 2282 q3 = q1.union(q2).order_by(User.name) 2283 2284 eq_( 2285 q3.all(), 2286 [ 2287 (User(name='ed'), Address(email_address="ed@wood.com")), 2288 (User(name='jack'), Address(email_address="jack@bean.com")), 2289 ] 2290 ) 2291 2292 def test_union_literal_expressions_compile(self): 2293 """test that column expressions translate during 2294 the _from_statement() portion of union(), others""" 2295 2296 User = self.classes.User 2297 2298 s = Session() 2299 q1 = s.query(User, literal("x")) 2300 q2 = s.query(User, literal_column("'y'")) 2301 q3 = q1.union(q2) 2302 2303 self.assert_compile( 2304 q3, 2305 "SELECT anon_1.users_id AS anon_1_users_id, " 2306 "anon_1.users_name AS anon_1_users_name, " 2307 "anon_1.param_1 AS anon_1_param_1 " 2308 "FROM (SELECT users.id AS users_id, users.name AS " 2309 "users_name, :param_1 AS param_1 " 2310 "FROM users UNION SELECT users.id AS users_id, " 2311 "users.name AS users_name, 'y' FROM users) AS anon_1" 2312 ) 2313 2314 def test_union_literal_expressions_results(self): 2315 User = self.classes.User 2316 2317 s = Session() 2318 2319 q1 = s.query(User, literal("x")) 2320 q2 = s.query(User, literal_column("'y'")) 2321 q3 = q1.union(q2) 2322 2323 q4 = s.query(User, literal_column("'x'").label('foo')) 2324 q5 = s.query(User, literal("y")) 2325 q6 = q4.union(q5) 2326 2327 eq_( 2328 [x['name'] for x in q6.column_descriptions], 2329 ['User', 'foo'] 2330 ) 2331 2332 for q in ( 2333 q3.order_by(User.id, text("anon_1_param_1")), 2334 q6.order_by(User.id, "foo")): 2335 eq_( 2336 q.all(), 2337 [ 2338 (User(id=7, name='jack'), 'x'), 2339 (User(id=7, name='jack'), 'y'), 2340 (User(id=8, name='ed'), 'x'), 2341 (User(id=8, name='ed'), 'y'), 2342 (User(id=9, name='fred'), 'x'), 2343 (User(id=9, name='fred'), 'y'), 2344 (User(id=10, name='chuck'), 'x'), 2345 (User(id=10, name='chuck'), 'y') 2346 ] 2347 ) 2348 2349 def test_union_labeled_anonymous_columns(self): 2350 User = self.classes.User 2351 2352 s = Session() 2353 2354 c1, c2 = column('c1'), column('c2') 2355 q1 = s.query(User, c1.label('foo'), c1.label('bar')) 2356 q2 = s.query(User, c1.label('foo'), c2.label('bar')) 2357 q3 = q1.union(q2) 2358 2359 eq_( 2360 [x['name'] for x in q3.column_descriptions], 2361 ['User', 'foo', 'bar'] 2362 ) 2363 2364 self.assert_compile( 2365 q3, 2366 "SELECT anon_1.users_id AS anon_1_users_id, " 2367 "anon_1.users_name AS anon_1_users_name, " 2368 "anon_1.foo AS anon_1_foo, anon_1.bar AS anon_1_bar " 2369 "FROM (SELECT users.id AS users_id, users.name AS users_name, " 2370 "c1 AS foo, c1 AS bar FROM users UNION SELECT users.id AS " 2371 "users_id, users.name AS users_name, c1 AS foo, c2 AS bar " 2372 "FROM users) AS anon_1" 2373 ) 2374 2375 def test_order_by_anonymous_col(self): 2376 User = self.classes.User 2377 2378 s = Session() 2379 2380 c1, c2 = column('c1'), column('c2') 2381 f = c1.label('foo') 2382 q1 = s.query(User, f, c2.label('bar')) 2383 q2 = s.query(User, c1.label('foo'), c2.label('bar')) 2384 q3 = q1.union(q2) 2385 2386 self.assert_compile( 2387 q3.order_by(c1), 2388 "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS " 2389 "anon_1_users_name, anon_1.foo AS anon_1_foo, anon_1.bar AS " 2390 "anon_1_bar FROM (SELECT users.id AS users_id, users.name AS " 2391 "users_name, c1 AS foo, c2 AS bar " 2392 "FROM users UNION SELECT users.id " 2393 "AS users_id, users.name AS users_name, c1 AS foo, c2 AS bar " 2394 "FROM users) AS anon_1 ORDER BY anon_1.foo" 2395 ) 2396 2397 self.assert_compile( 2398 q3.order_by(f), 2399 "SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name AS " 2400 "anon_1_users_name, anon_1.foo AS anon_1_foo, anon_1.bar AS " 2401 "anon_1_bar FROM (SELECT users.id AS users_id, users.name AS " 2402 "users_name, c1 AS foo, c2 AS bar " 2403 "FROM users UNION SELECT users.id " 2404 "AS users_id, users.name AS users_name, c1 AS foo, c2 AS bar " 2405 "FROM users) AS anon_1 ORDER BY anon_1.foo" 2406 ) 2407 2408 def test_union_mapped_colnames_preserved_across_subquery(self): 2409 User = self.classes.User 2410 2411 s = Session() 2412 q1 = s.query(User.name) 2413 q2 = s.query(User.name) 2414 2415 # the label names in the subquery are the typical anonymized ones 2416 self.assert_compile( 2417 q1.union(q2), 2418 "SELECT anon_1.users_name AS anon_1_users_name " 2419 "FROM (SELECT users.name AS users_name FROM users " 2420 "UNION SELECT users.name AS users_name FROM users) AS anon_1" 2421 ) 2422 2423 # but in the returned named tuples, 2424 # due to [ticket:1942], this should be 'name', not 'users_name' 2425 eq_( 2426 [x['name'] for x in q1.union(q2).column_descriptions], 2427 ['name'] 2428 ) 2429 2430 @testing.requires.intersect 2431 def test_intersect(self): 2432 User = self.classes.User 2433 2434 s = create_session() 2435 2436 fred = s.query(User).filter(User.name == 'fred') 2437 ed = s.query(User).filter(User.name == 'ed') 2438 jack = s.query(User).filter(User.name == 'jack') 2439 eq_(fred.intersect(ed, jack).all(), []) 2440 2441 eq_(fred.union(ed).intersect(ed.union(jack)).all(), [User(name='ed')]) 2442 2443 def test_eager_load(self): 2444 User, Address = self.classes.User, self.classes.Address 2445 2446 s = create_session() 2447 2448 fred = s.query(User).filter(User.name == 'fred') 2449 ed = s.query(User).filter(User.name == 'ed') 2450 2451 def go(): 2452 eq_( 2453 fred.union(ed).order_by(User.name). 2454 options(joinedload(User.addresses)).all(), [ 2455 User( 2456 name='ed', addresses=[Address(), Address(), 2457 Address()]), 2458 User(name='fred', addresses=[Address()])] 2459 ) 2460 self.assert_sql_count(testing.db, go, 1) 2461 2462 2463class AggregateTest(QueryTest): 2464 2465 def test_sum(self): 2466 Order = self.classes.Order 2467 2468 sess = create_session() 2469 orders = sess.query(Order).filter(Order.id.in_([2, 3, 4])) 2470 eq_( 2471 next(orders.values(func.sum(Order.user_id * Order.address_id))), 2472 (79,)) 2473 eq_(orders.value(func.sum(Order.user_id * Order.address_id)), 79) 2474 2475 def test_apply(self): 2476 Order = self.classes.Order 2477 2478 sess = create_session() 2479 assert sess.query(func.sum(Order.user_id * Order.address_id)). \ 2480 filter(Order.id.in_([2, 3, 4])).one() == (79,) 2481 2482 def test_having(self): 2483 User, Address = self.classes.User, self.classes.Address 2484 2485 sess = create_session() 2486 assert [User(name='ed', id=8)] == \ 2487 sess.query(User).order_by(User.id).group_by(User). \ 2488 join('addresses').having(func.count(Address.id) > 2).all() 2489 2490 assert [User(name='jack', id=7), User(name='fred', id=9)] == \ 2491 sess.query(User).order_by(User.id).group_by(User). \ 2492 join('addresses').having(func.count(Address.id) < 2).all() 2493 2494 2495class ExistsTest(QueryTest, AssertsCompiledSQL): 2496 __dialect__ = 'default' 2497 2498 def test_exists(self): 2499 User = self.classes.User 2500 sess = create_session() 2501 2502 q1 = sess.query(User) 2503 self.assert_compile( 2504 sess.query(q1.exists()), 2505 'SELECT EXISTS (' 2506 'SELECT 1 FROM users' 2507 ') AS anon_1' 2508 ) 2509 2510 q2 = sess.query(User).filter(User.name == 'fred') 2511 self.assert_compile( 2512 sess.query(q2.exists()), 2513 'SELECT EXISTS (' 2514 'SELECT 1 FROM users WHERE users.name = :name_1' 2515 ') AS anon_1' 2516 ) 2517 2518 def test_exists_col_warning(self): 2519 User = self.classes.User 2520 Address = self.classes.Address 2521 sess = create_session() 2522 2523 q1 = sess.query(User, Address).filter(User.id == Address.user_id) 2524 self.assert_compile( 2525 sess.query(q1.exists()), 2526 'SELECT EXISTS (' 2527 'SELECT 1 FROM users, addresses ' 2528 'WHERE users.id = addresses.user_id' 2529 ') AS anon_1' 2530 ) 2531 2532 def test_exists_w_select_from(self): 2533 User = self.classes.User 2534 sess = create_session() 2535 2536 q1 = sess.query().select_from(User).exists() 2537 self.assert_compile( 2538 sess.query(q1), 2539 'SELECT EXISTS (SELECT 1 FROM users) AS anon_1' 2540 ) 2541 2542 2543class CountTest(QueryTest): 2544 def test_basic(self): 2545 users, User = self.tables.users, self.classes.User 2546 2547 s = create_session() 2548 2549 eq_(s.query(User).count(), 4) 2550 2551 eq_(s.query(User).filter(users.c.name.endswith('ed')).count(), 2) 2552 2553 def test_count_char(self): 2554 User = self.classes.User 2555 s = create_session() 2556 # '*' is favored here as the most common character, 2557 # it is reported that Informix doesn't like count(1), 2558 # rumors about Oracle preferring count(1) don't appear 2559 # to be well founded. 2560 self.assert_sql_execution( 2561 testing.db, s.query(User).count, CompiledSQL( 2562 "SELECT count(*) AS count_1 FROM " 2563 "(SELECT users.id AS users_id, users.name " 2564 "AS users_name FROM users) AS anon_1", {} 2565 ) 2566 ) 2567 2568 def test_multiple_entity(self): 2569 User, Address = self.classes.User, self.classes.Address 2570 2571 s = create_session() 2572 q = s.query(User, Address) 2573 eq_(q.count(), 20) # cartesian product 2574 2575 q = s.query(User, Address).join(User.addresses) 2576 eq_(q.count(), 5) 2577 2578 def test_nested(self): 2579 User, Address = self.classes.User, self.classes.Address 2580 2581 s = create_session() 2582 q = s.query(User, Address).limit(2) 2583 eq_(q.count(), 2) 2584 2585 q = s.query(User, Address).limit(100) 2586 eq_(q.count(), 20) 2587 2588 q = s.query(User, Address).join(User.addresses).limit(100) 2589 eq_(q.count(), 5) 2590 2591 def test_cols(self): 2592 """test that column-based queries always nest.""" 2593 2594 User, Address = self.classes.User, self.classes.Address 2595 2596 s = create_session() 2597 2598 q = s.query(func.count(distinct(User.name))) 2599 eq_(q.count(), 1) 2600 2601 q = s.query(func.count(distinct(User.name))).distinct() 2602 eq_(q.count(), 1) 2603 2604 q = s.query(User.name) 2605 eq_(q.count(), 4) 2606 2607 q = s.query(User.name, Address) 2608 eq_(q.count(), 20) 2609 2610 q = s.query(Address.user_id) 2611 eq_(q.count(), 5) 2612 eq_(q.distinct().count(), 3) 2613 2614 2615class DistinctTest(QueryTest): 2616 def test_basic(self): 2617 User = self.classes.User 2618 2619 eq_( 2620 [User(id=7), User(id=8), User(id=9), User(id=10)], 2621 create_session().query(User).order_by(User.id).distinct().all() 2622 ) 2623 eq_( 2624 [User(id=7), User(id=9), User(id=8), User(id=10)], 2625 create_session().query(User).distinct(). 2626 order_by(desc(User.name)).all() 2627 ) 2628 2629 def test_joined(self): 2630 """test that orderbys from a joined table get placed into the columns 2631 clause when DISTINCT is used""" 2632 2633 User, Address = self.classes.User, self.classes.Address 2634 2635 sess = create_session() 2636 q = sess.query(User).join('addresses').distinct(). \ 2637 order_by(desc(Address.email_address)) 2638 2639 assert [User(id=7), User(id=9), User(id=8)] == q.all() 2640 2641 sess.expunge_all() 2642 2643 # test that it works on embedded joinedload/LIMIT subquery 2644 q = sess.query(User).join('addresses').distinct(). \ 2645 options(joinedload('addresses')).\ 2646 order_by(desc(Address.email_address)).limit(2) 2647 2648 def go(): 2649 assert [ 2650 User(id=7, addresses=[ 2651 Address(id=1) 2652 ]), 2653 User(id=9, addresses=[ 2654 Address(id=5) 2655 ]), 2656 ] == q.all() 2657 self.assert_sql_count(testing.db, go, 1) 2658 2659 2660class PrefixWithTest(QueryTest, AssertsCompiledSQL): 2661 2662 def test_one_prefix(self): 2663 User = self.classes.User 2664 sess = create_session() 2665 query = sess.query(User.name)\ 2666 .prefix_with('PREFIX_1') 2667 expected = "SELECT PREFIX_1 "\ 2668 "users.name AS users_name FROM users" 2669 self.assert_compile(query, expected, dialect=default.DefaultDialect()) 2670 2671 def test_many_prefixes(self): 2672 User = self.classes.User 2673 sess = create_session() 2674 query = sess.query(User.name).prefix_with('PREFIX_1', 'PREFIX_2') 2675 expected = "SELECT PREFIX_1 PREFIX_2 "\ 2676 "users.name AS users_name FROM users" 2677 self.assert_compile(query, expected, dialect=default.DefaultDialect()) 2678 2679 def test_chained_prefixes(self): 2680 User = self.classes.User 2681 sess = create_session() 2682 query = sess.query(User.name)\ 2683 .prefix_with('PREFIX_1')\ 2684 .prefix_with('PREFIX_2', 'PREFIX_3') 2685 expected = "SELECT PREFIX_1 PREFIX_2 PREFIX_3 "\ 2686 "users.name AS users_name FROM users" 2687 self.assert_compile(query, expected, dialect=default.DefaultDialect()) 2688 2689 2690class YieldTest(_fixtures.FixtureTest): 2691 run_setup_mappers = 'each' 2692 run_inserts = 'each' 2693 2694 def _eagerload_mappings(self, addresses_lazy=True, user_lazy=True): 2695 User, Address = self.classes("User", "Address") 2696 users, addresses = self.tables("users", "addresses") 2697 mapper(User, users, properties={ 2698 "addresses": relationship( 2699 Address, lazy=addresses_lazy, 2700 backref=backref("user", lazy=user_lazy) 2701 ) 2702 }) 2703 mapper(Address, addresses) 2704 2705 def test_basic(self): 2706 self._eagerload_mappings() 2707 2708 User = self.classes.User 2709 2710 sess = create_session() 2711 q = iter( 2712 sess.query(User).yield_per(1).from_statement( 2713 text("select * from users"))) 2714 2715 ret = [] 2716 eq_(len(sess.identity_map), 0) 2717 ret.append(next(q)) 2718 ret.append(next(q)) 2719 eq_(len(sess.identity_map), 2) 2720 ret.append(next(q)) 2721 ret.append(next(q)) 2722 eq_(len(sess.identity_map), 4) 2723 try: 2724 next(q) 2725 assert False 2726 except StopIteration: 2727 pass 2728 2729 def test_yield_per_and_execution_options(self): 2730 self._eagerload_mappings() 2731 2732 User = self.classes.User 2733 2734 sess = create_session() 2735 q = sess.query(User).yield_per(15) 2736 q = q.execution_options(foo='bar') 2737 assert q._yield_per 2738 eq_( 2739 q._execution_options, 2740 {"stream_results": True, "foo": "bar", "max_row_buffer": 15}) 2741 2742 def test_no_joinedload_opt(self): 2743 self._eagerload_mappings() 2744 2745 User = self.classes.User 2746 sess = create_session() 2747 q = sess.query(User).options(joinedload("addresses")).yield_per(1) 2748 assert_raises_message( 2749 sa_exc.InvalidRequestError, 2750 "The yield_per Query option is currently not compatible with " 2751 "joined collection eager loading. Please specify ", 2752 q.all 2753 ) 2754 2755 def test_no_subqueryload_opt(self): 2756 self._eagerload_mappings() 2757 2758 User = self.classes.User 2759 sess = create_session() 2760 q = sess.query(User).options(subqueryload("addresses")).yield_per(1) 2761 assert_raises_message( 2762 sa_exc.InvalidRequestError, 2763 "The yield_per Query option is currently not compatible with " 2764 "subquery eager loading. Please specify ", 2765 q.all 2766 ) 2767 2768 def test_no_subqueryload_mapping(self): 2769 self._eagerload_mappings(addresses_lazy="subquery") 2770 2771 User = self.classes.User 2772 sess = create_session() 2773 q = sess.query(User).yield_per(1) 2774 assert_raises_message( 2775 sa_exc.InvalidRequestError, 2776 "The yield_per Query option is currently not compatible with " 2777 "subquery eager loading. Please specify ", 2778 q.all 2779 ) 2780 2781 def test_joinedload_m2o_ok(self): 2782 self._eagerload_mappings(user_lazy="joined") 2783 Address = self.classes.Address 2784 sess = create_session() 2785 q = sess.query(Address).yield_per(1) 2786 q.all() 2787 2788 def test_eagerload_opt_disable(self): 2789 self._eagerload_mappings() 2790 2791 User = self.classes.User 2792 sess = create_session() 2793 q = sess.query(User).options(subqueryload("addresses")).\ 2794 enable_eagerloads(False).yield_per(1) 2795 q.all() 2796 2797 q = sess.query(User).options(joinedload("addresses")).\ 2798 enable_eagerloads(False).yield_per(1) 2799 q.all() 2800 2801 def test_m2o_joinedload_not_others(self): 2802 self._eagerload_mappings(addresses_lazy="joined") 2803 Address = self.classes.Address 2804 sess = create_session() 2805 q = sess.query(Address).options( 2806 lazyload('*'), joinedload("user")).yield_per(1).filter_by(id=1) 2807 2808 def go(): 2809 result = q.all() 2810 assert result[0].user 2811 self.assert_sql_count(testing.db, go, 1) 2812 2813 2814class HintsTest(QueryTest, AssertsCompiledSQL): 2815 __dialect__ = 'default' 2816 2817 def test_hints(self): 2818 User = self.classes.User 2819 2820 from sqlalchemy.dialects import mysql 2821 dialect = mysql.dialect() 2822 2823 sess = create_session() 2824 2825 self.assert_compile( 2826 sess.query(User).with_hint( 2827 User, 'USE INDEX (col1_index,col2_index)'), 2828 "SELECT users.id AS users_id, users.name AS users_name " 2829 "FROM users USE INDEX (col1_index,col2_index)", 2830 dialect=dialect 2831 ) 2832 2833 self.assert_compile( 2834 sess.query(User).with_hint( 2835 User, 'WITH INDEX col1_index', 'sybase'), 2836 "SELECT users.id AS users_id, users.name AS users_name " 2837 "FROM users", dialect=dialect 2838 ) 2839 2840 ualias = aliased(User) 2841 self.assert_compile( 2842 sess.query(User, ualias).with_hint( 2843 ualias, 'USE INDEX (col1_index,col2_index)'). 2844 join(ualias, ualias.id > User.id), 2845 "SELECT users.id AS users_id, users.name AS users_name, " 2846 "users_1.id AS users_1_id, users_1.name AS users_1_name " 2847 "FROM users INNER JOIN users AS users_1 " 2848 "USE INDEX (col1_index,col2_index) " 2849 "ON users_1.id > users.id", dialect=dialect 2850 ) 2851 2852 def test_statement_hints(self): 2853 User = self.classes.User 2854 2855 sess = create_session() 2856 stmt = sess.query(User).\ 2857 with_statement_hint("test hint one").\ 2858 with_statement_hint("test hint two").\ 2859 with_statement_hint("test hint three", "postgresql") 2860 2861 self.assert_compile( 2862 stmt, 2863 "SELECT users.id AS users_id, users.name AS users_name " 2864 "FROM users test hint one test hint two", 2865 ) 2866 2867 self.assert_compile( 2868 stmt, 2869 "SELECT users.id AS users_id, users.name AS users_name " 2870 "FROM users test hint one test hint two test hint three", 2871 dialect='postgresql' 2872 ) 2873 2874 2875class TextTest(QueryTest, AssertsCompiledSQL): 2876 __dialect__ = 'default' 2877 2878 def test_fulltext(self): 2879 User = self.classes.User 2880 2881 with expect_warnings("Textual SQL"): 2882 eq_( 2883 create_session().query(User). 2884 from_statement("select * from users order by id").all(), 2885 [User(id=7), User(id=8), User(id=9), User(id=10)] 2886 ) 2887 2888 eq_( 2889 create_session().query(User).from_statement( 2890 text("select * from users order by id")).first(), User(id=7) 2891 ) 2892 eq_( 2893 create_session().query(User).from_statement( 2894 text("select * from users where name='nonexistent'")).first(), 2895 None) 2896 2897 def test_fragment(self): 2898 User = self.classes.User 2899 2900 with expect_warnings("Textual SQL expression"): 2901 eq_( 2902 create_session().query(User).filter("id in (8, 9)").all(), 2903 [User(id=8), User(id=9)] 2904 2905 ) 2906 2907 eq_( 2908 create_session().query(User).filter("name='fred'"). 2909 filter("id=9").all(), [User(id=9)] 2910 ) 2911 eq_( 2912 create_session().query(User).filter("name='fred'"). 2913 filter(User.id == 9).all(), [User(id=9)] 2914 ) 2915 2916 def test_binds_coerce(self): 2917 User = self.classes.User 2918 2919 with expect_warnings("Textual SQL expression"): 2920 eq_( 2921 create_session().query(User).filter("id in (:id1, :id2)"). 2922 params(id1=8, id2=9).all(), [User(id=8), User(id=9)] 2923 ) 2924 2925 def test_as_column(self): 2926 User = self.classes.User 2927 2928 s = create_session() 2929 assert_raises( 2930 sa_exc.InvalidRequestError, s.query, 2931 User.id, text("users.name")) 2932 2933 eq_( 2934 s.query(User.id, "name").order_by(User.id).all(), 2935 [(7, 'jack'), (8, 'ed'), (9, 'fred'), (10, 'chuck')]) 2936 2937 def test_via_select(self): 2938 User = self.classes.User 2939 s = create_session() 2940 eq_( 2941 s.query(User).from_statement( 2942 select([column('id'), column('name')]). 2943 select_from(table('users')).order_by('id'), 2944 ).all(), 2945 [User(id=7), User(id=8), User(id=9), User(id=10)] 2946 ) 2947 2948 def test_via_textasfrom_from_statement(self): 2949 User = self.classes.User 2950 s = create_session() 2951 2952 eq_( 2953 s.query(User).from_statement( 2954 text("select * from users order by id"). 2955 columns(id=Integer, name=String)).all(), 2956 [User(id=7), User(id=8), User(id=9), User(id=10)] 2957 ) 2958 2959 def test_via_textasfrom_use_mapped_columns(self): 2960 User = self.classes.User 2961 s = create_session() 2962 2963 eq_( 2964 s.query(User).from_statement( 2965 text("select * from users order by id"). 2966 columns(User.id, User.name)).all(), 2967 [User(id=7), User(id=8), User(id=9), User(id=10)] 2968 ) 2969 2970 def test_via_textasfrom_select_from(self): 2971 User = self.classes.User 2972 s = create_session() 2973 2974 eq_( 2975 s.query(User).select_from( 2976 text("select * from users").columns(id=Integer, name=String) 2977 ).order_by(User.id).all(), 2978 [User(id=7), User(id=8), User(id=9), User(id=10)] 2979 ) 2980 2981 def test_group_by_accepts_text(self): 2982 User = self.classes.User 2983 s = create_session() 2984 2985 q = s.query(User).group_by(text("name")) 2986 self.assert_compile( 2987 q, 2988 "SELECT users.id AS users_id, users.name AS users_name " 2989 "FROM users GROUP BY name" 2990 ) 2991 2992 def test_orm_columns_accepts_text(self): 2993 from sqlalchemy.orm.base import _orm_columns 2994 t = text("x") 2995 eq_( 2996 _orm_columns(t), 2997 [t] 2998 ) 2999 3000 def test_order_by_w_eager_one(self): 3001 User = self.classes.User 3002 s = create_session() 3003 3004 # from 1.0.0 thru 1.0.2, the "name" symbol here was considered 3005 # to be part of the things we need to ORDER BY and it was being 3006 # placed into the inner query's columns clause, as part of 3007 # query._compound_eager_statement where we add unwrap_order_by() 3008 # to the columns clause. However, as #3392 illustrates, unlocatable 3009 # string expressions like "name desc" will only fail in this scenario, 3010 # so in general the changing of the query structure with string labels 3011 # is dangerous. 3012 # 3013 # the queries here are again "invalid" from a SQL perspective, as the 3014 # "name" field isn't matched up to anything. 3015 # 3016 with expect_warnings("Can't resolve label reference 'name';"): 3017 self.assert_compile( 3018 s.query(User).options(joinedload("addresses")). 3019 order_by(desc("name")).limit(1), 3020 "SELECT anon_1.users_id AS anon_1_users_id, " 3021 "anon_1.users_name AS anon_1_users_name, " 3022 "addresses_1.id AS addresses_1_id, " 3023 "addresses_1.user_id AS addresses_1_user_id, " 3024 "addresses_1.email_address AS addresses_1_email_address " 3025 "FROM (SELECT users.id AS users_id, users.name AS users_name " 3026 "FROM users ORDER BY users.name " 3027 "DESC LIMIT :param_1) AS anon_1 " 3028 "LEFT OUTER JOIN addresses AS addresses_1 " 3029 "ON anon_1.users_id = addresses_1.user_id " 3030 "ORDER BY name DESC, addresses_1.id" 3031 ) 3032 3033 def test_order_by_w_eager_two(self): 3034 User = self.classes.User 3035 s = create_session() 3036 3037 with expect_warnings("Can't resolve label reference 'name';"): 3038 self.assert_compile( 3039 s.query(User).options(joinedload("addresses")). 3040 order_by("name").limit(1), 3041 "SELECT anon_1.users_id AS anon_1_users_id, " 3042 "anon_1.users_name AS anon_1_users_name, " 3043 "addresses_1.id AS addresses_1_id, " 3044 "addresses_1.user_id AS addresses_1_user_id, " 3045 "addresses_1.email_address AS addresses_1_email_address " 3046 "FROM (SELECT users.id AS users_id, users.name AS users_name " 3047 "FROM users ORDER BY users.name " 3048 "LIMIT :param_1) AS anon_1 " 3049 "LEFT OUTER JOIN addresses AS addresses_1 " 3050 "ON anon_1.users_id = addresses_1.user_id " 3051 "ORDER BY name, addresses_1.id" 3052 ) 3053 3054 def test_order_by_w_eager_three(self): 3055 User = self.classes.User 3056 s = create_session() 3057 3058 self.assert_compile( 3059 s.query(User).options(joinedload("addresses")). 3060 order_by("users_name").limit(1), 3061 "SELECT anon_1.users_id AS anon_1_users_id, " 3062 "anon_1.users_name AS anon_1_users_name, " 3063 "addresses_1.id AS addresses_1_id, " 3064 "addresses_1.user_id AS addresses_1_user_id, " 3065 "addresses_1.email_address AS addresses_1_email_address " 3066 "FROM (SELECT users.id AS users_id, users.name AS users_name " 3067 "FROM users ORDER BY users.name " 3068 "LIMIT :param_1) AS anon_1 " 3069 "LEFT OUTER JOIN addresses AS addresses_1 " 3070 "ON anon_1.users_id = addresses_1.user_id " 3071 "ORDER BY anon_1.users_name, addresses_1.id" 3072 ) 3073 3074 # however! this works (again?) 3075 eq_( 3076 s.query(User).options(joinedload("addresses")). 3077 order_by("users_name").first(), 3078 User(name='chuck', addresses=[]) 3079 ) 3080 3081 def test_order_by_w_eager_four(self): 3082 User = self.classes.User 3083 Address = self.classes.Address 3084 s = create_session() 3085 3086 self.assert_compile( 3087 s.query(User).options(joinedload("addresses")). 3088 order_by(desc("users_name")).limit(1), 3089 "SELECT anon_1.users_id AS anon_1_users_id, " 3090 "anon_1.users_name AS anon_1_users_name, " 3091 "addresses_1.id AS addresses_1_id, " 3092 "addresses_1.user_id AS addresses_1_user_id, " 3093 "addresses_1.email_address AS addresses_1_email_address " 3094 "FROM (SELECT users.id AS users_id, users.name AS users_name " 3095 "FROM users ORDER BY users.name DESC " 3096 "LIMIT :param_1) AS anon_1 " 3097 "LEFT OUTER JOIN addresses AS addresses_1 " 3098 "ON anon_1.users_id = addresses_1.user_id " 3099 "ORDER BY anon_1.users_name DESC, addresses_1.id" 3100 ) 3101 3102 # however! this works (again?) 3103 eq_( 3104 s.query(User).options(joinedload("addresses")). 3105 order_by(desc("users_name")).first(), 3106 User(name='jack', addresses=[Address()]) 3107 ) 3108 3109 def test_order_by_w_eager_five(self): 3110 """essentially the same as test_eager_relations -> test_limit_3, 3111 but test for textual label elements that are freeform. 3112 this is again #3392.""" 3113 3114 User = self.classes.User 3115 Address = self.classes.Address 3116 Order = self.classes.Order 3117 3118 sess = create_session() 3119 3120 q = sess.query(User, Address.email_address.label('email_address')) 3121 3122 l = q.join('addresses').options(joinedload(User.orders)).\ 3123 order_by( 3124 "email_address desc").limit(1).offset(0) 3125 with expect_warnings( 3126 "Can't resolve label reference 'email_address desc'"): 3127 eq_( 3128 [ 3129 (User( 3130 id=7, 3131 orders=[Order(id=1), Order(id=3), Order(id=5)], 3132 addresses=[Address(id=1)] 3133 ), 'jack@bean.com') 3134 ], 3135 l.all()) 3136 3137 3138class TextWarningTest(QueryTest, AssertsCompiledSQL): 3139 def _test(self, fn, arg, offending_clause, expected): 3140 assert_raises_message( 3141 sa.exc.SAWarning, 3142 r"Textual (?:SQL|column|SQL FROM) expression %(stmt)r should be " 3143 r"explicitly declared (?:with|as) text\(%(stmt)r\)" % { 3144 "stmt": util.ellipses_string(offending_clause), 3145 }, 3146 fn, arg 3147 ) 3148 3149 with expect_warnings("Textual "): 3150 stmt = fn(arg) 3151 self.assert_compile(stmt, expected) 3152 3153 def test_filter(self): 3154 User = self.classes.User 3155 self._test( 3156 Session().query(User.id).filter, "myid == 5", "myid == 5", 3157 "SELECT users.id AS users_id FROM users WHERE myid == 5" 3158 ) 3159 3160 def test_having(self): 3161 User = self.classes.User 3162 self._test( 3163 Session().query(User.id).having, "myid == 5", "myid == 5", 3164 "SELECT users.id AS users_id FROM users HAVING myid == 5" 3165 ) 3166 3167 def test_from_statement(self): 3168 User = self.classes.User 3169 self._test( 3170 Session().query(User.id).from_statement, 3171 "select id from user", 3172 "select id from user", 3173 "select id from user", 3174 ) 3175 3176 3177class ParentTest(QueryTest, AssertsCompiledSQL): 3178 __dialect__ = 'default' 3179 3180 def test_o2m(self): 3181 User, orders, Order = ( 3182 self.classes.User, self.tables.orders, self.classes.Order) 3183 3184 sess = create_session() 3185 q = sess.query(User) 3186 3187 u1 = q.filter_by(name='jack').one() 3188 3189 # test auto-lookup of property 3190 o = sess.query(Order).with_parent(u1).all() 3191 assert [Order(description="order 1"), Order(description="order 3"), 3192 Order(description="order 5")] == o 3193 3194 # test with explicit property 3195 o = sess.query(Order).with_parent(u1, property='orders').all() 3196 assert [Order(description="order 1"), Order(description="order 3"), 3197 Order(description="order 5")] == o 3198 3199 o = sess.query(Order).with_parent(u1, property=User.orders).all() 3200 assert [Order(description="order 1"), Order(description="order 3"), 3201 Order(description="order 5")] == o 3202 3203 o = sess.query(Order).filter(with_parent(u1, User.orders)).all() 3204 assert [ 3205 Order(description="order 1"), Order(description="order 3"), 3206 Order(description="order 5")] == o 3207 3208 # test generative criterion 3209 o = sess.query(Order).with_parent(u1).filter(orders.c.id > 2).all() 3210 assert [ 3211 Order(description="order 3"), Order(description="order 5")] == o 3212 3213 # test against None for parent? this can't be done with the current 3214 # API since we don't know what mapper to use 3215 # assert 3216 # sess.query(Order).with_parent(None, property='addresses').all() 3217 # == [Order(description="order 5")] 3218 3219 def test_select_from(self): 3220 User, Address = self.classes.User, self.classes.Address 3221 3222 sess = create_session() 3223 u1 = sess.query(User).get(7) 3224 q = sess.query(Address).select_from(Address).with_parent(u1) 3225 self.assert_compile( 3226 q, 3227 "SELECT addresses.id AS addresses_id, " 3228 "addresses.user_id AS addresses_user_id, " 3229 "addresses.email_address AS addresses_email_address " 3230 "FROM addresses WHERE :param_1 = addresses.user_id", 3231 {'param_1': 7} 3232 ) 3233 3234 @testing.fails("issue #3607") 3235 def test_select_from_alias(self): 3236 User, Address = self.classes.User, self.classes.Address 3237 3238 sess = create_session() 3239 u1 = sess.query(User).get(7) 3240 a1 = aliased(Address) 3241 q = sess.query(a1).with_parent(u1) 3242 self.assert_compile( 3243 q, 3244 "SELECT addresses_1.id AS addresses_1_id, " 3245 "addresses_1.user_id AS addresses_1_user_id, " 3246 "addresses_1.email_address AS addresses_1_email_address " 3247 "FROM addresses AS addresses_1 " 3248 "WHERE :param_1 = addresses_1.user_id", 3249 {'param_1': 7} 3250 ) 3251 3252 def test_noparent(self): 3253 Item, User = self.classes.Item, self.classes.User 3254 3255 sess = create_session() 3256 q = sess.query(User) 3257 3258 u1 = q.filter_by(name='jack').one() 3259 3260 try: 3261 q = sess.query(Item).with_parent(u1) 3262 assert False 3263 except sa_exc.InvalidRequestError as e: 3264 assert str(e) \ 3265 == "Could not locate a property which relates "\ 3266 "instances of class 'Item' to instances of class 'User'" 3267 3268 def test_m2m(self): 3269 Item, Keyword = self.classes.Item, self.classes.Keyword 3270 3271 sess = create_session() 3272 i1 = sess.query(Item).filter_by(id=2).one() 3273 k = sess.query(Keyword).with_parent(i1).all() 3274 assert [ 3275 Keyword(name='red'), Keyword(name='small'), 3276 Keyword(name='square')] == k 3277 3278 def test_with_transient(self): 3279 User, Order = self.classes.User, self.classes.Order 3280 3281 sess = Session() 3282 3283 q = sess.query(User) 3284 u1 = q.filter_by(name='jack').one() 3285 utrans = User(id=u1.id) 3286 o = sess.query(Order).with_parent(utrans, 'orders') 3287 eq_( 3288 [ 3289 Order(description="order 1"), Order(description="order 3"), 3290 Order(description="order 5")], 3291 o.all() 3292 ) 3293 3294 o = sess.query(Order).filter(with_parent(utrans, 'orders')) 3295 eq_( 3296 [ 3297 Order(description="order 1"), Order(description="order 3"), 3298 Order(description="order 5")], 3299 o.all() 3300 ) 3301 3302 3303 def test_with_pending_autoflush(self): 3304 Order, User = self.classes.Order, self.classes.User 3305 3306 sess = Session() 3307 3308 o1 = sess.query(Order).first() 3309 opending = Order(id=20, user_id=o1.user_id) 3310 sess.add(opending) 3311 eq_( 3312 sess.query(User).with_parent(opending, 'user').one(), 3313 User(id=o1.user_id) 3314 ) 3315 eq_( 3316 sess.query(User).filter(with_parent(opending, 'user')).one(), 3317 User(id=o1.user_id) 3318 ) 3319 3320 def test_with_pending_no_autoflush(self): 3321 Order, User = self.classes.Order, self.classes.User 3322 3323 sess = Session(autoflush=False) 3324 3325 o1 = sess.query(Order).first() 3326 opending = Order(user_id=o1.user_id) 3327 sess.add(opending) 3328 eq_( 3329 sess.query(User).with_parent(opending, 'user').one(), 3330 User(id=o1.user_id) 3331 ) 3332 3333 def test_unique_binds_union(self): 3334 """bindparams used in the 'parent' query are unique""" 3335 User, Address = self.classes.User, self.classes.Address 3336 3337 sess = Session() 3338 u1, u2 = sess.query(User).order_by(User.id)[0:2] 3339 3340 q1 = sess.query(Address).with_parent(u1, 'addresses') 3341 q2 = sess.query(Address).with_parent(u2, 'addresses') 3342 3343 self.assert_compile( 3344 q1.union(q2), 3345 "SELECT anon_1.addresses_id AS anon_1_addresses_id, " 3346 "anon_1.addresses_user_id AS anon_1_addresses_user_id, " 3347 "anon_1.addresses_email_address AS " 3348 "anon_1_addresses_email_address FROM (SELECT addresses.id AS " 3349 "addresses_id, addresses.user_id AS addresses_user_id, " 3350 "addresses.email_address AS addresses_email_address FROM " 3351 "addresses WHERE :param_1 = addresses.user_id UNION SELECT " 3352 "addresses.id AS addresses_id, addresses.user_id AS " 3353 "addresses_user_id, addresses.email_address " 3354 "AS addresses_email_address " 3355 "FROM addresses WHERE :param_2 = addresses.user_id) AS anon_1", 3356 checkparams={'param_1': 7, 'param_2': 8}, 3357 ) 3358 3359 def test_unique_binds_or(self): 3360 User, Address = self.classes.User, self.classes.Address 3361 3362 sess = Session() 3363 u1, u2 = sess.query(User).order_by(User.id)[0:2] 3364 3365 self.assert_compile( 3366 sess.query(Address).filter( 3367 or_(with_parent(u1, 'addresses'), with_parent(u2, 'addresses')) 3368 ), 3369 "SELECT addresses.id AS addresses_id, addresses.user_id AS " 3370 "addresses_user_id, addresses.email_address AS " 3371 "addresses_email_address FROM addresses WHERE " 3372 ":param_1 = addresses.user_id OR :param_2 = addresses.user_id", 3373 checkparams={'param_1': 7, 'param_2': 8}, 3374 ) 3375 3376 3377class WithTransientOnNone(_fixtures.FixtureTest, AssertsCompiledSQL): 3378 run_inserts = None 3379 __dialect__ = 'default' 3380 3381 def _fixture1(self): 3382 User, Address = self.classes.User, self.classes.Address 3383 users, addresses = self.tables.users, self.tables.addresses 3384 3385 mapper(User, users) 3386 mapper(Address, addresses, properties={ 3387 'user': relationship(User), 3388 'special_user': relationship( 3389 User, primaryjoin=and_( 3390 users.c.id == addresses.c.user_id, 3391 users.c.name == addresses.c.email_address)) 3392 }) 3393 3394 def test_filter_with_transient_assume_pk(self): 3395 self._fixture1() 3396 User, Address = self.classes.User, self.classes.Address 3397 3398 sess = Session() 3399 3400 q = sess.query(Address).filter(Address.user == User()) 3401 with expect_warnings("Got None for value of column "): 3402 self.assert_compile( 3403 q, 3404 "SELECT addresses.id AS addresses_id, " 3405 "addresses.user_id AS addresses_user_id, " 3406 "addresses.email_address AS addresses_email_address " 3407 "FROM addresses WHERE :param_1 = addresses.user_id", 3408 checkparams={'param_1': None} 3409 ) 3410 3411 def test_filter_with_transient_warn_for_none_against_non_pk(self): 3412 self._fixture1() 3413 User, Address = self.classes.User, self.classes.Address 3414 3415 s = Session() 3416 q = s.query(Address).filter(Address.special_user == User()) 3417 with expect_warnings("Got None for value of column"): 3418 3419 self.assert_compile( 3420 q, 3421 "SELECT addresses.id AS addresses_id, " 3422 "addresses.user_id AS addresses_user_id, " 3423 "addresses.email_address AS addresses_email_address " 3424 "FROM addresses WHERE :param_1 = addresses.user_id " 3425 "AND :param_2 = addresses.email_address", 3426 checkparams={"param_1": None, "param_2": None} 3427 ) 3428 3429 def test_with_parent_with_transient_assume_pk(self): 3430 self._fixture1() 3431 User, Address = self.classes.User, self.classes.Address 3432 3433 sess = Session() 3434 3435 q = sess.query(User).with_parent(Address(), "user") 3436 with expect_warnings("Got None for value of column"): 3437 self.assert_compile( 3438 q, 3439 "SELECT users.id AS users_id, users.name AS users_name " 3440 "FROM users WHERE users.id = :param_1", 3441 checkparams={'param_1': None} 3442 ) 3443 3444 def test_with_parent_with_transient_warn_for_none_against_non_pk(self): 3445 self._fixture1() 3446 User, Address = self.classes.User, self.classes.Address 3447 3448 s = Session() 3449 q = s.query(User).with_parent(Address(), "special_user") 3450 with expect_warnings("Got None for value of column"): 3451 3452 self.assert_compile( 3453 q, 3454 "SELECT users.id AS users_id, users.name AS users_name " 3455 "FROM users WHERE users.id = :param_1 " 3456 "AND users.name = :param_2", 3457 checkparams={"param_1": None, "param_2": None} 3458 ) 3459 3460 def test_negated_contains_or_equals_plain_m2o(self): 3461 self._fixture1() 3462 User, Address = self.classes.User, self.classes.Address 3463 3464 s = Session() 3465 q = s.query(Address).filter(Address.user != User()) 3466 with expect_warnings("Got None for value of column"): 3467 self.assert_compile( 3468 q, 3469 3470 "SELECT addresses.id AS addresses_id, " 3471 "addresses.user_id AS addresses_user_id, " 3472 "addresses.email_address AS addresses_email_address " 3473 "FROM addresses " 3474 "WHERE addresses.user_id != :user_id_1 " 3475 "OR addresses.user_id IS NULL", 3476 checkparams={'user_id_1': None} 3477 ) 3478 3479 def test_negated_contains_or_equals_complex_rel(self): 3480 self._fixture1() 3481 User, Address = self.classes.User, self.classes.Address 3482 3483 s = Session() 3484 3485 # this one does *not* warn because we do the criteria 3486 # without deferral 3487 q = s.query(Address).filter(Address.special_user != User()) 3488 self.assert_compile( 3489 q, 3490 "SELECT addresses.id AS addresses_id, " 3491 "addresses.user_id AS addresses_user_id, " 3492 "addresses.email_address AS addresses_email_address " 3493 "FROM addresses " 3494 "WHERE NOT (EXISTS (SELECT 1 " 3495 "FROM users " 3496 "WHERE users.id = addresses.user_id AND " 3497 "users.name = addresses.email_address AND users.id IS NULL))", 3498 checkparams={} 3499 ) 3500 3501 3502class SynonymTest(QueryTest, AssertsCompiledSQL): 3503 __dialect__ = 'default' 3504 3505 @classmethod 3506 def setup_mappers(cls): 3507 users, Keyword, items, order_items, orders, Item, User, \ 3508 Address, keywords, Order, item_keywords, addresses = \ 3509 cls.tables.users, cls.classes.Keyword, cls.tables.items, \ 3510 cls.tables.order_items, cls.tables.orders, \ 3511 cls.classes.Item, cls.classes.User, cls.classes.Address, \ 3512 cls.tables.keywords, cls.classes.Order, \ 3513 cls.tables.item_keywords, cls.tables.addresses 3514 3515 mapper(User, users, properties={ 3516 'name_syn': synonym('name'), 3517 'addresses': relationship(Address), 3518 'orders': relationship( 3519 Order, backref='user', order_by=orders.c.id), # o2m, m2o 3520 'orders_syn': synonym('orders'), 3521 'orders_syn_2': synonym('orders_syn') 3522 }) 3523 mapper(Address, addresses) 3524 mapper(Order, orders, properties={ 3525 'items': relationship(Item, secondary=order_items), # m2m 3526 'address': relationship(Address), # m2o 3527 'items_syn': synonym('items') 3528 }) 3529 mapper(Item, items, properties={ 3530 'keywords': relationship(Keyword, secondary=item_keywords) # m2m 3531 }) 3532 mapper(Keyword, keywords) 3533 3534 def test_options(self): 3535 User, Order = self.classes.User, self.classes.Order 3536 3537 s = create_session() 3538 3539 def go(): 3540 result = s.query(User).filter_by(name='jack').\ 3541 options(joinedload(User.orders_syn)).all() 3542 eq_(result, [ 3543 User(id=7, name='jack', orders=[ 3544 Order(description='order 1'), 3545 Order(description='order 3'), 3546 Order(description='order 5') 3547 ]) 3548 ]) 3549 self.assert_sql_count(testing.db, go, 1) 3550 3551 def test_options_syn_of_syn(self): 3552 User, Order = self.classes.User, self.classes.Order 3553 3554 s = create_session() 3555 3556 def go(): 3557 result = s.query(User).filter_by(name='jack').\ 3558 options(joinedload(User.orders_syn_2)).all() 3559 eq_(result, [ 3560 User(id=7, name='jack', orders=[ 3561 Order(description='order 1'), 3562 Order(description='order 3'), 3563 Order(description='order 5') 3564 ]) 3565 ]) 3566 self.assert_sql_count(testing.db, go, 1) 3567 3568 def test_options_syn_of_syn_string(self): 3569 User, Order = self.classes.User, self.classes.Order 3570 3571 s = create_session() 3572 3573 def go(): 3574 result = s.query(User).filter_by(name='jack').\ 3575 options(joinedload('orders_syn_2')).all() 3576 eq_(result, [ 3577 User(id=7, name='jack', orders=[ 3578 Order(description='order 1'), 3579 Order(description='order 3'), 3580 Order(description='order 5') 3581 ]) 3582 ]) 3583 self.assert_sql_count(testing.db, go, 1) 3584 3585 def test_joins(self): 3586 User, Order = self.classes.User, self.classes.Order 3587 3588 for j in ( 3589 ['orders', 'items'], 3590 ['orders_syn', 'items'], 3591 [User.orders_syn, Order.items], 3592 ['orders_syn_2', 'items'], 3593 [User.orders_syn_2, 'items'], 3594 ['orders', 'items_syn'], 3595 ['orders_syn', 'items_syn'], 3596 ['orders_syn_2', 'items_syn'], 3597 ): 3598 result = create_session().query(User).join(*j).filter_by(id=3). \ 3599 all() 3600 assert [User(id=7, name='jack'), User(id=9, name='fred')] == result 3601 3602 def test_with_parent(self): 3603 Order, User = self.classes.Order, self.classes.User 3604 3605 for nameprop, orderprop in ( 3606 ('name', 'orders'), 3607 ('name_syn', 'orders'), 3608 ('name', 'orders_syn'), 3609 ('name', 'orders_syn_2'), 3610 ('name_syn', 'orders_syn'), 3611 ('name_syn', 'orders_syn_2'), 3612 ): 3613 sess = create_session() 3614 q = sess.query(User) 3615 3616 u1 = q.filter_by(**{nameprop: 'jack'}).one() 3617 3618 o = sess.query(Order).with_parent(u1, property=orderprop).all() 3619 assert [ 3620 Order(description="order 1"), Order(description="order 3"), 3621 Order(description="order 5")] == o 3622 3623 def test_froms_aliased_col(self): 3624 Address, User = self.classes.Address, self.classes.User 3625 3626 sess = create_session() 3627 ua = aliased(User) 3628 3629 q = sess.query(ua.name_syn).join( 3630 Address, ua.id == Address.user_id) 3631 self.assert_compile( 3632 q, 3633 "SELECT users_1.name AS users_1_name FROM " 3634 "users AS users_1 JOIN addresses ON users_1.id = addresses.user_id" 3635 ) 3636 3637 3638class ImmediateTest(_fixtures.FixtureTest): 3639 run_inserts = 'once' 3640 run_deletes = None 3641 3642 @classmethod 3643 def setup_mappers(cls): 3644 Address, addresses, users, User = (cls.classes.Address, 3645 cls.tables.addresses, 3646 cls.tables.users, 3647 cls.classes.User) 3648 3649 mapper(Address, addresses) 3650 3651 mapper(User, users, properties=dict( 3652 addresses=relationship(Address))) 3653 3654 def test_one(self): 3655 User, Address = self.classes.User, self.classes.Address 3656 3657 sess = create_session() 3658 3659 assert_raises( 3660 sa.orm.exc.NoResultFound, 3661 sess.query(User).filter(User.id == 99).one) 3662 3663 eq_(sess.query(User).filter(User.id == 7).one().id, 7) 3664 3665 assert_raises(sa.orm.exc.MultipleResultsFound, sess.query(User).one) 3666 3667 assert_raises( 3668 sa.orm.exc.NoResultFound, 3669 sess.query(User.id, User.name).filter(User.id == 99).one) 3670 3671 eq_(sess.query(User.id, User.name).filter(User.id == 7).one(), 3672 (7, 'jack')) 3673 3674 assert_raises( 3675 sa.orm.exc.MultipleResultsFound, 3676 sess.query(User.id, User.name).one) 3677 3678 assert_raises( 3679 sa.orm.exc.NoResultFound, 3680 (sess.query(User, Address).join(User.addresses). 3681 filter(Address.id == 99)).one) 3682 3683 eq_((sess.query(User, Address). 3684 join(User.addresses). 3685 filter(Address.id == 4)).one(), 3686 (User(id=8), Address(id=4))) 3687 3688 assert_raises( 3689 sa.orm.exc.MultipleResultsFound, 3690 sess.query(User, Address).join(User.addresses).one) 3691 3692 # this result returns multiple rows, the first 3693 # two rows being the same. but uniquing is 3694 # not applied for a column based result. 3695 assert_raises( 3696 sa.orm.exc.MultipleResultsFound, 3697 sess.query(User.id).join(User.addresses). 3698 filter(User.id.in_([8, 9])).order_by(User.id).one) 3699 3700 # test that a join which ultimately returns 3701 # multiple identities across many rows still 3702 # raises, even though the first two rows are of 3703 # the same identity and unique filtering 3704 # is applied ([ticket:1688]) 3705 assert_raises( 3706 sa.orm.exc.MultipleResultsFound, 3707 sess.query(User).join(User.addresses).filter(User.id.in_([8, 9])). 3708 order_by(User.id).one) 3709 3710 def test_one_or_none(self): 3711 User, Address = self.classes.User, self.classes.Address 3712 3713 sess = create_session() 3714 3715 eq_(sess.query(User).filter(User.id == 99).one_or_none(), None) 3716 3717 eq_(sess.query(User).filter(User.id == 7).one_or_none().id, 7) 3718 3719 assert_raises_message( 3720 sa.orm.exc.MultipleResultsFound, 3721 "Multiple rows were found for one_or_none\(\)", 3722 sess.query(User).one_or_none) 3723 3724 eq_(sess.query(User.id, User.name).filter(User.id == 99).one_or_none(), None) 3725 3726 eq_(sess.query(User.id, User.name).filter(User.id == 7).one_or_none(), 3727 (7, 'jack')) 3728 3729 assert_raises( 3730 sa.orm.exc.MultipleResultsFound, 3731 sess.query(User.id, User.name).one_or_none) 3732 3733 eq_( 3734 (sess.query(User, Address).join(User.addresses). 3735 filter(Address.id == 99)).one_or_none(), None) 3736 3737 eq_((sess.query(User, Address). 3738 join(User.addresses). 3739 filter(Address.id == 4)).one_or_none(), 3740 (User(id=8), Address(id=4))) 3741 3742 assert_raises( 3743 sa.orm.exc.MultipleResultsFound, 3744 sess.query(User, Address).join(User.addresses).one_or_none) 3745 3746 # this result returns multiple rows, the first 3747 # two rows being the same. but uniquing is 3748 # not applied for a column based result. 3749 assert_raises( 3750 sa.orm.exc.MultipleResultsFound, 3751 sess.query(User.id).join(User.addresses). 3752 filter(User.id.in_([8, 9])).order_by(User.id).one_or_none) 3753 3754 # test that a join which ultimately returns 3755 # multiple identities across many rows still 3756 # raises, even though the first two rows are of 3757 # the same identity and unique filtering 3758 # is applied ([ticket:1688]) 3759 assert_raises( 3760 sa.orm.exc.MultipleResultsFound, 3761 sess.query(User).join(User.addresses).filter(User.id.in_([8, 9])). 3762 order_by(User.id).one_or_none) 3763 3764 @testing.future 3765 def test_getslice(self): 3766 assert False 3767 3768 def test_scalar(self): 3769 User = self.classes.User 3770 3771 sess = create_session() 3772 3773 eq_(sess.query(User.id).filter_by(id=7).scalar(), 7) 3774 eq_(sess.query(User.id, User.name).filter_by(id=7).scalar(), 7) 3775 eq_(sess.query(User.id).filter_by(id=0).scalar(), None) 3776 eq_(sess.query(User).filter_by(id=7).scalar(), 3777 sess.query(User).filter_by(id=7).one()) 3778 3779 assert_raises(sa.orm.exc.MultipleResultsFound, sess.query(User).scalar) 3780 assert_raises( 3781 sa.orm.exc.MultipleResultsFound, 3782 sess.query(User.id, User.name).scalar) 3783 3784 def test_value(self): 3785 User = self.classes.User 3786 3787 sess = create_session() 3788 3789 eq_(sess.query(User).filter_by(id=7).value(User.id), 7) 3790 eq_(sess.query(User.id, User.name).filter_by(id=7).value(User.id), 7) 3791 eq_(sess.query(User).filter_by(id=0).value(User.id), None) 3792 3793 sess.bind = testing.db 3794 eq_(sess.query().value(sa.literal_column('1').label('x')), 1) 3795 3796 3797class ExecutionOptionsTest(QueryTest): 3798 3799 def test_option_building(self): 3800 User = self.classes.User 3801 3802 sess = create_session(bind=testing.db, autocommit=False) 3803 3804 q1 = sess.query(User) 3805 assert q1._execution_options == dict() 3806 q2 = q1.execution_options(foo='bar', stream_results=True) 3807 # q1's options should be unchanged. 3808 assert q1._execution_options == dict() 3809 # q2 should have them set. 3810 assert q2._execution_options == dict(foo='bar', stream_results=True) 3811 q3 = q2.execution_options(foo='not bar', answer=42) 3812 assert q2._execution_options == dict(foo='bar', stream_results=True) 3813 3814 q3_options = dict(foo='not bar', stream_results=True, answer=42) 3815 assert q3._execution_options == q3_options 3816 3817 def test_options_in_connection(self): 3818 User = self.classes.User 3819 3820 execution_options = dict(foo='bar', stream_results=True) 3821 3822 class TQuery(Query): 3823 def instances(self, result, ctx): 3824 try: 3825 eq_( 3826 result.connection._execution_options, 3827 execution_options) 3828 finally: 3829 result.close() 3830 return iter([]) 3831 3832 sess = create_session( 3833 bind=testing.db, autocommit=False, query_cls=TQuery) 3834 q1 = sess.query(User).execution_options(**execution_options) 3835 q1.all() 3836 3837 3838class BooleanEvalTest(fixtures.TestBase, testing.AssertsCompiledSQL): 3839 """test standalone booleans being wrapped in an AsBoolean, as well 3840 as true/false compilation.""" 3841 3842 def _dialect(self, native_boolean): 3843 d = default.DefaultDialect() 3844 d.supports_native_boolean = native_boolean 3845 return d 3846 3847 def test_one(self): 3848 s = Session() 3849 c = column('x', Boolean) 3850 self.assert_compile( 3851 s.query(c).filter(c), 3852 "SELECT x WHERE x", 3853 dialect=self._dialect(True) 3854 ) 3855 3856 def test_two(self): 3857 s = Session() 3858 c = column('x', Boolean) 3859 self.assert_compile( 3860 s.query(c).filter(c), 3861 "SELECT x WHERE x = 1", 3862 dialect=self._dialect(False) 3863 ) 3864 3865 def test_three(self): 3866 s = Session() 3867 c = column('x', Boolean) 3868 self.assert_compile( 3869 s.query(c).filter(~c), 3870 "SELECT x WHERE x = 0", 3871 dialect=self._dialect(False) 3872 ) 3873 3874 def test_four(self): 3875 s = Session() 3876 c = column('x', Boolean) 3877 self.assert_compile( 3878 s.query(c).filter(~c), 3879 "SELECT x WHERE NOT x", 3880 dialect=self._dialect(True) 3881 ) 3882 3883 def test_five(self): 3884 s = Session() 3885 c = column('x', Boolean) 3886 self.assert_compile( 3887 s.query(c).having(c), 3888 "SELECT x HAVING x = 1", 3889 dialect=self._dialect(False) 3890 ) 3891 3892 3893class SessionBindTest(QueryTest): 3894 3895 @contextlib.contextmanager 3896 def _assert_bind_args(self, session): 3897 get_bind = mock.Mock(side_effect=session.get_bind) 3898 with mock.patch.object(session, "get_bind", get_bind): 3899 yield 3900 for call_ in get_bind.mock_calls: 3901 is_(call_[1][0], inspect(self.classes.User)) 3902 is_not_(call_[2]['clause'], None) 3903 3904 def test_single_entity_q(self): 3905 User = self.classes.User 3906 session = Session() 3907 with self._assert_bind_args(session): 3908 session.query(User).all() 3909 3910 def test_sql_expr_entity_q(self): 3911 User = self.classes.User 3912 session = Session() 3913 with self._assert_bind_args(session): 3914 session.query(User.id).all() 3915 3916 def test_count(self): 3917 User = self.classes.User 3918 session = Session() 3919 with self._assert_bind_args(session): 3920 session.query(User).count() 3921 3922 def test_aggregate_fn(self): 3923 User = self.classes.User 3924 session = Session() 3925 with self._assert_bind_args(session): 3926 session.query(func.max(User.name)).all() 3927 3928 def test_bulk_update_no_sync(self): 3929 User = self.classes.User 3930 session = Session() 3931 with self._assert_bind_args(session): 3932 session.query(User).filter(User.id == 15).update( 3933 {"name": "foob"}, synchronize_session=False) 3934 3935 def test_bulk_delete_no_sync(self): 3936 User = self.classes.User 3937 session = Session() 3938 with self._assert_bind_args(session): 3939 session.query(User).filter(User.id == 15).delete( 3940 synchronize_session=False) 3941 3942 def test_bulk_update_fetch_sync(self): 3943 User = self.classes.User 3944 session = Session() 3945 with self._assert_bind_args(session): 3946 session.query(User).filter(User.id == 15).update( 3947 {"name": "foob"}, synchronize_session='fetch') 3948 3949 def test_bulk_delete_fetch_sync(self): 3950 User = self.classes.User 3951 session = Session() 3952 with self._assert_bind_args(session): 3953 session.query(User).filter(User.id == 15).delete( 3954 synchronize_session='fetch') 3955 3956 def test_column_property(self): 3957 User = self.classes.User 3958 3959 mapper = inspect(User) 3960 mapper.add_property( 3961 "score", 3962 column_property(func.coalesce(self.tables.users.c.name, None))) 3963 session = Session() 3964 with self._assert_bind_args(session): 3965 session.query(func.max(User.score)).scalar() 3966 3967 def test_column_property_select(self): 3968 User = self.classes.User 3969 Address = self.classes.Address 3970 3971 mapper = inspect(User) 3972 mapper.add_property( 3973 "score", 3974 column_property( 3975 select([func.sum(Address.id)]). 3976 where(Address.user_id == User.id).as_scalar() 3977 ) 3978 ) 3979 session = Session() 3980 3981 with self._assert_bind_args(session): 3982 session.query(func.max(User.score)).scalar() 3983 3984