1# coding: utf-8 2import contextlib 3import datetime 4import logging 5import logging.handlers 6 7from sqlalchemy import BigInteger 8from sqlalchemy import bindparam 9from sqlalchemy import cast 10from sqlalchemy import Column 11from sqlalchemy import DateTime 12from sqlalchemy import dialects 13from sqlalchemy import event 14from sqlalchemy import exc 15from sqlalchemy import extract 16from sqlalchemy import func 17from sqlalchemy import Integer 18from sqlalchemy import literal 19from sqlalchemy import literal_column 20from sqlalchemy import MetaData 21from sqlalchemy import Numeric 22from sqlalchemy import schema 23from sqlalchemy import select 24from sqlalchemy import Sequence 25from sqlalchemy import SmallInteger 26from sqlalchemy import String 27from sqlalchemy import Table 28from sqlalchemy import testing 29from sqlalchemy import text 30from sqlalchemy import TypeDecorator 31from sqlalchemy.dialects.postgresql import base as postgresql 32from sqlalchemy.dialects.postgresql import psycopg2 as psycopg2_dialect 33from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_BATCH 34from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_DEFAULT 35from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_VALUES 36from sqlalchemy.engine import engine_from_config 37from sqlalchemy.engine import url 38from sqlalchemy.testing import engines 39from sqlalchemy.testing import expect_deprecated 40from sqlalchemy.testing import fixtures 41from sqlalchemy.testing import is_ 42from sqlalchemy.testing import mock 43from sqlalchemy.testing.assertions import assert_raises 44from sqlalchemy.testing.assertions import assert_raises_message 45from sqlalchemy.testing.assertions import AssertsCompiledSQL 46from sqlalchemy.testing.assertions import AssertsExecutionResults 47from sqlalchemy.testing.assertions import eq_ 48from sqlalchemy.testing.assertions import eq_regex 49from sqlalchemy.testing.assertions import ne_ 50from ...engine import test_execute 51 52 53class DialectTest(fixtures.TestBase): 54 """python-side dialect tests. """ 55 56 def test_version_parsing(self): 57 def mock_conn(res): 58 return mock.Mock( 59 execute=mock.Mock( 60 return_value=mock.Mock(scalar=mock.Mock(return_value=res)) 61 ) 62 ) 63 64 dialect = postgresql.dialect() 65 for string, version in [ 66 ( 67 "PostgreSQL 8.3.8 on i686-redhat-linux-gnu, compiled by " 68 "GCC gcc (GCC) 4.1.2 20070925 (Red Hat 4.1.2-33)", 69 (8, 3, 8), 70 ), 71 ( 72 "PostgreSQL 8.5devel on x86_64-unknown-linux-gnu, " 73 "compiled by GCC gcc (GCC) 4.4.2, 64-bit", 74 (8, 5), 75 ), 76 ( 77 "EnterpriseDB 9.1.2.2 on x86_64-unknown-linux-gnu, " 78 "compiled by gcc (GCC) 4.1.2 20080704 (Red Hat 4.1.2-50), " 79 "64-bit", 80 (9, 1, 2), 81 ), 82 ( 83 "[PostgreSQL 9.2.4 ] VMware vFabric Postgres 9.2.4.0 " 84 "release build 1080137", 85 (9, 2, 4), 86 ), 87 ( 88 "PostgreSQL 10devel on x86_64-pc-linux-gnu" 89 "compiled by gcc (GCC) 6.3.1 20170306, 64-bit", 90 (10,), 91 ), 92 ( 93 "PostgreSQL 10beta1 on x86_64-pc-linux-gnu, " 94 "compiled by gcc (GCC) 4.8.5 20150623 " 95 "(Red Hat 4.8.5-11), 64-bit", 96 (10,), 97 ), 98 ]: 99 eq_(dialect._get_server_version_info(mock_conn(string)), version) 100 101 def test_deprecated_dialect_name_still_loads(self): 102 dialects.registry.clear() 103 with expect_deprecated( 104 "The 'postgres' dialect name " "has been renamed to 'postgresql'" 105 ): 106 dialect = url.URL("postgres").get_dialect() 107 is_(dialect, postgresql.dialect) 108 109 @testing.requires.psycopg2_compatibility 110 def test_pg_dialect_use_native_unicode_from_config(self): 111 config = { 112 "sqlalchemy.url": testing.db.url, 113 "sqlalchemy.use_native_unicode": "false", 114 } 115 116 e = engine_from_config(config, _initialize=False) 117 eq_(e.dialect.use_native_unicode, False) 118 119 config = { 120 "sqlalchemy.url": testing.db.url, 121 "sqlalchemy.use_native_unicode": "true", 122 } 123 124 e = engine_from_config(config, _initialize=False) 125 eq_(e.dialect.use_native_unicode, True) 126 127 def test_psycopg2_empty_connection_string(self): 128 dialect = psycopg2_dialect.dialect() 129 u = url.make_url("postgresql://") 130 cargs, cparams = dialect.create_connect_args(u) 131 eq_(cargs, [""]) 132 eq_(cparams, {}) 133 134 def test_psycopg2_nonempty_connection_string(self): 135 dialect = psycopg2_dialect.dialect() 136 u = url.make_url("postgresql://host") 137 cargs, cparams = dialect.create_connect_args(u) 138 eq_(cargs, []) 139 eq_(cparams, {"host": "host"}) 140 141 def test_psycopg2_empty_connection_string_w_query_one(self): 142 dialect = psycopg2_dialect.dialect() 143 u = url.make_url("postgresql:///?service=swh-log") 144 cargs, cparams = dialect.create_connect_args(u) 145 eq_(cargs, []) 146 eq_(cparams, {"service": "swh-log"}) 147 148 def test_psycopg2_empty_connection_string_w_query_two(self): 149 dialect = psycopg2_dialect.dialect() 150 u = url.make_url("postgresql:///?any_random_thing=yes") 151 cargs, cparams = dialect.create_connect_args(u) 152 eq_(cargs, []) 153 eq_(cparams, {"any_random_thing": "yes"}) 154 155 def test_psycopg2_nonempty_connection_string_w_query(self): 156 dialect = psycopg2_dialect.dialect() 157 u = url.make_url("postgresql://somehost/?any_random_thing=yes") 158 cargs, cparams = dialect.create_connect_args(u) 159 eq_(cargs, []) 160 eq_(cparams, {"host": "somehost", "any_random_thing": "yes"}) 161 162 def test_psycopg2_nonempty_connection_string_w_query_two(self): 163 dialect = psycopg2_dialect.dialect() 164 url_string = "postgresql://USER:PASS@/DB?host=hostA" 165 u = url.make_url(url_string) 166 cargs, cparams = dialect.create_connect_args(u) 167 eq_(cargs, []) 168 eq_(cparams["host"], "hostA") 169 170 def test_psycopg2_nonempty_connection_string_w_query_three(self): 171 dialect = psycopg2_dialect.dialect() 172 url_string = ( 173 "postgresql://USER:PASS@/DB" 174 "?host=hostA:portA&host=hostB&host=hostC" 175 ) 176 u = url.make_url(url_string) 177 cargs, cparams = dialect.create_connect_args(u) 178 eq_(cargs, []) 179 eq_(cparams["host"], "hostA:portA,hostB,hostC") 180 181 182class ExecuteManyMode(object): 183 __only_on__ = "postgresql+psycopg2" 184 __backend__ = True 185 186 run_create_tables = "each" 187 188 options = None 189 190 @classmethod 191 def define_tables(cls, metadata): 192 Table( 193 "data", 194 metadata, 195 Column("id", Integer, primary_key=True), 196 Column("x", String), 197 Column("y", String), 198 Column("z", Integer, server_default="5"), 199 ) 200 201 @contextlib.contextmanager 202 def expect_deprecated_opts(self): 203 yield 204 205 def setup(self): 206 super(ExecuteManyMode, self).setup() 207 with self.expect_deprecated_opts(): 208 self.engine = engines.testing_engine(options=self.options) 209 210 def teardown(self): 211 self.engine.dispose() 212 super(ExecuteManyMode, self).teardown() 213 214 def test_insert(self): 215 from psycopg2 import extras 216 217 if self.engine.dialect.executemany_mode is EXECUTEMANY_BATCH: 218 meth = extras.execute_batch 219 stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)" 220 expected_kwargs = {} 221 else: 222 meth = extras.execute_values 223 stmt = "INSERT INTO data (x, y) VALUES %s" 224 expected_kwargs = {"template": "(%(x)s, %(y)s)"} 225 226 with mock.patch.object( 227 extras, meth.__name__, side_effect=meth 228 ) as mock_exec: 229 with self.engine.connect() as conn: 230 conn.execute( 231 self.tables.data.insert(), 232 [ 233 {"x": "x1", "y": "y1"}, 234 {"x": "x2", "y": "y2"}, 235 {"x": "x3", "y": "y3"}, 236 ], 237 ) 238 239 eq_( 240 conn.execute(select([self.tables.data])).fetchall(), 241 [ 242 (1, "x1", "y1", 5), 243 (2, "x2", "y2", 5), 244 (3, "x3", "y3", 5), 245 ], 246 ) 247 eq_( 248 mock_exec.mock_calls, 249 [ 250 mock.call( 251 mock.ANY, 252 stmt, 253 ( 254 {"x": "x1", "y": "y1"}, 255 {"x": "x2", "y": "y2"}, 256 {"x": "x3", "y": "y3"}, 257 ), 258 **expected_kwargs 259 ) 260 ], 261 ) 262 263 def test_insert_no_page_size(self): 264 from psycopg2 import extras 265 266 eng = self.engine 267 if eng.dialect.executemany_mode is EXECUTEMANY_BATCH: 268 meth = extras.execute_batch 269 stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)" 270 expected_kwargs = {} 271 else: 272 meth = extras.execute_values 273 stmt = "INSERT INTO data (x, y) VALUES %s" 274 expected_kwargs = {"template": "(%(x)s, %(y)s)"} 275 276 with mock.patch.object( 277 extras, meth.__name__, side_effect=meth 278 ) as mock_exec: 279 with eng.connect() as conn: 280 conn.execute( 281 self.tables.data.insert(), 282 [ 283 {"x": "x1", "y": "y1"}, 284 {"x": "x2", "y": "y2"}, 285 {"x": "x3", "y": "y3"}, 286 ], 287 ) 288 289 eq_( 290 mock_exec.mock_calls, 291 [ 292 mock.call( 293 mock.ANY, 294 stmt, 295 ( 296 {"x": "x1", "y": "y1"}, 297 {"x": "x2", "y": "y2"}, 298 {"x": "x3", "y": "y3"}, 299 ), 300 **expected_kwargs 301 ) 302 ], 303 ) 304 305 def test_insert_page_size(self): 306 from psycopg2 import extras 307 308 opts = self.options.copy() 309 opts["executemany_batch_page_size"] = 500 310 opts["executemany_values_page_size"] = 1000 311 312 with self.expect_deprecated_opts(): 313 eng = engines.testing_engine(options=opts) 314 315 if eng.dialect.executemany_mode is EXECUTEMANY_BATCH: 316 meth = extras.execute_batch 317 stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)" 318 expected_kwargs = {"page_size": 500} 319 else: 320 meth = extras.execute_values 321 stmt = "INSERT INTO data (x, y) VALUES %s" 322 expected_kwargs = {"page_size": 1000, "template": "(%(x)s, %(y)s)"} 323 324 with mock.patch.object( 325 extras, meth.__name__, side_effect=meth 326 ) as mock_exec: 327 with eng.connect() as conn: 328 conn.execute( 329 self.tables.data.insert(), 330 [ 331 {"x": "x1", "y": "y1"}, 332 {"x": "x2", "y": "y2"}, 333 {"x": "x3", "y": "y3"}, 334 ], 335 ) 336 337 eq_( 338 mock_exec.mock_calls, 339 [ 340 mock.call( 341 mock.ANY, 342 stmt, 343 ( 344 {"x": "x1", "y": "y1"}, 345 {"x": "x2", "y": "y2"}, 346 {"x": "x3", "y": "y3"}, 347 ), 348 **expected_kwargs 349 ) 350 ], 351 ) 352 353 def test_update_fallback(self): 354 from psycopg2 import extras 355 356 eng = self.engine 357 meth = extras.execute_batch 358 stmt = "UPDATE data SET y=%(yval)s WHERE data.x = %(xval)s" 359 expected_kwargs = {} 360 361 with mock.patch.object( 362 extras, meth.__name__, side_effect=meth 363 ) as mock_exec: 364 with eng.connect() as conn: 365 conn.execute( 366 self.tables.data.update() 367 .where(self.tables.data.c.x == bindparam("xval")) 368 .values(y=bindparam("yval")), 369 [ 370 {"xval": "x1", "yval": "y5"}, 371 {"xval": "x3", "yval": "y6"}, 372 ], 373 ) 374 375 eq_( 376 mock_exec.mock_calls, 377 [ 378 mock.call( 379 mock.ANY, 380 stmt, 381 ( 382 {"xval": "x1", "yval": "y5"}, 383 {"xval": "x3", "yval": "y6"}, 384 ), 385 **expected_kwargs 386 ) 387 ], 388 ) 389 390 def test_not_sane_rowcount(self): 391 self.engine.connect().close() 392 assert not self.engine.dialect.supports_sane_multi_rowcount 393 394 def test_update(self): 395 with self.engine.connect() as conn: 396 conn.execute( 397 self.tables.data.insert(), 398 [ 399 {"x": "x1", "y": "y1"}, 400 {"x": "x2", "y": "y2"}, 401 {"x": "x3", "y": "y3"}, 402 ], 403 ) 404 405 conn.execute( 406 self.tables.data.update() 407 .where(self.tables.data.c.x == bindparam("xval")) 408 .values(y=bindparam("yval")), 409 [{"xval": "x1", "yval": "y5"}, {"xval": "x3", "yval": "y6"}], 410 ) 411 eq_( 412 conn.execute( 413 select([self.tables.data]).order_by(self.tables.data.c.id) 414 ).fetchall(), 415 [(1, "x1", "y5", 5), (2, "x2", "y2", 5), (3, "x3", "y6", 5)], 416 ) 417 418 419class UseBatchModeTest(ExecuteManyMode, fixtures.TablesTest): 420 options = {"use_batch_mode": True} 421 422 def expect_deprecated_opts(self): 423 return expect_deprecated( 424 "The psycopg2 use_batch_mode flag is superseded by " 425 "executemany_mode='batch'" 426 ) 427 428 429class ExecutemanyBatchModeTest(ExecuteManyMode, fixtures.TablesTest): 430 options = {"executemany_mode": "batch"} 431 432 433class ExecutemanyValuesInsertsTest(ExecuteManyMode, fixtures.TablesTest): 434 options = {"executemany_mode": "values"} 435 436 def test_insert_w_newlines(self): 437 from psycopg2 import extras 438 439 t = self.tables.data 440 441 ins = t.insert(inline=True).values( 442 id=bindparam("id"), 443 x=select([literal_column("5")]).select_from(self.tables.data), 444 y=bindparam("y"), 445 z=bindparam("z"), 446 ) 447 # compiled SQL has a newline in it 448 eq_( 449 str(ins.compile(testing.db)), 450 "INSERT INTO data (id, x, y, z) VALUES (%(id)s, " 451 "(SELECT 5 \nFROM data), %(y)s, %(z)s)", 452 ) 453 meth = extras.execute_values 454 with mock.patch.object( 455 extras, "execute_values", side_effect=meth 456 ) as mock_exec: 457 458 with self.engine.connect() as conn: 459 conn.execute( 460 ins, 461 [ 462 {"id": 1, "y": "y1", "z": 1}, 463 {"id": 2, "y": "y2", "z": 2}, 464 {"id": 3, "y": "y3", "z": 3}, 465 ], 466 ) 467 468 eq_( 469 mock_exec.mock_calls, 470 [ 471 mock.call( 472 mock.ANY, 473 "INSERT INTO data (id, x, y, z) VALUES %s", 474 ( 475 {"id": 1, "y": "y1", "z": 1}, 476 {"id": 2, "y": "y2", "z": 2}, 477 {"id": 3, "y": "y3", "z": 3}, 478 ), 479 template="(%(id)s, (SELECT 5 \nFROM data), %(y)s, %(z)s)", 480 ) 481 ], 482 ) 483 484 def test_insert_modified_by_event(self): 485 from psycopg2 import extras 486 487 t = self.tables.data 488 489 ins = t.insert(inline=True).values( 490 id=bindparam("id"), 491 x=select([literal_column("5")]).select_from(self.tables.data), 492 y=bindparam("y"), 493 z=bindparam("z"), 494 ) 495 # compiled SQL has a newline in it 496 eq_( 497 str(ins.compile(testing.db)), 498 "INSERT INTO data (id, x, y, z) VALUES (%(id)s, " 499 "(SELECT 5 \nFROM data), %(y)s, %(z)s)", 500 ) 501 meth = extras.execute_batch 502 with mock.patch.object( 503 extras, "execute_values" 504 ) as mock_values, mock.patch.object( 505 extras, "execute_batch", side_effect=meth 506 ) as mock_batch: 507 508 with self.engine.connect() as conn: 509 510 # create an event hook that will change the statement to 511 # something else, meaning the dialect has to detect that 512 # insert_single_values_expr is no longer useful 513 @event.listens_for(conn, "before_cursor_execute", retval=True) 514 def before_cursor_execute( 515 conn, cursor, statement, parameters, context, executemany 516 ): 517 statement = ( 518 "INSERT INTO data (id, y, z) VALUES " 519 "(%(id)s, %(y)s, %(z)s)" 520 ) 521 return statement, parameters 522 523 conn.execute( 524 ins, 525 [ 526 {"id": 1, "y": "y1", "z": 1}, 527 {"id": 2, "y": "y2", "z": 2}, 528 {"id": 3, "y": "y3", "z": 3}, 529 ], 530 ) 531 532 eq_(mock_values.mock_calls, []) 533 eq_( 534 mock_batch.mock_calls, 535 [ 536 mock.call( 537 mock.ANY, 538 "INSERT INTO data (id, y, z) VALUES " 539 "(%(id)s, %(y)s, %(z)s)", 540 ( 541 {"id": 1, "y": "y1", "z": 1}, 542 {"id": 2, "y": "y2", "z": 2}, 543 {"id": 3, "y": "y3", "z": 3}, 544 ), 545 ) 546 ], 547 ) 548 549 550class ExecutemanyFlagOptionsTest(fixtures.TablesTest): 551 __only_on__ = "postgresql+psycopg2" 552 __backend__ = True 553 554 def test_executemany_correct_flag_options(self): 555 for opt, expected in [ 556 (None, EXECUTEMANY_DEFAULT), 557 ("batch", EXECUTEMANY_BATCH), 558 ("values", EXECUTEMANY_VALUES), 559 ]: 560 self.engine = engines.testing_engine( 561 options={"executemany_mode": opt} 562 ) 563 is_(self.engine.dialect.executemany_mode, expected) 564 565 def test_executemany_wrong_flag_options(self): 566 for opt in [1, True, "batch_insert"]: 567 assert_raises_message( 568 exc.ArgumentError, 569 "Invalid value for 'executemany_mode': %r" % opt, 570 engines.testing_engine, 571 options={"executemany_mode": opt}, 572 ) 573 574 575class MiscBackendTest( 576 fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL 577): 578 579 __only_on__ = "postgresql" 580 __backend__ = True 581 582 @testing.provide_metadata 583 def test_date_reflection(self): 584 metadata = self.metadata 585 Table( 586 "pgdate", 587 metadata, 588 Column("date1", DateTime(timezone=True)), 589 Column("date2", DateTime(timezone=False)), 590 ) 591 metadata.create_all() 592 m2 = MetaData(testing.db) 593 t2 = Table("pgdate", m2, autoload=True) 594 assert t2.c.date1.type.timezone is True 595 assert t2.c.date2.type.timezone is False 596 597 @testing.requires.psycopg2_compatibility 598 def test_psycopg2_version(self): 599 v = testing.db.dialect.psycopg2_version 600 assert testing.db.dialect.dbapi.__version__.startswith( 601 ".".join(str(x) for x in v) 602 ) 603 604 @testing.requires.psycopg2_compatibility 605 def test_psycopg2_non_standard_err(self): 606 # under pypy the name here is psycopg2cffi 607 psycopg2 = testing.db.dialect.dbapi 608 TransactionRollbackError = __import__( 609 "%s.extensions" % psycopg2.__name__ 610 ).extensions.TransactionRollbackError 611 612 exception = exc.DBAPIError.instance( 613 "some statement", 614 {}, 615 TransactionRollbackError("foo"), 616 psycopg2.Error, 617 ) 618 assert isinstance(exception, exc.OperationalError) 619 620 @testing.requires.no_coverage 621 @testing.requires.psycopg2_compatibility 622 def test_notice_logging(self): 623 log = logging.getLogger("sqlalchemy.dialects.postgresql") 624 buf = logging.handlers.BufferingHandler(100) 625 lev = log.level 626 log.addHandler(buf) 627 log.setLevel(logging.INFO) 628 try: 629 conn = testing.db.connect() 630 trans = conn.begin() 631 try: 632 conn.execute( 633 """ 634CREATE OR REPLACE FUNCTION note(message varchar) RETURNS integer AS $$ 635BEGIN 636 RAISE NOTICE 'notice: %%', message; 637 RETURN NULL; 638END; 639$$ LANGUAGE plpgsql; 640""" 641 ) 642 conn.execute("SELECT note('hi there')") 643 conn.execute("SELECT note('another note')") 644 finally: 645 trans.rollback() 646 finally: 647 log.removeHandler(buf) 648 log.setLevel(lev) 649 msgs = " ".join(b.msg for b in buf.buffer) 650 eq_regex( 651 msgs, 652 "NOTICE: notice: hi there(\nCONTEXT: .*?)? " 653 "NOTICE: notice: another note(\nCONTEXT: .*?)?", 654 ) 655 656 @testing.requires.psycopg2_or_pg8000_compatibility 657 @engines.close_open_connections 658 def test_client_encoding(self): 659 c = testing.db.connect() 660 current_encoding = c.execute("show client_encoding").fetchone()[0] 661 c.close() 662 663 # attempt to use an encoding that's not 664 # already set 665 if current_encoding == "UTF8": 666 test_encoding = "LATIN1" 667 else: 668 test_encoding = "UTF8" 669 670 e = engines.testing_engine(options={"client_encoding": test_encoding}) 671 c = e.connect() 672 new_encoding = c.execute("show client_encoding").fetchone()[0] 673 eq_(new_encoding, test_encoding) 674 675 @testing.requires.psycopg2_or_pg8000_compatibility 676 @engines.close_open_connections 677 def test_autocommit_isolation_level(self): 678 c = testing.db.connect().execution_options( 679 isolation_level="AUTOCOMMIT" 680 ) 681 # If we're really in autocommit mode then we'll get an error saying 682 # that the prepared transaction doesn't exist. Otherwise, we'd 683 # get an error saying that the command can't be run within a 684 # transaction. 685 assert_raises_message( 686 exc.ProgrammingError, 687 'prepared transaction with identifier "gilberte" does not exist', 688 c.execute, 689 "commit prepared 'gilberte'", 690 ) 691 692 @testing.fails_on( 693 "+zxjdbc", 694 "Can't infer the SQL type to use for an instance " 695 "of org.python.core.PyObjectDerived.", 696 ) 697 def test_extract(self): 698 fivedaysago = testing.db.scalar( 699 select([func.now()]) 700 ) - datetime.timedelta(days=5) 701 for field, exp in ( 702 ("year", fivedaysago.year), 703 ("month", fivedaysago.month), 704 ("day", fivedaysago.day), 705 ): 706 r = testing.db.execute( 707 select( 708 [extract(field, func.now() + datetime.timedelta(days=-5))] 709 ) 710 ).scalar() 711 eq_(r, exp) 712 713 @testing.provide_metadata 714 def test_checksfor_sequence(self): 715 meta1 = self.metadata 716 seq = Sequence("fooseq") 717 t = Table("mytable", meta1, Column("col1", Integer, seq)) 718 seq.drop() 719 testing.db.execute("CREATE SEQUENCE fooseq") 720 t.create(checkfirst=True) 721 722 @testing.provide_metadata 723 def test_schema_roundtrips(self): 724 meta = self.metadata 725 users = Table( 726 "users", 727 meta, 728 Column("id", Integer, primary_key=True), 729 Column("name", String(50)), 730 schema="test_schema", 731 ) 732 users.create() 733 users.insert().execute(id=1, name="name1") 734 users.insert().execute(id=2, name="name2") 735 users.insert().execute(id=3, name="name3") 736 users.insert().execute(id=4, name="name4") 737 eq_( 738 users.select().where(users.c.name == "name2").execute().fetchall(), 739 [(2, "name2")], 740 ) 741 eq_( 742 users.select(use_labels=True) 743 .where(users.c.name == "name2") 744 .execute() 745 .fetchall(), 746 [(2, "name2")], 747 ) 748 users.delete().where(users.c.id == 3).execute() 749 eq_( 750 users.select().where(users.c.name == "name3").execute().fetchall(), 751 [], 752 ) 753 users.update().where(users.c.name == "name4").execute(name="newname") 754 eq_( 755 users.select(use_labels=True) 756 .where(users.c.id == 4) 757 .execute() 758 .fetchall(), 759 [(4, "newname")], 760 ) 761 762 def test_quoted_name_bindparam_ok(self): 763 from sqlalchemy.sql.elements import quoted_name 764 765 with testing.db.connect() as conn: 766 eq_( 767 conn.scalar( 768 select( 769 [ 770 cast( 771 literal(quoted_name("some_name", False)), 772 String, 773 ) 774 ] 775 ) 776 ), 777 "some_name", 778 ) 779 780 def test_preexecute_passivedefault(self): 781 """test that when we get a primary key column back from 782 reflecting a table which has a default value on it, we pre- 783 execute that DefaultClause upon insert.""" 784 785 try: 786 meta = MetaData(testing.db) 787 testing.db.execute( 788 """ 789 CREATE TABLE speedy_users 790 ( 791 speedy_user_id SERIAL PRIMARY KEY, 792 793 user_name VARCHAR NOT NULL, 794 user_password VARCHAR NOT NULL 795 ); 796 """ 797 ) 798 t = Table("speedy_users", meta, autoload=True) 799 r = t.insert().execute(user_name="user", user_password="lala") 800 assert r.inserted_primary_key == [1] 801 result = t.select().execute().fetchall() 802 assert result == [(1, "user", "lala")] 803 finally: 804 testing.db.execute("drop table speedy_users") 805 806 @testing.fails_on("+zxjdbc", "psycopg2/pg8000 specific assertion") 807 @testing.requires.psycopg2_or_pg8000_compatibility 808 def test_numeric_raise(self): 809 stmt = text("select cast('hi' as char) as hi").columns(hi=Numeric) 810 assert_raises(exc.InvalidRequestError, testing.db.execute, stmt) 811 812 @testing.only_if( 813 "postgresql >= 8.2", "requires standard_conforming_strings" 814 ) 815 def test_serial_integer(self): 816 class BITD(TypeDecorator): 817 impl = Integer 818 819 def load_dialect_impl(self, dialect): 820 if dialect.name == "postgresql": 821 return BigInteger() 822 else: 823 return Integer() 824 825 for version, type_, expected in [ 826 (None, Integer, "SERIAL"), 827 (None, BigInteger, "BIGSERIAL"), 828 ((9, 1), SmallInteger, "SMALLINT"), 829 ((9, 2), SmallInteger, "SMALLSERIAL"), 830 (None, postgresql.INTEGER, "SERIAL"), 831 (None, postgresql.BIGINT, "BIGSERIAL"), 832 ( 833 None, 834 Integer().with_variant(BigInteger(), "postgresql"), 835 "BIGSERIAL", 836 ), 837 ( 838 None, 839 Integer().with_variant(postgresql.BIGINT, "postgresql"), 840 "BIGSERIAL", 841 ), 842 ( 843 (9, 2), 844 Integer().with_variant(SmallInteger, "postgresql"), 845 "SMALLSERIAL", 846 ), 847 (None, BITD(), "BIGSERIAL"), 848 ]: 849 m = MetaData() 850 851 t = Table("t", m, Column("c", type_, primary_key=True)) 852 853 if version: 854 dialect = postgresql.dialect() 855 dialect._get_server_version_info = mock.Mock( 856 return_value=version 857 ) 858 dialect.initialize(testing.db.connect()) 859 else: 860 dialect = testing.db.dialect 861 862 ddl_compiler = dialect.ddl_compiler(dialect, schema.CreateTable(t)) 863 eq_( 864 ddl_compiler.get_column_specification(t.c.c), 865 "c %s NOT NULL" % expected, 866 ) 867 868 @testing.requires.psycopg2_compatibility 869 def test_initial_transaction_state(self): 870 from psycopg2.extensions import STATUS_IN_TRANSACTION 871 872 engine = engines.testing_engine() 873 with engine.connect() as conn: 874 ne_(conn.connection.status, STATUS_IN_TRANSACTION) 875 876 877class AutocommitTextTest(test_execute.AutocommitTextTest): 878 __only_on__ = "postgresql" 879 880 def test_grant(self): 881 self._test_keyword("GRANT USAGE ON SCHEMA fooschema TO foorole") 882 883 def test_import_foreign_schema(self): 884 self._test_keyword("IMPORT FOREIGN SCHEMA foob") 885 886 def test_refresh_view(self): 887 self._test_keyword("REFRESH MATERIALIZED VIEW fooview") 888 889 def test_revoke(self): 890 self._test_keyword("REVOKE USAGE ON SCHEMA fooschema FROM foorole") 891 892 def test_truncate(self): 893 self._test_keyword("TRUNCATE footable") 894