1from sqlalchemy import BIGINT 2from sqlalchemy import BigInteger 3from sqlalchemy import Boolean 4from sqlalchemy import CHAR 5from sqlalchemy import CheckConstraint 6from sqlalchemy import Column 7from sqlalchemy import DATE 8from sqlalchemy import DateTime 9from sqlalchemy import DECIMAL 10from sqlalchemy import Enum 11from sqlalchemy import FLOAT 12from sqlalchemy import ForeignKey 13from sqlalchemy import ForeignKeyConstraint 14from sqlalchemy import Index 15from sqlalchemy import inspect 16from sqlalchemy import INTEGER 17from sqlalchemy import Integer 18from sqlalchemy import LargeBinary 19from sqlalchemy import MetaData 20from sqlalchemy import Numeric 21from sqlalchemy import PrimaryKeyConstraint 22from sqlalchemy import SmallInteger 23from sqlalchemy import String 24from sqlalchemy import Table 25from sqlalchemy import Text 26from sqlalchemy import text 27from sqlalchemy import TIMESTAMP 28from sqlalchemy import TypeDecorator 29from sqlalchemy import Unicode 30from sqlalchemy import UniqueConstraint 31from sqlalchemy import VARCHAR 32from sqlalchemy.dialects import mysql 33from sqlalchemy.dialects import sqlite 34from sqlalchemy.types import NULLTYPE 35from sqlalchemy.types import VARBINARY 36 37from alembic import autogenerate 38from alembic import testing 39from alembic.autogenerate import api 40from alembic.migration import MigrationContext 41from alembic.operations import ops 42from alembic.testing import assert_raises_message 43from alembic.testing import config 44from alembic.testing import eq_ 45from alembic.testing import is_ 46from alembic.testing import is_not_ 47from alembic.testing import mock 48from alembic.testing import schemacompare 49from alembic.testing import TestBase 50from alembic.testing.env import clear_staging_env 51from alembic.testing.env import staging_env 52from alembic.testing.suite._autogen_fixtures import _default_name_filters 53from alembic.testing.suite._autogen_fixtures import _default_object_filters 54from alembic.testing.suite._autogen_fixtures import AutogenFixtureTest 55from alembic.testing.suite._autogen_fixtures import AutogenTest 56from alembic.util import CommandError 57 58# TODO: we should make an adaptation of CompareMetadataToInspectorTest that is 59# more well suited towards generic backends (2021-06-10) 60 61 62class AutogenCrossSchemaTest(AutogenTest, TestBase): 63 __only_on__ = "postgresql" 64 __backend__ = True 65 66 @classmethod 67 def _get_db_schema(cls): 68 m = MetaData() 69 Table("t1", m, Column("x", Integer)) 70 Table("t2", m, Column("y", Integer), schema=config.test_schema) 71 Table("t6", m, Column("u", Integer)) 72 Table("t7", m, Column("v", Integer), schema=config.test_schema) 73 74 return m 75 76 @classmethod 77 def _get_model_schema(cls): 78 m = MetaData() 79 Table("t3", m, Column("q", Integer)) 80 Table("t4", m, Column("z", Integer), schema=config.test_schema) 81 Table("t6", m, Column("u", Integer)) 82 Table("t7", m, Column("v", Integer), schema=config.test_schema) 83 return m 84 85 def test_default_schema_omitted_upgrade(self): 86 def include_object(obj, name, type_, reflected, compare_to): 87 if type_ == "table": 88 return name == "t3" 89 else: 90 return True 91 92 self._update_context( 93 object_filters=include_object, include_schemas=True 94 ) 95 uo = ops.UpgradeOps(ops=[]) 96 autogenerate._produce_net_changes(self.autogen_context, uo) 97 98 diffs = uo.as_diffs() 99 eq_(diffs[0][0], "add_table") 100 eq_(diffs[0][1].schema, None) 101 102 def test_default_schema_omitted_by_table_name_upgrade(self): 103 def include_name(name, type_, parent_names): 104 if type_ == "table": 105 retval = name in ["t1", "t6"] 106 if retval: 107 eq_(parent_names["schema_name"], None) 108 eq_(parent_names["schema_qualified_table_name"], name) 109 else: 110 eq_(parent_names["schema_name"], config.test_schema) 111 eq_( 112 parent_names["schema_qualified_table_name"], 113 "%s.%s" % (config.test_schema, name), 114 ) 115 return retval 116 else: 117 return True 118 119 self._update_context(name_filters=include_name, include_schemas=True) 120 uo = ops.UpgradeOps(ops=[]) 121 autogenerate._produce_net_changes(self.autogen_context, uo) 122 123 diffs = uo.as_diffs() 124 eq_( 125 {(d[0], d[1].name) for d in diffs}, 126 { 127 ("add_table", "t3"), 128 ("add_table", "t4"), 129 ("remove_table", "t1"), 130 ("add_table", "t7"), 131 }, 132 ) 133 134 def test_default_schema_omitted_by_schema_name_upgrade(self): 135 def include_name(name, type_, parent_names): 136 if type_ == "schema": 137 assert not parent_names 138 return name is None 139 else: 140 return True 141 142 self._update_context(name_filters=include_name, include_schemas=True) 143 uo = ops.UpgradeOps(ops=[]) 144 autogenerate._produce_net_changes(self.autogen_context, uo) 145 146 diffs = uo.as_diffs() 147 eq_( 148 {(d[0], d[1].name) for d in diffs}, 149 { 150 ("add_table", "t3"), 151 ("add_table", "t4"), 152 ("remove_table", "t1"), 153 ("add_table", "t7"), 154 }, 155 ) 156 157 def test_alt_schema_included_upgrade(self): 158 def include_object(obj, name, type_, reflected, compare_to): 159 if type_ == "table": 160 return name == "t4" 161 else: 162 return True 163 164 self._update_context( 165 object_filters=include_object, include_schemas=True 166 ) 167 uo = ops.UpgradeOps(ops=[]) 168 autogenerate._produce_net_changes(self.autogen_context, uo) 169 170 diffs = uo.as_diffs() 171 eq_(diffs[0][0], "add_table") 172 eq_(diffs[0][1].schema, config.test_schema) 173 174 def test_alt_schema_included_by_schema_name(self): 175 def include_name(name, type_, parent_names): 176 if type_ == "schema": 177 assert not parent_names 178 return name == config.test_schema 179 else: 180 return True 181 182 self._update_context(name_filters=include_name, include_schemas=True) 183 uo = ops.UpgradeOps(ops=[]) 184 autogenerate._produce_net_changes(self.autogen_context, uo) 185 186 # does not include "t1" in drops because t1 is in default schema 187 # includes "t6" in adds because t6 is in default schema, was omitted, 188 # so reflection added it 189 diffs = uo.as_diffs() 190 eq_( 191 {(d[0], d[1].name) for d in diffs}, 192 { 193 ("add_table", "t3"), 194 ("add_table", "t6"), 195 ("add_table", "t4"), 196 ("remove_table", "t2"), 197 }, 198 ) 199 200 def test_default_schema_omitted_downgrade(self): 201 def include_object(obj, name, type_, reflected, compare_to): 202 if type_ == "table": 203 return name == "t1" 204 else: 205 return True 206 207 self._update_context( 208 object_filters=include_object, include_schemas=True 209 ) 210 uo = ops.UpgradeOps(ops=[]) 211 autogenerate._produce_net_changes(self.autogen_context, uo) 212 213 diffs = uo.as_diffs() 214 eq_(diffs[0][0], "remove_table") 215 eq_(diffs[0][1].schema, None) 216 217 def test_alt_schema_included_downgrade(self): 218 def include_object(obj, name, type_, reflected, compare_to): 219 if type_ == "table": 220 return name == "t2" 221 else: 222 return True 223 224 self._update_context( 225 object_filters=include_object, include_schemas=True 226 ) 227 uo = ops.UpgradeOps(ops=[]) 228 autogenerate._produce_net_changes(self.autogen_context, uo) 229 diffs = uo.as_diffs() 230 eq_(diffs[0][0], "remove_table") 231 eq_(diffs[0][1].schema, config.test_schema) 232 233 234class AutogenDefaultSchemaTest(AutogenFixtureTest, TestBase): 235 __only_on__ = "postgresql" 236 __backend__ = True 237 238 def test_uses_explcit_schema_in_default_one(self): 239 240 default_schema = self.bind.dialect.default_schema_name 241 242 m1 = MetaData() 243 m2 = MetaData() 244 245 Table("a", m1, Column("x", String(50))) 246 Table("a", m2, Column("x", String(50)), schema=default_schema) 247 248 diffs = self._fixture(m1, m2, include_schemas=True) 249 eq_(diffs, []) 250 251 def test_uses_explcit_schema_in_default_two(self): 252 253 default_schema = self.bind.dialect.default_schema_name 254 255 m1 = MetaData() 256 m2 = MetaData() 257 258 Table("a", m1, Column("x", String(50))) 259 Table("a", m2, Column("x", String(50)), schema=default_schema) 260 Table("a", m2, Column("y", String(50)), schema="test_schema") 261 262 diffs = self._fixture(m1, m2, include_schemas=True) 263 eq_(len(diffs), 1) 264 eq_(diffs[0][0], "add_table") 265 eq_(diffs[0][1].schema, "test_schema") 266 eq_(diffs[0][1].c.keys(), ["y"]) 267 268 def test_uses_explcit_schema_in_default_three(self): 269 270 default_schema = self.bind.dialect.default_schema_name 271 272 m1 = MetaData() 273 m2 = MetaData() 274 275 Table("a", m1, Column("y", String(50)), schema="test_schema") 276 277 Table("a", m2, Column("x", String(50)), schema=default_schema) 278 Table("a", m2, Column("y", String(50)), schema="test_schema") 279 280 diffs = self._fixture(m1, m2, include_schemas=True) 281 eq_(len(diffs), 1) 282 eq_(diffs[0][0], "add_table") 283 eq_(diffs[0][1].schema, default_schema) 284 eq_(diffs[0][1].c.keys(), ["x"]) 285 286 287class AutogenDefaultSchemaIsNoneTest(AutogenFixtureTest, TestBase): 288 __only_on__ = "sqlite" 289 290 def setUp(self): 291 super(AutogenDefaultSchemaIsNoneTest, self).setUp() 292 293 # in SQLAlchemy 1.4, SQLite dialect is setting this name 294 # to "main" as is the actual default schema name for SQLite. 295 self.bind.dialect.default_schema_name = None 296 297 # prerequisite 298 eq_(self.bind.dialect.default_schema_name, None) 299 300 def test_no_default_schema(self): 301 302 m1 = MetaData() 303 m2 = MetaData() 304 305 Table("a", m1, Column("x", String(50))) 306 Table("a", m2, Column("x", String(50))) 307 308 def _include_object(obj, name, type_, reflected, compare_to): 309 if type_ == "table": 310 return name in "a" and obj.schema != "main" 311 else: 312 return True 313 314 diffs = self._fixture( 315 m1, m2, include_schemas=True, object_filters=_include_object 316 ) 317 eq_(len(diffs), 0) 318 319 320class ModelOne: 321 __requires__ = ("unique_constraint_reflection",) 322 323 schema = None 324 325 @classmethod 326 def _get_db_schema(cls): 327 schema = cls.schema 328 329 m = MetaData(schema=schema) 330 331 Table( 332 "user", 333 m, 334 Column("id", Integer, primary_key=True), 335 Column("name", String(50)), 336 Column("a1", Text), 337 Column("pw", String(50)), 338 Index("pw_idx", "pw"), 339 ) 340 341 Table( 342 "address", 343 m, 344 Column("id", Integer, primary_key=True), 345 Column("email_address", String(100), nullable=False), 346 ) 347 348 Table( 349 "order", 350 m, 351 Column("order_id", Integer, primary_key=True), 352 Column( 353 "amount", 354 Numeric(8, 2), 355 nullable=False, 356 server_default=text("0"), 357 ), 358 CheckConstraint("amount >= 0", name="ck_order_amount"), 359 ) 360 361 Table( 362 "extra", 363 m, 364 Column("x", CHAR), 365 Column("uid", Integer, ForeignKey("user.id")), 366 ) 367 368 return m 369 370 @classmethod 371 def _get_model_schema(cls): 372 schema = cls.schema 373 374 m = MetaData(schema=schema) 375 376 Table( 377 "user", 378 m, 379 Column("id", Integer, primary_key=True), 380 Column("name", String(50), nullable=False), 381 Column("a1", Text, server_default="x"), 382 ) 383 384 Table( 385 "address", 386 m, 387 Column("id", Integer, primary_key=True), 388 Column("email_address", String(100), nullable=False), 389 Column("street", String(50)), 390 UniqueConstraint("email_address", name="uq_email"), 391 ) 392 393 Table( 394 "order", 395 m, 396 Column("order_id", Integer, primary_key=True), 397 Column( 398 "amount", 399 Numeric(10, 2), 400 nullable=True, 401 server_default=text("0"), 402 ), 403 Column("user_id", Integer, ForeignKey("user.id")), 404 CheckConstraint("amount > -1", name="ck_order_amount"), 405 ) 406 407 Table( 408 "item", 409 m, 410 Column("id", Integer, primary_key=True), 411 Column("description", String(100)), 412 Column("order_id", Integer, ForeignKey("order.order_id")), 413 CheckConstraint("len(description) > 5"), 414 ) 415 return m 416 417 418class AutogenerateDiffTest(ModelOne, AutogenTest, TestBase): 419 __only_on__ = "sqlite" 420 421 def test_diffs(self): 422 """test generation of diff rules""" 423 424 metadata = self.m2 425 uo = ops.UpgradeOps(ops=[]) 426 ctx = self.autogen_context 427 428 autogenerate._produce_net_changes(ctx, uo) 429 430 diffs = uo.as_diffs() 431 eq_( 432 diffs[0], 433 ("add_table", schemacompare.CompareTable(metadata.tables["item"])), 434 ) 435 436 eq_(diffs[1][0], "remove_table") 437 eq_(diffs[1][1].name, "extra") 438 439 eq_(diffs[2][0], "add_column") 440 eq_(diffs[2][1], None) 441 eq_(diffs[2][2], "address") 442 eq_(diffs[2][3], metadata.tables["address"].c.street) 443 444 eq_(diffs[3][0], "add_constraint") 445 eq_(diffs[3][1].name, "uq_email") 446 447 eq_(diffs[4][0], "add_column") 448 eq_(diffs[4][1], None) 449 eq_(diffs[4][2], "order") 450 eq_(diffs[4][3], metadata.tables["order"].c.user_id) 451 452 eq_(diffs[5][0][0], "modify_type") 453 eq_(diffs[5][0][1], None) 454 eq_(diffs[5][0][2], "order") 455 eq_(diffs[5][0][3], "amount") 456 eq_(repr(diffs[5][0][5]), "NUMERIC(precision=8, scale=2)") 457 eq_(repr(diffs[5][0][6]), "Numeric(precision=10, scale=2)") 458 459 self._assert_fk_diff( 460 diffs[6], "add_fk", "order", ["user_id"], "user", ["id"] 461 ) 462 463 eq_(diffs[7][0][0], "modify_nullable") 464 eq_(diffs[7][0][5], True) 465 eq_(diffs[7][0][6], False) 466 467 eq_(diffs[8][0][0], "modify_default") 468 eq_(diffs[8][0][1], None) 469 eq_(diffs[8][0][2], "user") 470 eq_(diffs[8][0][3], "a1") 471 eq_(diffs[8][0][6].arg, "x") 472 473 eq_(diffs[9][0], "remove_index") 474 eq_(diffs[9][1].name, "pw_idx") 475 476 eq_(diffs[10][0], "remove_column") 477 eq_(diffs[10][3].name, "pw") 478 eq_(diffs[10][3].table.name, "user") 479 assert isinstance(diffs[10][3].type, String) 480 481 def test_include_object(self): 482 def include_object(obj, name, type_, reflected, compare_to): 483 assert obj.name == name 484 if type_ == "table": 485 if reflected: 486 assert obj.metadata is not self.m2 487 else: 488 assert obj.metadata is self.m2 489 return name in ("address", "order", "user") 490 elif type_ == "column": 491 if reflected: 492 assert obj.table.metadata is not self.m2 493 else: 494 assert obj.table.metadata is self.m2 495 return name != "street" 496 else: 497 return True 498 499 context = MigrationContext.configure( 500 connection=self.bind.connect(), 501 opts={ 502 "compare_type": True, 503 "compare_server_default": True, 504 "target_metadata": self.m2, 505 "include_object": include_object, 506 }, 507 ) 508 509 diffs = autogenerate.compare_metadata( 510 context, context.opts["target_metadata"] 511 ) 512 513 alter_cols = ( 514 set( 515 [ 516 d[2] 517 for d in self._flatten_diffs(diffs) 518 if d[0].startswith("modify") 519 ] 520 ) 521 .union( 522 d[3].name 523 for d in self._flatten_diffs(diffs) 524 if d[0] == "add_column" 525 ) 526 .union( 527 d[1].name 528 for d in self._flatten_diffs(diffs) 529 if d[0] == "add_table" 530 ) 531 ) 532 eq_(alter_cols, set(["user_id", "order", "user"])) 533 534 def test_include_name(self): 535 all_names = set() 536 537 def include_name(name, type_, parent_names): 538 all_names.add((name, type_, parent_names.get("table_name", None))) 539 if type_ == "table": 540 eq_( 541 parent_names, 542 {"schema_name": None, "schema_qualified_table_name": name}, 543 ) 544 return name in ("address", "order", "user") 545 elif type_ == "column": 546 return name != "street" 547 else: 548 return True 549 550 context = MigrationContext.configure( 551 connection=self.bind.connect(), 552 opts={ 553 "compare_type": True, 554 "compare_server_default": True, 555 "target_metadata": self.m2, 556 "include_name": include_name, 557 }, 558 ) 559 560 diffs = autogenerate.compare_metadata( 561 context, context.opts["target_metadata"] 562 ) 563 eq_( 564 all_names, 565 { 566 (None, "schema", None), 567 ("user", "table", None), 568 ("id", "column", "user"), 569 ("name", "column", "user"), 570 ("a1", "column", "user"), 571 ("pw", "column", "user"), 572 ("pw_idx", "index", "user"), 573 ("order", "table", None), 574 ("order_id", "column", "order"), 575 ("amount", "column", "order"), 576 ("address", "table", None), 577 ("id", "column", "address"), 578 ("email_address", "column", "address"), 579 ("extra", "table", None), 580 }, 581 ) 582 583 alter_cols = ( 584 set( 585 [ 586 d[2] 587 for d in self._flatten_diffs(diffs) 588 if d[0].startswith("modify") 589 ] 590 ) 591 .union( 592 d[3].name 593 for d in self._flatten_diffs(diffs) 594 if d[0] == "add_column" 595 ) 596 .union( 597 d[1].name 598 for d in self._flatten_diffs(diffs) 599 if d[0] == "add_table" 600 ) 601 ) 602 eq_(alter_cols, {"user_id", "order", "user", "street", "item"}) 603 604 def test_skip_null_type_comparison_reflected(self): 605 ac = ops.AlterColumnOp("sometable", "somecol") 606 autogenerate.compare._compare_type( 607 self.autogen_context, 608 ac, 609 None, 610 "sometable", 611 "somecol", 612 Column("somecol", NULLTYPE), 613 Column("somecol", Integer()), 614 ) 615 diff = ac.to_diff_tuple() 616 assert not diff 617 618 def test_skip_null_type_comparison_local(self): 619 ac = ops.AlterColumnOp("sometable", "somecol") 620 autogenerate.compare._compare_type( 621 self.autogen_context, 622 ac, 623 None, 624 "sometable", 625 "somecol", 626 Column("somecol", Integer()), 627 Column("somecol", NULLTYPE), 628 ) 629 diff = ac.to_diff_tuple() 630 assert not diff 631 632 def test_custom_type_compare(self): 633 class MyType(TypeDecorator): 634 impl = Integer 635 636 def compare_against_backend(self, dialect, conn_type): 637 return isinstance(conn_type, Integer) 638 639 ac = ops.AlterColumnOp("sometable", "somecol") 640 autogenerate.compare._compare_type( 641 self.autogen_context, 642 ac, 643 None, 644 "sometable", 645 "somecol", 646 Column("somecol", INTEGER()), 647 Column("somecol", MyType()), 648 ) 649 650 assert not ac.has_changes() 651 652 ac = ops.AlterColumnOp("sometable", "somecol") 653 autogenerate.compare._compare_type( 654 self.autogen_context, 655 ac, 656 None, 657 "sometable", 658 "somecol", 659 Column("somecol", String()), 660 Column("somecol", MyType()), 661 ) 662 diff = ac.to_diff_tuple() 663 eq_(diff[0][0:4], ("modify_type", None, "sometable", "somecol")) 664 665 def test_affinity_typedec(self): 666 class MyType(TypeDecorator): 667 impl = CHAR 668 669 def load_dialect_impl(self, dialect): 670 if dialect.name == "sqlite": 671 return dialect.type_descriptor(Integer()) 672 else: 673 return dialect.type_descriptor(CHAR(32)) 674 675 uo = ops.AlterColumnOp("sometable", "somecol") 676 autogenerate.compare._compare_type( 677 self.autogen_context, 678 uo, 679 None, 680 "sometable", 681 "somecol", 682 Column("somecol", Integer, nullable=True), 683 Column("somecol", MyType()), 684 ) 685 assert not uo.has_changes() 686 687 def test_dont_barf_on_already_reflected(self): 688 from sqlalchemy.util import OrderedSet 689 690 inspector = inspect(self.bind) 691 uo = ops.UpgradeOps(ops=[]) 692 autogenerate.compare._compare_tables( 693 OrderedSet([(None, "extra"), (None, "user")]), 694 OrderedSet(), 695 inspector, 696 uo, 697 self.autogen_context, 698 ) 699 eq_( 700 [(rec[0], rec[1].name) for rec in uo.as_diffs()], 701 [ 702 ("remove_table", "extra"), 703 ("remove_index", "pw_idx"), 704 ("remove_table", "user"), 705 ], 706 ) 707 708 709class AutogenerateDiffTestWSchema(ModelOne, AutogenTest, TestBase): 710 __only_on__ = "postgresql" 711 __backend__ = True 712 schema = "test_schema" 713 714 def test_diffs(self): 715 """test generation of diff rules""" 716 717 metadata = self.m2 718 719 self._update_context(include_schemas=True) 720 uo = ops.UpgradeOps(ops=[]) 721 autogenerate._produce_net_changes(self.autogen_context, uo) 722 723 diffs = uo.as_diffs() 724 725 eq_( 726 diffs[0], 727 ( 728 "add_table", 729 schemacompare.CompareTable( 730 metadata.tables["%s.item" % self.schema] 731 ), 732 ), 733 ) 734 735 eq_(diffs[1][0], "remove_table") 736 eq_(diffs[1][1].name, "extra") 737 738 eq_(diffs[2][0], "add_column") 739 eq_(diffs[2][1], self.schema) 740 eq_(diffs[2][2], "address") 741 eq_( 742 schemacompare.CompareColumn( 743 metadata.tables["%s.address" % self.schema].c.street 744 ), 745 diffs[2][3], 746 ) 747 748 eq_(diffs[3][0], "add_constraint") 749 eq_(diffs[3][1].name, "uq_email") 750 751 eq_(diffs[4][0], "add_column") 752 eq_(diffs[4][1], self.schema) 753 eq_(diffs[4][2], "order") 754 eq_( 755 schemacompare.CompareColumn( 756 metadata.tables["%s.order" % self.schema].c.user_id 757 ), 758 diffs[4][3], 759 ) 760 761 eq_(diffs[5][0][0], "modify_type") 762 eq_(diffs[5][0][1], self.schema) 763 eq_(diffs[5][0][2], "order") 764 eq_(diffs[5][0][3], "amount") 765 eq_(repr(diffs[5][0][5]), "NUMERIC(precision=8, scale=2)") 766 eq_(repr(diffs[5][0][6]), "Numeric(precision=10, scale=2)") 767 768 self._assert_fk_diff( 769 diffs[6], 770 "add_fk", 771 "order", 772 ["user_id"], 773 "user", 774 ["id"], 775 source_schema=config.test_schema, 776 ) 777 778 eq_(diffs[7][0][0], "modify_nullable") 779 eq_(diffs[7][0][5], True) 780 eq_(diffs[7][0][6], False) 781 782 eq_(diffs[8][0][0], "modify_default") 783 eq_(diffs[8][0][1], self.schema) 784 eq_(diffs[8][0][2], "user") 785 eq_(diffs[8][0][3], "a1") 786 eq_(diffs[8][0][6].arg, "x") 787 788 eq_(diffs[9][0], "remove_index") 789 eq_(diffs[9][1].name, "pw_idx") 790 791 eq_(diffs[10][0], "remove_column") 792 eq_(diffs[10][3].name, "pw") 793 794 795class CompareTypeSpecificityTest(TestBase): 796 @testing.fixture 797 def impl_fixture(self): 798 from alembic.ddl import impl 799 from sqlalchemy.engine import default 800 801 return impl.DefaultImpl( 802 default.DefaultDialect(), None, False, True, None, {} 803 ) 804 805 def test_typedec_to_nonstandard(self, impl_fixture): 806 class PasswordType(TypeDecorator): 807 impl = VARBINARY 808 809 def copy(self, **kw): 810 return PasswordType(self.impl.length) 811 812 def load_dialect_impl(self, dialect): 813 if dialect.name == "default": 814 impl = sqlite.NUMERIC(self.length) 815 else: 816 impl = VARBINARY(self.length) 817 return dialect.type_descriptor(impl) 818 819 impl_fixture.compare_type( 820 Column("x", sqlite.NUMERIC(50)), Column("x", PasswordType(50)) 821 ) 822 823 @testing.combinations( 824 (VARCHAR(30), String(30), False), 825 (VARCHAR(30), String(40), True), 826 (VARCHAR(30), Integer(), True), 827 (Text(), String(255), True), 828 # insp + metadata types same number of 829 # args but are different; they're different 830 (DECIMAL(10, 5), DECIMAL(10, 6), True), 831 # insp + metadata types, inspected type 832 # has an additional arg; assume this is additional 833 # default precision on the part of the DB, assume they are 834 # equivalent 835 (DECIMAL(10, 5), DECIMAL(10), False), 836 # insp + metadata types, metadata type 837 # has an additional arg; this can go either way, either the 838 # metadata has extra precision, or the DB doesn't support the 839 # element, go with consider them equivalent for now 840 (DECIMAL(10), DECIMAL(10, 5), False), 841 (DECIMAL(10, 2), Numeric(10), False), 842 (DECIMAL(10, 5), Numeric(10, 5), False), 843 (DECIMAL(10, 5), Numeric(12, 5), True), 844 (DECIMAL(10, 5), DateTime(), True), 845 (Numeric(), Numeric(scale=5), False), 846 (INTEGER(), Integer(), False), 847 (BIGINT(), Integer(), True), 848 (BIGINT(), BigInteger(), False), 849 (BIGINT(), SmallInteger(), True), 850 (INTEGER(), SmallInteger(), True), 851 (Integer(), String(), True), 852 id_="ssa", 853 argnames="inspected_type,metadata_type,expected", 854 ) 855 def test_compare_type( 856 self, impl_fixture, inspected_type, metadata_type, expected 857 ): 858 859 is_( 860 impl_fixture.compare_type( 861 Column("x", inspected_type), Column("x", metadata_type) 862 ), 863 expected, 864 ) 865 866 867class CompareMetadataToInspectorTest(TestBase): 868 __backend__ = True 869 870 @classmethod 871 def _get_bind(cls): 872 return config.db 873 874 configure_opts = {} 875 876 def setUp(self): 877 staging_env() 878 self.bind = self._get_bind() 879 self.m1 = MetaData() 880 881 def tearDown(self): 882 self.m1.drop_all(self.bind) 883 clear_staging_env() 884 885 def _compare_columns(self, cola, colb): 886 Table("sometable", self.m1, Column("col", cola)) 887 self.m1.create_all(self.bind) 888 m2 = MetaData() 889 Table("sometable", m2, Column("col", colb)) 890 891 ctx_opts = { 892 "compare_type": True, 893 "compare_server_default": True, 894 "target_metadata": m2, 895 "upgrade_token": "upgrades", 896 "downgrade_token": "downgrades", 897 "alembic_module_prefix": "op.", 898 "sqlalchemy_module_prefix": "sa.", 899 "include_object": _default_object_filters, 900 "include_name": _default_name_filters, 901 } 902 if self.configure_opts: 903 ctx_opts.update(self.configure_opts) 904 with self.bind.connect() as conn: 905 context = MigrationContext.configure( 906 connection=conn, opts=ctx_opts 907 ) 908 autogen_context = api.AutogenContext(context, m2) 909 uo = ops.UpgradeOps(ops=[]) 910 autogenerate._produce_net_changes(autogen_context, uo) 911 return bool(uo.as_diffs()) 912 913 @testing.combinations( 914 (INTEGER(),), 915 (CHAR(),), 916 (VARCHAR(32),), 917 (Text(),), 918 (FLOAT(),), 919 (Numeric(),), 920 (DECIMAL(),), 921 (TIMESTAMP(),), 922 (DateTime(),), 923 (Boolean(),), 924 (BigInteger(),), 925 (SmallInteger(),), 926 (DATE(),), 927 (String(32),), 928 (LargeBinary(),), 929 (Unicode(32),), 930 (Enum("one", "two", "three", name="the_enum"),), 931 ) 932 def test_introspected_columns_match_metadata_columns(self, cola): 933 # this is ensuring false positives aren't generated for types 934 # that have not changed. 935 is_(self._compare_columns(cola, cola), False) 936 937 # TODO: ideally the backend-specific types would be tested 938 # within the test suites for those backends. 939 @testing.combinations( 940 (String(32), VARCHAR(32), False), 941 (VARCHAR(6), String(6), False), 942 (CHAR(), String(1), True), 943 (Text(), VARCHAR(255), True), 944 (Unicode(32), String(32), False, config.requirements.unicode_string), 945 (Unicode(32), VARCHAR(32), False, config.requirements.unicode_string), 946 (VARCHAR(6), VARCHAR(12), True), 947 (VARCHAR(6), String(12), True), 948 (Integer(), String(10), True), 949 (String(10), Integer(), True), 950 ( 951 Unicode(30, collation="en_US"), 952 Unicode(30, collation="en_US"), 953 False, # unfortunately dialects don't seem to consistently 954 # reflect collations right now so we can't test for 955 # positives here 956 config.requirements.postgresql, 957 ), 958 ( 959 mysql.VARCHAR(200, charset="utf8"), 960 Unicode(200), 961 False, 962 config.requirements.mysql, 963 ), 964 ( 965 mysql.VARCHAR(200, charset="latin1"), 966 mysql.VARCHAR(200, charset="utf-8"), 967 True, 968 config.requirements.mysql + config.requirements.sqlalchemy_13, 969 ), 970 ( 971 String(255, collation="utf8_bin"), 972 String(255), 973 False, 974 config.requirements.mysql, 975 ), 976 ( 977 String(255, collation="utf8_bin"), 978 String(255, collation="latin1_bin"), 979 True, 980 config.requirements.mysql + config.requirements.sqlalchemy_13, 981 ), 982 ) 983 def test_string_comparisons(self, cola, colb, expect_changes): 984 is_(self._compare_columns(cola, colb), expect_changes) 985 986 @testing.combinations( 987 ( 988 DateTime(), 989 DateTime(timezone=False), 990 False, 991 config.requirements.datetime_timezone, 992 ), 993 ( 994 DateTime(), 995 DateTime(timezone=True), 996 True, 997 config.requirements.datetime_timezone, 998 ), 999 ( 1000 DateTime(timezone=True), 1001 DateTime(timezone=False), 1002 True, 1003 config.requirements.datetime_timezone, 1004 ), 1005 ) 1006 def test_datetime_comparisons(self, cola, colb, expect_changes): 1007 is_(self._compare_columns(cola, colb), expect_changes) 1008 1009 @testing.combinations( 1010 (Integer(), Integer(), False), 1011 ( 1012 Integer(), 1013 Numeric(8, 0), 1014 True, 1015 config.requirements.integer_subtype_comparisons, 1016 ), 1017 (Numeric(8, 0), Numeric(8, 2), True), 1018 ( 1019 BigInteger(), 1020 Integer(), 1021 True, 1022 config.requirements.integer_subtype_comparisons, 1023 ), 1024 ( 1025 SmallInteger(), 1026 Integer(), 1027 True, 1028 config.requirements.integer_subtype_comparisons, 1029 ), 1030 ( # note that the mysql.INTEGER tests only use these params 1031 # if the dialect is "mysql". however we also test that their 1032 # dialect-agnostic representation compares by running this 1033 # against other dialects. 1034 mysql.INTEGER(unsigned=True, display_width=10), 1035 mysql.INTEGER(unsigned=True, display_width=10), 1036 False, 1037 ), 1038 (mysql.INTEGER(unsigned=True), mysql.INTEGER(unsigned=True), False), 1039 ( 1040 mysql.INTEGER(unsigned=True, display_width=10), 1041 mysql.INTEGER(unsigned=True), 1042 False, 1043 ), 1044 ( 1045 mysql.INTEGER(unsigned=True), 1046 mysql.INTEGER(unsigned=True, display_width=10), 1047 False, 1048 ), 1049 ) 1050 def test_numeric_comparisons(self, cola, colb, expect_changes): 1051 is_(self._compare_columns(cola, colb), expect_changes) 1052 1053 1054class AutogenSystemColTest(AutogenTest, TestBase): 1055 __only_on__ = "postgresql" 1056 1057 @classmethod 1058 def _get_db_schema(cls): 1059 m = MetaData() 1060 1061 Table("sometable", m, Column("id", Integer, primary_key=True)) 1062 return m 1063 1064 @classmethod 1065 def _get_model_schema(cls): 1066 m = MetaData() 1067 1068 # 'xmin' is implicitly present, when added to a model should produce 1069 # no change 1070 Table( 1071 "sometable", 1072 m, 1073 Column("id", Integer, primary_key=True), 1074 Column("xmin", Integer, system=True), 1075 ) 1076 return m 1077 1078 def test_dont_add_system(self): 1079 uo = ops.UpgradeOps(ops=[]) 1080 autogenerate._produce_net_changes(self.autogen_context, uo) 1081 1082 diffs = uo.as_diffs() 1083 eq_(diffs, []) 1084 1085 1086class AutogenerateVariantCompareTest(AutogenTest, TestBase): 1087 __backend__ = True 1088 1089 @classmethod 1090 def _get_db_schema(cls): 1091 m = MetaData() 1092 1093 Table( 1094 "sometable", 1095 m, 1096 Column( 1097 "id", 1098 BigInteger().with_variant(Integer, "sqlite"), 1099 primary_key=True, 1100 ), 1101 Column("value", String(50)), 1102 ) 1103 return m 1104 1105 @classmethod 1106 def _get_model_schema(cls): 1107 m = MetaData() 1108 1109 Table( 1110 "sometable", 1111 m, 1112 Column( 1113 "id", 1114 BigInteger().with_variant(Integer, "sqlite"), 1115 primary_key=True, 1116 ), 1117 Column("value", String(50)), 1118 ) 1119 return m 1120 1121 def test_variant_no_issue(self): 1122 uo = ops.UpgradeOps(ops=[]) 1123 autogenerate._produce_net_changes(self.autogen_context, uo) 1124 1125 diffs = uo.as_diffs() 1126 eq_(diffs, []) 1127 1128 1129class AutogenerateCustomCompareTypeTest(AutogenTest, TestBase): 1130 __only_on__ = "sqlite" 1131 1132 @classmethod 1133 def _get_db_schema(cls): 1134 m = MetaData() 1135 1136 Table( 1137 "sometable", 1138 m, 1139 Column("id", Integer, primary_key=True), 1140 Column("value", Integer), 1141 ) 1142 return m 1143 1144 @classmethod 1145 def _get_model_schema(cls): 1146 m = MetaData() 1147 1148 Table( 1149 "sometable", 1150 m, 1151 Column("id", Integer, primary_key=True), 1152 Column("value", String), 1153 ) 1154 return m 1155 1156 def test_uses_custom_compare_type_function(self): 1157 my_compare_type = mock.Mock() 1158 self.context._user_compare_type = my_compare_type 1159 1160 uo = ops.UpgradeOps(ops=[]) 1161 1162 ctx = self.autogen_context 1163 autogenerate._produce_net_changes(ctx, uo) 1164 1165 first_table = self.m2.tables["sometable"] 1166 first_column = first_table.columns["id"] 1167 1168 eq_(len(my_compare_type.mock_calls), 2) 1169 1170 # We'll just test the first call 1171 _, args, _ = my_compare_type.mock_calls[0] 1172 ( 1173 context, 1174 inspected_column, 1175 metadata_column, 1176 inspected_type, 1177 metadata_type, 1178 ) = args 1179 eq_(context, self.context) 1180 eq_(metadata_column, first_column) 1181 eq_(metadata_type, first_column.type) 1182 eq_(inspected_column.name, first_column.name) 1183 eq_(type(inspected_type), INTEGER) 1184 1185 def test_column_type_not_modified_custom_compare_type_returns_False(self): 1186 my_compare_type = mock.Mock() 1187 my_compare_type.return_value = False 1188 self.context._user_compare_type = my_compare_type 1189 1190 diffs = [] 1191 ctx = self.autogen_context 1192 diffs = [] 1193 autogenerate._produce_net_changes(ctx, diffs) 1194 1195 eq_(diffs, []) 1196 1197 def test_column_type_modified_custom_compare_type_returns_True(self): 1198 my_compare_type = mock.Mock() 1199 my_compare_type.return_value = True 1200 self.context._user_compare_type = my_compare_type 1201 1202 ctx = self.autogen_context 1203 uo = ops.UpgradeOps(ops=[]) 1204 autogenerate._produce_net_changes(ctx, uo) 1205 diffs = uo.as_diffs() 1206 1207 eq_(diffs[0][0][0], "modify_type") 1208 eq_(diffs[1][0][0], "modify_type") 1209 1210 1211class IncludeFiltersAPITest(AutogenTest, TestBase): 1212 @classmethod 1213 def _get_db_schema(cls): 1214 return MetaData() 1215 1216 @classmethod 1217 def _get_model_schema(cls): 1218 return MetaData() 1219 1220 def test_run_name_filters_supports_extension_types(self): 1221 include_name = mock.Mock() 1222 1223 self._update_context(name_filters=include_name, include_schemas=True) 1224 1225 self.autogen_context.run_name_filters( 1226 name="some_function", 1227 type_="function", 1228 parent_names={"schema_name": "public"}, 1229 ) 1230 1231 eq_( 1232 include_name.mock_calls, 1233 [ 1234 mock.call( 1235 "some_function", "function", {"schema_name": "public"} 1236 ) 1237 ], 1238 ) 1239 1240 def test_run_object_filters_supports_extension_types(self): 1241 include_object = mock.Mock() 1242 1243 self._update_context( 1244 object_filters=include_object, include_schemas=True 1245 ) 1246 1247 class ExtFunction: 1248 pass 1249 1250 extfunc = ExtFunction() 1251 self.autogen_context.run_object_filters( 1252 object_=extfunc, 1253 name="some_function", 1254 type_="function", 1255 reflected=False, 1256 compare_to=None, 1257 ) 1258 1259 eq_( 1260 include_object.mock_calls, 1261 [mock.call(extfunc, "some_function", "function", False, None)], 1262 ) 1263 1264 1265class PKConstraintUpgradesIgnoresNullableTest(AutogenTest, TestBase): 1266 __backend__ = True 1267 1268 # test behavior for issue originally observed in SQLAlchemy issue #3023, 1269 # alembic issue #199 1270 @classmethod 1271 def _get_db_schema(cls): 1272 m = MetaData() 1273 1274 Table( 1275 "person_to_role", 1276 m, 1277 Column("person_id", Integer, autoincrement=False), 1278 Column("role_id", Integer, autoincrement=False), 1279 PrimaryKeyConstraint("person_id", "role_id"), 1280 ) 1281 return m 1282 1283 @classmethod 1284 def _get_model_schema(cls): 1285 return cls._get_db_schema() 1286 1287 def test_no_change(self): 1288 uo = ops.UpgradeOps(ops=[]) 1289 ctx = self.autogen_context 1290 autogenerate._produce_net_changes(ctx, uo) 1291 diffs = uo.as_diffs() 1292 eq_(diffs, []) 1293 1294 1295class AutogenKeyTest(AutogenTest, TestBase): 1296 __only_on__ = "sqlite" 1297 1298 @classmethod 1299 def _get_db_schema(cls): 1300 m = MetaData() 1301 1302 Table( 1303 "someothertable", 1304 m, 1305 Column("id", Integer, primary_key=True), 1306 Column("value", Integer, key="somekey"), 1307 ) 1308 return m 1309 1310 @classmethod 1311 def _get_model_schema(cls): 1312 m = MetaData() 1313 1314 Table( 1315 "sometable", 1316 m, 1317 Column("id", Integer, primary_key=True), 1318 Column("value", Integer, key="someotherkey"), 1319 ) 1320 Table( 1321 "someothertable", 1322 m, 1323 Column("id", Integer, primary_key=True), 1324 Column("value", Integer, key="somekey"), 1325 Column("othervalue", Integer, key="otherkey"), 1326 ) 1327 return m 1328 1329 symbols = ["someothertable", "sometable"] 1330 1331 def test_autogen(self): 1332 1333 uo = ops.UpgradeOps(ops=[]) 1334 1335 ctx = self.autogen_context 1336 autogenerate._produce_net_changes(ctx, uo) 1337 diffs = uo.as_diffs() 1338 eq_(diffs[0][0], "add_table") 1339 eq_(diffs[0][1].name, "sometable") 1340 eq_(diffs[1][0], "add_column") 1341 eq_(diffs[1][3].key, "otherkey") 1342 1343 1344class AutogenVersionTableTest(AutogenTest, TestBase): 1345 __only_on__ = "sqlite" 1346 version_table_name = "alembic_version" 1347 version_table_schema = None 1348 1349 @classmethod 1350 def _get_db_schema(cls): 1351 m = MetaData() 1352 Table( 1353 cls.version_table_name, 1354 m, 1355 Column("x", Integer), 1356 schema=cls.version_table_schema, 1357 ) 1358 return m 1359 1360 @classmethod 1361 def _get_model_schema(cls): 1362 m = MetaData() 1363 return m 1364 1365 def test_no_version_table(self): 1366 ctx = self.autogen_context 1367 1368 uo = ops.UpgradeOps(ops=[]) 1369 autogenerate._produce_net_changes(ctx, uo) 1370 eq_(uo.as_diffs(), []) 1371 1372 def test_version_table_in_target(self): 1373 Table( 1374 self.version_table_name, 1375 self.m2, 1376 Column("x", Integer), 1377 schema=self.version_table_schema, 1378 ) 1379 1380 ctx = self.autogen_context 1381 uo = ops.UpgradeOps(ops=[]) 1382 autogenerate._produce_net_changes(ctx, uo) 1383 eq_(uo.as_diffs(), []) 1384 1385 1386class AutogenCustomVersionTableSchemaTest(AutogenVersionTableTest): 1387 __only_on__ = "postgresql" 1388 __backend__ = True 1389 version_table_schema = "test_schema" 1390 configure_opts = {"version_table_schema": "test_schema"} 1391 1392 1393class AutogenCustomVersionTableTest(AutogenVersionTableTest): 1394 version_table_name = "my_version_table" 1395 configure_opts = {"version_table": "my_version_table"} 1396 1397 1398class AutogenCustomVersionTableAndSchemaTest(AutogenVersionTableTest): 1399 __only_on__ = "postgresql" 1400 __backend__ = True 1401 version_table_name = "my_version_table" 1402 version_table_schema = "test_schema" 1403 configure_opts = { 1404 "version_table": "my_version_table", 1405 "version_table_schema": "test_schema", 1406 } 1407 1408 1409class AutogenerateDiffOrderTest(AutogenTest, TestBase): 1410 __only_on__ = "sqlite" 1411 1412 @classmethod 1413 def _get_db_schema(cls): 1414 return MetaData() 1415 1416 @classmethod 1417 def _get_model_schema(cls): 1418 m = MetaData() 1419 Table("parent", m, Column("id", Integer, primary_key=True)) 1420 1421 Table( 1422 "child", m, Column("parent_id", Integer, ForeignKey("parent.id")) 1423 ) 1424 1425 return m 1426 1427 def test_diffs_order(self): 1428 """ 1429 Added in order to test that child tables(tables with FKs) are 1430 generated before their parent tables 1431 """ 1432 1433 ctx = self.autogen_context 1434 uo = ops.UpgradeOps(ops=[]) 1435 autogenerate._produce_net_changes(ctx, uo) 1436 diffs = uo.as_diffs() 1437 1438 eq_(diffs[0][0], "add_table") 1439 eq_(diffs[0][1].name, "parent") 1440 eq_(diffs[1][0], "add_table") 1441 eq_(diffs[1][1].name, "child") 1442 1443 1444class CompareMetadataTest(ModelOne, AutogenTest, TestBase): 1445 __only_on__ = "sqlite" 1446 1447 def test_compare_metadata(self): 1448 metadata = self.m2 1449 1450 diffs = autogenerate.compare_metadata(self.context, metadata) 1451 1452 eq_( 1453 diffs[0], 1454 ("add_table", schemacompare.CompareTable(metadata.tables["item"])), 1455 ) 1456 1457 eq_(diffs[1][0], "remove_table") 1458 eq_(diffs[1][1].name, "extra") 1459 1460 eq_(diffs[2][0], "add_column") 1461 eq_(diffs[2][1], None) 1462 eq_(diffs[2][2], "address") 1463 eq_(diffs[2][3], metadata.tables["address"].c.street) 1464 1465 eq_(diffs[3][0], "add_constraint") 1466 eq_(diffs[3][1].name, "uq_email") 1467 1468 eq_(diffs[4][0], "add_column") 1469 eq_(diffs[4][1], None) 1470 eq_(diffs[4][2], "order") 1471 eq_(diffs[4][3], metadata.tables["order"].c.user_id) 1472 1473 eq_(diffs[5][0][0], "modify_type") 1474 eq_(diffs[5][0][1], None) 1475 eq_(diffs[5][0][2], "order") 1476 eq_(diffs[5][0][3], "amount") 1477 eq_(repr(diffs[5][0][5]), "NUMERIC(precision=8, scale=2)") 1478 eq_(repr(diffs[5][0][6]), "Numeric(precision=10, scale=2)") 1479 1480 self._assert_fk_diff( 1481 diffs[6], "add_fk", "order", ["user_id"], "user", ["id"] 1482 ) 1483 1484 eq_(diffs[7][0][0], "modify_nullable") 1485 eq_(diffs[7][0][5], True) 1486 eq_(diffs[7][0][6], False) 1487 1488 eq_(diffs[8][0][0], "modify_default") 1489 eq_(diffs[8][0][1], None) 1490 eq_(diffs[8][0][2], "user") 1491 eq_(diffs[8][0][3], "a1") 1492 eq_(diffs[8][0][6].arg, "x") 1493 1494 eq_(diffs[9][0], "remove_index") 1495 eq_(diffs[9][1].name, "pw_idx") 1496 1497 eq_(diffs[10][0], "remove_column") 1498 eq_(diffs[10][3].name, "pw") 1499 1500 def test_compare_metadata_include_object(self): 1501 metadata = self.m2 1502 1503 def include_object(obj, name, type_, reflected, compare_to): 1504 if type_ == "table": 1505 return name in ("extra", "order") 1506 elif type_ == "column": 1507 return name != "amount" 1508 else: 1509 return True 1510 1511 context = MigrationContext.configure( 1512 connection=self.bind.connect(), 1513 opts={ 1514 "compare_type": True, 1515 "compare_server_default": True, 1516 "include_object": include_object, 1517 }, 1518 ) 1519 1520 diffs = autogenerate.compare_metadata(context, metadata) 1521 1522 eq_(diffs[0][0], "remove_table") 1523 eq_(diffs[0][1].name, "extra") 1524 1525 eq_(diffs[1][0], "add_column") 1526 eq_(diffs[1][1], None) 1527 eq_(diffs[1][2], "order") 1528 eq_(diffs[1][3], metadata.tables["order"].c.user_id) 1529 1530 def test_compare_metadata_include_name(self): 1531 metadata = self.m2 1532 1533 all_names = set() 1534 1535 def include_name(name, type_, parent_names): 1536 all_names.add((name, type_, parent_names.get("table_name", None))) 1537 if type_ == "table": 1538 return name in ("extra", "order") 1539 elif type_ == "column": 1540 return name != "amount" 1541 else: 1542 return True 1543 1544 context = MigrationContext.configure( 1545 connection=self.bind.connect(), 1546 opts={ 1547 "compare_type": True, 1548 "compare_server_default": True, 1549 "include_name": include_name, 1550 }, 1551 ) 1552 1553 diffs = autogenerate.compare_metadata(context, metadata) 1554 eq_( 1555 all_names, 1556 { 1557 ("user", "table", None), 1558 ("order", "table", None), 1559 ("address", "table", None), 1560 (None, "schema", None), 1561 ("amount", "column", "order"), 1562 ("extra", "table", None), 1563 ("order_id", "column", "order"), 1564 }, 1565 ) 1566 1567 eq_( 1568 { 1569 ( 1570 d[0], 1571 d[3].name if d[0] == "add_column" else d[1].name, 1572 d[2] if d[0] == "add_column" else None, 1573 ) 1574 for d in diffs 1575 }, 1576 { 1577 ("remove_table", "extra", None), 1578 ("add_fk", None, None), 1579 ("add_column", "amount", "order"), 1580 ("add_table", "user", None), 1581 ("add_table", "item", None), 1582 ("add_column", "user_id", "order"), 1583 ("add_table", "address", None), 1584 }, 1585 ) 1586 1587 def test_compare_metadata_as_sql(self): 1588 context = MigrationContext.configure( 1589 connection=self.bind.connect(), opts={"as_sql": True} 1590 ) 1591 metadata = self.m2 1592 1593 assert_raises_message( 1594 CommandError, 1595 "autogenerate can't use as_sql=True as it prevents " 1596 "querying the database for schema information", 1597 autogenerate.compare_metadata, 1598 context, 1599 metadata, 1600 ) 1601 1602 1603class PGCompareMetaData(ModelOne, AutogenTest, TestBase): 1604 __only_on__ = "postgresql" 1605 __backend__ = True 1606 schema = "test_schema" 1607 1608 def test_compare_metadata_schema(self): 1609 metadata = self.m2 1610 1611 context = MigrationContext.configure( 1612 connection=self.bind.connect(), opts={"include_schemas": True} 1613 ) 1614 1615 diffs = autogenerate.compare_metadata(context, metadata) 1616 1617 eq_( 1618 diffs[0], 1619 ( 1620 "add_table", 1621 schemacompare.CompareTable( 1622 metadata.tables["test_schema.item"] 1623 ), 1624 ), 1625 ) 1626 1627 eq_(diffs[1][0], "remove_table") 1628 eq_(diffs[1][1].name, "extra") 1629 1630 eq_(diffs[2][0], "add_column") 1631 eq_(diffs[2][1], "test_schema") 1632 eq_(diffs[2][2], "address") 1633 eq_( 1634 schemacompare.CompareColumn( 1635 metadata.tables["test_schema.address"].c.street 1636 ), 1637 diffs[2][3], 1638 ) 1639 1640 eq_(diffs[3][0], "add_constraint") 1641 eq_(diffs[3][1].name, "uq_email") 1642 1643 eq_(diffs[4][0], "add_column") 1644 eq_(diffs[4][1], "test_schema") 1645 eq_(diffs[4][2], "order") 1646 eq_( 1647 schemacompare.CompareColumn( 1648 metadata.tables["test_schema.order"].c.user_id 1649 ), 1650 diffs[4][3], 1651 ) 1652 1653 eq_(diffs[5][0][0], "modify_nullable") 1654 eq_(diffs[5][0][5], False) 1655 eq_(diffs[5][0][6], True) 1656 1657 1658class OrigObjectTest(TestBase): 1659 def setUp(self): 1660 self.metadata = m = MetaData() 1661 t = Table( 1662 "t", 1663 m, 1664 Column("id", Integer(), primary_key=True), 1665 Column("x", Integer()), 1666 ) 1667 self.ix = Index("ix1", t.c.id) 1668 fk = ForeignKeyConstraint(["t_id"], ["t.id"]) 1669 q = Table("q", m, Column("t_id", Integer()), fk) 1670 self.table = t 1671 self.fk = fk 1672 self.ck = CheckConstraint(t.c.x > 5) 1673 t.append_constraint(self.ck) 1674 self.uq = UniqueConstraint(q.c.t_id) 1675 self.pk = t.primary_key 1676 1677 def test_drop_fk(self): 1678 fk = self.fk 1679 op = ops.DropConstraintOp.from_constraint(fk) 1680 eq_(op.to_constraint(), schemacompare.CompareForeignKey(fk)) 1681 eq_(op.reverse().to_constraint(), schemacompare.CompareForeignKey(fk)) 1682 1683 def test_add_fk(self): 1684 fk = self.fk 1685 op = ops.AddConstraintOp.from_constraint(fk) 1686 eq_(op.to_constraint(), schemacompare.CompareForeignKey(fk)) 1687 eq_(op.reverse().to_constraint(), schemacompare.CompareForeignKey(fk)) 1688 is_not_(None, op.to_constraint().table) 1689 1690 def test_add_check(self): 1691 ck = self.ck 1692 op = ops.AddConstraintOp.from_constraint(ck) 1693 eq_(op.to_constraint(), schemacompare.CompareCheckConstraint(ck)) 1694 eq_( 1695 op.reverse().to_constraint(), 1696 schemacompare.CompareCheckConstraint(ck), 1697 ) 1698 is_not_(None, op.to_constraint().table) 1699 1700 def test_drop_check(self): 1701 ck = self.ck 1702 op = ops.DropConstraintOp.from_constraint(ck) 1703 eq_(op.to_constraint(), schemacompare.CompareCheckConstraint(ck)) 1704 eq_( 1705 op.reverse().to_constraint(), 1706 schemacompare.CompareCheckConstraint(ck), 1707 ) 1708 is_not_(None, op.to_constraint().table) 1709 1710 def test_add_unique(self): 1711 uq = self.uq 1712 op = ops.AddConstraintOp.from_constraint(uq) 1713 eq_(op.to_constraint(), schemacompare.CompareUniqueConstraint(uq)) 1714 eq_( 1715 op.reverse().to_constraint(), 1716 schemacompare.CompareUniqueConstraint(uq), 1717 ) 1718 is_not_(None, op.to_constraint().table) 1719 1720 def test_drop_unique(self): 1721 uq = self.uq 1722 op = ops.DropConstraintOp.from_constraint(uq) 1723 eq_(op.to_constraint(), schemacompare.CompareUniqueConstraint(uq)) 1724 eq_( 1725 op.reverse().to_constraint(), 1726 schemacompare.CompareUniqueConstraint(uq), 1727 ) 1728 is_not_(None, op.to_constraint().table) 1729 1730 def test_add_pk_no_orig(self): 1731 op = ops.CreatePrimaryKeyOp("pk1", "t", ["x", "y"]) 1732 pk = op.to_constraint() 1733 eq_(pk.name, "pk1") 1734 eq_(pk.table.name, "t") 1735 1736 def test_add_pk(self): 1737 pk = self.pk 1738 op = ops.AddConstraintOp.from_constraint(pk) 1739 eq_(op.to_constraint(), schemacompare.ComparePrimaryKey(pk)) 1740 eq_(op.reverse().to_constraint(), schemacompare.ComparePrimaryKey(pk)) 1741 is_not_(None, op.to_constraint().table) 1742 1743 def test_drop_pk(self): 1744 pk = self.pk 1745 op = ops.DropConstraintOp.from_constraint(pk) 1746 eq_(op.to_constraint(), schemacompare.ComparePrimaryKey(pk)) 1747 eq_(op.reverse().to_constraint(), schemacompare.ComparePrimaryKey(pk)) 1748 is_not_(None, op.to_constraint().table) 1749 1750 def test_drop_column(self): 1751 t = self.table 1752 1753 op = ops.DropColumnOp.from_column_and_tablename(None, "t", t.c.x) 1754 is_(op.to_column(), t.c.x) 1755 is_(op.reverse().to_column(), t.c.x) 1756 is_not_(None, op.to_column().table) 1757 1758 def test_add_column(self): 1759 t = self.table 1760 1761 op = ops.AddColumnOp.from_column_and_tablename(None, "t", t.c.x) 1762 is_(op.to_column(), t.c.x) 1763 is_(op.reverse().to_column(), t.c.x) 1764 is_not_(None, op.to_column().table) 1765 1766 def test_drop_table(self): 1767 t = self.table 1768 1769 op = ops.DropTableOp.from_table(t) 1770 eq_(op.to_table(), schemacompare.CompareTable(t)) 1771 eq_(op.reverse().to_table(), schemacompare.CompareTable(t)) 1772 1773 def test_add_table(self): 1774 t = self.table 1775 1776 op = ops.CreateTableOp.from_table(t) 1777 eq_(op.to_table(), schemacompare.CompareTable(t)) 1778 eq_(op.reverse().to_table(), schemacompare.CompareTable(t)) 1779 1780 def test_drop_index(self): 1781 op = ops.DropIndexOp.from_index(self.ix) 1782 eq_(op.to_index(), schemacompare.CompareIndex(self.ix)) 1783 eq_(op.reverse().to_index(), schemacompare.CompareIndex(self.ix)) 1784 1785 def test_create_index(self): 1786 op = ops.CreateIndexOp.from_index(self.ix) 1787 eq_(op.to_index(), schemacompare.CompareIndex(self.ix)) 1788 eq_(op.reverse().to_index(), schemacompare.CompareIndex(self.ix)) 1789 1790 1791class MultipleMetaDataTest(AutogenFixtureTest, TestBase): 1792 def test_multiple(self): 1793 m1a = MetaData() 1794 m1b = MetaData() 1795 m1c = MetaData() 1796 1797 m2a = MetaData() 1798 m2b = MetaData() 1799 m2c = MetaData() 1800 1801 Table("a", m1a, Column("id", Integer, primary_key=True)) 1802 Table("b1", m1b, Column("id", Integer, primary_key=True)) 1803 Table("b2", m1b, Column("id", Integer, primary_key=True)) 1804 Table( 1805 "c1", 1806 m1c, 1807 Column("id", Integer, primary_key=True), 1808 Column("x", Integer), 1809 ) 1810 1811 a = Table( 1812 "a", 1813 m2a, 1814 Column("id", Integer, primary_key=True), 1815 Column("q", Integer), 1816 ) 1817 Table("b1", m2b, Column("id", Integer, primary_key=True)) 1818 Table("c1", m2c, Column("id", Integer, primary_key=True)) 1819 c2 = Table("c2", m2c, Column("id", Integer, primary_key=True)) 1820 1821 diffs = self._fixture([m1a, m1b, m1c], [m2a, m2b, m2c]) 1822 eq_(diffs[0], ("add_table", schemacompare.CompareTable(c2))) 1823 eq_(diffs[1][0], "remove_table") 1824 eq_(diffs[1][1].name, "b2") 1825 eq_(diffs[2], ("add_column", None, "a", a.c.q)) 1826 eq_(diffs[3][0:3], ("remove_column", None, "c1")) 1827 eq_(diffs[3][3].name, "x") 1828 1829 def test_empty_list(self): 1830 # because they're going to do it.... 1831 1832 diffs = self._fixture([], []) 1833 eq_(diffs, []) 1834 1835 def test_non_list_sequence(self): 1836 # we call it "sequence", let's check that 1837 1838 m1a = MetaData() 1839 m1b = MetaData() 1840 1841 m2a = MetaData() 1842 m2b = MetaData() 1843 1844 Table("a", m1a, Column("id", Integer, primary_key=True)) 1845 Table("b", m1b, Column("id", Integer, primary_key=True)) 1846 1847 Table("a", m2a, Column("id", Integer, primary_key=True)) 1848 b = Table( 1849 "b", 1850 m2b, 1851 Column("id", Integer, primary_key=True), 1852 Column("q", Integer), 1853 ) 1854 1855 diffs = self._fixture((m1a, m1b), (m2a, m2b)) 1856 eq_(diffs, [("add_column", None, "b", b.c.q)]) 1857 1858 def test_raise_on_dupe(self): 1859 m1a = MetaData() 1860 m1b = MetaData() 1861 1862 m2a = MetaData() 1863 m2b = MetaData() 1864 1865 Table("a", m1a, Column("id", Integer, primary_key=True)) 1866 Table("b1", m1b, Column("id", Integer, primary_key=True)) 1867 Table("b2", m1b, Column("id", Integer, primary_key=True)) 1868 Table("b3", m1b, Column("id", Integer, primary_key=True)) 1869 1870 Table("a", m2a, Column("id", Integer, primary_key=True)) 1871 Table("a", m2b, Column("id", Integer, primary_key=True)) 1872 Table("b1", m2b, Column("id", Integer, primary_key=True)) 1873 Table("b2", m2a, Column("id", Integer, primary_key=True)) 1874 Table("b2", m2b, Column("id", Integer, primary_key=True)) 1875 1876 assert_raises_message( 1877 ValueError, 1878 'Duplicate table keys across multiple MetaData objects: "a", "b2"', 1879 self._fixture, 1880 [m1a, m1b], 1881 [m2a, m2b], 1882 ) 1883