1from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, \ 2 in_, not_in_, is_, ne_, le_ 3from sqlalchemy import testing 4from sqlalchemy.testing import fixtures, engines 5from sqlalchemy import util 6from sqlalchemy import ( 7 exc, sql, func, select, String, Integer, MetaData, ForeignKey, 8 VARCHAR, INT, CHAR, text, type_coerce, literal_column, 9 TypeDecorator, table, column, literal) 10from sqlalchemy.engine import result as _result 11from sqlalchemy.testing.schema import Table, Column 12import operator 13from sqlalchemy.testing import assertions 14from sqlalchemy import exc as sa_exc 15from sqlalchemy.testing.mock import patch, Mock 16from contextlib import contextmanager 17from sqlalchemy.engine import default 18 19 20class ResultProxyTest(fixtures.TablesTest): 21 __backend__ = True 22 23 @classmethod 24 def define_tables(cls, metadata): 25 Table( 26 'users', metadata, 27 Column( 28 'user_id', INT, primary_key=True, 29 test_needs_autoincrement=True), 30 Column('user_name', VARCHAR(20)), 31 test_needs_acid=True 32 ) 33 Table( 34 'addresses', metadata, 35 Column( 36 'address_id', Integer, primary_key=True, 37 test_needs_autoincrement=True), 38 Column('user_id', Integer, ForeignKey('users.user_id')), 39 Column('address', String(30)), 40 test_needs_acid=True 41 ) 42 43 Table( 44 'users2', metadata, 45 Column('user_id', INT, primary_key=True), 46 Column('user_name', VARCHAR(20)), 47 test_needs_acid=True 48 ) 49 50 def test_row_iteration(self): 51 users = self.tables.users 52 53 users.insert().execute( 54 {'user_id': 7, 'user_name': 'jack'}, 55 {'user_id': 8, 'user_name': 'ed'}, 56 {'user_id': 9, 'user_name': 'fred'}, 57 ) 58 r = users.select().execute() 59 rows = [] 60 for row in r: 61 rows.append(row) 62 eq_(len(rows), 3) 63 64 @testing.requires.subqueries 65 def test_anonymous_rows(self): 66 users = self.tables.users 67 68 users.insert().execute( 69 {'user_id': 7, 'user_name': 'jack'}, 70 {'user_id': 8, 'user_name': 'ed'}, 71 {'user_id': 9, 'user_name': 'fred'}, 72 ) 73 74 sel = select([users.c.user_id]).where(users.c.user_name == 'jack'). \ 75 as_scalar() 76 for row in select([sel + 1, sel + 3], bind=users.bind).execute(): 77 eq_(row['anon_1'], 8) 78 eq_(row['anon_2'], 10) 79 80 def test_row_comparison(self): 81 users = self.tables.users 82 83 users.insert().execute(user_id=7, user_name='jack') 84 rp = users.select().execute().first() 85 86 eq_(rp, rp) 87 is_(not(rp != rp), True) 88 89 equal = (7, 'jack') 90 91 eq_(rp, equal) 92 eq_(equal, rp) 93 is_((not (rp != equal)), True) 94 is_(not (equal != equal), True) 95 96 def endless(): 97 while True: 98 yield 1 99 ne_(rp, endless()) 100 ne_(endless(), rp) 101 102 # test that everything compares the same 103 # as it would against a tuple 104 for compare in [False, 8, endless(), 'xyz', (7, 'jack')]: 105 for op in [ 106 operator.eq, operator.ne, operator.gt, 107 operator.lt, operator.ge, operator.le 108 ]: 109 110 try: 111 control = op(equal, compare) 112 except TypeError: 113 # Py3K raises TypeError for some invalid comparisons 114 assert_raises(TypeError, op, rp, compare) 115 else: 116 eq_(control, op(rp, compare)) 117 118 try: 119 control = op(compare, equal) 120 except TypeError: 121 # Py3K raises TypeError for some invalid comparisons 122 assert_raises(TypeError, op, compare, rp) 123 else: 124 eq_(control, op(compare, rp)) 125 126 @testing.provide_metadata 127 def test_column_label_overlap_fallback(self): 128 content = Table( 129 'content', self.metadata, 130 Column('type', String(30)), 131 ) 132 bar = Table( 133 'bar', self.metadata, 134 Column('content_type', String(30)) 135 ) 136 self.metadata.create_all(testing.db) 137 testing.db.execute(content.insert().values(type="t1")) 138 139 row = testing.db.execute(content.select(use_labels=True)).first() 140 in_(content.c.type, row) 141 not_in_(bar.c.content_type, row) 142 in_(sql.column('content_type'), row) 143 144 row = testing.db.execute( 145 select([content.c.type.label("content_type")])).first() 146 in_(content.c.type, row) 147 148 not_in_(bar.c.content_type, row) 149 150 in_(sql.column('content_type'), row) 151 152 row = testing.db.execute(select([func.now().label("content_type")])). \ 153 first() 154 not_in_(content.c.type, row) 155 156 not_in_(bar.c.content_type, row) 157 158 in_(sql.column('content_type'), row) 159 160 def test_pickled_rows(self): 161 users = self.tables.users 162 addresses = self.tables.addresses 163 164 users.insert().execute( 165 {'user_id': 7, 'user_name': 'jack'}, 166 {'user_id': 8, 'user_name': 'ed'}, 167 {'user_id': 9, 'user_name': 'fred'}, 168 ) 169 170 for pickle in False, True: 171 for use_labels in False, True: 172 result = users.select(use_labels=use_labels).order_by( 173 users.c.user_id).execute().fetchall() 174 175 if pickle: 176 result = util.pickle.loads(util.pickle.dumps(result)) 177 178 eq_( 179 result, 180 [(7, "jack"), (8, "ed"), (9, "fred")] 181 ) 182 if use_labels: 183 eq_(result[0]['users_user_id'], 7) 184 eq_( 185 list(result[0].keys()), 186 ["users_user_id", "users_user_name"]) 187 else: 188 eq_(result[0]['user_id'], 7) 189 eq_(list(result[0].keys()), ["user_id", "user_name"]) 190 191 eq_(result[0][0], 7) 192 eq_(result[0][users.c.user_id], 7) 193 eq_(result[0][users.c.user_name], 'jack') 194 195 if not pickle or use_labels: 196 assert_raises( 197 exc.NoSuchColumnError, 198 lambda: result[0][addresses.c.user_id]) 199 else: 200 # test with a different table. name resolution is 201 # causing 'user_id' to match when use_labels wasn't used. 202 eq_(result[0][addresses.c.user_id], 7) 203 204 assert_raises( 205 exc.NoSuchColumnError, lambda: result[0]['fake key']) 206 assert_raises( 207 exc.NoSuchColumnError, 208 lambda: result[0][addresses.c.address_id]) 209 210 def test_column_error_printing(self): 211 result = testing.db.execute(select([1])) 212 row = result.first() 213 214 class unprintable(object): 215 216 def __str__(self): 217 raise ValueError("nope") 218 219 msg = r"Could not locate column in row for column '%s'" 220 221 for accessor, repl in [ 222 ("x", "x"), 223 (Column("q", Integer), "q"), 224 (Column("q", Integer) + 12, r"q \+ :q_1"), 225 (unprintable(), "unprintable element.*"), 226 ]: 227 assert_raises_message( 228 exc.NoSuchColumnError, 229 msg % repl, 230 result._getter, accessor 231 ) 232 233 is_(result._getter(accessor, False), None) 234 235 assert_raises_message( 236 exc.NoSuchColumnError, 237 msg % repl, 238 lambda: row[accessor] 239 ) 240 241 def test_fetchmany(self): 242 users = self.tables.users 243 244 users.insert().execute(user_id=7, user_name='jack') 245 users.insert().execute(user_id=8, user_name='ed') 246 users.insert().execute(user_id=9, user_name='fred') 247 r = users.select().execute() 248 rows = [] 249 for row in r.fetchmany(size=2): 250 rows.append(row) 251 eq_(len(rows), 2) 252 253 def test_column_slices(self): 254 users = self.tables.users 255 addresses = self.tables.addresses 256 257 users.insert().execute(user_id=1, user_name='john') 258 users.insert().execute(user_id=2, user_name='jack') 259 addresses.insert().execute( 260 address_id=1, user_id=2, address='foo@bar.com') 261 262 r = text( 263 "select * from addresses", bind=testing.db).execute().first() 264 eq_(r[0:1], (1,)) 265 eq_(r[1:], (2, 'foo@bar.com')) 266 eq_(r[:-1], (1, 2)) 267 268 def test_column_accessor_basic_compiled(self): 269 users = self.tables.users 270 271 users.insert().execute( 272 dict(user_id=1, user_name='john'), 273 dict(user_id=2, user_name='jack') 274 ) 275 276 r = users.select(users.c.user_id == 2).execute().first() 277 eq_(r.user_id, 2) 278 eq_(r['user_id'], 2) 279 eq_(r[users.c.user_id], 2) 280 281 eq_(r.user_name, 'jack') 282 eq_(r['user_name'], 'jack') 283 eq_(r[users.c.user_name], 'jack') 284 285 def test_column_accessor_basic_text(self): 286 users = self.tables.users 287 288 users.insert().execute( 289 dict(user_id=1, user_name='john'), 290 dict(user_id=2, user_name='jack') 291 ) 292 r = testing.db.execute( 293 text("select * from users where user_id=2")).first() 294 295 eq_(r.user_id, 2) 296 eq_(r['user_id'], 2) 297 eq_(r[users.c.user_id], 2) 298 299 eq_(r.user_name, 'jack') 300 eq_(r['user_name'], 'jack') 301 eq_(r[users.c.user_name], 'jack') 302 303 def test_column_accessor_textual_select(self): 304 users = self.tables.users 305 306 users.insert().execute( 307 dict(user_id=1, user_name='john'), 308 dict(user_id=2, user_name='jack') 309 ) 310 # this will create column() objects inside 311 # the select(), these need to match on name anyway 312 r = testing.db.execute( 313 select([ 314 column('user_id'), column('user_name') 315 ]).select_from(table('users')). 316 where(text('user_id=2')) 317 ).first() 318 319 eq_(r.user_id, 2) 320 eq_(r['user_id'], 2) 321 eq_(r[users.c.user_id], 2) 322 323 eq_(r.user_name, 'jack') 324 eq_(r['user_name'], 'jack') 325 eq_(r[users.c.user_name], 'jack') 326 327 def test_column_accessor_dotted_union(self): 328 users = self.tables.users 329 330 users.insert().execute( 331 dict(user_id=1, user_name='john'), 332 ) 333 334 # test a little sqlite < 3.10.0 weirdness - with the UNION, 335 # cols come back as "users.user_id" in cursor.description 336 r = testing.db.execute( 337 text( 338 "select users.user_id, users.user_name " 339 "from users " 340 "UNION select users.user_id, " 341 "users.user_name from users" 342 ) 343 ).first() 344 eq_(r['user_id'], 1) 345 eq_(r['user_name'], "john") 346 eq_(list(r.keys()), ["user_id", "user_name"]) 347 348 def test_column_accessor_sqlite_raw(self): 349 users = self.tables.users 350 351 users.insert().execute( 352 dict(user_id=1, user_name='john'), 353 ) 354 355 r = text( 356 "select users.user_id, users.user_name " 357 "from users " 358 "UNION select users.user_id, " 359 "users.user_name from users", 360 bind=testing.db).execution_options(sqlite_raw_colnames=True). \ 361 execute().first() 362 363 if testing.against("sqlite < 3.10.0"): 364 not_in_('user_id', r) 365 not_in_('user_name', r) 366 eq_(r['users.user_id'], 1) 367 eq_(r['users.user_name'], "john") 368 369 eq_(list(r.keys()), ["users.user_id", "users.user_name"]) 370 else: 371 not_in_('users.user_id', r) 372 not_in_('users.user_name', r) 373 eq_(r['user_id'], 1) 374 eq_(r['user_name'], "john") 375 376 eq_(list(r.keys()), ["user_id", "user_name"]) 377 378 def test_column_accessor_sqlite_translated(self): 379 users = self.tables.users 380 381 users.insert().execute( 382 dict(user_id=1, user_name='john'), 383 ) 384 385 r = text( 386 "select users.user_id, users.user_name " 387 "from users " 388 "UNION select users.user_id, " 389 "users.user_name from users", 390 bind=testing.db).execute().first() 391 eq_(r['user_id'], 1) 392 eq_(r['user_name'], "john") 393 394 if testing.against("sqlite < 3.10.0"): 395 eq_(r['users.user_id'], 1) 396 eq_(r['users.user_name'], "john") 397 else: 398 not_in_('users.user_id', r) 399 not_in_('users.user_name', r) 400 401 eq_(list(r.keys()), ["user_id", "user_name"]) 402 403 def test_column_accessor_labels_w_dots(self): 404 users = self.tables.users 405 406 users.insert().execute( 407 dict(user_id=1, user_name='john'), 408 ) 409 # test using literal tablename.colname 410 r = text( 411 'select users.user_id AS "users.user_id", ' 412 'users.user_name AS "users.user_name" ' 413 'from users', bind=testing.db).\ 414 execution_options(sqlite_raw_colnames=True).execute().first() 415 eq_(r['users.user_id'], 1) 416 eq_(r['users.user_name'], "john") 417 not_in_("user_name", r) 418 eq_(list(r.keys()), ["users.user_id", "users.user_name"]) 419 420 def test_column_accessor_unary(self): 421 users = self.tables.users 422 423 users.insert().execute( 424 dict(user_id=1, user_name='john'), 425 ) 426 427 # unary expressions 428 r = select([users.c.user_name.distinct()]).order_by( 429 users.c.user_name).execute().first() 430 eq_(r[users.c.user_name], 'john') 431 eq_(r.user_name, 'john') 432 433 def test_column_accessor_err(self): 434 r = testing.db.execute(select([1])).first() 435 assert_raises_message( 436 AttributeError, 437 "Could not locate column in row for column 'foo'", 438 getattr, r, "foo" 439 ) 440 assert_raises_message( 441 KeyError, 442 "Could not locate column in row for column 'foo'", 443 lambda: r['foo'] 444 ) 445 446 def test_graceful_fetch_on_non_rows(self): 447 """test that calling fetchone() etc. on a result that doesn't 448 return rows fails gracefully. 449 450 """ 451 452 # these proxies don't work with no cursor.description present. 453 # so they don't apply to this test at the moment. 454 # result.FullyBufferedResultProxy, 455 # result.BufferedRowResultProxy, 456 # result.BufferedColumnResultProxy 457 458 users = self.tables.users 459 460 conn = testing.db.connect() 461 for meth in [ 462 lambda r: r.fetchone(), 463 lambda r: r.fetchall(), 464 lambda r: r.first(), 465 lambda r: r.scalar(), 466 lambda r: r.fetchmany(), 467 lambda r: r._getter('user'), 468 lambda r: r._has_key('user'), 469 ]: 470 trans = conn.begin() 471 result = conn.execute(users.insert(), user_id=1) 472 assert_raises_message( 473 exc.ResourceClosedError, 474 "This result object does not return rows. " 475 "It has been closed automatically.", 476 meth, result, 477 ) 478 trans.rollback() 479 480 def test_fetchone_til_end(self): 481 result = testing.db.execute("select * from users") 482 eq_(result.fetchone(), None) 483 eq_(result.fetchone(), None) 484 eq_(result.fetchone(), None) 485 result.close() 486 assert_raises_message( 487 exc.ResourceClosedError, 488 "This result object is closed.", 489 result.fetchone 490 ) 491 492 def test_connectionless_autoclose_rows_exhausted(self): 493 users = self.tables.users 494 users.insert().execute( 495 dict(user_id=1, user_name='john'), 496 ) 497 498 result = testing.db.execute("select * from users") 499 connection = result.connection 500 assert not connection.closed 501 eq_(result.fetchone(), (1, 'john')) 502 assert not connection.closed 503 eq_(result.fetchone(), None) 504 assert connection.closed 505 506 @testing.requires.returning 507 def test_connectionless_autoclose_crud_rows_exhausted(self): 508 users = self.tables.users 509 stmt = users.insert().values(user_id=1, user_name='john').\ 510 returning(users.c.user_id) 511 result = testing.db.execute(stmt) 512 connection = result.connection 513 assert not connection.closed 514 eq_(result.fetchone(), (1, )) 515 assert not connection.closed 516 eq_(result.fetchone(), None) 517 assert connection.closed 518 519 def test_connectionless_autoclose_no_rows(self): 520 result = testing.db.execute("select * from users") 521 connection = result.connection 522 assert not connection.closed 523 eq_(result.fetchone(), None) 524 assert connection.closed 525 526 def test_connectionless_autoclose_no_metadata(self): 527 result = testing.db.execute("update users set user_id=5") 528 connection = result.connection 529 assert connection.closed 530 assert_raises_message( 531 exc.ResourceClosedError, 532 "This result object does not return rows.", 533 result.fetchone 534 ) 535 536 def test_row_case_sensitive(self): 537 row = testing.db.execute( 538 select([ 539 literal_column("1").label("case_insensitive"), 540 literal_column("2").label("CaseSensitive") 541 ]) 542 ).first() 543 544 eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"]) 545 546 in_("case_insensitive", row._keymap) 547 in_("CaseSensitive", row._keymap) 548 not_in_("casesensitive", row._keymap) 549 550 eq_(row["case_insensitive"], 1) 551 eq_(row["CaseSensitive"], 2) 552 553 assert_raises( 554 KeyError, 555 lambda: row["Case_insensitive"] 556 ) 557 assert_raises( 558 KeyError, 559 lambda: row["casesensitive"] 560 ) 561 562 def test_row_case_sensitive_unoptimized(self): 563 ins_db = engines.testing_engine(options={"case_sensitive": True}) 564 row = ins_db.execute( 565 select([ 566 literal_column("1").label("case_insensitive"), 567 literal_column("2").label("CaseSensitive"), 568 text("3 AS screw_up_the_cols") 569 ]) 570 ).first() 571 572 eq_( 573 list(row.keys()), 574 ["case_insensitive", "CaseSensitive", "screw_up_the_cols"]) 575 576 in_("case_insensitive", row._keymap) 577 in_("CaseSensitive", row._keymap) 578 not_in_("casesensitive", row._keymap) 579 580 eq_(row["case_insensitive"], 1) 581 eq_(row["CaseSensitive"], 2) 582 eq_(row["screw_up_the_cols"], 3) 583 584 assert_raises(KeyError, lambda: row["Case_insensitive"]) 585 assert_raises(KeyError, lambda: row["casesensitive"]) 586 assert_raises(KeyError, lambda: row["screw_UP_the_cols"]) 587 588 def test_row_case_insensitive(self): 589 ins_db = engines.testing_engine(options={"case_sensitive": False}) 590 row = ins_db.execute( 591 select([ 592 literal_column("1").label("case_insensitive"), 593 literal_column("2").label("CaseSensitive") 594 ]) 595 ).first() 596 597 eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"]) 598 599 in_("case_insensitive", row._keymap) 600 in_("CaseSensitive", row._keymap) 601 in_("casesensitive", row._keymap) 602 603 eq_(row["case_insensitive"], 1) 604 eq_(row["CaseSensitive"], 2) 605 eq_(row["Case_insensitive"], 1) 606 eq_(row["casesensitive"], 2) 607 608 def test_row_case_insensitive_unoptimized(self): 609 ins_db = engines.testing_engine(options={"case_sensitive": False}) 610 row = ins_db.execute( 611 select([ 612 literal_column("1").label("case_insensitive"), 613 literal_column("2").label("CaseSensitive"), 614 text("3 AS screw_up_the_cols") 615 ]) 616 ).first() 617 618 eq_( 619 list(row.keys()), 620 ["case_insensitive", "CaseSensitive", "screw_up_the_cols"]) 621 622 in_("case_insensitive", row._keymap) 623 in_("CaseSensitive", row._keymap) 624 in_("casesensitive", row._keymap) 625 626 eq_(row["case_insensitive"], 1) 627 eq_(row["CaseSensitive"], 2) 628 eq_(row["screw_up_the_cols"], 3) 629 eq_(row["Case_insensitive"], 1) 630 eq_(row["casesensitive"], 2) 631 eq_(row["screw_UP_the_cols"], 3) 632 633 def test_row_as_args(self): 634 users = self.tables.users 635 636 users.insert().execute(user_id=1, user_name='john') 637 r = users.select(users.c.user_id == 1).execute().first() 638 users.delete().execute() 639 users.insert().execute(r) 640 eq_(users.select().execute().fetchall(), [(1, 'john')]) 641 642 def test_result_as_args(self): 643 users = self.tables.users 644 users2 = self.tables.users2 645 646 users.insert().execute([ 647 dict(user_id=1, user_name='john'), 648 dict(user_id=2, user_name='ed')]) 649 r = users.select().execute() 650 users2.insert().execute(list(r)) 651 eq_( 652 users2.select().order_by(users2.c.user_id).execute().fetchall(), 653 [(1, 'john'), (2, 'ed')] 654 ) 655 656 users2.delete().execute() 657 r = users.select().execute() 658 users2.insert().execute(*list(r)) 659 eq_( 660 users2.select().order_by(users2.c.user_id).execute().fetchall(), 661 [(1, 'john'), (2, 'ed')] 662 ) 663 664 @testing.requires.duplicate_names_in_cursor_description 665 def test_ambiguous_column(self): 666 users = self.tables.users 667 addresses = self.tables.addresses 668 669 users.insert().execute(user_id=1, user_name='john') 670 result = users.outerjoin(addresses).select().execute() 671 r = result.first() 672 673 assert_raises_message( 674 exc.InvalidRequestError, 675 "Ambiguous column name", 676 lambda: r['user_id'] 677 ) 678 679 # pure positional targeting; users.c.user_id 680 # and addresses.c.user_id are known! 681 # works as of 1.1 issue #3501 682 eq_(r[users.c.user_id], 1) 683 eq_(r[addresses.c.user_id], None) 684 685 # try to trick it - fake_table isn't in the result! 686 # we get the correct error 687 fake_table = Table('fake', MetaData(), Column('user_id', Integer)) 688 assert_raises_message( 689 exc.InvalidRequestError, 690 "Could not locate column in row for column 'fake.user_id'", 691 lambda: r[fake_table.c.user_id] 692 ) 693 694 r = util.pickle.loads(util.pickle.dumps(r)) 695 assert_raises_message( 696 exc.InvalidRequestError, 697 "Ambiguous column name", 698 lambda: r['user_id'] 699 ) 700 701 result = users.outerjoin(addresses).select().execute() 702 result = _result.BufferedColumnResultProxy(result.context) 703 r = result.first() 704 assert isinstance(r, _result.BufferedColumnRow) 705 assert_raises_message( 706 exc.InvalidRequestError, 707 "Ambiguous column name", 708 lambda: r['user_id'] 709 ) 710 711 @testing.requires.duplicate_names_in_cursor_description 712 def test_ambiguous_column_by_col(self): 713 users = self.tables.users 714 715 users.insert().execute(user_id=1, user_name='john') 716 ua = users.alias() 717 u2 = users.alias() 718 result = select([users.c.user_id, ua.c.user_id]).execute() 719 row = result.first() 720 721 # as of 1.1 issue #3501, we use pure positional 722 # targeting for the column objects here 723 eq_(row[users.c.user_id], 1) 724 725 eq_(row[ua.c.user_id], 1) 726 727 # this now works as of 1.1 issue #3501; 728 # previously this was stuck on "ambiguous column name" 729 assert_raises_message( 730 exc.InvalidRequestError, 731 "Could not locate column in row", 732 lambda: row[u2.c.user_id] 733 ) 734 735 @testing.requires.duplicate_names_in_cursor_description 736 def test_ambiguous_column_case_sensitive(self): 737 eng = engines.testing_engine(options=dict(case_sensitive=False)) 738 739 row = eng.execute(select([ 740 literal_column('1').label('SOMECOL'), 741 literal_column('1').label('SOMECOL'), 742 ])).first() 743 744 assert_raises_message( 745 exc.InvalidRequestError, 746 "Ambiguous column name", 747 lambda: row['somecol'] 748 ) 749 750 @testing.requires.duplicate_names_in_cursor_description 751 def test_ambiguous_column_contains(self): 752 users = self.tables.users 753 addresses = self.tables.addresses 754 755 # ticket 2702. in 0.7 we'd get True, False. 756 # in 0.8, both columns are present so it's True; 757 # but when they're fetched you'll get the ambiguous error. 758 users.insert().execute(user_id=1, user_name='john') 759 result = select([users.c.user_id, addresses.c.user_id]).\ 760 select_from(users.outerjoin(addresses)).execute() 761 row = result.first() 762 763 eq_( 764 set([users.c.user_id in row, addresses.c.user_id in row]), 765 set([True]) 766 ) 767 768 def test_ambiguous_column_by_col_plus_label(self): 769 users = self.tables.users 770 771 users.insert().execute(user_id=1, user_name='john') 772 result = select( 773 [users.c.user_id, 774 type_coerce(users.c.user_id, Integer).label('foo')]).execute() 775 row = result.first() 776 eq_( 777 row[users.c.user_id], 1 778 ) 779 eq_( 780 row[1], 1 781 ) 782 783 def test_fetch_partial_result_map(self): 784 users = self.tables.users 785 786 users.insert().execute(user_id=7, user_name='ed') 787 788 t = text("select * from users").columns( 789 user_name=String() 790 ) 791 eq_( 792 testing.db.execute(t).fetchall(), [(7, 'ed')] 793 ) 794 795 def test_fetch_unordered_result_map(self): 796 users = self.tables.users 797 798 users.insert().execute(user_id=7, user_name='ed') 799 800 class Goofy1(TypeDecorator): 801 impl = String 802 803 def process_result_value(self, value, dialect): 804 return value + "a" 805 806 class Goofy2(TypeDecorator): 807 impl = String 808 809 def process_result_value(self, value, dialect): 810 return value + "b" 811 812 class Goofy3(TypeDecorator): 813 impl = String 814 815 def process_result_value(self, value, dialect): 816 return value + "c" 817 818 t = text( 819 "select user_name as a, user_name as b, " 820 "user_name as c from users").columns( 821 a=Goofy1(), b=Goofy2(), c=Goofy3() 822 ) 823 eq_( 824 testing.db.execute(t).fetchall(), [ 825 ('eda', 'edb', 'edc') 826 ] 827 ) 828 829 @testing.requires.subqueries 830 def test_column_label_targeting(self): 831 users = self.tables.users 832 833 users.insert().execute(user_id=7, user_name='ed') 834 835 for s in ( 836 users.select().alias('foo'), 837 users.select().alias(users.name), 838 ): 839 row = s.select(use_labels=True).execute().first() 840 eq_(row[s.c.user_id], 7) 841 eq_(row[s.c.user_name], 'ed') 842 843 def test_keys(self): 844 users = self.tables.users 845 846 users.insert().execute(user_id=1, user_name='foo') 847 result = users.select().execute() 848 eq_( 849 result.keys(), 850 ['user_id', 'user_name'] 851 ) 852 row = result.first() 853 eq_( 854 row.keys(), 855 ['user_id', 'user_name'] 856 ) 857 858 def test_keys_anon_labels(self): 859 """test [ticket:3483]""" 860 861 users = self.tables.users 862 863 users.insert().execute(user_id=1, user_name='foo') 864 result = testing.db.execute( 865 select([ 866 users.c.user_id, 867 users.c.user_name.label(None), 868 func.count(literal_column('1'))]). 869 group_by(users.c.user_id, users.c.user_name) 870 ) 871 872 eq_( 873 result.keys(), 874 ['user_id', 'user_name_1', 'count_1'] 875 ) 876 row = result.first() 877 eq_( 878 row.keys(), 879 ['user_id', 'user_name_1', 'count_1'] 880 ) 881 882 def test_items(self): 883 users = self.tables.users 884 885 users.insert().execute(user_id=1, user_name='foo') 886 r = users.select().execute().first() 887 eq_( 888 [(x[0].lower(), x[1]) for x in list(r.items())], 889 [('user_id', 1), ('user_name', 'foo')]) 890 891 def test_len(self): 892 users = self.tables.users 893 894 users.insert().execute(user_id=1, user_name='foo') 895 r = users.select().execute().first() 896 eq_(len(r), 2) 897 898 r = testing.db.execute('select user_name, user_id from users'). \ 899 first() 900 eq_(len(r), 2) 901 r = testing.db.execute('select user_name from users').first() 902 eq_(len(r), 1) 903 904 def test_sorting_in_python(self): 905 users = self.tables.users 906 907 users.insert().execute( 908 dict(user_id=1, user_name='foo'), 909 dict(user_id=2, user_name='bar'), 910 dict(user_id=3, user_name='def'), 911 ) 912 913 rows = users.select().order_by(users.c.user_name).execute().fetchall() 914 915 eq_(rows, [(2, 'bar'), (3, 'def'), (1, 'foo')]) 916 917 eq_(sorted(rows), [(1, 'foo'), (2, 'bar'), (3, 'def')]) 918 919 def test_column_order_with_simple_query(self): 920 # should return values in column definition order 921 users = self.tables.users 922 923 users.insert().execute(user_id=1, user_name='foo') 924 r = users.select(users.c.user_id == 1).execute().first() 925 eq_(r[0], 1) 926 eq_(r[1], 'foo') 927 eq_([x.lower() for x in list(r.keys())], ['user_id', 'user_name']) 928 eq_(list(r.values()), [1, 'foo']) 929 930 def test_column_order_with_text_query(self): 931 # should return values in query order 932 users = self.tables.users 933 934 users.insert().execute(user_id=1, user_name='foo') 935 r = testing.db.execute('select user_name, user_id from users'). \ 936 first() 937 eq_(r[0], 'foo') 938 eq_(r[1], 1) 939 eq_([x.lower() for x in list(r.keys())], ['user_name', 'user_id']) 940 eq_(list(r.values()), ['foo', 1]) 941 942 @testing.crashes('oracle', 'FIXME: unknown, varify not fails_on()') 943 @testing.crashes('firebird', 'An identifier must begin with a letter') 944 @testing.provide_metadata 945 def test_column_accessor_shadow(self): 946 shadowed = Table( 947 'test_shadowed', self.metadata, 948 Column('shadow_id', INT, primary_key=True), 949 Column('shadow_name', VARCHAR(20)), 950 Column('parent', VARCHAR(20)), 951 Column('row', VARCHAR(40)), 952 Column('_parent', VARCHAR(20)), 953 Column('_row', VARCHAR(20)), 954 ) 955 self.metadata.create_all() 956 shadowed.insert().execute( 957 shadow_id=1, shadow_name='The Shadow', parent='The Light', 958 row='Without light there is no shadow', 959 _parent='Hidden parent', _row='Hidden row') 960 r = shadowed.select(shadowed.c.shadow_id == 1).execute().first() 961 962 eq_(r.shadow_id, 1) 963 eq_(r['shadow_id'], 1) 964 eq_(r[shadowed.c.shadow_id], 1) 965 966 eq_(r.shadow_name, 'The Shadow') 967 eq_(r['shadow_name'], 'The Shadow') 968 eq_(r[shadowed.c.shadow_name], 'The Shadow') 969 970 eq_(r.parent, 'The Light') 971 eq_(r['parent'], 'The Light') 972 eq_(r[shadowed.c.parent], 'The Light') 973 974 eq_(r.row, 'Without light there is no shadow') 975 eq_(r['row'], 'Without light there is no shadow') 976 eq_(r[shadowed.c.row], 'Without light there is no shadow') 977 978 eq_(r['_parent'], 'Hidden parent') 979 eq_(r['_row'], 'Hidden row') 980 981 def test_nontuple_row(self): 982 """ensure the C version of BaseRowProxy handles 983 duck-type-dependent rows.""" 984 985 from sqlalchemy.engine import RowProxy 986 987 class MyList(object): 988 989 def __init__(self, data): 990 self.internal_list = data 991 992 def __len__(self): 993 return len(self.internal_list) 994 995 def __getitem__(self, i): 996 return list.__getitem__(self.internal_list, i) 997 998 proxy = RowProxy(object(), MyList(['value']), [None], { 999 'key': (None, None, 0), 0: (None, None, 0)}) 1000 eq_(list(proxy), ['value']) 1001 eq_(proxy[0], 'value') 1002 eq_(proxy['key'], 'value') 1003 1004 @testing.provide_metadata 1005 def test_no_rowcount_on_selects_inserts(self): 1006 """assert that rowcount is only called on deletes and updates. 1007 1008 This because cursor.rowcount may can be expensive on some dialects 1009 such as Firebird, however many dialects require it be called 1010 before the cursor is closed. 1011 1012 """ 1013 1014 metadata = self.metadata 1015 1016 engine = engines.testing_engine() 1017 1018 t = Table('t1', metadata, 1019 Column('data', String(10)) 1020 ) 1021 metadata.create_all(engine) 1022 1023 with patch.object( 1024 engine.dialect.execution_ctx_cls, "rowcount") as mock_rowcount: 1025 mock_rowcount.__get__ = Mock() 1026 engine.execute(t.insert(), 1027 {'data': 'd1'}, 1028 {'data': 'd2'}, 1029 {'data': 'd3'}) 1030 1031 eq_(len(mock_rowcount.__get__.mock_calls), 0) 1032 1033 eq_( 1034 engine.execute(t.select()).fetchall(), 1035 [('d1', ), ('d2', ), ('d3', )] 1036 ) 1037 eq_(len(mock_rowcount.__get__.mock_calls), 0) 1038 1039 engine.execute(t.update(), {'data': 'd4'}) 1040 1041 eq_(len(mock_rowcount.__get__.mock_calls), 1) 1042 1043 engine.execute(t.delete()) 1044 eq_(len(mock_rowcount.__get__.mock_calls), 2) 1045 1046 def test_rowproxy_is_sequence(self): 1047 import collections 1048 from sqlalchemy.engine import RowProxy 1049 1050 row = RowProxy( 1051 object(), ['value'], [None], 1052 {'key': (None, None, 0), 0: (None, None, 0)}) 1053 assert isinstance(row, collections.Sequence) 1054 1055 @testing.provide_metadata 1056 def test_rowproxy_getitem_indexes_compiled(self): 1057 values = Table('rp', self.metadata, 1058 Column('key', String(10), primary_key=True), 1059 Column('value', String(10))) 1060 values.create() 1061 1062 testing.db.execute(values.insert(), dict(key='One', value='Uno')) 1063 row = testing.db.execute(values.select()).first() 1064 eq_(row['key'], 'One') 1065 eq_(row['value'], 'Uno') 1066 eq_(row[0], 'One') 1067 eq_(row[1], 'Uno') 1068 eq_(row[-2], 'One') 1069 eq_(row[-1], 'Uno') 1070 eq_(row[1:0:-1], ('Uno',)) 1071 1072 @testing.only_on("sqlite") 1073 def test_rowproxy_getitem_indexes_raw(self): 1074 row = testing.db.execute("select 'One' as key, 'Uno' as value").first() 1075 eq_(row['key'], 'One') 1076 eq_(row['value'], 'Uno') 1077 eq_(row[0], 'One') 1078 eq_(row[1], 'Uno') 1079 eq_(row[-2], 'One') 1080 eq_(row[-1], 'Uno') 1081 eq_(row[1:0:-1], ('Uno',)) 1082 1083 @testing.requires.cextensions 1084 def test_row_c_sequence_check(self): 1085 import csv 1086 1087 metadata = MetaData() 1088 metadata.bind = 'sqlite://' 1089 users = Table('users', metadata, 1090 Column('id', Integer, primary_key=True), 1091 Column('name', String(40)), 1092 ) 1093 users.create() 1094 1095 users.insert().execute(name='Test') 1096 row = users.select().execute().fetchone() 1097 1098 s = util.StringIO() 1099 writer = csv.writer(s) 1100 # csv performs PySequenceCheck call 1101 writer.writerow(row) 1102 assert s.getvalue().strip() == '1,Test' 1103 1104 @testing.requires.selectone 1105 def test_empty_accessors(self): 1106 statements = [ 1107 ( 1108 "select 1", 1109 [ 1110 lambda r: r.last_inserted_params(), 1111 lambda r: r.last_updated_params(), 1112 lambda r: r.prefetch_cols(), 1113 lambda r: r.postfetch_cols(), 1114 lambda r: r.inserted_primary_key 1115 ], 1116 "Statement is not a compiled expression construct." 1117 ), 1118 ( 1119 select([1]), 1120 [ 1121 lambda r: r.last_inserted_params(), 1122 lambda r: r.inserted_primary_key 1123 ], 1124 r"Statement is not an insert\(\) expression construct." 1125 ), 1126 ( 1127 select([1]), 1128 [ 1129 lambda r: r.last_updated_params(), 1130 ], 1131 r"Statement is not an update\(\) expression construct." 1132 ), 1133 ( 1134 select([1]), 1135 [ 1136 lambda r: r.prefetch_cols(), 1137 lambda r: r.postfetch_cols() 1138 ], 1139 r"Statement is not an insert\(\) " 1140 r"or update\(\) expression construct." 1141 ), 1142 ] 1143 1144 for stmt, meths, msg in statements: 1145 r = testing.db.execute(stmt) 1146 try: 1147 for meth in meths: 1148 assert_raises_message( 1149 sa_exc.InvalidRequestError, 1150 msg, 1151 meth, r 1152 ) 1153 1154 finally: 1155 r.close() 1156 1157 1158class KeyTargetingTest(fixtures.TablesTest): 1159 run_inserts = 'once' 1160 run_deletes = None 1161 __backend__ = True 1162 1163 @classmethod 1164 def define_tables(cls, metadata): 1165 Table( 1166 'keyed1', metadata, Column("a", CHAR(2), key="b"), 1167 Column("c", CHAR(2), key="q") 1168 ) 1169 Table('keyed2', metadata, Column("a", CHAR(2)), Column("b", CHAR(2))) 1170 Table('keyed3', metadata, Column("a", CHAR(2)), Column("d", CHAR(2))) 1171 Table('keyed4', metadata, Column("b", CHAR(2)), Column("q", CHAR(2))) 1172 Table('content', metadata, Column('t', String(30), key="type")) 1173 Table('bar', metadata, Column('ctype', String(30), key="content_type")) 1174 1175 if testing.requires.schemas.enabled: 1176 Table( 1177 'wschema', metadata, 1178 Column("a", CHAR(2), key="b"), 1179 Column("c", CHAR(2), key="q"), 1180 schema=testing.config.test_schema 1181 ) 1182 1183 @classmethod 1184 def insert_data(cls): 1185 cls.tables.keyed1.insert().execute(dict(b="a1", q="c1")) 1186 cls.tables.keyed2.insert().execute(dict(a="a2", b="b2")) 1187 cls.tables.keyed3.insert().execute(dict(a="a3", d="d3")) 1188 cls.tables.keyed4.insert().execute(dict(b="b4", q="q4")) 1189 cls.tables.content.insert().execute(type="t1") 1190 1191 if testing.requires.schemas.enabled: 1192 cls.tables[ 1193 '%s.wschema' % testing.config.test_schema].insert().execute( 1194 dict(b="a1", q="c1")) 1195 1196 @testing.requires.schemas 1197 def test_keyed_accessor_wschema(self): 1198 keyed1 = self.tables['%s.wschema' % testing.config.test_schema] 1199 row = testing.db.execute(keyed1.select()).first() 1200 1201 eq_(row.b, "a1") 1202 eq_(row.q, "c1") 1203 eq_(row.a, "a1") 1204 eq_(row.c, "c1") 1205 1206 def test_keyed_accessor_single(self): 1207 keyed1 = self.tables.keyed1 1208 row = testing.db.execute(keyed1.select()).first() 1209 1210 eq_(row.b, "a1") 1211 eq_(row.q, "c1") 1212 eq_(row.a, "a1") 1213 eq_(row.c, "c1") 1214 1215 def test_keyed_accessor_single_labeled(self): 1216 keyed1 = self.tables.keyed1 1217 row = testing.db.execute(keyed1.select().apply_labels()).first() 1218 1219 eq_(row.keyed1_b, "a1") 1220 eq_(row.keyed1_q, "c1") 1221 eq_(row.keyed1_a, "a1") 1222 eq_(row.keyed1_c, "c1") 1223 1224 @testing.requires.duplicate_names_in_cursor_description 1225 def test_keyed_accessor_composite_conflict_2(self): 1226 keyed1 = self.tables.keyed1 1227 keyed2 = self.tables.keyed2 1228 1229 row = testing.db.execute(select([keyed1, keyed2])).first() 1230 # row.b is unambiguous 1231 eq_(row.b, "b2") 1232 # row.a is ambiguous 1233 assert_raises_message( 1234 exc.InvalidRequestError, 1235 "Ambig", 1236 getattr, row, "a" 1237 ) 1238 1239 def test_keyed_accessor_composite_names_precedent(self): 1240 keyed1 = self.tables.keyed1 1241 keyed4 = self.tables.keyed4 1242 1243 row = testing.db.execute(select([keyed1, keyed4])).first() 1244 eq_(row.b, "b4") 1245 eq_(row.q, "q4") 1246 eq_(row.a, "a1") 1247 eq_(row.c, "c1") 1248 1249 @testing.requires.duplicate_names_in_cursor_description 1250 def test_keyed_accessor_composite_keys_precedent(self): 1251 keyed1 = self.tables.keyed1 1252 keyed3 = self.tables.keyed3 1253 1254 row = testing.db.execute(select([keyed1, keyed3])).first() 1255 eq_(row.q, "c1") 1256 assert_raises_message( 1257 exc.InvalidRequestError, 1258 "Ambiguous column name 'a'", 1259 getattr, row, "b" 1260 ) 1261 assert_raises_message( 1262 exc.InvalidRequestError, 1263 "Ambiguous column name 'a'", 1264 getattr, row, "a" 1265 ) 1266 eq_(row.d, "d3") 1267 1268 def test_keyed_accessor_composite_labeled(self): 1269 keyed1 = self.tables.keyed1 1270 keyed2 = self.tables.keyed2 1271 1272 row = testing.db.execute(select([keyed1, keyed2]).apply_labels()). \ 1273 first() 1274 eq_(row.keyed1_b, "a1") 1275 eq_(row.keyed1_a, "a1") 1276 eq_(row.keyed1_q, "c1") 1277 eq_(row.keyed1_c, "c1") 1278 eq_(row.keyed2_a, "a2") 1279 eq_(row.keyed2_b, "b2") 1280 assert_raises(KeyError, lambda: row['keyed2_c']) 1281 assert_raises(KeyError, lambda: row['keyed2_q']) 1282 1283 def test_column_label_overlap_fallback(self): 1284 content, bar = self.tables.content, self.tables.bar 1285 row = testing.db.execute( 1286 select([content.c.type.label("content_type")])).first() 1287 1288 not_in_(content.c.type, row) 1289 not_in_(bar.c.content_type, row) 1290 1291 in_(sql.column('content_type'), row) 1292 1293 row = testing.db.execute(select([func.now().label("content_type")])). \ 1294 first() 1295 not_in_(content.c.type, row) 1296 not_in_(bar.c.content_type, row) 1297 in_(sql.column('content_type'), row) 1298 1299 def test_column_label_overlap_fallback_2(self): 1300 content, bar = self.tables.content, self.tables.bar 1301 row = testing.db.execute(content.select(use_labels=True)).first() 1302 in_(content.c.type, row) 1303 not_in_(bar.c.content_type, row) 1304 not_in_(sql.column('content_type'), row) 1305 1306 def test_columnclause_schema_column_one(self): 1307 keyed2 = self.tables.keyed2 1308 1309 # this is addressed by [ticket:2932] 1310 # ColumnClause._compare_name_for_result allows the 1311 # columns which the statement is against to be lightweight 1312 # cols, which results in a more liberal comparison scheme 1313 a, b = sql.column('a'), sql.column('b') 1314 stmt = select([a, b]).select_from(table("keyed2")) 1315 row = testing.db.execute(stmt).first() 1316 1317 in_(keyed2.c.a, row) 1318 in_(keyed2.c.b, row) 1319 in_(a, row) 1320 in_(b, row) 1321 1322 def test_columnclause_schema_column_two(self): 1323 keyed2 = self.tables.keyed2 1324 1325 a, b = sql.column('a'), sql.column('b') 1326 stmt = select([keyed2.c.a, keyed2.c.b]) 1327 row = testing.db.execute(stmt).first() 1328 1329 in_(keyed2.c.a, row) 1330 in_(keyed2.c.b, row) 1331 in_(a, row) 1332 in_(b, row) 1333 1334 def test_columnclause_schema_column_three(self): 1335 keyed2 = self.tables.keyed2 1336 1337 # this is also addressed by [ticket:2932] 1338 1339 a, b = sql.column('a'), sql.column('b') 1340 stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR) 1341 row = testing.db.execute(stmt).first() 1342 1343 in_(keyed2.c.a, row) 1344 in_(keyed2.c.b, row) 1345 in_(a, row) 1346 in_(b, row) 1347 in_(stmt.c.a, row) 1348 in_(stmt.c.b, row) 1349 1350 def test_columnclause_schema_column_four(self): 1351 keyed2 = self.tables.keyed2 1352 1353 # this is also addressed by [ticket:2932] 1354 1355 a, b = sql.column('keyed2_a'), sql.column('keyed2_b') 1356 stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( 1357 a, b) 1358 row = testing.db.execute(stmt).first() 1359 1360 in_(keyed2.c.a, row) 1361 in_(keyed2.c.b, row) 1362 in_(a, row) 1363 in_(b, row) 1364 in_(stmt.c.keyed2_a, row) 1365 in_(stmt.c.keyed2_b, row) 1366 1367 def test_columnclause_schema_column_five(self): 1368 keyed2 = self.tables.keyed2 1369 1370 # this is also addressed by [ticket:2932] 1371 1372 stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( 1373 keyed2_a=CHAR, keyed2_b=CHAR) 1374 row = testing.db.execute(stmt).first() 1375 1376 in_(keyed2.c.a, row) 1377 in_(keyed2.c.b, row) 1378 in_(stmt.c.keyed2_a, row) 1379 in_(stmt.c.keyed2_b, row) 1380 1381 1382class PositionalTextTest(fixtures.TablesTest): 1383 run_inserts = 'once' 1384 run_deletes = None 1385 __backend__ = True 1386 1387 @classmethod 1388 def define_tables(cls, metadata): 1389 Table( 1390 'text1', 1391 metadata, 1392 Column("a", CHAR(2)), 1393 Column("b", CHAR(2)), 1394 Column("c", CHAR(2)), 1395 Column("d", CHAR(2)) 1396 ) 1397 1398 @classmethod 1399 def insert_data(cls): 1400 cls.tables.text1.insert().execute([ 1401 dict(a="a1", b="b1", c="c1", d="d1"), 1402 ]) 1403 1404 def test_via_column(self): 1405 c1, c2, c3, c4 = column('q'), column('p'), column('r'), column('d') 1406 stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) 1407 1408 result = testing.db.execute(stmt) 1409 row = result.first() 1410 1411 eq_(row[c2], "b1") 1412 eq_(row[c4], "d1") 1413 eq_(row[1], "b1") 1414 eq_(row["b"], "b1") 1415 eq_(row.keys(), ["a", "b", "c", "d"]) 1416 eq_(row["r"], "c1") 1417 eq_(row["d"], "d1") 1418 1419 def test_fewer_cols_than_sql_positional(self): 1420 c1, c2 = column('q'), column('p') 1421 stmt = text("select a, b, c, d from text1").columns(c1, c2) 1422 1423 # no warning as this can be similar for non-positional 1424 result = testing.db.execute(stmt) 1425 row = result.first() 1426 1427 eq_(row[c1], "a1") 1428 eq_(row["c"], "c1") 1429 1430 def test_fewer_cols_than_sql_non_positional(self): 1431 c1, c2 = column('a'), column('p') 1432 stmt = text("select a, b, c, d from text1").columns(c2, c1, d=CHAR) 1433 1434 # no warning as this can be similar for non-positional 1435 result = testing.db.execute(stmt) 1436 row = result.first() 1437 1438 # c1 name matches, locates 1439 eq_(row[c1], "a1") 1440 eq_(row["c"], "c1") 1441 1442 # c2 name does not match, doesn't locate 1443 assert_raises_message( 1444 exc.NoSuchColumnError, 1445 "in row for column 'p'", 1446 lambda: row[c2] 1447 ) 1448 1449 def test_more_cols_than_sql(self): 1450 c1, c2, c3, c4 = column('q'), column('p'), column('r'), column('d') 1451 stmt = text("select a, b from text1").columns(c1, c2, c3, c4) 1452 1453 with assertions.expect_warnings( 1454 r"Number of columns in textual SQL \(4\) is " 1455 r"smaller than number of columns requested \(2\)"): 1456 result = testing.db.execute(stmt) 1457 1458 row = result.first() 1459 eq_(row[c2], "b1") 1460 1461 assert_raises_message( 1462 exc.NoSuchColumnError, 1463 "in row for column 'r'", 1464 lambda: row[c3] 1465 ) 1466 1467 def test_dupe_col_obj(self): 1468 c1, c2, c3 = column('q'), column('p'), column('r') 1469 stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c2) 1470 1471 assert_raises_message( 1472 exc.InvalidRequestError, 1473 "Duplicate column expression requested in " 1474 "textual SQL: <.*.ColumnClause.*; p>", 1475 testing.db.execute, stmt 1476 ) 1477 1478 def test_anon_aliased_unique(self): 1479 text1 = self.tables.text1 1480 1481 c1 = text1.c.a.label(None) 1482 c2 = text1.alias().c.c 1483 c3 = text1.alias().c.b 1484 c4 = text1.alias().c.d.label(None) 1485 1486 stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) 1487 result = testing.db.execute(stmt) 1488 row = result.first() 1489 1490 eq_(row[c1], "a1") 1491 eq_(row[c2], "b1") 1492 eq_(row[c3], "c1") 1493 eq_(row[c4], "d1") 1494 1495 # key fallback rules still match this to a column 1496 # unambiguously based on its name 1497 eq_(row[text1.c.a], "a1") 1498 1499 # key fallback rules still match this to a column 1500 # unambiguously based on its name 1501 eq_(row[text1.c.d], "d1") 1502 1503 # text1.c.b goes nowhere....because we hit key fallback 1504 # but the text1.c.b doesn't derive from text1.c.c 1505 assert_raises_message( 1506 exc.NoSuchColumnError, 1507 "Could not locate column in row for column 'text1.b'", 1508 lambda: row[text1.c.b] 1509 ) 1510 1511 def test_anon_aliased_overlapping(self): 1512 text1 = self.tables.text1 1513 1514 c1 = text1.c.a.label(None) 1515 c2 = text1.alias().c.a 1516 c3 = text1.alias().c.a.label(None) 1517 c4 = text1.c.a.label(None) 1518 1519 stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) 1520 result = testing.db.execute(stmt) 1521 row = result.first() 1522 1523 eq_(row[c1], "a1") 1524 eq_(row[c2], "b1") 1525 eq_(row[c3], "c1") 1526 eq_(row[c4], "d1") 1527 1528 # key fallback rules still match this to a column 1529 # unambiguously based on its name 1530 eq_(row[text1.c.a], "a1") 1531 1532 def test_anon_aliased_name_conflict(self): 1533 text1 = self.tables.text1 1534 1535 c1 = text1.c.a.label("a") 1536 c2 = text1.alias().c.a 1537 c3 = text1.alias().c.a.label("a") 1538 c4 = text1.c.a.label("a") 1539 1540 # all cols are named "a". if we are positional, we don't care. 1541 # this is new logic in 1.1 1542 stmt = text("select a, b as a, c as a, d as a from text1").columns( 1543 c1, c2, c3, c4) 1544 result = testing.db.execute(stmt) 1545 row = result.first() 1546 1547 eq_(row[c1], "a1") 1548 eq_(row[c2], "b1") 1549 eq_(row[c3], "c1") 1550 eq_(row[c4], "d1") 1551 1552 # fails, because we hit key fallback and find conflicts 1553 # in columns that are presnet 1554 assert_raises_message( 1555 exc.NoSuchColumnError, 1556 "Could not locate column in row for column 'text1.a'", 1557 lambda: row[text1.c.a] 1558 ) 1559 1560 1561class AlternateResultProxyTest(fixtures.TablesTest): 1562 __requires__ = ('sqlite', ) 1563 1564 @classmethod 1565 def setup_bind(cls): 1566 cls.engine = engine = engines.testing_engine('sqlite://') 1567 return engine 1568 1569 @classmethod 1570 def define_tables(cls, metadata): 1571 Table( 1572 'test', metadata, 1573 Column('x', Integer, primary_key=True), 1574 Column('y', String(50, convert_unicode='force')) 1575 ) 1576 1577 @classmethod 1578 def insert_data(cls): 1579 cls.engine.execute(cls.tables.test.insert(), [ 1580 {'x': i, 'y': "t_%d" % i} for i in range(1, 12) 1581 ]) 1582 1583 @contextmanager 1584 def _proxy_fixture(self, cls): 1585 self.table = self.tables.test 1586 1587 class ExcCtx(default.DefaultExecutionContext): 1588 1589 def get_result_proxy(self): 1590 return cls(self) 1591 self.patcher = patch.object( 1592 self.engine.dialect, "execution_ctx_cls", ExcCtx) 1593 with self.patcher: 1594 yield 1595 1596 def _test_proxy(self, cls): 1597 with self._proxy_fixture(cls): 1598 rows = [] 1599 r = self.engine.execute(select([self.table])) 1600 assert isinstance(r, cls) 1601 for i in range(5): 1602 rows.append(r.fetchone()) 1603 eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) 1604 1605 rows = r.fetchmany(3) 1606 eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)]) 1607 1608 rows = r.fetchall() 1609 eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)]) 1610 1611 r = self.engine.execute(select([self.table])) 1612 rows = r.fetchmany(None) 1613 eq_(rows[0], (1, "t_1")) 1614 # number of rows here could be one, or the whole thing 1615 assert len(rows) == 1 or len(rows) == 11 1616 1617 r = self.engine.execute(select([self.table]).limit(1)) 1618 r.fetchone() 1619 eq_(r.fetchone(), None) 1620 1621 r = self.engine.execute(select([self.table]).limit(5)) 1622 rows = r.fetchmany(6) 1623 eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) 1624 1625 # result keeps going just fine with blank results... 1626 eq_(r.fetchmany(2), []) 1627 1628 eq_(r.fetchmany(2), []) 1629 1630 eq_(r.fetchall(), []) 1631 1632 eq_(r.fetchone(), None) 1633 1634 # until we close 1635 r.close() 1636 1637 self._assert_result_closed(r) 1638 1639 r = self.engine.execute(select([self.table]).limit(5)) 1640 eq_(r.first(), (1, "t_1")) 1641 self._assert_result_closed(r) 1642 1643 r = self.engine.execute(select([self.table]).limit(5)) 1644 eq_(r.scalar(), 1) 1645 self._assert_result_closed(r) 1646 1647 def _assert_result_closed(self, r): 1648 assert_raises_message( 1649 sa_exc.ResourceClosedError, 1650 "object is closed", 1651 r.fetchone 1652 ) 1653 1654 assert_raises_message( 1655 sa_exc.ResourceClosedError, 1656 "object is closed", 1657 r.fetchmany, 2 1658 ) 1659 1660 assert_raises_message( 1661 sa_exc.ResourceClosedError, 1662 "object is closed", 1663 r.fetchall 1664 ) 1665 1666 def test_basic_plain(self): 1667 self._test_proxy(_result.ResultProxy) 1668 1669 def test_basic_buffered_row_result_proxy(self): 1670 self._test_proxy(_result.BufferedRowResultProxy) 1671 1672 def test_basic_fully_buffered_result_proxy(self): 1673 self._test_proxy(_result.FullyBufferedResultProxy) 1674 1675 def test_basic_buffered_column_result_proxy(self): 1676 self._test_proxy(_result.BufferedColumnResultProxy) 1677 1678 def test_resultprocessor_plain(self): 1679 self._test_result_processor(_result.ResultProxy, False) 1680 1681 def test_resultprocessor_plain_cached(self): 1682 self._test_result_processor(_result.ResultProxy, True) 1683 1684 def test_resultprocessor_buffered_column(self): 1685 self._test_result_processor(_result.BufferedColumnResultProxy, False) 1686 1687 def test_resultprocessor_buffered_column_cached(self): 1688 self._test_result_processor(_result.BufferedColumnResultProxy, True) 1689 1690 def test_resultprocessor_buffered_row(self): 1691 self._test_result_processor(_result.BufferedRowResultProxy, False) 1692 1693 def test_resultprocessor_buffered_row_cached(self): 1694 self._test_result_processor(_result.BufferedRowResultProxy, True) 1695 1696 def test_resultprocessor_fully_buffered(self): 1697 self._test_result_processor(_result.FullyBufferedResultProxy, False) 1698 1699 def test_resultprocessor_fully_buffered_cached(self): 1700 self._test_result_processor(_result.FullyBufferedResultProxy, True) 1701 1702 def _test_result_processor(self, cls, use_cache): 1703 class MyType(TypeDecorator): 1704 impl = String() 1705 1706 def process_result_value(self, value, dialect): 1707 return "HI " + value 1708 1709 with self._proxy_fixture(cls): 1710 with self.engine.connect() as conn: 1711 if use_cache: 1712 cache = {} 1713 conn = conn.execution_options(compiled_cache=cache) 1714 1715 stmt = select([literal("THERE", type_=MyType())]) 1716 for i in range(2): 1717 r = conn.execute(stmt) 1718 eq_(r.scalar(), "HI THERE") 1719 1720 def test_buffered_row_growth(self): 1721 with self._proxy_fixture(_result.BufferedRowResultProxy): 1722 with self.engine.connect() as conn: 1723 conn.execute(self.table.insert(), [ 1724 {'x': i, 'y': "t_%d" % i} for i in range(15, 1200) 1725 ]) 1726 result = conn.execute(self.table.select()) 1727 checks = { 1728 0: 5, 1: 10, 9: 20, 135: 250, 274: 500, 1729 1351: 1000 1730 } 1731 for idx, row in enumerate(result, 0): 1732 if idx in checks: 1733 eq_(result._bufsize, checks[idx]) 1734 le_( 1735 len(result._BufferedRowResultProxy__rowbuffer), 1736 1000 1737 ) 1738 1739 def test_max_row_buffer_option(self): 1740 with self._proxy_fixture(_result.BufferedRowResultProxy): 1741 with self.engine.connect() as conn: 1742 conn.execute(self.table.insert(), [ 1743 {'x': i, 'y': "t_%d" % i} for i in range(15, 1200) 1744 ]) 1745 result = conn.execution_options(max_row_buffer=27).execute( 1746 self.table.select() 1747 ) 1748 for idx, row in enumerate(result, 0): 1749 if idx in (16, 70, 150, 250): 1750 eq_(result._bufsize, 27) 1751 le_( 1752 len(result._BufferedRowResultProxy__rowbuffer), 1753 27 1754 ) 1755