1from contextlib import contextmanager 2import operator 3 4from sqlalchemy import CHAR 5from sqlalchemy import column 6from sqlalchemy import exc 7from sqlalchemy import exc as sa_exc 8from sqlalchemy import ForeignKey 9from sqlalchemy import func 10from sqlalchemy import INT 11from sqlalchemy import Integer 12from sqlalchemy import literal 13from sqlalchemy import literal_column 14from sqlalchemy import MetaData 15from sqlalchemy import select 16from sqlalchemy import sql 17from sqlalchemy import String 18from sqlalchemy import table 19from sqlalchemy import testing 20from sqlalchemy import text 21from sqlalchemy import type_coerce 22from sqlalchemy import TypeDecorator 23from sqlalchemy import util 24from sqlalchemy import VARCHAR 25from sqlalchemy.engine import default 26from sqlalchemy.engine import result as _result 27from sqlalchemy.testing import assert_raises 28from sqlalchemy.testing import assert_raises_message 29from sqlalchemy.testing import assertions 30from sqlalchemy.testing import engines 31from sqlalchemy.testing import eq_ 32from sqlalchemy.testing import fixtures 33from sqlalchemy.testing import in_ 34from sqlalchemy.testing import is_ 35from sqlalchemy.testing import le_ 36from sqlalchemy.testing import ne_ 37from sqlalchemy.testing import not_in_ 38from sqlalchemy.testing.mock import Mock 39from sqlalchemy.testing.mock import patch 40from sqlalchemy.testing.schema import Column 41from sqlalchemy.testing.schema import Table 42 43 44class ResultProxyTest(fixtures.TablesTest): 45 __backend__ = True 46 47 @classmethod 48 def define_tables(cls, metadata): 49 Table( 50 "users", 51 metadata, 52 Column( 53 "user_id", INT, primary_key=True, test_needs_autoincrement=True 54 ), 55 Column("user_name", VARCHAR(20)), 56 test_needs_acid=True, 57 ) 58 Table( 59 "addresses", 60 metadata, 61 Column( 62 "address_id", 63 Integer, 64 primary_key=True, 65 test_needs_autoincrement=True, 66 ), 67 Column("user_id", Integer, ForeignKey("users.user_id")), 68 Column("address", String(30)), 69 test_needs_acid=True, 70 ) 71 72 Table( 73 "users2", 74 metadata, 75 Column("user_id", INT, primary_key=True), 76 Column("user_name", VARCHAR(20)), 77 test_needs_acid=True, 78 ) 79 80 def test_row_iteration(self): 81 users = self.tables.users 82 83 users.insert().execute( 84 {"user_id": 7, "user_name": "jack"}, 85 {"user_id": 8, "user_name": "ed"}, 86 {"user_id": 9, "user_name": "fred"}, 87 ) 88 r = users.select().execute() 89 rows = [] 90 for row in r: 91 rows.append(row) 92 eq_(len(rows), 3) 93 94 def test_row_next(self): 95 users = self.tables.users 96 97 users.insert().execute( 98 {"user_id": 7, "user_name": "jack"}, 99 {"user_id": 8, "user_name": "ed"}, 100 {"user_id": 9, "user_name": "fred"}, 101 ) 102 r = users.select().execute() 103 rows = [] 104 while True: 105 row = next(r, "foo") 106 if row == "foo": 107 break 108 rows.append(row) 109 eq_(len(rows), 3) 110 111 @testing.requires.subqueries 112 def test_anonymous_rows(self): 113 users = self.tables.users 114 115 users.insert().execute( 116 {"user_id": 7, "user_name": "jack"}, 117 {"user_id": 8, "user_name": "ed"}, 118 {"user_id": 9, "user_name": "fred"}, 119 ) 120 121 sel = ( 122 select([users.c.user_id]) 123 .where(users.c.user_name == "jack") 124 .as_scalar() 125 ) 126 for row in select([sel + 1, sel + 3], bind=users.bind).execute(): 127 eq_(row["anon_1"], 8) 128 eq_(row["anon_2"], 10) 129 130 def test_row_comparison(self): 131 users = self.tables.users 132 133 users.insert().execute(user_id=7, user_name="jack") 134 rp = users.select().execute().first() 135 136 eq_(rp, rp) 137 is_(not (rp != rp), True) 138 139 equal = (7, "jack") 140 141 eq_(rp, equal) 142 eq_(equal, rp) 143 is_((not (rp != equal)), True) 144 is_(not (equal != equal), True) 145 146 def endless(): 147 while True: 148 yield 1 149 150 ne_(rp, endless()) 151 ne_(endless(), rp) 152 153 # test that everything compares the same 154 # as it would against a tuple 155 for compare in [False, 8, endless(), "xyz", (7, "jack")]: 156 for op in [ 157 operator.eq, 158 operator.ne, 159 operator.gt, 160 operator.lt, 161 operator.ge, 162 operator.le, 163 ]: 164 165 try: 166 control = op(equal, compare) 167 except TypeError: 168 # Py3K raises TypeError for some invalid comparisons 169 assert_raises(TypeError, op, rp, compare) 170 else: 171 eq_(control, op(rp, compare)) 172 173 try: 174 control = op(compare, equal) 175 except TypeError: 176 # Py3K raises TypeError for some invalid comparisons 177 assert_raises(TypeError, op, compare, rp) 178 else: 179 eq_(control, op(compare, rp)) 180 181 @testing.provide_metadata 182 def test_column_label_overlap_fallback(self): 183 content = Table("content", self.metadata, Column("type", String(30))) 184 bar = Table("bar", self.metadata, Column("content_type", String(30))) 185 self.metadata.create_all(testing.db) 186 testing.db.execute(content.insert().values(type="t1")) 187 188 row = testing.db.execute(content.select(use_labels=True)).first() 189 in_(content.c.type, row) 190 not_in_(bar.c.content_type, row) 191 in_(sql.column("content_type"), row) 192 193 row = testing.db.execute( 194 select([content.c.type.label("content_type")]) 195 ).first() 196 in_(content.c.type, row) 197 198 not_in_(bar.c.content_type, row) 199 200 in_(sql.column("content_type"), row) 201 202 row = testing.db.execute( 203 select([func.now().label("content_type")]) 204 ).first() 205 not_in_(content.c.type, row) 206 207 not_in_(bar.c.content_type, row) 208 209 in_(sql.column("content_type"), row) 210 211 def test_pickled_rows(self): 212 users = self.tables.users 213 addresses = self.tables.addresses 214 215 users.insert().execute( 216 {"user_id": 7, "user_name": "jack"}, 217 {"user_id": 8, "user_name": "ed"}, 218 {"user_id": 9, "user_name": "fred"}, 219 ) 220 221 for pickle in False, True: 222 for use_labels in False, True: 223 result = ( 224 users.select(use_labels=use_labels) 225 .order_by(users.c.user_id) 226 .execute() 227 .fetchall() 228 ) 229 230 if pickle: 231 result = util.pickle.loads(util.pickle.dumps(result)) 232 233 eq_(result, [(7, "jack"), (8, "ed"), (9, "fred")]) 234 if use_labels: 235 eq_(result[0]["users_user_id"], 7) 236 eq_( 237 list(result[0].keys()), 238 ["users_user_id", "users_user_name"], 239 ) 240 else: 241 eq_(result[0]["user_id"], 7) 242 eq_(list(result[0].keys()), ["user_id", "user_name"]) 243 244 eq_(result[0][0], 7) 245 eq_(result[0][users.c.user_id], 7) 246 eq_(result[0][users.c.user_name], "jack") 247 248 if not pickle or use_labels: 249 assert_raises( 250 exc.NoSuchColumnError, 251 lambda: result[0][addresses.c.user_id], 252 ) 253 else: 254 # test with a different table. name resolution is 255 # causing 'user_id' to match when use_labels wasn't used. 256 eq_(result[0][addresses.c.user_id], 7) 257 258 assert_raises( 259 exc.NoSuchColumnError, lambda: result[0]["fake key"] 260 ) 261 assert_raises( 262 exc.NoSuchColumnError, 263 lambda: result[0][addresses.c.address_id], 264 ) 265 266 def test_column_error_printing(self): 267 result = testing.db.execute(select([1])) 268 row = result.first() 269 270 class unprintable(object): 271 def __str__(self): 272 raise ValueError("nope") 273 274 msg = r"Could not locate column in row for column '%s'" 275 276 for accessor, repl in [ 277 ("x", "x"), 278 (Column("q", Integer), "q"), 279 (Column("q", Integer) + 12, r"q \+ :q_1"), 280 (unprintable(), "unprintable element.*"), 281 ]: 282 assert_raises_message( 283 exc.NoSuchColumnError, msg % repl, result._getter, accessor 284 ) 285 286 is_(result._getter(accessor, False), None) 287 288 assert_raises_message( 289 exc.NoSuchColumnError, msg % repl, lambda: row[accessor] 290 ) 291 292 def test_fetchmany(self): 293 users = self.tables.users 294 295 users.insert().execute(user_id=7, user_name="jack") 296 users.insert().execute(user_id=8, user_name="ed") 297 users.insert().execute(user_id=9, user_name="fred") 298 r = users.select().execute() 299 rows = [] 300 for row in r.fetchmany(size=2): 301 rows.append(row) 302 eq_(len(rows), 2) 303 304 def test_column_slices(self): 305 users = self.tables.users 306 addresses = self.tables.addresses 307 308 users.insert().execute(user_id=1, user_name="john") 309 users.insert().execute(user_id=2, user_name="jack") 310 addresses.insert().execute( 311 address_id=1, user_id=2, address="foo@bar.com" 312 ) 313 314 r = text("select * from addresses", bind=testing.db).execute().first() 315 eq_(r[0:1], (1,)) 316 eq_(r[1:], (2, "foo@bar.com")) 317 eq_(r[:-1], (1, 2)) 318 319 def test_column_accessor_basic_compiled(self): 320 users = self.tables.users 321 322 users.insert().execute( 323 dict(user_id=1, user_name="john"), 324 dict(user_id=2, user_name="jack"), 325 ) 326 327 r = users.select(users.c.user_id == 2).execute().first() 328 eq_(r.user_id, 2) 329 eq_(r["user_id"], 2) 330 eq_(r[users.c.user_id], 2) 331 332 eq_(r.user_name, "jack") 333 eq_(r["user_name"], "jack") 334 eq_(r[users.c.user_name], "jack") 335 336 def test_column_accessor_basic_text(self): 337 users = self.tables.users 338 339 users.insert().execute( 340 dict(user_id=1, user_name="john"), 341 dict(user_id=2, user_name="jack"), 342 ) 343 r = testing.db.execute( 344 text("select * from users where user_id=2") 345 ).first() 346 347 eq_(r.user_id, 2) 348 eq_(r["user_id"], 2) 349 eq_(r[users.c.user_id], 2) 350 351 eq_(r.user_name, "jack") 352 eq_(r["user_name"], "jack") 353 eq_(r[users.c.user_name], "jack") 354 355 def test_column_accessor_textual_select(self): 356 users = self.tables.users 357 358 users.insert().execute( 359 dict(user_id=1, user_name="john"), 360 dict(user_id=2, user_name="jack"), 361 ) 362 # this will create column() objects inside 363 # the select(), these need to match on name anyway 364 r = testing.db.execute( 365 select([column("user_id"), column("user_name")]) 366 .select_from(table("users")) 367 .where(text("user_id=2")) 368 ).first() 369 370 eq_(r.user_id, 2) 371 eq_(r["user_id"], 2) 372 eq_(r[users.c.user_id], 2) 373 374 eq_(r.user_name, "jack") 375 eq_(r["user_name"], "jack") 376 eq_(r[users.c.user_name], "jack") 377 378 def test_column_accessor_dotted_union(self): 379 users = self.tables.users 380 381 users.insert().execute(dict(user_id=1, user_name="john")) 382 383 # test a little sqlite < 3.10.0 weirdness - with the UNION, 384 # cols come back as "users.user_id" in cursor.description 385 r = testing.db.execute( 386 text( 387 "select users.user_id, users.user_name " 388 "from users " 389 "UNION select users.user_id, " 390 "users.user_name from users" 391 ) 392 ).first() 393 eq_(r["user_id"], 1) 394 eq_(r["user_name"], "john") 395 eq_(list(r.keys()), ["user_id", "user_name"]) 396 397 def test_column_accessor_sqlite_raw(self): 398 users = self.tables.users 399 400 users.insert().execute(dict(user_id=1, user_name="john")) 401 402 r = ( 403 text( 404 "select users.user_id, users.user_name " 405 "from users " 406 "UNION select users.user_id, " 407 "users.user_name from users", 408 bind=testing.db, 409 ) 410 .execution_options(sqlite_raw_colnames=True) 411 .execute() 412 .first() 413 ) 414 415 if testing.against("sqlite < 3.10.0"): 416 not_in_("user_id", r) 417 not_in_("user_name", r) 418 eq_(r["users.user_id"], 1) 419 eq_(r["users.user_name"], "john") 420 421 eq_(list(r.keys()), ["users.user_id", "users.user_name"]) 422 else: 423 not_in_("users.user_id", r) 424 not_in_("users.user_name", r) 425 eq_(r["user_id"], 1) 426 eq_(r["user_name"], "john") 427 428 eq_(list(r.keys()), ["user_id", "user_name"]) 429 430 def test_column_accessor_sqlite_translated(self): 431 users = self.tables.users 432 433 users.insert().execute(dict(user_id=1, user_name="john")) 434 435 r = ( 436 text( 437 "select users.user_id, users.user_name " 438 "from users " 439 "UNION select users.user_id, " 440 "users.user_name from users", 441 bind=testing.db, 442 ) 443 .execute() 444 .first() 445 ) 446 eq_(r["user_id"], 1) 447 eq_(r["user_name"], "john") 448 449 if testing.against("sqlite < 3.10.0"): 450 eq_(r["users.user_id"], 1) 451 eq_(r["users.user_name"], "john") 452 else: 453 not_in_("users.user_id", r) 454 not_in_("users.user_name", r) 455 456 eq_(list(r.keys()), ["user_id", "user_name"]) 457 458 def test_column_accessor_labels_w_dots(self): 459 users = self.tables.users 460 461 users.insert().execute(dict(user_id=1, user_name="john")) 462 # test using literal tablename.colname 463 r = ( 464 text( 465 'select users.user_id AS "users.user_id", ' 466 'users.user_name AS "users.user_name" ' 467 "from users", 468 bind=testing.db, 469 ) 470 .execution_options(sqlite_raw_colnames=True) 471 .execute() 472 .first() 473 ) 474 eq_(r["users.user_id"], 1) 475 eq_(r["users.user_name"], "john") 476 not_in_("user_name", r) 477 eq_(list(r.keys()), ["users.user_id", "users.user_name"]) 478 479 def test_column_accessor_unary(self): 480 users = self.tables.users 481 482 users.insert().execute(dict(user_id=1, user_name="john")) 483 484 # unary expressions 485 r = ( 486 select([users.c.user_name.distinct()]) 487 .order_by(users.c.user_name) 488 .execute() 489 .first() 490 ) 491 eq_(r[users.c.user_name], "john") 492 eq_(r.user_name, "john") 493 494 def test_column_accessor_err(self): 495 r = testing.db.execute(select([1])).first() 496 assert_raises_message( 497 AttributeError, 498 "Could not locate column in row for column 'foo'", 499 getattr, 500 r, 501 "foo", 502 ) 503 assert_raises_message( 504 KeyError, 505 "Could not locate column in row for column 'foo'", 506 lambda: r["foo"], 507 ) 508 509 def test_graceful_fetch_on_non_rows(self): 510 """test that calling fetchone() etc. on a result that doesn't 511 return rows fails gracefully. 512 513 """ 514 515 # these proxies don't work with no cursor.description present. 516 # so they don't apply to this test at the moment. 517 # result.FullyBufferedResultProxy, 518 # result.BufferedRowResultProxy, 519 # result.BufferedColumnResultProxy 520 521 users = self.tables.users 522 523 conn = testing.db.connect() 524 for meth in [ 525 lambda r: r.fetchone(), 526 lambda r: r.fetchall(), 527 lambda r: r.first(), 528 lambda r: r.scalar(), 529 lambda r: r.fetchmany(), 530 lambda r: r._getter("user"), 531 lambda r: r._has_key("user"), 532 ]: 533 trans = conn.begin() 534 result = conn.execute(users.insert(), user_id=1) 535 assert_raises_message( 536 exc.ResourceClosedError, 537 "This result object does not return rows. " 538 "It has been closed automatically.", 539 meth, 540 result, 541 ) 542 trans.rollback() 543 544 def test_fetchone_til_end(self): 545 result = testing.db.execute("select * from users") 546 eq_(result.fetchone(), None) 547 eq_(result.fetchone(), None) 548 eq_(result.fetchone(), None) 549 result.close() 550 assert_raises_message( 551 exc.ResourceClosedError, 552 "This result object is closed.", 553 result.fetchone, 554 ) 555 556 def test_connectionless_autoclose_rows_exhausted(self): 557 users = self.tables.users 558 users.insert().execute(dict(user_id=1, user_name="john")) 559 560 result = testing.db.execute("select * from users") 561 connection = result.connection 562 assert not connection.closed 563 eq_(result.fetchone(), (1, "john")) 564 assert not connection.closed 565 eq_(result.fetchone(), None) 566 assert connection.closed 567 568 @testing.requires.returning 569 def test_connectionless_autoclose_crud_rows_exhausted(self): 570 users = self.tables.users 571 stmt = ( 572 users.insert() 573 .values(user_id=1, user_name="john") 574 .returning(users.c.user_id) 575 ) 576 result = testing.db.execute(stmt) 577 connection = result.connection 578 assert not connection.closed 579 eq_(result.fetchone(), (1,)) 580 assert not connection.closed 581 eq_(result.fetchone(), None) 582 assert connection.closed 583 584 def test_connectionless_autoclose_no_rows(self): 585 result = testing.db.execute("select * from users") 586 connection = result.connection 587 assert not connection.closed 588 eq_(result.fetchone(), None) 589 assert connection.closed 590 591 @testing.requires.updateable_autoincrement_pks 592 def test_connectionless_autoclose_no_metadata(self): 593 result = testing.db.execute("update users set user_id=5") 594 connection = result.connection 595 assert connection.closed 596 assert_raises_message( 597 exc.ResourceClosedError, 598 "This result object does not return rows.", 599 result.fetchone, 600 ) 601 602 def test_row_case_sensitive(self): 603 row = testing.db.execute( 604 select( 605 [ 606 literal_column("1").label("case_insensitive"), 607 literal_column("2").label("CaseSensitive"), 608 ] 609 ) 610 ).first() 611 612 eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"]) 613 614 in_("case_insensitive", row._keymap) 615 in_("CaseSensitive", row._keymap) 616 not_in_("casesensitive", row._keymap) 617 618 eq_(row["case_insensitive"], 1) 619 eq_(row["CaseSensitive"], 2) 620 621 assert_raises(KeyError, lambda: row["Case_insensitive"]) 622 assert_raises(KeyError, lambda: row["casesensitive"]) 623 624 def test_row_case_sensitive_unoptimized(self): 625 ins_db = engines.testing_engine(options={"case_sensitive": True}) 626 row = ins_db.execute( 627 select( 628 [ 629 literal_column("1").label("case_insensitive"), 630 literal_column("2").label("CaseSensitive"), 631 text("3 AS screw_up_the_cols"), 632 ] 633 ) 634 ).first() 635 636 eq_( 637 list(row.keys()), 638 ["case_insensitive", "CaseSensitive", "screw_up_the_cols"], 639 ) 640 641 in_("case_insensitive", row._keymap) 642 in_("CaseSensitive", row._keymap) 643 not_in_("casesensitive", row._keymap) 644 645 eq_(row["case_insensitive"], 1) 646 eq_(row["CaseSensitive"], 2) 647 eq_(row["screw_up_the_cols"], 3) 648 649 assert_raises(KeyError, lambda: row["Case_insensitive"]) 650 assert_raises(KeyError, lambda: row["casesensitive"]) 651 assert_raises(KeyError, lambda: row["screw_UP_the_cols"]) 652 653 def test_row_case_insensitive(self): 654 ins_db = engines.testing_engine(options={"case_sensitive": False}) 655 row = ins_db.execute( 656 select( 657 [ 658 literal_column("1").label("case_insensitive"), 659 literal_column("2").label("CaseSensitive"), 660 ] 661 ) 662 ).first() 663 664 eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"]) 665 666 in_("case_insensitive", row._keymap) 667 in_("CaseSensitive", row._keymap) 668 in_("casesensitive", row._keymap) 669 670 eq_(row["case_insensitive"], 1) 671 eq_(row["CaseSensitive"], 2) 672 eq_(row["Case_insensitive"], 1) 673 eq_(row["casesensitive"], 2) 674 675 def test_row_case_insensitive_unoptimized(self): 676 ins_db = engines.testing_engine(options={"case_sensitive": False}) 677 row = ins_db.execute( 678 select( 679 [ 680 literal_column("1").label("case_insensitive"), 681 literal_column("2").label("CaseSensitive"), 682 text("3 AS screw_up_the_cols"), 683 ] 684 ) 685 ).first() 686 687 eq_( 688 list(row.keys()), 689 ["case_insensitive", "CaseSensitive", "screw_up_the_cols"], 690 ) 691 692 in_("case_insensitive", row._keymap) 693 in_("CaseSensitive", row._keymap) 694 in_("casesensitive", row._keymap) 695 696 eq_(row["case_insensitive"], 1) 697 eq_(row["CaseSensitive"], 2) 698 eq_(row["screw_up_the_cols"], 3) 699 eq_(row["Case_insensitive"], 1) 700 eq_(row["casesensitive"], 2) 701 eq_(row["screw_UP_the_cols"], 3) 702 703 def test_row_as_args(self): 704 users = self.tables.users 705 706 users.insert().execute(user_id=1, user_name="john") 707 r = users.select(users.c.user_id == 1).execute().first() 708 users.delete().execute() 709 users.insert().execute(r) 710 eq_(users.select().execute().fetchall(), [(1, "john")]) 711 712 def test_result_as_args(self): 713 users = self.tables.users 714 users2 = self.tables.users2 715 716 users.insert().execute( 717 [ 718 dict(user_id=1, user_name="john"), 719 dict(user_id=2, user_name="ed"), 720 ] 721 ) 722 r = users.select().execute() 723 users2.insert().execute(list(r)) 724 eq_( 725 users2.select().order_by(users2.c.user_id).execute().fetchall(), 726 [(1, "john"), (2, "ed")], 727 ) 728 729 users2.delete().execute() 730 r = users.select().execute() 731 users2.insert().execute(*list(r)) 732 eq_( 733 users2.select().order_by(users2.c.user_id).execute().fetchall(), 734 [(1, "john"), (2, "ed")], 735 ) 736 737 @testing.requires.duplicate_names_in_cursor_description 738 def test_ambiguous_column(self): 739 users = self.tables.users 740 addresses = self.tables.addresses 741 742 users.insert().execute(user_id=1, user_name="john") 743 result = users.outerjoin(addresses).select().execute() 744 r = result.first() 745 746 assert_raises_message( 747 exc.InvalidRequestError, 748 "Ambiguous column name", 749 lambda: r["user_id"], 750 ) 751 752 # pure positional targeting; users.c.user_id 753 # and addresses.c.user_id are known! 754 # works as of 1.1 issue #3501 755 eq_(r[users.c.user_id], 1) 756 eq_(r[addresses.c.user_id], None) 757 758 # try to trick it - fake_table isn't in the result! 759 # we get the correct error 760 fake_table = Table("fake", MetaData(), Column("user_id", Integer)) 761 assert_raises_message( 762 exc.InvalidRequestError, 763 "Could not locate column in row for column 'fake.user_id'", 764 lambda: r[fake_table.c.user_id], 765 ) 766 767 r = util.pickle.loads(util.pickle.dumps(r)) 768 assert_raises_message( 769 exc.InvalidRequestError, 770 "Ambiguous column name", 771 lambda: r["user_id"], 772 ) 773 774 result = users.outerjoin(addresses).select().execute() 775 result = _result.BufferedColumnResultProxy(result.context) 776 r = result.first() 777 assert isinstance(r, _result.BufferedColumnRow) 778 assert_raises_message( 779 exc.InvalidRequestError, 780 "Ambiguous column name", 781 lambda: r["user_id"], 782 ) 783 784 @testing.requires.duplicate_names_in_cursor_description 785 def test_ambiguous_column_by_col(self): 786 users = self.tables.users 787 788 users.insert().execute(user_id=1, user_name="john") 789 ua = users.alias() 790 u2 = users.alias() 791 result = select([users.c.user_id, ua.c.user_id]).execute() 792 row = result.first() 793 794 # as of 1.1 issue #3501, we use pure positional 795 # targeting for the column objects here 796 eq_(row[users.c.user_id], 1) 797 798 eq_(row[ua.c.user_id], 1) 799 800 # this now works as of 1.1 issue #3501; 801 # previously this was stuck on "ambiguous column name" 802 assert_raises_message( 803 exc.InvalidRequestError, 804 "Could not locate column in row", 805 lambda: row[u2.c.user_id], 806 ) 807 808 @testing.requires.duplicate_names_in_cursor_description 809 def test_ambiguous_column_case_sensitive(self): 810 eng = engines.testing_engine(options=dict(case_sensitive=False)) 811 812 row = eng.execute( 813 select( 814 [ 815 literal_column("1").label("SOMECOL"), 816 literal_column("1").label("SOMECOL"), 817 ] 818 ) 819 ).first() 820 821 assert_raises_message( 822 exc.InvalidRequestError, 823 "Ambiguous column name", 824 lambda: row["somecol"], 825 ) 826 827 @testing.requires.duplicate_names_in_cursor_description 828 def test_ambiguous_column_contains(self): 829 users = self.tables.users 830 addresses = self.tables.addresses 831 832 # ticket 2702. in 0.7 we'd get True, False. 833 # in 0.8, both columns are present so it's True; 834 # but when they're fetched you'll get the ambiguous error. 835 users.insert().execute(user_id=1, user_name="john") 836 result = ( 837 select([users.c.user_id, addresses.c.user_id]) 838 .select_from(users.outerjoin(addresses)) 839 .execute() 840 ) 841 row = result.first() 842 843 eq_( 844 set([users.c.user_id in row, addresses.c.user_id in row]), 845 set([True]), 846 ) 847 848 def test_ambiguous_column_by_col_plus_label(self): 849 users = self.tables.users 850 851 users.insert().execute(user_id=1, user_name="john") 852 result = select( 853 [ 854 users.c.user_id, 855 type_coerce(users.c.user_id, Integer).label("foo"), 856 ] 857 ).execute() 858 row = result.first() 859 eq_(row[users.c.user_id], 1) 860 eq_(row[1], 1) 861 862 def test_fetch_partial_result_map(self): 863 users = self.tables.users 864 865 users.insert().execute(user_id=7, user_name="ed") 866 867 t = text("select * from users").columns(user_name=String()) 868 eq_(testing.db.execute(t).fetchall(), [(7, "ed")]) 869 870 def test_fetch_unordered_result_map(self): 871 users = self.tables.users 872 873 users.insert().execute(user_id=7, user_name="ed") 874 875 class Goofy1(TypeDecorator): 876 impl = String 877 878 def process_result_value(self, value, dialect): 879 return value + "a" 880 881 class Goofy2(TypeDecorator): 882 impl = String 883 884 def process_result_value(self, value, dialect): 885 return value + "b" 886 887 class Goofy3(TypeDecorator): 888 impl = String 889 890 def process_result_value(self, value, dialect): 891 return value + "c" 892 893 t = text( 894 "select user_name as a, user_name as b, " 895 "user_name as c from users" 896 ).columns(a=Goofy1(), b=Goofy2(), c=Goofy3()) 897 eq_(testing.db.execute(t).fetchall(), [("eda", "edb", "edc")]) 898 899 @testing.requires.subqueries 900 def test_column_label_targeting(self): 901 users = self.tables.users 902 903 users.insert().execute(user_id=7, user_name="ed") 904 905 for s in ( 906 users.select().alias("foo"), 907 users.select().alias(users.name), 908 ): 909 row = s.select(use_labels=True).execute().first() 910 eq_(row[s.c.user_id], 7) 911 eq_(row[s.c.user_name], "ed") 912 913 def test_keys(self): 914 users = self.tables.users 915 916 users.insert().execute(user_id=1, user_name="foo") 917 result = users.select().execute() 918 eq_(result.keys(), ["user_id", "user_name"]) 919 row = result.first() 920 eq_(row.keys(), ["user_id", "user_name"]) 921 922 def test_keys_anon_labels(self): 923 """test [ticket:3483]""" 924 925 users = self.tables.users 926 927 users.insert().execute(user_id=1, user_name="foo") 928 result = testing.db.execute( 929 select( 930 [ 931 users.c.user_id, 932 users.c.user_name.label(None), 933 func.count(literal_column("1")), 934 ] 935 ).group_by(users.c.user_id, users.c.user_name) 936 ) 937 938 eq_(result.keys(), ["user_id", "user_name_1", "count_1"]) 939 row = result.first() 940 eq_(row.keys(), ["user_id", "user_name_1", "count_1"]) 941 942 def test_items(self): 943 users = self.tables.users 944 945 users.insert().execute(user_id=1, user_name="foo") 946 r = users.select().execute().first() 947 eq_( 948 [(x[0].lower(), x[1]) for x in list(r.items())], 949 [("user_id", 1), ("user_name", "foo")], 950 ) 951 952 def test_len(self): 953 users = self.tables.users 954 955 users.insert().execute(user_id=1, user_name="foo") 956 r = users.select().execute().first() 957 eq_(len(r), 2) 958 959 r = testing.db.execute("select user_name, user_id from users").first() 960 eq_(len(r), 2) 961 r = testing.db.execute("select user_name from users").first() 962 eq_(len(r), 1) 963 964 def test_sorting_in_python(self): 965 users = self.tables.users 966 967 users.insert().execute( 968 dict(user_id=1, user_name="foo"), 969 dict(user_id=2, user_name="bar"), 970 dict(user_id=3, user_name="def"), 971 ) 972 973 rows = users.select().order_by(users.c.user_name).execute().fetchall() 974 975 eq_(rows, [(2, "bar"), (3, "def"), (1, "foo")]) 976 977 eq_(sorted(rows), [(1, "foo"), (2, "bar"), (3, "def")]) 978 979 def test_column_order_with_simple_query(self): 980 # should return values in column definition order 981 users = self.tables.users 982 983 users.insert().execute(user_id=1, user_name="foo") 984 r = users.select(users.c.user_id == 1).execute().first() 985 eq_(r[0], 1) 986 eq_(r[1], "foo") 987 eq_([x.lower() for x in list(r.keys())], ["user_id", "user_name"]) 988 eq_(list(r.values()), [1, "foo"]) 989 990 def test_column_order_with_text_query(self): 991 # should return values in query order 992 users = self.tables.users 993 994 users.insert().execute(user_id=1, user_name="foo") 995 r = testing.db.execute("select user_name, user_id from users").first() 996 eq_(r[0], "foo") 997 eq_(r[1], 1) 998 eq_([x.lower() for x in list(r.keys())], ["user_name", "user_id"]) 999 eq_(list(r.values()), ["foo", 1]) 1000 1001 @testing.crashes("oracle", "FIXME: unknown, varify not fails_on()") 1002 @testing.crashes("firebird", "An identifier must begin with a letter") 1003 @testing.provide_metadata 1004 def test_column_accessor_shadow(self): 1005 shadowed = Table( 1006 "test_shadowed", 1007 self.metadata, 1008 Column("shadow_id", INT, primary_key=True), 1009 Column("shadow_name", VARCHAR(20)), 1010 Column("parent", VARCHAR(20)), 1011 Column("row", VARCHAR(40)), 1012 Column("_parent", VARCHAR(20)), 1013 Column("_row", VARCHAR(20)), 1014 ) 1015 self.metadata.create_all() 1016 shadowed.insert().execute( 1017 shadow_id=1, 1018 shadow_name="The Shadow", 1019 parent="The Light", 1020 row="Without light there is no shadow", 1021 _parent="Hidden parent", 1022 _row="Hidden row", 1023 ) 1024 r = shadowed.select(shadowed.c.shadow_id == 1).execute().first() 1025 1026 eq_(r.shadow_id, 1) 1027 eq_(r["shadow_id"], 1) 1028 eq_(r[shadowed.c.shadow_id], 1) 1029 1030 eq_(r.shadow_name, "The Shadow") 1031 eq_(r["shadow_name"], "The Shadow") 1032 eq_(r[shadowed.c.shadow_name], "The Shadow") 1033 1034 eq_(r.parent, "The Light") 1035 eq_(r["parent"], "The Light") 1036 eq_(r[shadowed.c.parent], "The Light") 1037 1038 eq_(r.row, "Without light there is no shadow") 1039 eq_(r["row"], "Without light there is no shadow") 1040 eq_(r[shadowed.c.row], "Without light there is no shadow") 1041 1042 eq_(r["_parent"], "Hidden parent") 1043 eq_(r["_row"], "Hidden row") 1044 1045 def test_nontuple_row(self): 1046 """ensure the C version of BaseRowProxy handles 1047 duck-type-dependent rows.""" 1048 1049 from sqlalchemy.engine import RowProxy 1050 1051 class MyList(object): 1052 def __init__(self, data): 1053 self.internal_list = data 1054 1055 def __len__(self): 1056 return len(self.internal_list) 1057 1058 def __getitem__(self, i): 1059 return list.__getitem__(self.internal_list, i) 1060 1061 proxy = RowProxy( 1062 object(), 1063 MyList(["value"]), 1064 [None], 1065 {"key": (None, None, 0), 0: (None, None, 0)}, 1066 ) 1067 eq_(list(proxy), ["value"]) 1068 eq_(proxy[0], "value") 1069 eq_(proxy["key"], "value") 1070 1071 @testing.provide_metadata 1072 def test_no_rowcount_on_selects_inserts(self): 1073 """assert that rowcount is only called on deletes and updates. 1074 1075 This because cursor.rowcount may can be expensive on some dialects 1076 such as Firebird, however many dialects require it be called 1077 before the cursor is closed. 1078 1079 """ 1080 1081 metadata = self.metadata 1082 1083 engine = engines.testing_engine() 1084 1085 t = Table("t1", metadata, Column("data", String(10))) 1086 metadata.create_all(engine) 1087 1088 with patch.object( 1089 engine.dialect.execution_ctx_cls, "rowcount" 1090 ) as mock_rowcount: 1091 mock_rowcount.__get__ = Mock() 1092 engine.execute( 1093 t.insert(), {"data": "d1"}, {"data": "d2"}, {"data": "d3"} 1094 ) 1095 1096 eq_(len(mock_rowcount.__get__.mock_calls), 0) 1097 1098 eq_( 1099 engine.execute(t.select()).fetchall(), 1100 [("d1",), ("d2",), ("d3",)], 1101 ) 1102 eq_(len(mock_rowcount.__get__.mock_calls), 0) 1103 1104 engine.execute(t.update(), {"data": "d4"}) 1105 1106 eq_(len(mock_rowcount.__get__.mock_calls), 1) 1107 1108 engine.execute(t.delete()) 1109 eq_(len(mock_rowcount.__get__.mock_calls), 2) 1110 1111 def test_rowproxy_is_sequence(self): 1112 from sqlalchemy.util import collections_abc 1113 from sqlalchemy.engine import RowProxy 1114 1115 row = RowProxy( 1116 object(), 1117 ["value"], 1118 [None], 1119 {"key": (None, None, 0), 0: (None, None, 0)}, 1120 ) 1121 assert isinstance(row, collections_abc.Sequence) 1122 1123 @testing.provide_metadata 1124 def test_rowproxy_getitem_indexes_compiled(self): 1125 values = Table( 1126 "rp", 1127 self.metadata, 1128 Column("key", String(10), primary_key=True), 1129 Column("value", String(10)), 1130 ) 1131 values.create() 1132 1133 testing.db.execute(values.insert(), dict(key="One", value="Uno")) 1134 row = testing.db.execute(values.select()).first() 1135 eq_(row["key"], "One") 1136 eq_(row["value"], "Uno") 1137 eq_(row[0], "One") 1138 eq_(row[1], "Uno") 1139 eq_(row[-2], "One") 1140 eq_(row[-1], "Uno") 1141 eq_(row[1:0:-1], ("Uno",)) 1142 1143 @testing.only_on("sqlite") 1144 def test_rowproxy_getitem_indexes_raw(self): 1145 row = testing.db.execute("select 'One' as key, 'Uno' as value").first() 1146 eq_(row["key"], "One") 1147 eq_(row["value"], "Uno") 1148 eq_(row[0], "One") 1149 eq_(row[1], "Uno") 1150 eq_(row[-2], "One") 1151 eq_(row[-1], "Uno") 1152 eq_(row[1:0:-1], ("Uno",)) 1153 1154 @testing.requires.cextensions 1155 def test_row_c_sequence_check(self): 1156 import csv 1157 1158 metadata = MetaData() 1159 metadata.bind = "sqlite://" 1160 users = Table( 1161 "users", 1162 metadata, 1163 Column("id", Integer, primary_key=True), 1164 Column("name", String(40)), 1165 ) 1166 users.create() 1167 1168 users.insert().execute(name="Test") 1169 row = users.select().execute().fetchone() 1170 1171 s = util.StringIO() 1172 writer = csv.writer(s) 1173 # csv performs PySequenceCheck call 1174 writer.writerow(row) 1175 assert s.getvalue().strip() == "1,Test" 1176 1177 @testing.requires.selectone 1178 def test_empty_accessors(self): 1179 statements = [ 1180 ( 1181 "select 1", 1182 [ 1183 lambda r: r.last_inserted_params(), 1184 lambda r: r.last_updated_params(), 1185 lambda r: r.prefetch_cols(), 1186 lambda r: r.postfetch_cols(), 1187 lambda r: r.inserted_primary_key, 1188 ], 1189 "Statement is not a compiled expression construct.", 1190 ), 1191 ( 1192 select([1]), 1193 [ 1194 lambda r: r.last_inserted_params(), 1195 lambda r: r.inserted_primary_key, 1196 ], 1197 r"Statement is not an insert\(\) expression construct.", 1198 ), 1199 ( 1200 select([1]), 1201 [lambda r: r.last_updated_params()], 1202 r"Statement is not an update\(\) expression construct.", 1203 ), 1204 ( 1205 select([1]), 1206 [lambda r: r.prefetch_cols(), lambda r: r.postfetch_cols()], 1207 r"Statement is not an insert\(\) " 1208 r"or update\(\) expression construct.", 1209 ), 1210 ] 1211 1212 for stmt, meths, msg in statements: 1213 r = testing.db.execute(stmt) 1214 try: 1215 for meth in meths: 1216 assert_raises_message( 1217 sa_exc.InvalidRequestError, msg, meth, r 1218 ) 1219 1220 finally: 1221 r.close() 1222 1223 1224class KeyTargetingTest(fixtures.TablesTest): 1225 run_inserts = "once" 1226 run_deletes = None 1227 __backend__ = True 1228 1229 @classmethod 1230 def define_tables(cls, metadata): 1231 Table( 1232 "keyed1", 1233 metadata, 1234 Column("a", CHAR(2), key="b"), 1235 Column("c", CHAR(2), key="q"), 1236 ) 1237 Table("keyed2", metadata, Column("a", CHAR(2)), Column("b", CHAR(2))) 1238 Table("keyed3", metadata, Column("a", CHAR(2)), Column("d", CHAR(2))) 1239 Table("keyed4", metadata, Column("b", CHAR(2)), Column("q", CHAR(2))) 1240 Table("content", metadata, Column("t", String(30), key="type")) 1241 Table("bar", metadata, Column("ctype", String(30), key="content_type")) 1242 1243 if testing.requires.schemas.enabled: 1244 Table( 1245 "wschema", 1246 metadata, 1247 Column("a", CHAR(2), key="b"), 1248 Column("c", CHAR(2), key="q"), 1249 schema=testing.config.test_schema, 1250 ) 1251 1252 @classmethod 1253 def insert_data(cls): 1254 cls.tables.keyed1.insert().execute(dict(b="a1", q="c1")) 1255 cls.tables.keyed2.insert().execute(dict(a="a2", b="b2")) 1256 cls.tables.keyed3.insert().execute(dict(a="a3", d="d3")) 1257 cls.tables.keyed4.insert().execute(dict(b="b4", q="q4")) 1258 cls.tables.content.insert().execute(type="t1") 1259 1260 if testing.requires.schemas.enabled: 1261 cls.tables[ 1262 "%s.wschema" % testing.config.test_schema 1263 ].insert().execute(dict(b="a1", q="c1")) 1264 1265 @testing.requires.schemas 1266 def test_keyed_accessor_wschema(self): 1267 keyed1 = self.tables["%s.wschema" % testing.config.test_schema] 1268 row = testing.db.execute(keyed1.select()).first() 1269 1270 eq_(row.b, "a1") 1271 eq_(row.q, "c1") 1272 eq_(row.a, "a1") 1273 eq_(row.c, "c1") 1274 1275 def test_keyed_accessor_single(self): 1276 keyed1 = self.tables.keyed1 1277 row = testing.db.execute(keyed1.select()).first() 1278 1279 eq_(row.b, "a1") 1280 eq_(row.q, "c1") 1281 eq_(row.a, "a1") 1282 eq_(row.c, "c1") 1283 1284 def test_keyed_accessor_single_labeled(self): 1285 keyed1 = self.tables.keyed1 1286 row = testing.db.execute(keyed1.select().apply_labels()).first() 1287 1288 eq_(row.keyed1_b, "a1") 1289 eq_(row.keyed1_q, "c1") 1290 eq_(row.keyed1_a, "a1") 1291 eq_(row.keyed1_c, "c1") 1292 1293 @testing.requires.duplicate_names_in_cursor_description 1294 def test_keyed_accessor_composite_conflict_2(self): 1295 keyed1 = self.tables.keyed1 1296 keyed2 = self.tables.keyed2 1297 1298 row = testing.db.execute(select([keyed1, keyed2])).first() 1299 # row.b is unambiguous 1300 eq_(row.b, "b2") 1301 # row.a is ambiguous 1302 assert_raises_message( 1303 exc.InvalidRequestError, "Ambig", getattr, row, "a" 1304 ) 1305 1306 def test_keyed_accessor_composite_names_precedent(self): 1307 keyed1 = self.tables.keyed1 1308 keyed4 = self.tables.keyed4 1309 1310 row = testing.db.execute(select([keyed1, keyed4])).first() 1311 eq_(row.b, "b4") 1312 eq_(row.q, "q4") 1313 eq_(row.a, "a1") 1314 eq_(row.c, "c1") 1315 1316 @testing.requires.duplicate_names_in_cursor_description 1317 def test_keyed_accessor_composite_keys_precedent(self): 1318 keyed1 = self.tables.keyed1 1319 keyed3 = self.tables.keyed3 1320 1321 row = testing.db.execute(select([keyed1, keyed3])).first() 1322 eq_(row.q, "c1") 1323 assert_raises_message( 1324 exc.InvalidRequestError, 1325 "Ambiguous column name 'a'", 1326 getattr, 1327 row, 1328 "b", 1329 ) 1330 assert_raises_message( 1331 exc.InvalidRequestError, 1332 "Ambiguous column name 'a'", 1333 getattr, 1334 row, 1335 "a", 1336 ) 1337 eq_(row.d, "d3") 1338 1339 def test_keyed_accessor_composite_labeled(self): 1340 keyed1 = self.tables.keyed1 1341 keyed2 = self.tables.keyed2 1342 1343 row = testing.db.execute( 1344 select([keyed1, keyed2]).apply_labels() 1345 ).first() 1346 eq_(row.keyed1_b, "a1") 1347 eq_(row.keyed1_a, "a1") 1348 eq_(row.keyed1_q, "c1") 1349 eq_(row.keyed1_c, "c1") 1350 eq_(row.keyed2_a, "a2") 1351 eq_(row.keyed2_b, "b2") 1352 assert_raises(KeyError, lambda: row["keyed2_c"]) 1353 assert_raises(KeyError, lambda: row["keyed2_q"]) 1354 1355 def test_column_label_overlap_fallback(self): 1356 content, bar = self.tables.content, self.tables.bar 1357 row = testing.db.execute( 1358 select([content.c.type.label("content_type")]) 1359 ).first() 1360 1361 not_in_(content.c.type, row) 1362 not_in_(bar.c.content_type, row) 1363 1364 in_(sql.column("content_type"), row) 1365 1366 row = testing.db.execute( 1367 select([func.now().label("content_type")]) 1368 ).first() 1369 not_in_(content.c.type, row) 1370 not_in_(bar.c.content_type, row) 1371 in_(sql.column("content_type"), row) 1372 1373 def test_column_label_overlap_fallback_2(self): 1374 content, bar = self.tables.content, self.tables.bar 1375 row = testing.db.execute(content.select(use_labels=True)).first() 1376 in_(content.c.type, row) 1377 not_in_(bar.c.content_type, row) 1378 not_in_(sql.column("content_type"), row) 1379 1380 def test_columnclause_schema_column_one(self): 1381 keyed2 = self.tables.keyed2 1382 1383 # this is addressed by [ticket:2932] 1384 # ColumnClause._compare_name_for_result allows the 1385 # columns which the statement is against to be lightweight 1386 # cols, which results in a more liberal comparison scheme 1387 a, b = sql.column("a"), sql.column("b") 1388 stmt = select([a, b]).select_from(table("keyed2")) 1389 row = testing.db.execute(stmt).first() 1390 1391 in_(keyed2.c.a, row) 1392 in_(keyed2.c.b, row) 1393 in_(a, row) 1394 in_(b, row) 1395 1396 def test_columnclause_schema_column_two(self): 1397 keyed2 = self.tables.keyed2 1398 1399 a, b = sql.column("a"), sql.column("b") 1400 stmt = select([keyed2.c.a, keyed2.c.b]) 1401 row = testing.db.execute(stmt).first() 1402 1403 in_(keyed2.c.a, row) 1404 in_(keyed2.c.b, row) 1405 in_(a, row) 1406 in_(b, row) 1407 1408 def test_columnclause_schema_column_three(self): 1409 keyed2 = self.tables.keyed2 1410 1411 # this is also addressed by [ticket:2932] 1412 1413 a, b = sql.column("a"), sql.column("b") 1414 stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR) 1415 row = testing.db.execute(stmt).first() 1416 1417 in_(keyed2.c.a, row) 1418 in_(keyed2.c.b, row) 1419 in_(a, row) 1420 in_(b, row) 1421 in_(stmt.c.a, row) 1422 in_(stmt.c.b, row) 1423 1424 def test_columnclause_schema_column_four(self): 1425 keyed2 = self.tables.keyed2 1426 1427 # this is also addressed by [ticket:2932] 1428 1429 a, b = sql.column("keyed2_a"), sql.column("keyed2_b") 1430 stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( 1431 a, b 1432 ) 1433 row = testing.db.execute(stmt).first() 1434 1435 in_(keyed2.c.a, row) 1436 in_(keyed2.c.b, row) 1437 in_(a, row) 1438 in_(b, row) 1439 in_(stmt.c.keyed2_a, row) 1440 in_(stmt.c.keyed2_b, row) 1441 1442 def test_columnclause_schema_column_five(self): 1443 keyed2 = self.tables.keyed2 1444 1445 # this is also addressed by [ticket:2932] 1446 1447 stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( 1448 keyed2_a=CHAR, keyed2_b=CHAR 1449 ) 1450 row = testing.db.execute(stmt).first() 1451 1452 in_(keyed2.c.a, row) 1453 in_(keyed2.c.b, row) 1454 in_(stmt.c.keyed2_a, row) 1455 in_(stmt.c.keyed2_b, row) 1456 1457 1458class PositionalTextTest(fixtures.TablesTest): 1459 run_inserts = "once" 1460 run_deletes = None 1461 __backend__ = True 1462 1463 @classmethod 1464 def define_tables(cls, metadata): 1465 Table( 1466 "text1", 1467 metadata, 1468 Column("a", CHAR(2)), 1469 Column("b", CHAR(2)), 1470 Column("c", CHAR(2)), 1471 Column("d", CHAR(2)), 1472 ) 1473 1474 @classmethod 1475 def insert_data(cls): 1476 cls.tables.text1.insert().execute( 1477 [dict(a="a1", b="b1", c="c1", d="d1")] 1478 ) 1479 1480 def test_via_column(self): 1481 c1, c2, c3, c4 = column("q"), column("p"), column("r"), column("d") 1482 stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) 1483 1484 result = testing.db.execute(stmt) 1485 row = result.first() 1486 1487 eq_(row[c2], "b1") 1488 eq_(row[c4], "d1") 1489 eq_(row[1], "b1") 1490 eq_(row["b"], "b1") 1491 eq_(row.keys(), ["a", "b", "c", "d"]) 1492 eq_(row["r"], "c1") 1493 eq_(row["d"], "d1") 1494 1495 def test_fewer_cols_than_sql_positional(self): 1496 c1, c2 = column("q"), column("p") 1497 stmt = text("select a, b, c, d from text1").columns(c1, c2) 1498 1499 # no warning as this can be similar for non-positional 1500 result = testing.db.execute(stmt) 1501 row = result.first() 1502 1503 eq_(row[c1], "a1") 1504 eq_(row["c"], "c1") 1505 1506 def test_fewer_cols_than_sql_non_positional(self): 1507 c1, c2 = column("a"), column("p") 1508 stmt = text("select a, b, c, d from text1").columns(c2, c1, d=CHAR) 1509 1510 # no warning as this can be similar for non-positional 1511 result = testing.db.execute(stmt) 1512 row = result.first() 1513 1514 # c1 name matches, locates 1515 eq_(row[c1], "a1") 1516 eq_(row["c"], "c1") 1517 1518 # c2 name does not match, doesn't locate 1519 assert_raises_message( 1520 exc.NoSuchColumnError, "in row for column 'p'", lambda: row[c2] 1521 ) 1522 1523 def test_more_cols_than_sql(self): 1524 c1, c2, c3, c4 = column("q"), column("p"), column("r"), column("d") 1525 stmt = text("select a, b from text1").columns(c1, c2, c3, c4) 1526 1527 with assertions.expect_warnings( 1528 r"Number of columns in textual SQL \(4\) is " 1529 r"smaller than number of columns requested \(2\)" 1530 ): 1531 result = testing.db.execute(stmt) 1532 1533 row = result.first() 1534 eq_(row[c2], "b1") 1535 1536 assert_raises_message( 1537 exc.NoSuchColumnError, "in row for column 'r'", lambda: row[c3] 1538 ) 1539 1540 def test_dupe_col_obj(self): 1541 c1, c2, c3 = column("q"), column("p"), column("r") 1542 stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c2) 1543 1544 assert_raises_message( 1545 exc.InvalidRequestError, 1546 "Duplicate column expression requested in " 1547 "textual SQL: <.*.ColumnClause.*; p>", 1548 testing.db.execute, 1549 stmt, 1550 ) 1551 1552 def test_anon_aliased_unique(self): 1553 text1 = self.tables.text1 1554 1555 c1 = text1.c.a.label(None) 1556 c2 = text1.alias().c.c 1557 c3 = text1.alias().c.b 1558 c4 = text1.alias().c.d.label(None) 1559 1560 stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) 1561 result = testing.db.execute(stmt) 1562 row = result.first() 1563 1564 eq_(row[c1], "a1") 1565 eq_(row[c2], "b1") 1566 eq_(row[c3], "c1") 1567 eq_(row[c4], "d1") 1568 1569 # key fallback rules still match this to a column 1570 # unambiguously based on its name 1571 eq_(row[text1.c.a], "a1") 1572 1573 # key fallback rules still match this to a column 1574 # unambiguously based on its name 1575 eq_(row[text1.c.d], "d1") 1576 1577 # text1.c.b goes nowhere....because we hit key fallback 1578 # but the text1.c.b doesn't derive from text1.c.c 1579 assert_raises_message( 1580 exc.NoSuchColumnError, 1581 "Could not locate column in row for column 'text1.b'", 1582 lambda: row[text1.c.b], 1583 ) 1584 1585 def test_anon_aliased_overlapping(self): 1586 text1 = self.tables.text1 1587 1588 c1 = text1.c.a.label(None) 1589 c2 = text1.alias().c.a 1590 c3 = text1.alias().c.a.label(None) 1591 c4 = text1.c.a.label(None) 1592 1593 stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) 1594 result = testing.db.execute(stmt) 1595 row = result.first() 1596 1597 eq_(row[c1], "a1") 1598 eq_(row[c2], "b1") 1599 eq_(row[c3], "c1") 1600 eq_(row[c4], "d1") 1601 1602 # key fallback rules still match this to a column 1603 # unambiguously based on its name 1604 eq_(row[text1.c.a], "a1") 1605 1606 def test_anon_aliased_name_conflict(self): 1607 text1 = self.tables.text1 1608 1609 c1 = text1.c.a.label("a") 1610 c2 = text1.alias().c.a 1611 c3 = text1.alias().c.a.label("a") 1612 c4 = text1.c.a.label("a") 1613 1614 # all cols are named "a". if we are positional, we don't care. 1615 # this is new logic in 1.1 1616 stmt = text("select a, b as a, c as a, d as a from text1").columns( 1617 c1, c2, c3, c4 1618 ) 1619 result = testing.db.execute(stmt) 1620 row = result.first() 1621 1622 eq_(row[c1], "a1") 1623 eq_(row[c2], "b1") 1624 eq_(row[c3], "c1") 1625 eq_(row[c4], "d1") 1626 1627 # fails, because we hit key fallback and find conflicts 1628 # in columns that are presnet 1629 assert_raises_message( 1630 exc.NoSuchColumnError, 1631 "Could not locate column in row for column 'text1.a'", 1632 lambda: row[text1.c.a], 1633 ) 1634 1635 1636class AlternateResultProxyTest(fixtures.TablesTest): 1637 __requires__ = ("sqlite",) 1638 1639 @classmethod 1640 def setup_bind(cls): 1641 cls.engine = engine = engines.testing_engine("sqlite://") 1642 return engine 1643 1644 @classmethod 1645 def define_tables(cls, metadata): 1646 Table( 1647 "test", 1648 metadata, 1649 Column("x", Integer, primary_key=True), 1650 Column("y", String(50, convert_unicode="force")), 1651 ) 1652 1653 @classmethod 1654 def insert_data(cls): 1655 cls.engine.execute( 1656 cls.tables.test.insert(), 1657 [{"x": i, "y": "t_%d" % i} for i in range(1, 12)], 1658 ) 1659 1660 @contextmanager 1661 def _proxy_fixture(self, cls): 1662 self.table = self.tables.test 1663 1664 class ExcCtx(default.DefaultExecutionContext): 1665 def get_result_proxy(self): 1666 return cls(self) 1667 1668 self.patcher = patch.object( 1669 self.engine.dialect, "execution_ctx_cls", ExcCtx 1670 ) 1671 with self.patcher: 1672 yield 1673 1674 def _test_proxy(self, cls): 1675 with self._proxy_fixture(cls): 1676 rows = [] 1677 r = self.engine.execute(select([self.table])) 1678 assert isinstance(r, cls) 1679 for i in range(5): 1680 rows.append(r.fetchone()) 1681 eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) 1682 1683 rows = r.fetchmany(3) 1684 eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)]) 1685 1686 rows = r.fetchall() 1687 eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)]) 1688 1689 r = self.engine.execute(select([self.table])) 1690 rows = r.fetchmany(None) 1691 eq_(rows[0], (1, "t_1")) 1692 # number of rows here could be one, or the whole thing 1693 assert len(rows) == 1 or len(rows) == 11 1694 1695 r = self.engine.execute(select([self.table]).limit(1)) 1696 r.fetchone() 1697 eq_(r.fetchone(), None) 1698 1699 r = self.engine.execute(select([self.table]).limit(5)) 1700 rows = r.fetchmany(6) 1701 eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) 1702 1703 # result keeps going just fine with blank results... 1704 eq_(r.fetchmany(2), []) 1705 1706 eq_(r.fetchmany(2), []) 1707 1708 eq_(r.fetchall(), []) 1709 1710 eq_(r.fetchone(), None) 1711 1712 # until we close 1713 r.close() 1714 1715 self._assert_result_closed(r) 1716 1717 r = self.engine.execute(select([self.table]).limit(5)) 1718 eq_(r.first(), (1, "t_1")) 1719 self._assert_result_closed(r) 1720 1721 r = self.engine.execute(select([self.table]).limit(5)) 1722 eq_(r.scalar(), 1) 1723 self._assert_result_closed(r) 1724 1725 def _assert_result_closed(self, r): 1726 assert_raises_message( 1727 sa_exc.ResourceClosedError, "object is closed", r.fetchone 1728 ) 1729 1730 assert_raises_message( 1731 sa_exc.ResourceClosedError, "object is closed", r.fetchmany, 2 1732 ) 1733 1734 assert_raises_message( 1735 sa_exc.ResourceClosedError, "object is closed", r.fetchall 1736 ) 1737 1738 def test_basic_plain(self): 1739 self._test_proxy(_result.ResultProxy) 1740 1741 def test_basic_buffered_row_result_proxy(self): 1742 self._test_proxy(_result.BufferedRowResultProxy) 1743 1744 def test_basic_fully_buffered_result_proxy(self): 1745 self._test_proxy(_result.FullyBufferedResultProxy) 1746 1747 def test_basic_buffered_column_result_proxy(self): 1748 self._test_proxy(_result.BufferedColumnResultProxy) 1749 1750 def test_resultprocessor_plain(self): 1751 self._test_result_processor(_result.ResultProxy, False) 1752 1753 def test_resultprocessor_plain_cached(self): 1754 self._test_result_processor(_result.ResultProxy, True) 1755 1756 def test_resultprocessor_buffered_column(self): 1757 self._test_result_processor(_result.BufferedColumnResultProxy, False) 1758 1759 def test_resultprocessor_buffered_column_cached(self): 1760 self._test_result_processor(_result.BufferedColumnResultProxy, True) 1761 1762 def test_resultprocessor_buffered_row(self): 1763 self._test_result_processor(_result.BufferedRowResultProxy, False) 1764 1765 def test_resultprocessor_buffered_row_cached(self): 1766 self._test_result_processor(_result.BufferedRowResultProxy, True) 1767 1768 def test_resultprocessor_fully_buffered(self): 1769 self._test_result_processor(_result.FullyBufferedResultProxy, False) 1770 1771 def test_resultprocessor_fully_buffered_cached(self): 1772 self._test_result_processor(_result.FullyBufferedResultProxy, True) 1773 1774 def _test_result_processor(self, cls, use_cache): 1775 class MyType(TypeDecorator): 1776 impl = String() 1777 1778 def process_result_value(self, value, dialect): 1779 return "HI " + value 1780 1781 with self._proxy_fixture(cls): 1782 with self.engine.connect() as conn: 1783 if use_cache: 1784 cache = {} 1785 conn = conn.execution_options(compiled_cache=cache) 1786 1787 stmt = select([literal("THERE", type_=MyType())]) 1788 for i in range(2): 1789 r = conn.execute(stmt) 1790 eq_(r.scalar(), "HI THERE") 1791 1792 def test_buffered_row_growth(self): 1793 with self._proxy_fixture(_result.BufferedRowResultProxy): 1794 with self.engine.connect() as conn: 1795 conn.execute( 1796 self.table.insert(), 1797 [{"x": i, "y": "t_%d" % i} for i in range(15, 1200)], 1798 ) 1799 result = conn.execute(self.table.select()) 1800 checks = {0: 5, 1: 10, 9: 20, 135: 250, 274: 500, 1351: 1000} 1801 for idx, row in enumerate(result, 0): 1802 if idx in checks: 1803 eq_(result._bufsize, checks[idx]) 1804 le_(len(result._BufferedRowResultProxy__rowbuffer), 1000) 1805 1806 def test_max_row_buffer_option(self): 1807 with self._proxy_fixture(_result.BufferedRowResultProxy): 1808 with self.engine.connect() as conn: 1809 conn.execute( 1810 self.table.insert(), 1811 [{"x": i, "y": "t_%d" % i} for i in range(15, 1200)], 1812 ) 1813 result = conn.execution_options(max_row_buffer=27).execute( 1814 self.table.select() 1815 ) 1816 for idx, row in enumerate(result, 0): 1817 if idx in (16, 70, 150, 250): 1818 eq_(result._bufsize, 27) 1819 le_(len(result._BufferedRowResultProxy__rowbuffer), 27) 1820