1import sqlalchemy as tsa 2from sqlalchemy import create_engine 3from sqlalchemy import create_mock_engine 4from sqlalchemy import event 5from sqlalchemy import Integer 6from sqlalchemy import MetaData 7from sqlalchemy import String 8from sqlalchemy import testing 9from sqlalchemy import text 10from sqlalchemy.schema import AddConstraint 11from sqlalchemy.schema import CheckConstraint 12from sqlalchemy.schema import DDL 13from sqlalchemy.schema import DropConstraint 14from sqlalchemy.testing import AssertsCompiledSQL 15from sqlalchemy.testing import engines 16from sqlalchemy.testing import eq_ 17from sqlalchemy.testing import fixtures 18from sqlalchemy.testing import mock 19from sqlalchemy.testing.schema import Column 20from sqlalchemy.testing.schema import Table 21 22 23class DDLEventTest(fixtures.TestBase): 24 def setup_test(self): 25 self.bind = engines.mock_engine() 26 self.metadata = MetaData() 27 self.table = Table("t", self.metadata, Column("id", Integer)) 28 29 def test_table_create_before(self): 30 table, bind = self.table, self.bind 31 canary = mock.Mock() 32 event.listen(table, "before_create", canary.before_create) 33 34 table.create(bind) 35 table.drop(bind) 36 eq_( 37 canary.mock_calls, 38 [ 39 mock.call.before_create( 40 table, 41 self.bind, 42 checkfirst=False, 43 _ddl_runner=mock.ANY, 44 _is_metadata_operation=mock.ANY, 45 ) 46 ], 47 ) 48 49 def test_table_create_after(self): 50 table, bind = self.table, self.bind 51 canary = mock.Mock() 52 event.listen(table, "after_create", canary.after_create) 53 54 table.create(bind) 55 table.drop(bind) 56 eq_( 57 canary.mock_calls, 58 [ 59 mock.call.after_create( 60 table, 61 self.bind, 62 checkfirst=False, 63 _ddl_runner=mock.ANY, 64 _is_metadata_operation=mock.ANY, 65 ) 66 ], 67 ) 68 69 def test_table_create_both(self): 70 table, bind = self.table, self.bind 71 canary = mock.Mock() 72 event.listen(table, "before_create", canary.before_create) 73 event.listen(table, "after_create", canary.after_create) 74 75 table.create(bind) 76 table.drop(bind) 77 eq_( 78 canary.mock_calls, 79 [ 80 mock.call.before_create( 81 table, 82 self.bind, 83 checkfirst=False, 84 _ddl_runner=mock.ANY, 85 _is_metadata_operation=mock.ANY, 86 ), 87 mock.call.after_create( 88 table, 89 self.bind, 90 checkfirst=False, 91 _ddl_runner=mock.ANY, 92 _is_metadata_operation=mock.ANY, 93 ), 94 ], 95 ) 96 97 def test_table_drop_before(self): 98 table, bind = self.table, self.bind 99 canary = mock.Mock() 100 event.listen(table, "before_drop", canary.before_drop) 101 102 table.create(bind) 103 table.drop(bind) 104 eq_( 105 canary.mock_calls, 106 [ 107 mock.call.before_drop( 108 table, 109 self.bind, 110 checkfirst=False, 111 _ddl_runner=mock.ANY, 112 _is_metadata_operation=mock.ANY, 113 ) 114 ], 115 ) 116 117 def test_table_drop_after(self): 118 table, bind = self.table, self.bind 119 canary = mock.Mock() 120 event.listen(table, "after_drop", canary.after_drop) 121 122 table.create(bind) 123 canary.state = "skipped" 124 table.drop(bind) 125 eq_( 126 canary.mock_calls, 127 [ 128 mock.call.after_drop( 129 table, 130 self.bind, 131 checkfirst=False, 132 _ddl_runner=mock.ANY, 133 _is_metadata_operation=mock.ANY, 134 ) 135 ], 136 ) 137 138 def test_table_drop_both(self): 139 table, bind = self.table, self.bind 140 canary = mock.Mock() 141 142 event.listen(table, "before_drop", canary.before_drop) 143 event.listen(table, "after_drop", canary.after_drop) 144 145 table.create(bind) 146 table.drop(bind) 147 eq_( 148 canary.mock_calls, 149 [ 150 mock.call.before_drop( 151 table, 152 self.bind, 153 checkfirst=False, 154 _ddl_runner=mock.ANY, 155 _is_metadata_operation=mock.ANY, 156 ), 157 mock.call.after_drop( 158 table, 159 self.bind, 160 checkfirst=False, 161 _ddl_runner=mock.ANY, 162 _is_metadata_operation=mock.ANY, 163 ), 164 ], 165 ) 166 167 def test_table_all(self): 168 table, bind = self.table, self.bind 169 canary = mock.Mock() 170 171 event.listen(table, "before_create", canary.before_create) 172 event.listen(table, "after_create", canary.after_create) 173 event.listen(table, "before_drop", canary.before_drop) 174 event.listen(table, "after_drop", canary.after_drop) 175 176 table.create(bind) 177 table.drop(bind) 178 eq_( 179 canary.mock_calls, 180 [ 181 mock.call.before_create( 182 table, 183 self.bind, 184 checkfirst=False, 185 _ddl_runner=mock.ANY, 186 _is_metadata_operation=mock.ANY, 187 ), 188 mock.call.after_create( 189 table, 190 self.bind, 191 checkfirst=False, 192 _ddl_runner=mock.ANY, 193 _is_metadata_operation=mock.ANY, 194 ), 195 mock.call.before_drop( 196 table, 197 self.bind, 198 checkfirst=False, 199 _ddl_runner=mock.ANY, 200 _is_metadata_operation=mock.ANY, 201 ), 202 mock.call.after_drop( 203 table, 204 self.bind, 205 checkfirst=False, 206 _ddl_runner=mock.ANY, 207 _is_metadata_operation=mock.ANY, 208 ), 209 ], 210 ) 211 212 def test_metadata_create_before(self): 213 metadata, bind = self.metadata, self.bind 214 canary = mock.Mock() 215 event.listen(metadata, "before_create", canary.before_create) 216 217 metadata.create_all(bind) 218 metadata.drop_all(bind) 219 eq_( 220 canary.mock_calls, 221 [ 222 mock.call.before_create( 223 # checkfirst is False because of the MockConnection 224 # used in the current testing strategy. 225 metadata, 226 self.bind, 227 checkfirst=False, 228 tables=list(metadata.tables.values()), 229 _ddl_runner=mock.ANY, 230 ) 231 ], 232 ) 233 234 def test_metadata_create_after(self): 235 metadata, bind = self.metadata, self.bind 236 canary = mock.Mock() 237 event.listen(metadata, "after_create", canary.after_create) 238 239 metadata.create_all(bind) 240 metadata.drop_all(bind) 241 eq_( 242 canary.mock_calls, 243 [ 244 mock.call.after_create( 245 metadata, 246 self.bind, 247 checkfirst=False, 248 tables=list(metadata.tables.values()), 249 _ddl_runner=mock.ANY, 250 ) 251 ], 252 ) 253 254 def test_metadata_create_both(self): 255 metadata, bind = self.metadata, self.bind 256 canary = mock.Mock() 257 258 event.listen(metadata, "before_create", canary.before_create) 259 event.listen(metadata, "after_create", canary.after_create) 260 261 metadata.create_all(bind) 262 metadata.drop_all(bind) 263 eq_( 264 canary.mock_calls, 265 [ 266 mock.call.before_create( 267 metadata, 268 self.bind, 269 checkfirst=False, 270 tables=list(metadata.tables.values()), 271 _ddl_runner=mock.ANY, 272 ), 273 mock.call.after_create( 274 metadata, 275 self.bind, 276 checkfirst=False, 277 tables=list(metadata.tables.values()), 278 _ddl_runner=mock.ANY, 279 ), 280 ], 281 ) 282 283 def test_metadata_drop_before(self): 284 metadata, bind = self.metadata, self.bind 285 canary = mock.Mock() 286 event.listen(metadata, "before_drop", canary.before_drop) 287 288 metadata.create_all(bind) 289 metadata.drop_all(bind) 290 eq_( 291 canary.mock_calls, 292 [ 293 mock.call.before_drop( 294 metadata, 295 self.bind, 296 checkfirst=False, 297 tables=list(metadata.tables.values()), 298 _ddl_runner=mock.ANY, 299 ) 300 ], 301 ) 302 303 def test_metadata_drop_after(self): 304 metadata, bind = self.metadata, self.bind 305 canary = mock.Mock() 306 event.listen(metadata, "after_drop", canary.after_drop) 307 308 metadata.create_all(bind) 309 metadata.drop_all(bind) 310 eq_( 311 canary.mock_calls, 312 [ 313 mock.call.after_drop( 314 metadata, 315 self.bind, 316 checkfirst=False, 317 tables=list(metadata.tables.values()), 318 _ddl_runner=mock.ANY, 319 ) 320 ], 321 ) 322 323 def test_metadata_drop_both(self): 324 metadata, bind = self.metadata, self.bind 325 canary = mock.Mock() 326 327 event.listen(metadata, "before_drop", canary.before_drop) 328 event.listen(metadata, "after_drop", canary.after_drop) 329 330 metadata.create_all(bind) 331 metadata.drop_all(bind) 332 eq_( 333 canary.mock_calls, 334 [ 335 mock.call.before_drop( 336 metadata, 337 self.bind, 338 checkfirst=False, 339 tables=list(metadata.tables.values()), 340 _ddl_runner=mock.ANY, 341 ), 342 mock.call.after_drop( 343 metadata, 344 self.bind, 345 checkfirst=False, 346 tables=list(metadata.tables.values()), 347 _ddl_runner=mock.ANY, 348 ), 349 ], 350 ) 351 352 def test_metadata_table_isolation(self): 353 metadata, table = self.metadata, self.table 354 table_canary = mock.Mock() 355 metadata_canary = mock.Mock() 356 357 event.listen(table, "before_create", table_canary.before_create) 358 359 event.listen(metadata, "before_create", metadata_canary.before_create) 360 self.table.create(self.bind) 361 eq_( 362 table_canary.mock_calls, 363 [ 364 mock.call.before_create( 365 table, 366 self.bind, 367 checkfirst=False, 368 _ddl_runner=mock.ANY, 369 _is_metadata_operation=mock.ANY, 370 ) 371 ], 372 ) 373 eq_(metadata_canary.mock_calls, []) 374 375 376class DDLExecutionTest(fixtures.TestBase): 377 def setup_test(self): 378 self.engine = engines.mock_engine() 379 self.metadata = MetaData() 380 self.users = Table( 381 "users", 382 self.metadata, 383 Column("user_id", Integer, primary_key=True), 384 Column("user_name", String(40)), 385 ) 386 387 def test_table_standalone(self): 388 users, engine = self.users, self.engine 389 event.listen(users, "before_create", DDL("mxyzptlk")) 390 event.listen(users, "after_create", DDL("klptzyxm")) 391 event.listen(users, "before_drop", DDL("xyzzy")) 392 event.listen(users, "after_drop", DDL("fnord")) 393 394 users.create(self.engine) 395 strings = [str(x) for x in engine.mock] 396 assert "mxyzptlk" in strings 397 assert "klptzyxm" in strings 398 assert "xyzzy" not in strings 399 assert "fnord" not in strings 400 del engine.mock[:] 401 users.drop(self.engine) 402 strings = [str(x) for x in engine.mock] 403 assert "mxyzptlk" not in strings 404 assert "klptzyxm" not in strings 405 assert "xyzzy" in strings 406 assert "fnord" in strings 407 408 def test_table_by_metadata(self): 409 metadata, users, engine = self.metadata, self.users, self.engine 410 411 event.listen(users, "before_create", DDL("mxyzptlk")) 412 event.listen(users, "after_create", DDL("klptzyxm")) 413 event.listen(users, "before_drop", DDL("xyzzy")) 414 event.listen(users, "after_drop", DDL("fnord")) 415 416 metadata.create_all(self.engine) 417 strings = [str(x) for x in engine.mock] 418 assert "mxyzptlk" in strings 419 assert "klptzyxm" in strings 420 assert "xyzzy" not in strings 421 assert "fnord" not in strings 422 del engine.mock[:] 423 metadata.drop_all(self.engine) 424 strings = [str(x) for x in engine.mock] 425 assert "mxyzptlk" not in strings 426 assert "klptzyxm" not in strings 427 assert "xyzzy" in strings 428 assert "fnord" in strings 429 430 def test_metadata(self): 431 metadata, engine = self.metadata, self.engine 432 433 event.listen(metadata, "before_create", DDL("mxyzptlk")) 434 event.listen(metadata, "after_create", DDL("klptzyxm")) 435 event.listen(metadata, "before_drop", DDL("xyzzy")) 436 event.listen(metadata, "after_drop", DDL("fnord")) 437 438 metadata.create_all(self.engine) 439 strings = [str(x) for x in engine.mock] 440 assert "mxyzptlk" in strings 441 assert "klptzyxm" in strings 442 assert "xyzzy" not in strings 443 assert "fnord" not in strings 444 del engine.mock[:] 445 metadata.drop_all(self.engine) 446 strings = [str(x) for x in engine.mock] 447 assert "mxyzptlk" not in strings 448 assert "klptzyxm" not in strings 449 assert "xyzzy" in strings 450 assert "fnord" in strings 451 452 def test_conditional_constraint(self): 453 metadata, users = self.metadata, self.users 454 nonpg_mock = engines.mock_engine(dialect_name="sqlite") 455 pg_mock = engines.mock_engine(dialect_name="postgresql") 456 constraint = CheckConstraint( 457 "a < b", name="my_test_constraint", table=users 458 ) 459 460 # by placing the constraint in an Add/Drop construct, the 461 # 'inline_ddl' flag is set to False 462 463 event.listen( 464 users, 465 "after_create", 466 AddConstraint(constraint).execute_if(dialect="postgresql"), 467 ) 468 469 event.listen( 470 users, 471 "before_drop", 472 DropConstraint(constraint).execute_if(dialect="postgresql"), 473 ) 474 475 metadata.create_all(bind=nonpg_mock) 476 strings = " ".join(str(x) for x in nonpg_mock.mock) 477 assert "my_test_constraint" not in strings 478 metadata.drop_all(bind=nonpg_mock) 479 strings = " ".join(str(x) for x in nonpg_mock.mock) 480 assert "my_test_constraint" not in strings 481 metadata.create_all(bind=pg_mock) 482 strings = " ".join(str(x) for x in pg_mock.mock) 483 assert "my_test_constraint" in strings 484 metadata.drop_all(bind=pg_mock) 485 strings = " ".join(str(x) for x in pg_mock.mock) 486 assert "my_test_constraint" in strings 487 488 @testing.requires.sqlite 489 def test_ddl_execute(self): 490 engine = create_engine("sqlite:///") 491 cx = engine.connect() 492 cx.begin() 493 ddl = DDL("SELECT 1") 494 495 r = cx.execute(ddl) 496 eq_(list(r), [(1,)]) 497 498 def test_platform_escape(self): 499 """test the escaping of % characters in the DDL construct.""" 500 501 default_from = testing.db.dialect.statement_compiler( 502 testing.db.dialect, None 503 ).default_from() 504 505 # We're abusing the DDL() 506 # construct here by pushing a SELECT through it 507 # so that we can verify the round trip. 508 # the DDL() will trigger autocommit, which prohibits 509 # some DBAPIs from returning results (pyodbc), so we 510 # run in an explicit transaction. 511 with testing.db.begin() as conn: 512 eq_( 513 conn.execute( 514 text("select 'foo%something'" + default_from) 515 ).scalar(), 516 "foo%something", 517 ) 518 519 eq_( 520 conn.execute( 521 DDL("select 'foo%%something'" + default_from) 522 ).scalar(), 523 "foo%something", 524 ) 525 526 527class DDLTransactionTest(fixtures.TestBase): 528 """test DDL transactional behavior as of SQLAlchemy 1.4.""" 529 530 @testing.fixture 531 def metadata_fixture(self): 532 m = MetaData() 533 Table("t1", m, Column("q", Integer)) 534 Table("t2", m, Column("q", Integer)) 535 536 try: 537 yield m 538 finally: 539 m.drop_all(testing.db) 540 541 def _listening_engine_fixture(self, future=False): 542 eng = engines.testing_engine(future=future) 543 544 m1 = mock.Mock() 545 546 event.listen(eng, "begin", m1.begin) 547 event.listen(eng, "commit", m1.commit) 548 event.listen(eng, "rollback", m1.rollback) 549 550 @event.listens_for(eng, "before_cursor_execute") 551 def before_cursor_execute( 552 conn, cursor, statement, parameters, context, executemany 553 ): 554 if "CREATE TABLE" in statement: 555 m1.cursor_execute("CREATE TABLE ...") 556 557 eng.connect().close() 558 559 return eng, m1 560 561 @testing.fixture 562 def listening_engine_fixture(self): 563 return self._listening_engine_fixture(future=False) 564 565 @testing.fixture 566 def future_listening_engine_fixture(self): 567 return self._listening_engine_fixture(future=True) 568 569 def test_ddl_legacy_engine( 570 self, metadata_fixture, listening_engine_fixture 571 ): 572 eng, m1 = listening_engine_fixture 573 574 metadata_fixture.create_all(eng) 575 576 eq_( 577 m1.mock_calls, 578 [ 579 mock.call.begin(mock.ANY), 580 mock.call.cursor_execute("CREATE TABLE ..."), 581 mock.call.cursor_execute("CREATE TABLE ..."), 582 mock.call.commit(mock.ANY), 583 ], 584 ) 585 586 def test_ddl_future_engine( 587 self, metadata_fixture, future_listening_engine_fixture 588 ): 589 eng, m1 = future_listening_engine_fixture 590 591 metadata_fixture.create_all(eng) 592 593 eq_( 594 m1.mock_calls, 595 [ 596 mock.call.begin(mock.ANY), 597 mock.call.cursor_execute("CREATE TABLE ..."), 598 mock.call.cursor_execute("CREATE TABLE ..."), 599 mock.call.commit(mock.ANY), 600 ], 601 ) 602 603 def test_ddl_legacy_connection_no_transaction( 604 self, metadata_fixture, listening_engine_fixture 605 ): 606 eng, m1 = listening_engine_fixture 607 608 with eng.connect() as conn: 609 with testing.expect_deprecated( 610 "The current statement is being autocommitted using " 611 "implicit autocommit" 612 ): 613 metadata_fixture.create_all(conn) 614 615 eq_( 616 m1.mock_calls, 617 [ 618 mock.call.cursor_execute("CREATE TABLE ..."), 619 mock.call.commit(mock.ANY), 620 mock.call.cursor_execute("CREATE TABLE ..."), 621 mock.call.commit(mock.ANY), 622 ], 623 ) 624 625 def test_ddl_legacy_connection_transaction( 626 self, metadata_fixture, listening_engine_fixture 627 ): 628 eng, m1 = listening_engine_fixture 629 630 with eng.connect() as conn: 631 with conn.begin(): 632 metadata_fixture.create_all(conn) 633 634 eq_( 635 m1.mock_calls, 636 [ 637 mock.call.begin(mock.ANY), 638 mock.call.cursor_execute("CREATE TABLE ..."), 639 mock.call.cursor_execute("CREATE TABLE ..."), 640 mock.call.commit(mock.ANY), 641 ], 642 ) 643 644 def test_ddl_future_connection_autobegin_transaction( 645 self, metadata_fixture, future_listening_engine_fixture 646 ): 647 eng, m1 = future_listening_engine_fixture 648 649 with eng.connect() as conn: 650 metadata_fixture.create_all(conn) 651 652 conn.commit() 653 654 eq_( 655 m1.mock_calls, 656 [ 657 mock.call.begin(mock.ANY), 658 mock.call.cursor_execute("CREATE TABLE ..."), 659 mock.call.cursor_execute("CREATE TABLE ..."), 660 mock.call.commit(mock.ANY), 661 ], 662 ) 663 664 def test_ddl_future_connection_explicit_begin_transaction( 665 self, metadata_fixture, future_listening_engine_fixture 666 ): 667 eng, m1 = future_listening_engine_fixture 668 669 with eng.connect() as conn: 670 with conn.begin(): 671 metadata_fixture.create_all(conn) 672 673 eq_( 674 m1.mock_calls, 675 [ 676 mock.call.begin(mock.ANY), 677 mock.call.cursor_execute("CREATE TABLE ..."), 678 mock.call.cursor_execute("CREATE TABLE ..."), 679 mock.call.commit(mock.ANY), 680 ], 681 ) 682 683 684class DDLTest(fixtures.TestBase, AssertsCompiledSQL): 685 def mock_engine(self): 686 def executor(*a, **kw): 687 return None 688 689 engine = create_mock_engine(testing.db.name + "://", executor) 690 # fmt: off 691 engine.dialect.identifier_preparer = \ 692 tsa.sql.compiler.IdentifierPreparer( 693 engine.dialect 694 ) 695 # fmt: on 696 return engine 697 698 def test_tokens(self): 699 m = MetaData() 700 sane_alone = Table("t", m, Column("id", Integer)) 701 sane_schema = Table("t", m, Column("id", Integer), schema="s") 702 insane_alone = Table("t t", m, Column("id", Integer)) 703 insane_schema = Table("t t", m, Column("id", Integer), schema="s s") 704 ddl = DDL("%(schema)s-%(table)s-%(fullname)s") 705 dialect = self.mock_engine().dialect 706 self.assert_compile(ddl.against(sane_alone), "-t-t", dialect=dialect) 707 self.assert_compile( 708 ddl.against(sane_schema), "s-t-s.t", dialect=dialect 709 ) 710 self.assert_compile( 711 ddl.against(insane_alone), '-"t t"-"t t"', dialect=dialect 712 ) 713 self.assert_compile( 714 ddl.against(insane_schema), 715 '"s s"-"t t"-"s s"."t t"', 716 dialect=dialect, 717 ) 718 719 # overrides are used piece-meal and verbatim. 720 721 ddl = DDL( 722 "%(schema)s-%(table)s-%(fullname)s-%(bonus)s", 723 context={"schema": "S S", "table": "T T", "bonus": "b"}, 724 ) 725 self.assert_compile( 726 ddl.against(sane_alone), "S S-T T-t-b", dialect=dialect 727 ) 728 self.assert_compile( 729 ddl.against(sane_schema), "S S-T T-s.t-b", dialect=dialect 730 ) 731 self.assert_compile( 732 ddl.against(insane_alone), 'S S-T T-"t t"-b', dialect=dialect 733 ) 734 self.assert_compile( 735 ddl.against(insane_schema), 736 'S S-T T-"s s"."t t"-b', 737 dialect=dialect, 738 ) 739 740 def test_filter(self): 741 cx = self.mock_engine() 742 743 tbl = Table("t", MetaData(), Column("id", Integer)) 744 target = cx.name 745 746 assert DDL("")._should_execute(tbl, cx) 747 assert DDL("").execute_if(dialect=target)._should_execute(tbl, cx) 748 assert not DDL("").execute_if(dialect="bogus")._should_execute(tbl, cx) 749 assert ( 750 DDL("") 751 .execute_if(callable_=lambda d, y, z, **kw: True) 752 ._should_execute(tbl, cx) 753 ) 754 assert ( 755 DDL("") 756 .execute_if( 757 callable_=lambda d, y, z, **kw: z.engine.name != "bogus" 758 ) 759 ._should_execute(tbl, cx) 760 ) 761