1from contextlib import contextmanager 2import re 3 4from sqlalchemy import Boolean 5from sqlalchemy import CheckConstraint 6from sqlalchemy import Column 7from sqlalchemy import DateTime 8from sqlalchemy import Enum 9from sqlalchemy import ForeignKey 10from sqlalchemy import ForeignKeyConstraint 11from sqlalchemy import func 12from sqlalchemy import Index 13from sqlalchemy import inspect 14from sqlalchemy import Integer 15from sqlalchemy import JSON 16from sqlalchemy import MetaData 17from sqlalchemy import PrimaryKeyConstraint 18from sqlalchemy import String 19from sqlalchemy import Table 20from sqlalchemy import Text 21from sqlalchemy import UniqueConstraint 22from sqlalchemy.dialects import sqlite as sqlite_dialect 23from sqlalchemy.schema import CreateIndex 24from sqlalchemy.schema import CreateTable 25from sqlalchemy.sql import column 26from sqlalchemy.sql import text 27 28from alembic import testing 29from alembic.ddl import sqlite 30from alembic.operations import Operations 31from alembic.operations.batch import ApplyBatchImpl 32from alembic.runtime.migration import MigrationContext 33from alembic.testing import assert_raises_message 34from alembic.testing import config 35from alembic.testing import eq_ 36from alembic.testing import exclusions 37from alembic.testing import is_ 38from alembic.testing import mock 39from alembic.testing import TestBase 40from alembic.testing.fixtures import op_fixture 41from alembic.util import exc as alembic_exc 42from alembic.util.sqla_compat import _safe_commit_connection_transaction 43from alembic.util.sqla_compat import _select 44from alembic.util.sqla_compat import has_computed 45from alembic.util.sqla_compat import has_identity 46from alembic.util.sqla_compat import sqla_14 47 48if has_computed: 49 from alembic.util.sqla_compat import Computed 50 51if has_identity: 52 from alembic.util.sqla_compat import Identity 53 54 55class BatchApplyTest(TestBase): 56 def setUp(self): 57 self.op = Operations(mock.Mock(opts={})) 58 self.impl = sqlite.SQLiteImpl( 59 sqlite_dialect.dialect(), None, False, False, None, {} 60 ) 61 62 def _simple_fixture(self, table_args=(), table_kwargs={}, **kw): 63 m = MetaData() 64 t = Table( 65 "tname", 66 m, 67 Column("id", Integer, primary_key=True), 68 Column("x", String(10)), 69 Column("y", Integer), 70 ) 71 return ApplyBatchImpl( 72 self.impl, t, table_args, table_kwargs, False, **kw 73 ) 74 75 def _uq_fixture(self, table_args=(), table_kwargs={}): 76 m = MetaData() 77 t = Table( 78 "tname", 79 m, 80 Column("id", Integer, primary_key=True), 81 Column("x", String()), 82 Column("y", Integer), 83 UniqueConstraint("y", name="uq1"), 84 ) 85 return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False) 86 87 def _named_ck_table_fixture(self, table_args=(), table_kwargs={}): 88 m = MetaData() 89 t = Table( 90 "tname", 91 m, 92 Column("id", Integer, primary_key=True), 93 Column("x", String()), 94 Column("y", Integer), 95 CheckConstraint("y > 5", name="ck1"), 96 ) 97 return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False) 98 99 def _named_ck_col_fixture(self, table_args=(), table_kwargs={}): 100 m = MetaData() 101 t = Table( 102 "tname", 103 m, 104 Column("id", Integer, primary_key=True), 105 Column("x", String()), 106 Column("y", Integer, CheckConstraint("y > 5", name="ck1")), 107 ) 108 return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False) 109 110 def _ix_fixture(self, table_args=(), table_kwargs={}): 111 m = MetaData() 112 t = Table( 113 "tname", 114 m, 115 Column("id", Integer, primary_key=True), 116 Column("x", String()), 117 Column("y", Integer), 118 Index("ix1", "y"), 119 ) 120 return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False) 121 122 def _pk_fixture(self): 123 m = MetaData() 124 t = Table( 125 "tname", 126 m, 127 Column("id", Integer), 128 Column("x", String()), 129 Column("y", Integer), 130 PrimaryKeyConstraint("id", name="mypk"), 131 ) 132 return ApplyBatchImpl(self.impl, t, (), {}, False) 133 134 def _literal_ck_fixture( 135 self, copy_from=None, table_args=(), table_kwargs={} 136 ): 137 m = MetaData() 138 if copy_from is not None: 139 t = copy_from 140 else: 141 t = Table( 142 "tname", 143 m, 144 Column("id", Integer, primary_key=True), 145 Column("email", String()), 146 CheckConstraint("email LIKE '%@%'"), 147 ) 148 return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False) 149 150 def _sql_ck_fixture(self, table_args=(), table_kwargs={}): 151 m = MetaData() 152 t = Table( 153 "tname", 154 m, 155 Column("id", Integer, primary_key=True), 156 Column("email", String()), 157 ) 158 t.append_constraint(CheckConstraint(t.c.email.like("%@%"))) 159 return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False) 160 161 def _fk_fixture(self, table_args=(), table_kwargs={}): 162 m = MetaData() 163 t = Table( 164 "tname", 165 m, 166 Column("id", Integer, primary_key=True), 167 Column("email", String()), 168 Column("user_id", Integer, ForeignKey("user.id")), 169 ) 170 return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False) 171 172 def _multi_fk_fixture(self, table_args=(), table_kwargs={}, schema=None): 173 m = MetaData() 174 if schema: 175 schemaarg = "%s." % schema 176 else: 177 schemaarg = "" 178 179 t = Table( 180 "tname", 181 m, 182 Column("id", Integer, primary_key=True), 183 Column("email", String()), 184 Column("user_id_1", Integer, ForeignKey("%suser.id" % schemaarg)), 185 Column("user_id_2", Integer, ForeignKey("%suser.id" % schemaarg)), 186 Column("user_id_3", Integer), 187 Column("user_id_version", Integer), 188 ForeignKeyConstraint( 189 ["user_id_3", "user_id_version"], 190 ["%suser.id" % schemaarg, "%suser.id_version" % schemaarg], 191 ), 192 schema=schema, 193 ) 194 return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False) 195 196 def _named_fk_fixture(self, table_args=(), table_kwargs={}): 197 m = MetaData() 198 t = Table( 199 "tname", 200 m, 201 Column("id", Integer, primary_key=True), 202 Column("email", String()), 203 Column("user_id", Integer, ForeignKey("user.id", name="ufk")), 204 ) 205 return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False) 206 207 def _selfref_fk_fixture(self, table_args=(), table_kwargs={}): 208 m = MetaData() 209 t = Table( 210 "tname", 211 m, 212 Column("id", Integer, primary_key=True), 213 Column("parent_id", Integer, ForeignKey("tname.id")), 214 Column("data", String), 215 ) 216 return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False) 217 218 def _boolean_fixture(self, table_args=(), table_kwargs={}): 219 m = MetaData() 220 t = Table( 221 "tname", 222 m, 223 Column("id", Integer, primary_key=True), 224 Column("flag", Boolean(create_constraint=True)), 225 ) 226 return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False) 227 228 def _boolean_no_ck_fixture(self, table_args=(), table_kwargs={}): 229 m = MetaData() 230 t = Table( 231 "tname", 232 m, 233 Column("id", Integer, primary_key=True), 234 Column("flag", Boolean(create_constraint=False)), 235 ) 236 return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False) 237 238 def _enum_fixture(self, table_args=(), table_kwargs={}): 239 m = MetaData() 240 t = Table( 241 "tname", 242 m, 243 Column("id", Integer, primary_key=True), 244 Column("thing", Enum("a", "b", "c", create_constraint=True)), 245 ) 246 return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False) 247 248 def _server_default_fixture(self, table_args=(), table_kwargs={}): 249 m = MetaData() 250 t = Table( 251 "tname", 252 m, 253 Column("id", Integer, primary_key=True), 254 Column("thing", String(), server_default=""), 255 ) 256 return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False) 257 258 def _assert_impl( 259 self, 260 impl, 261 colnames=None, 262 ddl_contains=None, 263 ddl_not_contains=None, 264 dialect="default", 265 schema=None, 266 ): 267 context = op_fixture(dialect=dialect) 268 269 impl._create(context.impl) 270 271 if colnames is None: 272 colnames = ["id", "x", "y"] 273 eq_(impl.new_table.c.keys(), colnames) 274 275 pk_cols = [col for col in impl.new_table.c if col.primary_key] 276 eq_(list(impl.new_table.primary_key), pk_cols) 277 278 create_stmt = str( 279 CreateTable(impl.new_table).compile(dialect=context.dialect) 280 ) 281 create_stmt = re.sub(r"[\n\t]", "", create_stmt) 282 283 idx_stmt = "" 284 for idx in impl.indexes.values(): 285 idx_stmt += str(CreateIndex(idx).compile(dialect=context.dialect)) 286 for idx in impl.new_indexes.values(): 287 impl.new_table.name = impl.table.name 288 idx_stmt += str(CreateIndex(idx).compile(dialect=context.dialect)) 289 impl.new_table.name = ApplyBatchImpl._calc_temp_name( 290 impl.table.name 291 ) 292 idx_stmt = re.sub(r"[\n\t]", "", idx_stmt) 293 294 if ddl_contains: 295 assert ddl_contains in create_stmt + idx_stmt 296 if ddl_not_contains: 297 assert ddl_not_contains not in create_stmt + idx_stmt 298 299 expected = [create_stmt] 300 301 if schema: 302 args = {"schema": "%s." % schema} 303 else: 304 args = {"schema": ""} 305 306 args["temp_name"] = impl.new_table.name 307 308 args["colnames"] = ", ".join( 309 [ 310 impl.new_table.c[name].name 311 for name in colnames 312 if name in impl.table.c 313 ] 314 ) 315 316 args["tname_colnames"] = ", ".join( 317 "CAST(%(schema)stname.%(name)s AS %(type)s) AS %(cast_label)s" 318 % { 319 "schema": args["schema"], 320 "name": name, 321 "type": impl.new_table.c[name].type, 322 "cast_label": name if sqla_14 else "anon_1", 323 } 324 if ( 325 impl.new_table.c[name].type._type_affinity 326 is not impl.table.c[name].type._type_affinity 327 ) 328 else "%(schema)stname.%(name)s" 329 % {"schema": args["schema"], "name": name} 330 for name in colnames 331 if name in impl.table.c 332 ) 333 334 expected.extend( 335 [ 336 "INSERT INTO %(schema)s%(temp_name)s (%(colnames)s) " 337 "SELECT %(tname_colnames)s FROM %(schema)stname" % args, 338 "DROP TABLE %(schema)stname" % args, 339 "ALTER TABLE %(schema)s%(temp_name)s " 340 "RENAME TO %(schema)stname" % args, 341 ] 342 ) 343 if idx_stmt: 344 expected.append(idx_stmt) 345 context.assert_(*expected) 346 return impl.new_table 347 348 def test_change_type(self): 349 impl = self._simple_fixture() 350 impl.alter_column("tname", "x", type_=String) 351 new_table = self._assert_impl(impl) 352 assert new_table.c.x.type._type_affinity is String 353 354 def test_rename_col(self): 355 impl = self._simple_fixture() 356 impl.alter_column("tname", "x", name="q") 357 new_table = self._assert_impl(impl) 358 eq_(new_table.c.x.name, "q") 359 360 def test_alter_column_comment(self): 361 impl = self._simple_fixture() 362 impl.alter_column("tname", "x", comment="some comment") 363 new_table = self._assert_impl(impl) 364 eq_(new_table.c.x.comment, "some comment") 365 366 def test_add_column_comment(self): 367 impl = self._simple_fixture() 368 impl.add_column("tname", Column("q", Integer, comment="some comment")) 369 new_table = self._assert_impl(impl, colnames=["id", "x", "y", "q"]) 370 eq_(new_table.c.q.comment, "some comment") 371 372 def test_rename_col_boolean(self): 373 impl = self._boolean_fixture() 374 impl.alter_column("tname", "flag", name="bflag") 375 new_table = self._assert_impl( 376 impl, 377 ddl_contains="CHECK (bflag IN (0, 1)", 378 colnames=["id", "flag"], 379 ) 380 eq_(new_table.c.flag.name, "bflag") 381 eq_( 382 len( 383 [ 384 const 385 for const in new_table.constraints 386 if isinstance(const, CheckConstraint) 387 ] 388 ), 389 1, 390 ) 391 392 def test_change_type_schematype_to_non(self): 393 impl = self._boolean_fixture() 394 impl.alter_column("tname", "flag", type_=Integer) 395 new_table = self._assert_impl( 396 impl, colnames=["id", "flag"], ddl_not_contains="CHECK" 397 ) 398 assert new_table.c.flag.type._type_affinity is Integer 399 400 # NOTE: we can't do test_change_type_non_to_schematype 401 # at this level because the "add_constraint" part of this 402 # comes from toimpl.py, which we aren't testing here 403 404 def test_rename_col_boolean_no_ck(self): 405 impl = self._boolean_no_ck_fixture() 406 impl.alter_column("tname", "flag", name="bflag") 407 new_table = self._assert_impl( 408 impl, ddl_not_contains="CHECK", colnames=["id", "flag"] 409 ) 410 eq_(new_table.c.flag.name, "bflag") 411 eq_( 412 len( 413 [ 414 const 415 for const in new_table.constraints 416 if isinstance(const, CheckConstraint) 417 ] 418 ), 419 0, 420 ) 421 422 def test_rename_col_enum(self): 423 impl = self._enum_fixture() 424 impl.alter_column("tname", "thing", name="thang") 425 new_table = self._assert_impl( 426 impl, 427 ddl_contains="CHECK (thang IN ('a', 'b', 'c')", 428 colnames=["id", "thing"], 429 ) 430 eq_(new_table.c.thing.name, "thang") 431 eq_( 432 len( 433 [ 434 const 435 for const in new_table.constraints 436 if isinstance(const, CheckConstraint) 437 ] 438 ), 439 1, 440 ) 441 442 def test_rename_col_literal_ck(self): 443 impl = self._literal_ck_fixture() 444 impl.alter_column("tname", "email", name="emol") 445 new_table = self._assert_impl( 446 # note this is wrong, we don't dig into the SQL 447 impl, 448 ddl_contains="CHECK (email LIKE '%@%')", 449 colnames=["id", "email"], 450 ) 451 eq_( 452 len( 453 [ 454 c 455 for c in new_table.constraints 456 if isinstance(c, CheckConstraint) 457 ] 458 ), 459 1, 460 ) 461 462 eq_(new_table.c.email.name, "emol") 463 464 def test_rename_col_literal_ck_workaround(self): 465 impl = self._literal_ck_fixture( 466 copy_from=Table( 467 "tname", 468 MetaData(), 469 Column("id", Integer, primary_key=True), 470 Column("email", String), 471 ), 472 table_args=[CheckConstraint("emol LIKE '%@%'")], 473 ) 474 475 impl.alter_column("tname", "email", name="emol") 476 new_table = self._assert_impl( 477 impl, 478 ddl_contains="CHECK (emol LIKE '%@%')", 479 colnames=["id", "email"], 480 ) 481 eq_( 482 len( 483 [ 484 c 485 for c in new_table.constraints 486 if isinstance(c, CheckConstraint) 487 ] 488 ), 489 1, 490 ) 491 eq_(new_table.c.email.name, "emol") 492 493 def test_rename_col_sql_ck(self): 494 impl = self._sql_ck_fixture() 495 496 impl.alter_column("tname", "email", name="emol") 497 new_table = self._assert_impl( 498 impl, 499 ddl_contains="CHECK (emol LIKE '%@%')", 500 colnames=["id", "email"], 501 ) 502 eq_( 503 len( 504 [ 505 c 506 for c in new_table.constraints 507 if isinstance(c, CheckConstraint) 508 ] 509 ), 510 1, 511 ) 512 513 eq_(new_table.c.email.name, "emol") 514 515 def test_add_col(self): 516 impl = self._simple_fixture() 517 col = Column("g", Integer) 518 # operations.add_column produces a table 519 t = self.op.schema_obj.table("tname", col) # noqa 520 impl.add_column("tname", col) 521 new_table = self._assert_impl(impl, colnames=["id", "x", "y", "g"]) 522 eq_(new_table.c.g.name, "g") 523 524 def test_partial_reordering(self): 525 impl = self._simple_fixture(partial_reordering=[("x", "id", "y")]) 526 new_table = self._assert_impl(impl, colnames=["x", "id", "y"]) 527 eq_(new_table.c.x.name, "x") 528 529 def test_add_col_partial_reordering(self): 530 impl = self._simple_fixture(partial_reordering=[("id", "x", "g", "y")]) 531 col = Column("g", Integer) 532 # operations.add_column produces a table 533 t = self.op.schema_obj.table("tname", col) # noqa 534 impl.add_column("tname", col) 535 new_table = self._assert_impl(impl, colnames=["id", "x", "g", "y"]) 536 eq_(new_table.c.g.name, "g") 537 538 def test_add_col_insert_before(self): 539 impl = self._simple_fixture() 540 col = Column("g", Integer) 541 # operations.add_column produces a table 542 t = self.op.schema_obj.table("tname", col) # noqa 543 impl.add_column("tname", col, insert_before="x") 544 new_table = self._assert_impl(impl, colnames=["id", "g", "x", "y"]) 545 eq_(new_table.c.g.name, "g") 546 547 def test_add_col_insert_before_beginning(self): 548 impl = self._simple_fixture() 549 impl.add_column("tname", Column("g", Integer), insert_before="id") 550 new_table = self._assert_impl(impl, colnames=["g", "id", "x", "y"]) 551 eq_(new_table.c.g.name, "g") 552 553 def test_add_col_insert_before_middle(self): 554 impl = self._simple_fixture() 555 impl.add_column("tname", Column("g", Integer), insert_before="y") 556 new_table = self._assert_impl(impl, colnames=["id", "x", "g", "y"]) 557 eq_(new_table.c.g.name, "g") 558 559 def test_add_col_insert_after_middle(self): 560 impl = self._simple_fixture() 561 impl.add_column("tname", Column("g", Integer), insert_after="id") 562 new_table = self._assert_impl(impl, colnames=["id", "g", "x", "y"]) 563 eq_(new_table.c.g.name, "g") 564 565 def test_add_col_insert_after_penultimate(self): 566 impl = self._simple_fixture() 567 impl.add_column("tname", Column("g", Integer), insert_after="x") 568 self._assert_impl(impl, colnames=["id", "x", "g", "y"]) 569 570 def test_add_col_insert_after_end(self): 571 impl = self._simple_fixture() 572 impl.add_column("tname", Column("g", Integer), insert_after="y") 573 new_table = self._assert_impl(impl, colnames=["id", "x", "y", "g"]) 574 eq_(new_table.c.g.name, "g") 575 576 def test_add_col_insert_after_plus_no_order(self): 577 impl = self._simple_fixture() 578 # operations.add_column produces a table 579 impl.add_column("tname", Column("g", Integer), insert_after="id") 580 impl.add_column("tname", Column("q", Integer)) 581 new_table = self._assert_impl( 582 impl, colnames=["id", "g", "x", "y", "q"] 583 ) 584 eq_(new_table.c.g.name, "g") 585 586 def test_add_col_no_order_plus_insert_after(self): 587 impl = self._simple_fixture() 588 col = Column("g", Integer) 589 # operations.add_column produces a table 590 t = self.op.schema_obj.table("tname", col) # noqa 591 impl.add_column("tname", Column("q", Integer)) 592 impl.add_column("tname", Column("g", Integer), insert_after="id") 593 new_table = self._assert_impl( 594 impl, colnames=["id", "g", "x", "y", "q"] 595 ) 596 eq_(new_table.c.g.name, "g") 597 598 def test_add_col_insert_after_another_insert(self): 599 impl = self._simple_fixture() 600 impl.add_column("tname", Column("g", Integer), insert_after="id") 601 impl.add_column("tname", Column("q", Integer), insert_after="g") 602 new_table = self._assert_impl( 603 impl, colnames=["id", "g", "q", "x", "y"] 604 ) 605 eq_(new_table.c.g.name, "g") 606 607 def test_add_col_insert_before_another_insert(self): 608 impl = self._simple_fixture() 609 impl.add_column("tname", Column("g", Integer), insert_after="id") 610 impl.add_column("tname", Column("q", Integer), insert_before="g") 611 new_table = self._assert_impl( 612 impl, colnames=["id", "q", "g", "x", "y"] 613 ) 614 eq_(new_table.c.g.name, "g") 615 616 def test_add_server_default(self): 617 impl = self._simple_fixture() 618 impl.alter_column("tname", "y", server_default="10") 619 new_table = self._assert_impl(impl, ddl_contains="DEFAULT '10'") 620 eq_(new_table.c.y.server_default.arg, "10") 621 622 def test_drop_server_default(self): 623 impl = self._server_default_fixture() 624 impl.alter_column("tname", "thing", server_default=None) 625 new_table = self._assert_impl( 626 impl, colnames=["id", "thing"], ddl_not_contains="DEFAULT" 627 ) 628 eq_(new_table.c.thing.server_default, None) 629 630 def test_rename_col_pk(self): 631 impl = self._simple_fixture() 632 impl.alter_column("tname", "id", name="foobar") 633 new_table = self._assert_impl( 634 impl, ddl_contains="PRIMARY KEY (foobar)" 635 ) 636 eq_(new_table.c.id.name, "foobar") 637 eq_(list(new_table.primary_key), [new_table.c.id]) 638 639 def test_rename_col_fk(self): 640 impl = self._fk_fixture() 641 impl.alter_column("tname", "user_id", name="foobar") 642 new_table = self._assert_impl( 643 impl, 644 colnames=["id", "email", "user_id"], 645 ddl_contains='FOREIGN KEY(foobar) REFERENCES "user" (id)', 646 ) 647 eq_(new_table.c.user_id.name, "foobar") 648 eq_( 649 list(new_table.c.user_id.foreign_keys)[0]._get_colspec(), "user.id" 650 ) 651 652 def test_regen_multi_fk(self): 653 impl = self._multi_fk_fixture() 654 self._assert_impl( 655 impl, 656 colnames=[ 657 "id", 658 "email", 659 "user_id_1", 660 "user_id_2", 661 "user_id_3", 662 "user_id_version", 663 ], 664 ddl_contains="FOREIGN KEY(user_id_3, user_id_version) " 665 'REFERENCES "user" (id, id_version)', 666 ) 667 668 def test_regen_multi_fk_schema(self): 669 impl = self._multi_fk_fixture(schema="foo_schema") 670 self._assert_impl( 671 impl, 672 colnames=[ 673 "id", 674 "email", 675 "user_id_1", 676 "user_id_2", 677 "user_id_3", 678 "user_id_version", 679 ], 680 ddl_contains="FOREIGN KEY(user_id_3, user_id_version) " 681 'REFERENCES foo_schema."user" (id, id_version)', 682 schema="foo_schema", 683 ) 684 685 def test_do_not_add_existing_columns_columns(self): 686 impl = self._multi_fk_fixture() 687 meta = impl.table.metadata 688 689 cid = Column("id", Integer()) 690 user = Table("user", meta, cid) 691 692 fk = [ 693 c 694 for c in impl.unnamed_constraints 695 if isinstance(c, ForeignKeyConstraint) 696 ] 697 impl._setup_referent(meta, fk[0]) 698 is_(user.c.id, cid) 699 700 def test_drop_col(self): 701 impl = self._simple_fixture() 702 impl.drop_column("tname", column("x")) 703 new_table = self._assert_impl(impl, colnames=["id", "y"]) 704 assert "y" in new_table.c 705 assert "x" not in new_table.c 706 707 def test_drop_col_remove_pk(self): 708 impl = self._simple_fixture() 709 impl.drop_column("tname", column("id")) 710 new_table = self._assert_impl( 711 impl, colnames=["x", "y"], ddl_not_contains="PRIMARY KEY" 712 ) 713 assert "y" in new_table.c 714 assert "id" not in new_table.c 715 assert not new_table.primary_key 716 717 def test_drop_col_remove_fk(self): 718 impl = self._fk_fixture() 719 impl.drop_column("tname", column("user_id")) 720 new_table = self._assert_impl( 721 impl, colnames=["id", "email"], ddl_not_contains="FOREIGN KEY" 722 ) 723 assert "user_id" not in new_table.c 724 assert not new_table.foreign_keys 725 726 def test_drop_col_retain_fk(self): 727 impl = self._fk_fixture() 728 impl.drop_column("tname", column("email")) 729 new_table = self._assert_impl( 730 impl, 731 colnames=["id", "user_id"], 732 ddl_contains='FOREIGN KEY(user_id) REFERENCES "user" (id)', 733 ) 734 assert "email" not in new_table.c 735 assert new_table.c.user_id.foreign_keys 736 737 def test_drop_col_retain_fk_selfref(self): 738 impl = self._selfref_fk_fixture() 739 impl.drop_column("tname", column("data")) 740 new_table = self._assert_impl(impl, colnames=["id", "parent_id"]) 741 assert "data" not in new_table.c 742 assert new_table.c.parent_id.foreign_keys 743 744 def test_add_fk(self): 745 impl = self._simple_fixture() 746 impl.add_column("tname", Column("user_id", Integer)) 747 fk = self.op.schema_obj.foreign_key_constraint( 748 "fk1", "tname", "user", ["user_id"], ["id"] 749 ) 750 impl.add_constraint(fk) 751 new_table = self._assert_impl( 752 impl, 753 colnames=["id", "x", "y", "user_id"], 754 ddl_contains="CONSTRAINT fk1 FOREIGN KEY(user_id) " 755 'REFERENCES "user" (id)', 756 ) 757 eq_( 758 list(new_table.c.user_id.foreign_keys)[0]._get_colspec(), "user.id" 759 ) 760 761 def test_drop_fk(self): 762 impl = self._named_fk_fixture() 763 fk = ForeignKeyConstraint([], [], name="ufk") 764 impl.drop_constraint(fk) 765 new_table = self._assert_impl( 766 impl, 767 colnames=["id", "email", "user_id"], 768 ddl_not_contains="CONSTRANT fk1", 769 ) 770 eq_(list(new_table.foreign_keys), []) 771 772 def test_add_uq(self): 773 impl = self._simple_fixture() 774 uq = self.op.schema_obj.unique_constraint("uq1", "tname", ["y"]) 775 776 impl.add_constraint(uq) 777 self._assert_impl( 778 impl, 779 colnames=["id", "x", "y"], 780 ddl_contains="CONSTRAINT uq1 UNIQUE", 781 ) 782 783 def test_drop_uq(self): 784 impl = self._uq_fixture() 785 786 uq = self.op.schema_obj.unique_constraint("uq1", "tname", ["y"]) 787 impl.drop_constraint(uq) 788 self._assert_impl( 789 impl, 790 colnames=["id", "x", "y"], 791 ddl_not_contains="CONSTRAINT uq1 UNIQUE", 792 ) 793 794 def test_add_ck(self): 795 impl = self._simple_fixture() 796 ck = self.op.schema_obj.check_constraint("ck1", "tname", "y > 5") 797 798 impl.add_constraint(ck) 799 self._assert_impl( 800 impl, 801 colnames=["id", "x", "y"], 802 ddl_contains="CONSTRAINT ck1 CHECK (y > 5)", 803 ) 804 805 def test_drop_ck_table(self): 806 impl = self._named_ck_table_fixture() 807 808 ck = self.op.schema_obj.check_constraint("ck1", "tname", "y > 5") 809 impl.drop_constraint(ck) 810 self._assert_impl( 811 impl, 812 colnames=["id", "x", "y"], 813 ddl_not_contains="CONSTRAINT ck1 CHECK (y > 5)", 814 ) 815 816 def test_drop_ck_col(self): 817 impl = self._named_ck_col_fixture() 818 819 ck = self.op.schema_obj.check_constraint("ck1", "tname", "y > 5") 820 impl.drop_constraint(ck) 821 self._assert_impl( 822 impl, 823 colnames=["id", "x", "y"], 824 ddl_not_contains="CONSTRAINT ck1 CHECK (y > 5)", 825 ) 826 827 def test_create_index(self): 828 impl = self._simple_fixture() 829 ix = self.op.schema_obj.index("ix1", "tname", ["y"]) 830 831 impl.create_index(ix) 832 self._assert_impl( 833 impl, colnames=["id", "x", "y"], ddl_contains="CREATE INDEX ix1" 834 ) 835 836 def test_drop_index(self): 837 impl = self._ix_fixture() 838 839 ix = self.op.schema_obj.index("ix1", "tname", ["y"]) 840 impl.drop_index(ix) 841 self._assert_impl( 842 impl, 843 colnames=["id", "x", "y"], 844 ddl_not_contains="CONSTRAINT uq1 UNIQUE", 845 ) 846 847 def test_add_table_opts(self): 848 impl = self._simple_fixture(table_kwargs={"mysql_engine": "InnoDB"}) 849 self._assert_impl(impl, ddl_contains="ENGINE=InnoDB", dialect="mysql") 850 851 def test_drop_pk(self): 852 impl = self._pk_fixture() 853 pk = self.op.schema_obj.primary_key_constraint("mypk", "tname", ["id"]) 854 impl.drop_constraint(pk) 855 new_table = self._assert_impl(impl) 856 assert not new_table.c.id.primary_key 857 assert not len(new_table.primary_key) 858 859 860class BatchAPITest(TestBase): 861 @contextmanager 862 def _fixture(self, schema=None): 863 864 migration_context = mock.Mock( 865 opts={}, 866 impl=mock.MagicMock(__dialect__="sqlite", connection=object()), 867 ) 868 op = Operations(migration_context) 869 batch = op.batch_alter_table( 870 "tname", recreate="never", schema=schema 871 ).__enter__() 872 873 mock_schema = mock.MagicMock() 874 with mock.patch("alembic.operations.schemaobj.sa_schema", mock_schema): 875 yield batch 876 batch.impl.flush() 877 self.mock_schema = mock_schema 878 879 def test_drop_col(self): 880 with self._fixture() as batch: 881 batch.drop_column("q") 882 883 eq_( 884 batch.impl.operations.impl.mock_calls, 885 [ 886 mock.call.drop_column( 887 "tname", self.mock_schema.Column(), schema=None 888 ) 889 ], 890 ) 891 892 def test_add_col(self): 893 column = Column("w", String(50)) 894 895 with self._fixture() as batch: 896 batch.add_column(column) 897 898 assert ( 899 mock.call.add_column("tname", column, schema=None) 900 in batch.impl.operations.impl.mock_calls 901 ) 902 903 def test_create_fk(self): 904 with self._fixture() as batch: 905 batch.create_foreign_key("myfk", "user", ["x"], ["y"]) 906 907 eq_( 908 self.mock_schema.ForeignKeyConstraint.mock_calls, 909 [ 910 mock.call( 911 ["x"], 912 ["user.y"], 913 onupdate=None, 914 ondelete=None, 915 name="myfk", 916 initially=None, 917 deferrable=None, 918 match=None, 919 ) 920 ], 921 ) 922 eq_( 923 self.mock_schema.Table.mock_calls, 924 [ 925 mock.call( 926 "user", 927 self.mock_schema.MetaData(), 928 self.mock_schema.Column(), 929 schema=None, 930 ), 931 mock.call( 932 "tname", 933 self.mock_schema.MetaData(), 934 self.mock_schema.Column(), 935 schema=None, 936 ), 937 mock.call().append_constraint( 938 self.mock_schema.ForeignKeyConstraint() 939 ), 940 ], 941 ) 942 eq_( 943 batch.impl.operations.impl.mock_calls, 944 [ 945 mock.call.add_constraint( 946 self.mock_schema.ForeignKeyConstraint() 947 ) 948 ], 949 ) 950 951 def test_create_fk_schema(self): 952 with self._fixture(schema="foo") as batch: 953 batch.create_foreign_key("myfk", "user", ["x"], ["y"]) 954 955 eq_( 956 self.mock_schema.ForeignKeyConstraint.mock_calls, 957 [ 958 mock.call( 959 ["x"], 960 ["user.y"], 961 onupdate=None, 962 ondelete=None, 963 name="myfk", 964 initially=None, 965 deferrable=None, 966 match=None, 967 ) 968 ], 969 ) 970 eq_( 971 self.mock_schema.Table.mock_calls, 972 [ 973 mock.call( 974 "user", 975 self.mock_schema.MetaData(), 976 self.mock_schema.Column(), 977 schema=None, 978 ), 979 mock.call( 980 "tname", 981 self.mock_schema.MetaData(), 982 self.mock_schema.Column(), 983 schema="foo", 984 ), 985 mock.call().append_constraint( 986 self.mock_schema.ForeignKeyConstraint() 987 ), 988 ], 989 ) 990 eq_( 991 batch.impl.operations.impl.mock_calls, 992 [ 993 mock.call.add_constraint( 994 self.mock_schema.ForeignKeyConstraint() 995 ) 996 ], 997 ) 998 999 def test_create_uq(self): 1000 with self._fixture() as batch: 1001 batch.create_unique_constraint("uq1", ["a", "b"]) 1002 1003 eq_( 1004 self.mock_schema.Table().c.__getitem__.mock_calls, 1005 [mock.call("a"), mock.call("b")], 1006 ) 1007 1008 eq_( 1009 self.mock_schema.UniqueConstraint.mock_calls, 1010 [ 1011 mock.call( 1012 self.mock_schema.Table().c.__getitem__(), 1013 self.mock_schema.Table().c.__getitem__(), 1014 name="uq1", 1015 ) 1016 ], 1017 ) 1018 eq_( 1019 batch.impl.operations.impl.mock_calls, 1020 [mock.call.add_constraint(self.mock_schema.UniqueConstraint())], 1021 ) 1022 1023 def test_create_pk(self): 1024 with self._fixture() as batch: 1025 batch.create_primary_key("pk1", ["a", "b"]) 1026 1027 eq_( 1028 self.mock_schema.Table().c.__getitem__.mock_calls, 1029 [mock.call("a"), mock.call("b")], 1030 ) 1031 1032 eq_( 1033 self.mock_schema.PrimaryKeyConstraint.mock_calls, 1034 [ 1035 mock.call( 1036 self.mock_schema.Table().c.__getitem__(), 1037 self.mock_schema.Table().c.__getitem__(), 1038 name="pk1", 1039 ) 1040 ], 1041 ) 1042 eq_( 1043 batch.impl.operations.impl.mock_calls, 1044 [ 1045 mock.call.add_constraint( 1046 self.mock_schema.PrimaryKeyConstraint() 1047 ) 1048 ], 1049 ) 1050 1051 def test_create_check(self): 1052 expr = text("a > b") 1053 with self._fixture() as batch: 1054 batch.create_check_constraint("ck1", expr) 1055 1056 eq_( 1057 self.mock_schema.CheckConstraint.mock_calls, 1058 [mock.call(expr, name="ck1")], 1059 ) 1060 eq_( 1061 batch.impl.operations.impl.mock_calls, 1062 [mock.call.add_constraint(self.mock_schema.CheckConstraint())], 1063 ) 1064 1065 def test_drop_constraint(self): 1066 with self._fixture() as batch: 1067 batch.drop_constraint("uq1") 1068 1069 eq_(self.mock_schema.Constraint.mock_calls, [mock.call(name="uq1")]) 1070 eq_( 1071 batch.impl.operations.impl.mock_calls, 1072 [mock.call.drop_constraint(self.mock_schema.Constraint())], 1073 ) 1074 1075 1076class CopyFromTest(TestBase): 1077 def _fixture(self): 1078 self.metadata = MetaData() 1079 self.table = Table( 1080 "foo", 1081 self.metadata, 1082 Column("id", Integer, primary_key=True), 1083 Column("data", String(50)), 1084 Column("x", Integer), 1085 ) 1086 1087 context = op_fixture(dialect="sqlite", as_sql=True) 1088 self.op = Operations(context) 1089 return context 1090 1091 @config.requirements.sqlalchemy_13 1092 def test_change_type(self): 1093 context = self._fixture() 1094 self.table.append_column(Column("toj", Text)) 1095 self.table.append_column(Column("fromj", JSON)) 1096 with self.op.batch_alter_table( 1097 "foo", copy_from=self.table 1098 ) as batch_op: 1099 batch_op.alter_column("data", type_=Integer) 1100 batch_op.alter_column("toj", type_=JSON) 1101 batch_op.alter_column("fromj", type_=Text) 1102 context.assert_( 1103 "CREATE TABLE _alembic_tmp_foo (id INTEGER NOT NULL, " 1104 "data INTEGER, x INTEGER, toj JSON, fromj TEXT, PRIMARY KEY (id))", 1105 "INSERT INTO _alembic_tmp_foo (id, data, x, toj, fromj) " 1106 "SELECT foo.id, " 1107 "CAST(foo.data AS INTEGER) AS %s, foo.x, foo.toj, " 1108 "CAST(foo.fromj AS TEXT) AS %s FROM foo" 1109 % ( 1110 ("data" if sqla_14 else "anon_1"), 1111 ("fromj" if sqla_14 else "anon_2"), 1112 ), 1113 "DROP TABLE foo", 1114 "ALTER TABLE _alembic_tmp_foo RENAME TO foo", 1115 ) 1116 1117 def test_change_type_from_schematype(self): 1118 context = self._fixture() 1119 self.table.append_column( 1120 Column("y", Boolean(create_constraint=True, name="ck1")) 1121 ) 1122 1123 with self.op.batch_alter_table( 1124 "foo", copy_from=self.table 1125 ) as batch_op: 1126 batch_op.alter_column( 1127 "y", 1128 type_=Integer, 1129 existing_type=Boolean(create_constraint=True, name="ck1"), 1130 ) 1131 context.assert_( 1132 "CREATE TABLE _alembic_tmp_foo (id INTEGER NOT NULL, " 1133 "data VARCHAR(50), x INTEGER, y INTEGER, PRIMARY KEY (id))", 1134 "INSERT INTO _alembic_tmp_foo (id, data, x, y) SELECT foo.id, " 1135 "foo.data, foo.x, CAST(foo.y AS INTEGER) AS %s FROM foo" 1136 % (("y" if sqla_14 else "anon_1"),), 1137 "DROP TABLE foo", 1138 "ALTER TABLE _alembic_tmp_foo RENAME TO foo", 1139 ) 1140 1141 def test_change_type_to_schematype(self): 1142 context = self._fixture() 1143 self.table.append_column(Column("y", Integer)) 1144 1145 with self.op.batch_alter_table( 1146 "foo", copy_from=self.table 1147 ) as batch_op: 1148 batch_op.alter_column( 1149 "y", 1150 existing_type=Integer, 1151 type_=Boolean(create_constraint=True, name="ck1"), 1152 ) 1153 context.assert_( 1154 "CREATE TABLE _alembic_tmp_foo (id INTEGER NOT NULL, " 1155 "data VARCHAR(50), x INTEGER, y BOOLEAN, PRIMARY KEY (id), " 1156 "CONSTRAINT ck1 CHECK (y IN (0, 1)))", 1157 "INSERT INTO _alembic_tmp_foo (id, data, x, y) SELECT foo.id, " 1158 "foo.data, foo.x, CAST(foo.y AS BOOLEAN) AS %s FROM foo" 1159 % (("y" if sqla_14 else "anon_1"),), 1160 "DROP TABLE foo", 1161 "ALTER TABLE _alembic_tmp_foo RENAME TO foo", 1162 ) 1163 1164 def test_create_drop_index_w_always(self): 1165 context = self._fixture() 1166 with self.op.batch_alter_table( 1167 "foo", copy_from=self.table, recreate="always" 1168 ) as batch_op: 1169 batch_op.create_index("ix_data", ["data"], unique=True) 1170 1171 context.assert_( 1172 "CREATE TABLE _alembic_tmp_foo (id INTEGER NOT NULL, " 1173 "data VARCHAR(50), " 1174 "x INTEGER, PRIMARY KEY (id))", 1175 "INSERT INTO _alembic_tmp_foo (id, data, x) " 1176 "SELECT foo.id, foo.data, foo.x FROM foo", 1177 "DROP TABLE foo", 1178 "ALTER TABLE _alembic_tmp_foo RENAME TO foo", 1179 "CREATE UNIQUE INDEX ix_data ON foo (data)", 1180 ) 1181 1182 context.clear_assertions() 1183 1184 Index("ix_data", self.table.c.data, unique=True) 1185 with self.op.batch_alter_table( 1186 "foo", copy_from=self.table, recreate="always" 1187 ) as batch_op: 1188 batch_op.drop_index("ix_data") 1189 1190 context.assert_( 1191 "CREATE TABLE _alembic_tmp_foo (id INTEGER NOT NULL, " 1192 "data VARCHAR(50), x INTEGER, PRIMARY KEY (id))", 1193 "INSERT INTO _alembic_tmp_foo (id, data, x) " 1194 "SELECT foo.id, foo.data, foo.x FROM foo", 1195 "DROP TABLE foo", 1196 "ALTER TABLE _alembic_tmp_foo RENAME TO foo", 1197 ) 1198 1199 def test_create_drop_index_wo_always(self): 1200 context = self._fixture() 1201 with self.op.batch_alter_table( 1202 "foo", copy_from=self.table 1203 ) as batch_op: 1204 batch_op.create_index("ix_data", ["data"], unique=True) 1205 1206 context.assert_("CREATE UNIQUE INDEX ix_data ON foo (data)") 1207 1208 context.clear_assertions() 1209 1210 Index("ix_data", self.table.c.data, unique=True) 1211 with self.op.batch_alter_table( 1212 "foo", copy_from=self.table 1213 ) as batch_op: 1214 batch_op.drop_index("ix_data") 1215 1216 context.assert_("DROP INDEX ix_data") 1217 1218 def test_create_drop_index_w_other_ops(self): 1219 context = self._fixture() 1220 with self.op.batch_alter_table( 1221 "foo", copy_from=self.table 1222 ) as batch_op: 1223 batch_op.alter_column("data", type_=Integer) 1224 batch_op.create_index("ix_data", ["data"], unique=True) 1225 1226 context.assert_( 1227 "CREATE TABLE _alembic_tmp_foo (id INTEGER NOT NULL, " 1228 "data INTEGER, x INTEGER, PRIMARY KEY (id))", 1229 "INSERT INTO _alembic_tmp_foo (id, data, x) SELECT foo.id, " 1230 "CAST(foo.data AS INTEGER) AS %s, foo.x FROM foo" 1231 % (("data" if sqla_14 else "anon_1"),), 1232 "DROP TABLE foo", 1233 "ALTER TABLE _alembic_tmp_foo RENAME TO foo", 1234 "CREATE UNIQUE INDEX ix_data ON foo (data)", 1235 ) 1236 1237 context.clear_assertions() 1238 1239 Index("ix_data", self.table.c.data, unique=True) 1240 with self.op.batch_alter_table( 1241 "foo", copy_from=self.table 1242 ) as batch_op: 1243 batch_op.drop_index("ix_data") 1244 batch_op.alter_column("data", type_=String) 1245 1246 context.assert_( 1247 "CREATE TABLE _alembic_tmp_foo (id INTEGER NOT NULL, " 1248 "data VARCHAR, x INTEGER, PRIMARY KEY (id))", 1249 "INSERT INTO _alembic_tmp_foo (id, data, x) SELECT foo.id, " 1250 "foo.data, foo.x FROM foo", 1251 "DROP TABLE foo", 1252 "ALTER TABLE _alembic_tmp_foo RENAME TO foo", 1253 ) 1254 1255 1256class BatchRoundTripTest(TestBase): 1257 __only_on__ = "sqlite" 1258 1259 def setUp(self): 1260 self.conn = config.db.connect() 1261 self.metadata = MetaData() 1262 t1 = Table( 1263 "foo", 1264 self.metadata, 1265 Column("id", Integer, primary_key=True), 1266 Column("data", String(50)), 1267 Column("x", Integer), 1268 mysql_engine="InnoDB", 1269 ) 1270 with self.conn.begin(): 1271 t1.create(self.conn) 1272 1273 self.conn.execute( 1274 t1.insert(), 1275 [ 1276 {"id": 1, "data": "d1", "x": 5}, 1277 {"id": 2, "data": "22", "x": 6}, 1278 {"id": 3, "data": "8.5", "x": 7}, 1279 {"id": 4, "data": "9.46", "x": 8}, 1280 {"id": 5, "data": "d5", "x": 9}, 1281 ], 1282 ) 1283 context = MigrationContext.configure(self.conn) 1284 self.op = Operations(context) 1285 1286 def tearDown(self): 1287 # why commit? because SQLite has inconsistent treatment 1288 # of transactional DDL. A test that runs CREATE TABLE and then 1289 # ALTER TABLE to change the name of that table, will end up 1290 # committing the CREATE TABLE but not the ALTER. As batch mode 1291 # does this with a temp table name that's not even in the 1292 # metadata collection, we don't have an explicit drop for it 1293 # (though we could do that too). calling commit means the 1294 # ALTER will go through and the drop_all() will then catch it. 1295 _safe_commit_connection_transaction(self.conn) 1296 with self.conn.begin(): 1297 self.metadata.drop_all(self.conn) 1298 self.conn.close() 1299 1300 @contextmanager 1301 def _sqlite_referential_integrity(self): 1302 self.conn.exec_driver_sql("PRAGMA foreign_keys=ON") 1303 try: 1304 yield 1305 finally: 1306 self.conn.exec_driver_sql("PRAGMA foreign_keys=OFF") 1307 1308 # as these tests are typically intentional fails, clean out 1309 # tables left over 1310 m = MetaData() 1311 m.reflect(self.conn) 1312 with self.conn.begin(): 1313 m.drop_all(self.conn) 1314 1315 def _no_pk_fixture(self): 1316 with self.conn.begin(): 1317 nopk = Table( 1318 "nopk", 1319 self.metadata, 1320 Column("a", Integer), 1321 Column("b", Integer), 1322 Column("c", Integer), 1323 mysql_engine="InnoDB", 1324 ) 1325 nopk.create(self.conn) 1326 self.conn.execute( 1327 nopk.insert(), 1328 [{"a": 1, "b": 2, "c": 3}, {"a": 2, "b": 4, "c": 5}], 1329 ) 1330 return nopk 1331 1332 def _table_w_index_fixture(self): 1333 with self.conn.begin(): 1334 t = Table( 1335 "t_w_ix", 1336 self.metadata, 1337 Column("id", Integer, primary_key=True), 1338 Column("thing", Integer), 1339 Column("data", String(20)), 1340 ) 1341 Index("ix_thing", t.c.thing) 1342 t.create(self.conn) 1343 return t 1344 1345 def _boolean_fixture(self): 1346 with self.conn.begin(): 1347 t = Table( 1348 "hasbool", 1349 self.metadata, 1350 Column("x", Boolean(create_constraint=True, name="ck1")), 1351 Column("y", Integer), 1352 ) 1353 t.create(self.conn) 1354 1355 def _timestamp_fixture(self): 1356 with self.conn.begin(): 1357 t = Table("hasts", self.metadata, Column("x", DateTime())) 1358 t.create(self.conn) 1359 return t 1360 1361 def _ck_constraint_fixture(self): 1362 with self.conn.begin(): 1363 t = Table( 1364 "ck_table", 1365 self.metadata, 1366 Column("id", Integer, nullable=False), 1367 CheckConstraint("id is not NULL", name="ck"), 1368 ) 1369 t.create(self.conn) 1370 return t 1371 1372 def _datetime_server_default_fixture(self): 1373 return func.datetime("now", "localtime") 1374 1375 def _timestamp_w_expr_default_fixture(self): 1376 with self.conn.begin(): 1377 t = Table( 1378 "hasts", 1379 self.metadata, 1380 Column( 1381 "x", 1382 DateTime(), 1383 server_default=self._datetime_server_default_fixture(), 1384 nullable=False, 1385 ), 1386 ) 1387 t.create(self.conn) 1388 return t 1389 1390 def _int_to_boolean_fixture(self): 1391 with self.conn.begin(): 1392 t = Table("hasbool", self.metadata, Column("x", Integer)) 1393 t.create(self.conn) 1394 1395 def test_change_type_boolean_to_int(self): 1396 self._boolean_fixture() 1397 with self.op.batch_alter_table("hasbool") as batch_op: 1398 batch_op.alter_column( 1399 "x", 1400 type_=Integer, 1401 existing_type=Boolean(create_constraint=True, name="ck1"), 1402 ) 1403 insp = inspect(self.conn) 1404 1405 eq_( 1406 [ 1407 c["type"]._type_affinity 1408 for c in insp.get_columns("hasbool") 1409 if c["name"] == "x" 1410 ], 1411 [Integer], 1412 ) 1413 1414 def test_no_net_change_timestamp(self): 1415 t = self._timestamp_fixture() 1416 1417 import datetime 1418 1419 with self.conn.begin(): 1420 self.conn.execute( 1421 t.insert(), {"x": datetime.datetime(2012, 5, 18, 15, 32, 5)} 1422 ) 1423 1424 with self.op.batch_alter_table("hasts") as batch_op: 1425 batch_op.alter_column("x", type_=DateTime()) 1426 1427 eq_( 1428 self.conn.execute(_select(t.c.x)).fetchall(), 1429 [(datetime.datetime(2012, 5, 18, 15, 32, 5),)], 1430 ) 1431 1432 def test_no_net_change_timestamp_w_default(self): 1433 t = self._timestamp_w_expr_default_fixture() 1434 1435 with self.op.batch_alter_table("hasts") as batch_op: 1436 batch_op.alter_column( 1437 "x", 1438 type_=DateTime(), 1439 nullable=False, 1440 server_default=self._datetime_server_default_fixture(), 1441 ) 1442 1443 with self.conn.begin(): 1444 self.conn.execute(t.insert()) 1445 res = self.conn.execute(_select(t.c.x)) 1446 if sqla_14: 1447 assert res.scalar_one_or_none() is not None 1448 else: 1449 row = res.fetchone() 1450 assert row["x"] is not None 1451 1452 def test_drop_col_schematype(self): 1453 self._boolean_fixture() 1454 with self.op.batch_alter_table("hasbool") as batch_op: 1455 batch_op.drop_column( 1456 "x", existing_type=Boolean(create_constraint=True, name="ck1") 1457 ) 1458 insp = inspect(self.conn) 1459 1460 assert "x" not in (c["name"] for c in insp.get_columns("hasbool")) 1461 1462 def test_change_type_int_to_boolean(self): 1463 self._int_to_boolean_fixture() 1464 with self.op.batch_alter_table("hasbool") as batch_op: 1465 batch_op.alter_column( 1466 "x", type_=Boolean(create_constraint=True, name="ck1") 1467 ) 1468 insp = inspect(self.conn) 1469 1470 if exclusions.against(config, "sqlite"): 1471 eq_( 1472 [ 1473 c["type"]._type_affinity 1474 for c in insp.get_columns("hasbool") 1475 if c["name"] == "x" 1476 ], 1477 [Boolean], 1478 ) 1479 elif exclusions.against(config, "mysql"): 1480 eq_( 1481 [ 1482 c["type"]._type_affinity 1483 for c in insp.get_columns("hasbool") 1484 if c["name"] == "x" 1485 ], 1486 [Integer], 1487 ) 1488 1489 def _assert_data(self, data, tablename="foo"): 1490 res = self.conn.execute(text("select * from %s" % tablename)) 1491 if sqla_14: 1492 res = res.mappings() 1493 eq_([dict(row) for row in res], data) 1494 1495 def test_ix_existing(self): 1496 self._table_w_index_fixture() 1497 1498 with self.op.batch_alter_table("t_w_ix") as batch_op: 1499 batch_op.alter_column("data", type_=String(30)) 1500 batch_op.create_index("ix_data", ["data"]) 1501 1502 insp = inspect(self.conn) 1503 eq_( 1504 set( 1505 (ix["name"], tuple(ix["column_names"])) 1506 for ix in insp.get_indexes("t_w_ix") 1507 ), 1508 set([("ix_data", ("data",)), ("ix_thing", ("thing",))]), 1509 ) 1510 1511 def test_fk_points_to_me_auto(self): 1512 self._test_fk_points_to_me("auto") 1513 1514 # in particular, this tests that the failures 1515 # on PG and MySQL result in recovery of the batch system, 1516 # e.g. that the _alembic_tmp_temp table is dropped 1517 @config.requirements.no_referential_integrity 1518 def test_fk_points_to_me_recreate(self): 1519 self._test_fk_points_to_me("always") 1520 1521 @exclusions.only_on("sqlite") 1522 @exclusions.fails( 1523 "intentionally asserting that this " 1524 "doesn't work w/ pragma foreign keys" 1525 ) 1526 def test_fk_points_to_me_sqlite_refinteg(self): 1527 with self._sqlite_referential_integrity(): 1528 self._test_fk_points_to_me("auto") 1529 1530 def _test_fk_points_to_me(self, recreate): 1531 bar = Table( 1532 "bar", 1533 self.metadata, 1534 Column("id", Integer, primary_key=True), 1535 Column("foo_id", Integer, ForeignKey("foo.id")), 1536 mysql_engine="InnoDB", 1537 ) 1538 with self.conn.begin(): 1539 bar.create(self.conn) 1540 self.conn.execute(bar.insert(), {"id": 1, "foo_id": 3}) 1541 1542 with self.op.batch_alter_table("foo", recreate=recreate) as batch_op: 1543 batch_op.alter_column( 1544 "data", new_column_name="newdata", existing_type=String(50) 1545 ) 1546 1547 insp = inspect(self.conn) 1548 eq_( 1549 [ 1550 ( 1551 key["referred_table"], 1552 key["referred_columns"], 1553 key["constrained_columns"], 1554 ) 1555 for key in insp.get_foreign_keys("bar") 1556 ], 1557 [("foo", ["id"], ["foo_id"])], 1558 ) 1559 1560 def test_selfref_fk_auto(self): 1561 self._test_selfref_fk("auto") 1562 1563 @config.requirements.no_referential_integrity 1564 def test_selfref_fk_recreate(self): 1565 self._test_selfref_fk("always") 1566 1567 @exclusions.only_on("sqlite") 1568 @exclusions.fails( 1569 "intentionally asserting that this " 1570 "doesn't work w/ pragma foreign keys" 1571 ) 1572 def test_selfref_fk_sqlite_refinteg(self): 1573 with self._sqlite_referential_integrity(): 1574 self._test_selfref_fk("auto") 1575 1576 def _test_selfref_fk(self, recreate): 1577 bar = Table( 1578 "bar", 1579 self.metadata, 1580 Column("id", Integer, primary_key=True), 1581 Column("bar_id", Integer, ForeignKey("bar.id")), 1582 Column("data", String(50)), 1583 mysql_engine="InnoDB", 1584 ) 1585 with self.conn.begin(): 1586 bar.create(self.conn) 1587 self.conn.execute( 1588 bar.insert(), {"id": 1, "data": "x", "bar_id": None} 1589 ) 1590 self.conn.execute( 1591 bar.insert(), {"id": 2, "data": "y", "bar_id": 1} 1592 ) 1593 1594 with self.op.batch_alter_table("bar", recreate=recreate) as batch_op: 1595 batch_op.alter_column( 1596 "data", new_column_name="newdata", existing_type=String(50) 1597 ) 1598 1599 insp = inspect(self.conn) 1600 1601 eq_( 1602 [ 1603 ( 1604 key["referred_table"], 1605 key["referred_columns"], 1606 key["constrained_columns"], 1607 ) 1608 for key in insp.get_foreign_keys("bar") 1609 ], 1610 [("bar", ["id"], ["bar_id"])], 1611 ) 1612 1613 def test_change_type(self): 1614 with self.op.batch_alter_table("foo") as batch_op: 1615 batch_op.alter_column("data", type_=Integer) 1616 1617 self._assert_data( 1618 [ 1619 {"id": 1, "data": 0, "x": 5}, 1620 {"id": 2, "data": 22, "x": 6}, 1621 {"id": 3, "data": 8, "x": 7}, 1622 {"id": 4, "data": 9, "x": 8}, 1623 {"id": 5, "data": 0, "x": 9}, 1624 ] 1625 ) 1626 1627 def test_drop_column(self): 1628 with self.op.batch_alter_table("foo") as batch_op: 1629 batch_op.drop_column("data") 1630 1631 self._assert_data( 1632 [ 1633 {"id": 1, "x": 5}, 1634 {"id": 2, "x": 6}, 1635 {"id": 3, "x": 7}, 1636 {"id": 4, "x": 8}, 1637 {"id": 5, "x": 9}, 1638 ] 1639 ) 1640 1641 def test_drop_pk_col_readd_col(self): 1642 # drop a column, add it back without primary_key=True, should no 1643 # longer be in the constraint 1644 with self.op.batch_alter_table("foo") as batch_op: 1645 batch_op.drop_column("id") 1646 batch_op.add_column(Column("id", Integer)) 1647 1648 pk_const = inspect(self.conn).get_pk_constraint("foo") 1649 eq_(pk_const["constrained_columns"], []) 1650 1651 def test_drop_pk_col_readd_pk_col(self): 1652 # drop a column, add it back with primary_key=True, should remain 1653 with self.op.batch_alter_table("foo") as batch_op: 1654 batch_op.drop_column("id") 1655 batch_op.add_column(Column("id", Integer, primary_key=True)) 1656 1657 pk_const = inspect(self.conn).get_pk_constraint("foo") 1658 eq_(pk_const["constrained_columns"], ["id"]) 1659 1660 def test_drop_pk_col_readd_col_also_pk_const(self): 1661 # drop a column, add it back without primary_key=True, but then 1662 # also make anew PK constraint that includes it, should remain 1663 with self.op.batch_alter_table("foo") as batch_op: 1664 batch_op.drop_column("id") 1665 batch_op.add_column(Column("id", Integer)) 1666 batch_op.create_primary_key("newpk", ["id"]) 1667 1668 pk_const = inspect(self.conn).get_pk_constraint("foo") 1669 eq_(pk_const["constrained_columns"], ["id"]) 1670 1671 @testing.combinations(("always",), ("auto",), argnames="recreate") 1672 def test_add_pk_constraint(self, recreate): 1673 self._no_pk_fixture() 1674 with self.op.batch_alter_table("nopk", recreate=recreate) as batch_op: 1675 batch_op.create_primary_key("newpk", ["a", "b"]) 1676 1677 pk_const = inspect(self.conn).get_pk_constraint("nopk") 1678 with config.requirements.reflects_pk_names.fail_if(): 1679 eq_(pk_const["name"], "newpk") 1680 eq_(pk_const["constrained_columns"], ["a", "b"]) 1681 1682 @testing.combinations(("always",), ("auto",), argnames="recreate") 1683 @config.requirements.check_constraint_reflection 1684 def test_add_ck_constraint(self, recreate): 1685 with self.op.batch_alter_table("foo", recreate=recreate) as batch_op: 1686 batch_op.create_check_constraint("newck", text("x > 0")) 1687 1688 ck_consts = inspect(self.conn).get_check_constraints("foo") 1689 ck_consts[0]["sqltext"] = re.sub( 1690 r"[\'\"`\(\)]", "", ck_consts[0]["sqltext"] 1691 ) 1692 eq_(ck_consts, [{"sqltext": "x > 0", "name": "newck"}]) 1693 1694 @testing.combinations(("always",), ("auto",), argnames="recreate") 1695 @config.requirements.check_constraint_reflection 1696 def test_drop_ck_constraint(self, recreate): 1697 self._ck_constraint_fixture() 1698 1699 with self.op.batch_alter_table( 1700 "ck_table", recreate=recreate 1701 ) as batch_op: 1702 batch_op.drop_constraint("ck", "check") 1703 1704 ck_consts = inspect(self.conn).get_check_constraints("ck_table") 1705 eq_(ck_consts, []) 1706 1707 @config.requirements.unnamed_constraints 1708 def test_drop_foreign_key(self): 1709 bar = Table( 1710 "bar", 1711 self.metadata, 1712 Column("id", Integer, primary_key=True), 1713 Column("foo_id", Integer, ForeignKey("foo.id")), 1714 mysql_engine="InnoDB", 1715 ) 1716 with self.conn.begin(): 1717 bar.create(self.conn) 1718 self.conn.execute(bar.insert(), {"id": 1, "foo_id": 3}) 1719 1720 naming_convention = { 1721 "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s" 1722 } 1723 with self.op.batch_alter_table( 1724 "bar", naming_convention=naming_convention 1725 ) as batch_op: 1726 batch_op.drop_constraint("fk_bar_foo_id_foo", type_="foreignkey") 1727 eq_(inspect(self.conn).get_foreign_keys("bar"), []) 1728 1729 def test_drop_column_fk_recreate(self): 1730 with self.op.batch_alter_table("foo", recreate="always") as batch_op: 1731 batch_op.drop_column("data") 1732 1733 self._assert_data( 1734 [ 1735 {"id": 1, "x": 5}, 1736 {"id": 2, "x": 6}, 1737 {"id": 3, "x": 7}, 1738 {"id": 4, "x": 8}, 1739 {"id": 5, "x": 9}, 1740 ] 1741 ) 1742 1743 def _assert_table_comment(self, tname, comment): 1744 insp = inspect(self.conn) 1745 1746 tcomment = insp.get_table_comment(tname) 1747 eq_(tcomment, {"text": comment}) 1748 1749 @testing.combinations(("always",), ("auto",), argnames="recreate") 1750 def test_add_uq(self, recreate): 1751 with self.op.batch_alter_table("foo", recreate=recreate) as batch_op: 1752 batch_op.create_unique_constraint("newuk", ["x"]) 1753 1754 uq_consts = inspect(self.conn).get_unique_constraints("foo") 1755 eq_( 1756 [ 1757 {"name": uc["name"], "column_names": uc["column_names"]} 1758 for uc in uq_consts 1759 ], 1760 [{"name": "newuk", "column_names": ["x"]}], 1761 ) 1762 1763 @testing.combinations(("always",), ("auto",), argnames="recreate") 1764 def test_add_uq_plus_col(self, recreate): 1765 with self.op.batch_alter_table("foo", recreate=recreate) as batch_op: 1766 batch_op.add_column(Column("y", Integer)) 1767 batch_op.create_unique_constraint("newuk", ["x", "y"]) 1768 1769 uq_consts = inspect(self.conn).get_unique_constraints("foo") 1770 1771 eq_( 1772 [ 1773 {"name": uc["name"], "column_names": uc["column_names"]} 1774 for uc in uq_consts 1775 ], 1776 [{"name": "newuk", "column_names": ["x", "y"]}], 1777 ) 1778 1779 @config.requirements.comments 1780 def test_add_table_comment(self): 1781 with self.op.batch_alter_table("foo") as batch_op: 1782 batch_op.create_table_comment("some comment") 1783 1784 self._assert_table_comment("foo", "some comment") 1785 1786 with self.op.batch_alter_table("foo") as batch_op: 1787 batch_op.create_table_comment( 1788 "some new comment", existing_comment="some comment" 1789 ) 1790 1791 self._assert_table_comment("foo", "some new comment") 1792 1793 @config.requirements.comments 1794 def test_drop_table_comment(self): 1795 with self.op.batch_alter_table("foo") as batch_op: 1796 batch_op.create_table_comment("some comment") 1797 1798 with self.op.batch_alter_table("foo") as batch_op: 1799 batch_op.drop_table_comment(existing_comment="some comment") 1800 1801 self._assert_table_comment("foo", None) 1802 1803 def _assert_column_comment(self, tname, cname, comment): 1804 insp = inspect(self.conn) 1805 1806 cols = {col["name"]: col for col in insp.get_columns(tname)} 1807 eq_(cols[cname]["comment"], comment) 1808 1809 @config.requirements.comments 1810 def test_add_column_comment(self): 1811 with self.op.batch_alter_table("foo") as batch_op: 1812 batch_op.add_column(Column("y", Integer, comment="some comment")) 1813 1814 self._assert_column_comment("foo", "y", "some comment") 1815 1816 self._assert_data( 1817 [ 1818 {"id": 1, "data": "d1", "x": 5, "y": None}, 1819 {"id": 2, "data": "22", "x": 6, "y": None}, 1820 {"id": 3, "data": "8.5", "x": 7, "y": None}, 1821 {"id": 4, "data": "9.46", "x": 8, "y": None}, 1822 {"id": 5, "data": "d5", "x": 9, "y": None}, 1823 ] 1824 ) 1825 1826 @config.requirements.comments 1827 def test_add_column_comment_recreate(self): 1828 with self.op.batch_alter_table("foo", recreate="always") as batch_op: 1829 batch_op.add_column(Column("y", Integer, comment="some comment")) 1830 1831 self._assert_column_comment("foo", "y", "some comment") 1832 1833 self._assert_data( 1834 [ 1835 {"id": 1, "data": "d1", "x": 5, "y": None}, 1836 {"id": 2, "data": "22", "x": 6, "y": None}, 1837 {"id": 3, "data": "8.5", "x": 7, "y": None}, 1838 {"id": 4, "data": "9.46", "x": 8, "y": None}, 1839 {"id": 5, "data": "d5", "x": 9, "y": None}, 1840 ] 1841 ) 1842 1843 @config.requirements.comments 1844 def test_alter_column_comment(self): 1845 with self.op.batch_alter_table("foo") as batch_op: 1846 batch_op.alter_column( 1847 "x", existing_type=Integer(), comment="some comment" 1848 ) 1849 1850 self._assert_column_comment("foo", "x", "some comment") 1851 1852 self._assert_data( 1853 [ 1854 {"id": 1, "data": "d1", "x": 5}, 1855 {"id": 2, "data": "22", "x": 6}, 1856 {"id": 3, "data": "8.5", "x": 7}, 1857 {"id": 4, "data": "9.46", "x": 8}, 1858 {"id": 5, "data": "d5", "x": 9}, 1859 ] 1860 ) 1861 1862 @config.requirements.comments 1863 def test_alter_column_comment_recreate(self): 1864 with self.op.batch_alter_table("foo", recreate="always") as batch_op: 1865 batch_op.alter_column("x", comment="some comment") 1866 1867 self._assert_column_comment("foo", "x", "some comment") 1868 1869 self._assert_data( 1870 [ 1871 {"id": 1, "data": "d1", "x": 5}, 1872 {"id": 2, "data": "22", "x": 6}, 1873 {"id": 3, "data": "8.5", "x": 7}, 1874 {"id": 4, "data": "9.46", "x": 8}, 1875 {"id": 5, "data": "d5", "x": 9}, 1876 ] 1877 ) 1878 1879 def test_rename_column(self): 1880 with self.op.batch_alter_table("foo") as batch_op: 1881 batch_op.alter_column("x", new_column_name="y") 1882 1883 self._assert_data( 1884 [ 1885 {"id": 1, "data": "d1", "y": 5}, 1886 {"id": 2, "data": "22", "y": 6}, 1887 {"id": 3, "data": "8.5", "y": 7}, 1888 {"id": 4, "data": "9.46", "y": 8}, 1889 {"id": 5, "data": "d5", "y": 9}, 1890 ] 1891 ) 1892 1893 def test_rename_column_boolean(self): 1894 bar = Table( 1895 "bar", 1896 self.metadata, 1897 Column("id", Integer, primary_key=True), 1898 Column("flag", Boolean(create_constraint=True)), 1899 mysql_engine="InnoDB", 1900 ) 1901 with self.conn.begin(): 1902 bar.create(self.conn) 1903 self.conn.execute(bar.insert(), {"id": 1, "flag": True}) 1904 self.conn.execute(bar.insert(), {"id": 2, "flag": False}) 1905 1906 with self.op.batch_alter_table("bar") as batch_op: 1907 batch_op.alter_column( 1908 "flag", new_column_name="bflag", existing_type=Boolean 1909 ) 1910 1911 self._assert_data( 1912 [{"id": 1, "bflag": True}, {"id": 2, "bflag": False}], "bar" 1913 ) 1914 1915 # @config.requirements.check_constraint_reflection 1916 def test_rename_column_boolean_named_ck(self): 1917 bar = Table( 1918 "bar", 1919 self.metadata, 1920 Column("id", Integer, primary_key=True), 1921 Column("flag", Boolean(create_constraint=True, name="ck1")), 1922 mysql_engine="InnoDB", 1923 ) 1924 with self.conn.begin(): 1925 bar.create(self.conn) 1926 self.conn.execute(bar.insert(), {"id": 1, "flag": True}) 1927 self.conn.execute(bar.insert(), {"id": 2, "flag": False}) 1928 1929 with self.op.batch_alter_table("bar", recreate="always") as batch_op: 1930 batch_op.alter_column( 1931 "flag", 1932 new_column_name="bflag", 1933 existing_type=Boolean(create_constraint=True, name="ck1"), 1934 ) 1935 1936 self._assert_data( 1937 [{"id": 1, "bflag": True}, {"id": 2, "bflag": False}], "bar" 1938 ) 1939 1940 @config.requirements.non_native_boolean 1941 def test_rename_column_non_native_boolean_no_ck(self): 1942 bar = Table( 1943 "bar", 1944 self.metadata, 1945 Column("id", Integer, primary_key=True), 1946 Column("flag", Boolean(create_constraint=False)), 1947 mysql_engine="InnoDB", 1948 ) 1949 with self.conn.begin(): 1950 bar.create(self.conn) 1951 self.conn.execute(bar.insert(), {"id": 1, "flag": True}) 1952 self.conn.execute(bar.insert(), {"id": 2, "flag": False}) 1953 self.conn.execute( 1954 # override Boolean type which as of 1.1 coerces numerics 1955 # to 1/0 1956 text("insert into bar (id, flag) values (:id, :flag)"), 1957 {"id": 3, "flag": 5}, 1958 ) 1959 1960 with self.op.batch_alter_table( 1961 "bar", 1962 reflect_args=[Column("flag", Boolean(create_constraint=False))], 1963 ) as batch_op: 1964 batch_op.alter_column( 1965 "flag", new_column_name="bflag", existing_type=Boolean 1966 ) 1967 1968 self._assert_data( 1969 [ 1970 {"id": 1, "bflag": True}, 1971 {"id": 2, "bflag": False}, 1972 {"id": 3, "bflag": 5}, 1973 ], 1974 "bar", 1975 ) 1976 1977 def test_drop_column_pk(self): 1978 with self.op.batch_alter_table("foo") as batch_op: 1979 batch_op.drop_column("id") 1980 1981 self._assert_data( 1982 [ 1983 {"data": "d1", "x": 5}, 1984 {"data": "22", "x": 6}, 1985 {"data": "8.5", "x": 7}, 1986 {"data": "9.46", "x": 8}, 1987 {"data": "d5", "x": 9}, 1988 ] 1989 ) 1990 1991 def test_rename_column_pk(self): 1992 with self.op.batch_alter_table("foo") as batch_op: 1993 batch_op.alter_column("id", new_column_name="ident") 1994 1995 self._assert_data( 1996 [ 1997 {"ident": 1, "data": "d1", "x": 5}, 1998 {"ident": 2, "data": "22", "x": 6}, 1999 {"ident": 3, "data": "8.5", "x": 7}, 2000 {"ident": 4, "data": "9.46", "x": 8}, 2001 {"ident": 5, "data": "d5", "x": 9}, 2002 ] 2003 ) 2004 2005 def test_add_column_auto(self): 2006 # note this uses ALTER 2007 with self.op.batch_alter_table("foo") as batch_op: 2008 batch_op.add_column( 2009 Column("data2", String(50), server_default="hi") 2010 ) 2011 2012 self._assert_data( 2013 [ 2014 {"id": 1, "data": "d1", "x": 5, "data2": "hi"}, 2015 {"id": 2, "data": "22", "x": 6, "data2": "hi"}, 2016 {"id": 3, "data": "8.5", "x": 7, "data2": "hi"}, 2017 {"id": 4, "data": "9.46", "x": 8, "data2": "hi"}, 2018 {"id": 5, "data": "d5", "x": 9, "data2": "hi"}, 2019 ] 2020 ) 2021 eq_( 2022 [col["name"] for col in inspect(config.db).get_columns("foo")], 2023 ["id", "data", "x", "data2"], 2024 ) 2025 2026 def test_add_column_auto_server_default_calculated(self): 2027 """test #883""" 2028 with self.op.batch_alter_table("foo") as batch_op: 2029 batch_op.add_column( 2030 Column( 2031 "data2", 2032 DateTime(), 2033 server_default=self._datetime_server_default_fixture(), 2034 ) 2035 ) 2036 2037 self._assert_data( 2038 [ 2039 {"id": 1, "data": "d1", "x": 5, "data2": mock.ANY}, 2040 {"id": 2, "data": "22", "x": 6, "data2": mock.ANY}, 2041 {"id": 3, "data": "8.5", "x": 7, "data2": mock.ANY}, 2042 {"id": 4, "data": "9.46", "x": 8, "data2": mock.ANY}, 2043 {"id": 5, "data": "d5", "x": 9, "data2": mock.ANY}, 2044 ] 2045 ) 2046 eq_( 2047 [col["name"] for col in inspect(self.conn).get_columns("foo")], 2048 ["id", "data", "x", "data2"], 2049 ) 2050 2051 @testing.combinations((True,), (False,)) 2052 @testing.exclusions.only_on("sqlite") 2053 @config.requirements.computed_columns 2054 def test_add_column_auto_generated(self, persisted): 2055 """test #883""" 2056 with self.op.batch_alter_table("foo") as batch_op: 2057 batch_op.add_column( 2058 Column( 2059 "data2", Integer, Computed("1 + 1", persisted=persisted) 2060 ) 2061 ) 2062 2063 self._assert_data( 2064 [ 2065 {"id": 1, "data": "d1", "x": 5, "data2": 2}, 2066 {"id": 2, "data": "22", "x": 6, "data2": 2}, 2067 {"id": 3, "data": "8.5", "x": 7, "data2": 2}, 2068 {"id": 4, "data": "9.46", "x": 8, "data2": 2}, 2069 {"id": 5, "data": "d5", "x": 9, "data2": 2}, 2070 ] 2071 ) 2072 eq_( 2073 [col["name"] for col in inspect(self.conn).get_columns("foo")], 2074 ["id", "data", "x", "data2"], 2075 ) 2076 2077 @config.requirements.identity_columns 2078 def test_add_column_auto_identity(self): 2079 """test #883""" 2080 2081 self._no_pk_fixture() 2082 2083 with self.op.batch_alter_table("nopk") as batch_op: 2084 batch_op.add_column(Column("id", Integer, Identity())) 2085 2086 self._assert_data( 2087 [ 2088 {"a": 1, "b": 2, "c": 3, "id": 1}, 2089 {"a": 2, "b": 4, "c": 5, "id": 2}, 2090 ], 2091 tablename="nopk", 2092 ) 2093 eq_( 2094 [col["name"] for col in inspect(self.conn).get_columns("foo")], 2095 ["id", "data", "x"], 2096 ) 2097 2098 def test_add_column_insert_before_recreate(self): 2099 with self.op.batch_alter_table("foo", recreate="always") as batch_op: 2100 batch_op.add_column( 2101 Column("data2", String(50), server_default="hi"), 2102 insert_before="data", 2103 ) 2104 self._assert_data( 2105 [ 2106 {"id": 1, "data": "d1", "x": 5, "data2": "hi"}, 2107 {"id": 2, "data": "22", "x": 6, "data2": "hi"}, 2108 {"id": 3, "data": "8.5", "x": 7, "data2": "hi"}, 2109 {"id": 4, "data": "9.46", "x": 8, "data2": "hi"}, 2110 {"id": 5, "data": "d5", "x": 9, "data2": "hi"}, 2111 ] 2112 ) 2113 eq_( 2114 [col["name"] for col in inspect(self.conn).get_columns("foo")], 2115 ["id", "data2", "data", "x"], 2116 ) 2117 2118 def test_add_column_insert_after_recreate(self): 2119 with self.op.batch_alter_table("foo", recreate="always") as batch_op: 2120 batch_op.add_column( 2121 Column("data2", String(50), server_default="hi"), 2122 insert_after="data", 2123 ) 2124 self._assert_data( 2125 [ 2126 {"id": 1, "data": "d1", "x": 5, "data2": "hi"}, 2127 {"id": 2, "data": "22", "x": 6, "data2": "hi"}, 2128 {"id": 3, "data": "8.5", "x": 7, "data2": "hi"}, 2129 {"id": 4, "data": "9.46", "x": 8, "data2": "hi"}, 2130 {"id": 5, "data": "d5", "x": 9, "data2": "hi"}, 2131 ] 2132 ) 2133 eq_( 2134 [col["name"] for col in inspect(self.conn).get_columns("foo")], 2135 ["id", "data", "data2", "x"], 2136 ) 2137 2138 def test_add_column_insert_before_raise_on_alter(self): 2139 def go(): 2140 with self.op.batch_alter_table("foo") as batch_op: 2141 batch_op.add_column( 2142 Column("data2", String(50), server_default="hi"), 2143 insert_before="data", 2144 ) 2145 2146 assert_raises_message( 2147 alembic_exc.CommandError, 2148 "Can't specify insert_before or insert_after when using ALTER", 2149 go, 2150 ) 2151 2152 def test_add_column_recreate(self): 2153 with self.op.batch_alter_table("foo", recreate="always") as batch_op: 2154 batch_op.add_column( 2155 Column("data2", String(50), server_default="hi") 2156 ) 2157 2158 self._assert_data( 2159 [ 2160 {"id": 1, "data": "d1", "x": 5, "data2": "hi"}, 2161 {"id": 2, "data": "22", "x": 6, "data2": "hi"}, 2162 {"id": 3, "data": "8.5", "x": 7, "data2": "hi"}, 2163 {"id": 4, "data": "9.46", "x": 8, "data2": "hi"}, 2164 {"id": 5, "data": "d5", "x": 9, "data2": "hi"}, 2165 ] 2166 ) 2167 eq_( 2168 [col["name"] for col in inspect(self.conn).get_columns("foo")], 2169 ["id", "data", "x", "data2"], 2170 ) 2171 2172 def test_create_drop_index(self): 2173 insp = inspect(self.conn) 2174 eq_(insp.get_indexes("foo"), []) 2175 2176 with self.op.batch_alter_table("foo", recreate="always") as batch_op: 2177 batch_op.create_index("ix_data", ["data"], unique=True) 2178 2179 self._assert_data( 2180 [ 2181 {"id": 1, "data": "d1", "x": 5}, 2182 {"id": 2, "data": "22", "x": 6}, 2183 {"id": 3, "data": "8.5", "x": 7}, 2184 {"id": 4, "data": "9.46", "x": 8}, 2185 {"id": 5, "data": "d5", "x": 9}, 2186 ] 2187 ) 2188 insp = inspect(self.conn) 2189 eq_( 2190 [ 2191 dict( 2192 unique=ix["unique"], 2193 name=ix["name"], 2194 column_names=ix["column_names"], 2195 ) 2196 for ix in insp.get_indexes("foo") 2197 ], 2198 [{"unique": True, "name": "ix_data", "column_names": ["data"]}], 2199 ) 2200 2201 with self.op.batch_alter_table("foo", recreate="always") as batch_op: 2202 batch_op.drop_index("ix_data") 2203 2204 insp = inspect(self.conn) 2205 eq_(insp.get_indexes("foo"), []) 2206 2207 2208class BatchRoundTripMySQLTest(BatchRoundTripTest): 2209 __only_on__ = "mysql", "mariadb" 2210 __backend__ = True 2211 2212 def _datetime_server_default_fixture(self): 2213 return func.current_timestamp() 2214 2215 @exclusions.fails() 2216 def test_drop_pk_col_readd_pk_col(self): 2217 super(BatchRoundTripMySQLTest, self).test_drop_pk_col_readd_pk_col() 2218 2219 @exclusions.fails() 2220 def test_drop_pk_col_readd_col_also_pk_const(self): 2221 super( 2222 BatchRoundTripMySQLTest, self 2223 ).test_drop_pk_col_readd_col_also_pk_const() 2224 2225 @exclusions.fails() 2226 def test_rename_column_pk(self): 2227 super(BatchRoundTripMySQLTest, self).test_rename_column_pk() 2228 2229 @exclusions.fails() 2230 def test_rename_column(self): 2231 super(BatchRoundTripMySQLTest, self).test_rename_column() 2232 2233 @exclusions.fails() 2234 def test_change_type(self): 2235 super(BatchRoundTripMySQLTest, self).test_change_type() 2236 2237 def test_create_drop_index(self): 2238 super(BatchRoundTripMySQLTest, self).test_create_drop_index() 2239 2240 # fails on mariadb 10.2, succeeds on 10.3 2241 @exclusions.fails_if(config.requirements.mysql_check_col_name_change) 2242 def test_rename_column_boolean(self): 2243 super(BatchRoundTripMySQLTest, self).test_rename_column_boolean() 2244 2245 def test_change_type_boolean_to_int(self): 2246 super(BatchRoundTripMySQLTest, self).test_change_type_boolean_to_int() 2247 2248 def test_change_type_int_to_boolean(self): 2249 super(BatchRoundTripMySQLTest, self).test_change_type_int_to_boolean() 2250 2251 2252class BatchRoundTripPostgresqlTest(BatchRoundTripTest): 2253 __only_on__ = "postgresql" 2254 __backend__ = True 2255 2256 def _native_boolean_fixture(self): 2257 t = Table( 2258 "has_native_bool", 2259 self.metadata, 2260 Column( 2261 "x", 2262 Boolean(create_constraint=True), 2263 server_default="false", 2264 nullable=False, 2265 ), 2266 Column("y", Integer), 2267 ) 2268 with self.conn.begin(): 2269 t.create(self.conn) 2270 2271 def _datetime_server_default_fixture(self): 2272 return func.current_timestamp() 2273 2274 @exclusions.fails() 2275 def test_drop_pk_col_readd_pk_col(self): 2276 super( 2277 BatchRoundTripPostgresqlTest, self 2278 ).test_drop_pk_col_readd_pk_col() 2279 2280 @exclusions.fails() 2281 def test_drop_pk_col_readd_col_also_pk_const(self): 2282 super( 2283 BatchRoundTripPostgresqlTest, self 2284 ).test_drop_pk_col_readd_col_also_pk_const() 2285 2286 @exclusions.fails() 2287 def test_change_type(self): 2288 super(BatchRoundTripPostgresqlTest, self).test_change_type() 2289 2290 def test_create_drop_index(self): 2291 super(BatchRoundTripPostgresqlTest, self).test_create_drop_index() 2292 2293 @exclusions.fails() 2294 def test_change_type_int_to_boolean(self): 2295 super( 2296 BatchRoundTripPostgresqlTest, self 2297 ).test_change_type_int_to_boolean() 2298 2299 @exclusions.fails() 2300 def test_change_type_boolean_to_int(self): 2301 super( 2302 BatchRoundTripPostgresqlTest, self 2303 ).test_change_type_boolean_to_int() 2304 2305 def test_add_col_table_has_native_boolean(self): 2306 self._native_boolean_fixture() 2307 2308 # to ensure test coverage on SQLAlchemy 1.4 and above, 2309 # force the create_constraint flag to True even though it 2310 # defaults to false in 1.4. this test wants to ensure that the 2311 # "should create" rule is consulted 2312 def listen_for_reflect(inspector, table, column_info): 2313 if isinstance(column_info["type"], Boolean): 2314 column_info["type"].create_constraint = True 2315 2316 with self.op.batch_alter_table( 2317 "has_native_bool", 2318 recreate="always", 2319 reflect_kwargs={ 2320 "listeners": [("column_reflect", listen_for_reflect)] 2321 }, 2322 ) as batch_op: 2323 batch_op.add_column(Column("data", Integer)) 2324 2325 insp = inspect(self.conn) 2326 2327 eq_( 2328 [ 2329 c["type"]._type_affinity 2330 for c in insp.get_columns("has_native_bool") 2331 if c["name"] == "data" 2332 ], 2333 [Integer], 2334 ) 2335 eq_( 2336 [ 2337 c["type"]._type_affinity 2338 for c in insp.get_columns("has_native_bool") 2339 if c["name"] == "x" 2340 ], 2341 [Boolean], 2342 ) 2343