1from sqlalchemy import BIGINT
2from sqlalchemy import BigInteger
3from sqlalchemy import Boolean
4from sqlalchemy import CHAR
5from sqlalchemy import CheckConstraint
6from sqlalchemy import Column
7from sqlalchemy import DATE
8from sqlalchemy import DateTime
9from sqlalchemy import DECIMAL
10from sqlalchemy import Enum
11from sqlalchemy import FLOAT
12from sqlalchemy import ForeignKey
13from sqlalchemy import ForeignKeyConstraint
14from sqlalchemy import Index
15from sqlalchemy import inspect
16from sqlalchemy import INTEGER
17from sqlalchemy import Integer
18from sqlalchemy import LargeBinary
19from sqlalchemy import MetaData
20from sqlalchemy import Numeric
21from sqlalchemy import PrimaryKeyConstraint
22from sqlalchemy import SmallInteger
23from sqlalchemy import String
24from sqlalchemy import Table
25from sqlalchemy import Text
26from sqlalchemy import text
27from sqlalchemy import TIMESTAMP
28from sqlalchemy import TypeDecorator
29from sqlalchemy import Unicode
30from sqlalchemy import UniqueConstraint
31from sqlalchemy import VARCHAR
32from sqlalchemy.dialects import mysql
33from sqlalchemy.dialects import sqlite
34from sqlalchemy.types import NULLTYPE
35from sqlalchemy.types import VARBINARY
36
37from alembic import autogenerate
38from alembic import testing
39from alembic.autogenerate import api
40from alembic.migration import MigrationContext
41from alembic.operations import ops
42from alembic.testing import assert_raises_message
43from alembic.testing import config
44from alembic.testing import eq_
45from alembic.testing import is_
46from alembic.testing import is_not_
47from alembic.testing import mock
48from alembic.testing import schemacompare
49from alembic.testing import TestBase
50from alembic.testing.env import clear_staging_env
51from alembic.testing.env import staging_env
52from alembic.testing.suite._autogen_fixtures import _default_name_filters
53from alembic.testing.suite._autogen_fixtures import _default_object_filters
54from alembic.testing.suite._autogen_fixtures import AutogenFixtureTest
55from alembic.testing.suite._autogen_fixtures import AutogenTest
56from alembic.util import CommandError
57
58# TODO: we should make an adaptation of CompareMetadataToInspectorTest that is
59#       more well suited towards generic backends (2021-06-10)
60
61
62class AutogenCrossSchemaTest(AutogenTest, TestBase):
63    __only_on__ = "postgresql"
64    __backend__ = True
65
66    @classmethod
67    def _get_db_schema(cls):
68        m = MetaData()
69        Table("t1", m, Column("x", Integer))
70        Table("t2", m, Column("y", Integer), schema=config.test_schema)
71        Table("t6", m, Column("u", Integer))
72        Table("t7", m, Column("v", Integer), schema=config.test_schema)
73
74        return m
75
76    @classmethod
77    def _get_model_schema(cls):
78        m = MetaData()
79        Table("t3", m, Column("q", Integer))
80        Table("t4", m, Column("z", Integer), schema=config.test_schema)
81        Table("t6", m, Column("u", Integer))
82        Table("t7", m, Column("v", Integer), schema=config.test_schema)
83        return m
84
85    def test_default_schema_omitted_upgrade(self):
86        def include_object(obj, name, type_, reflected, compare_to):
87            if type_ == "table":
88                return name == "t3"
89            else:
90                return True
91
92        self._update_context(
93            object_filters=include_object, include_schemas=True
94        )
95        uo = ops.UpgradeOps(ops=[])
96        autogenerate._produce_net_changes(self.autogen_context, uo)
97
98        diffs = uo.as_diffs()
99        eq_(diffs[0][0], "add_table")
100        eq_(diffs[0][1].schema, None)
101
102    def test_default_schema_omitted_by_table_name_upgrade(self):
103        def include_name(name, type_, parent_names):
104            if type_ == "table":
105                retval = name in ["t1", "t6"]
106                if retval:
107                    eq_(parent_names["schema_name"], None)
108                    eq_(parent_names["schema_qualified_table_name"], name)
109                else:
110                    eq_(parent_names["schema_name"], config.test_schema)
111                    eq_(
112                        parent_names["schema_qualified_table_name"],
113                        "%s.%s" % (config.test_schema, name),
114                    )
115                return retval
116            else:
117                return True
118
119        self._update_context(name_filters=include_name, include_schemas=True)
120        uo = ops.UpgradeOps(ops=[])
121        autogenerate._produce_net_changes(self.autogen_context, uo)
122
123        diffs = uo.as_diffs()
124        eq_(
125            {(d[0], d[1].name) for d in diffs},
126            {
127                ("add_table", "t3"),
128                ("add_table", "t4"),
129                ("remove_table", "t1"),
130                ("add_table", "t7"),
131            },
132        )
133
134    def test_default_schema_omitted_by_schema_name_upgrade(self):
135        def include_name(name, type_, parent_names):
136            if type_ == "schema":
137                assert not parent_names
138                return name is None
139            else:
140                return True
141
142        self._update_context(name_filters=include_name, include_schemas=True)
143        uo = ops.UpgradeOps(ops=[])
144        autogenerate._produce_net_changes(self.autogen_context, uo)
145
146        diffs = uo.as_diffs()
147        eq_(
148            {(d[0], d[1].name) for d in diffs},
149            {
150                ("add_table", "t3"),
151                ("add_table", "t4"),
152                ("remove_table", "t1"),
153                ("add_table", "t7"),
154            },
155        )
156
157    def test_alt_schema_included_upgrade(self):
158        def include_object(obj, name, type_, reflected, compare_to):
159            if type_ == "table":
160                return name == "t4"
161            else:
162                return True
163
164        self._update_context(
165            object_filters=include_object, include_schemas=True
166        )
167        uo = ops.UpgradeOps(ops=[])
168        autogenerate._produce_net_changes(self.autogen_context, uo)
169
170        diffs = uo.as_diffs()
171        eq_(diffs[0][0], "add_table")
172        eq_(diffs[0][1].schema, config.test_schema)
173
174    def test_alt_schema_included_by_schema_name(self):
175        def include_name(name, type_, parent_names):
176            if type_ == "schema":
177                assert not parent_names
178                return name == config.test_schema
179            else:
180                return True
181
182        self._update_context(name_filters=include_name, include_schemas=True)
183        uo = ops.UpgradeOps(ops=[])
184        autogenerate._produce_net_changes(self.autogen_context, uo)
185
186        # does not include "t1" in drops because t1 is in default schema
187        # includes "t6" in adds because t6 is in default schema, was omitted,
188        # so reflection added it
189        diffs = uo.as_diffs()
190        eq_(
191            {(d[0], d[1].name) for d in diffs},
192            {
193                ("add_table", "t3"),
194                ("add_table", "t6"),
195                ("add_table", "t4"),
196                ("remove_table", "t2"),
197            },
198        )
199
200    def test_default_schema_omitted_downgrade(self):
201        def include_object(obj, name, type_, reflected, compare_to):
202            if type_ == "table":
203                return name == "t1"
204            else:
205                return True
206
207        self._update_context(
208            object_filters=include_object, include_schemas=True
209        )
210        uo = ops.UpgradeOps(ops=[])
211        autogenerate._produce_net_changes(self.autogen_context, uo)
212
213        diffs = uo.as_diffs()
214        eq_(diffs[0][0], "remove_table")
215        eq_(diffs[0][1].schema, None)
216
217    def test_alt_schema_included_downgrade(self):
218        def include_object(obj, name, type_, reflected, compare_to):
219            if type_ == "table":
220                return name == "t2"
221            else:
222                return True
223
224        self._update_context(
225            object_filters=include_object, include_schemas=True
226        )
227        uo = ops.UpgradeOps(ops=[])
228        autogenerate._produce_net_changes(self.autogen_context, uo)
229        diffs = uo.as_diffs()
230        eq_(diffs[0][0], "remove_table")
231        eq_(diffs[0][1].schema, config.test_schema)
232
233
234class AutogenDefaultSchemaTest(AutogenFixtureTest, TestBase):
235    __only_on__ = "postgresql"
236    __backend__ = True
237
238    def test_uses_explcit_schema_in_default_one(self):
239
240        default_schema = self.bind.dialect.default_schema_name
241
242        m1 = MetaData()
243        m2 = MetaData()
244
245        Table("a", m1, Column("x", String(50)))
246        Table("a", m2, Column("x", String(50)), schema=default_schema)
247
248        diffs = self._fixture(m1, m2, include_schemas=True)
249        eq_(diffs, [])
250
251    def test_uses_explcit_schema_in_default_two(self):
252
253        default_schema = self.bind.dialect.default_schema_name
254
255        m1 = MetaData()
256        m2 = MetaData()
257
258        Table("a", m1, Column("x", String(50)))
259        Table("a", m2, Column("x", String(50)), schema=default_schema)
260        Table("a", m2, Column("y", String(50)), schema="test_schema")
261
262        diffs = self._fixture(m1, m2, include_schemas=True)
263        eq_(len(diffs), 1)
264        eq_(diffs[0][0], "add_table")
265        eq_(diffs[0][1].schema, "test_schema")
266        eq_(diffs[0][1].c.keys(), ["y"])
267
268    def test_uses_explcit_schema_in_default_three(self):
269
270        default_schema = self.bind.dialect.default_schema_name
271
272        m1 = MetaData()
273        m2 = MetaData()
274
275        Table("a", m1, Column("y", String(50)), schema="test_schema")
276
277        Table("a", m2, Column("x", String(50)), schema=default_schema)
278        Table("a", m2, Column("y", String(50)), schema="test_schema")
279
280        diffs = self._fixture(m1, m2, include_schemas=True)
281        eq_(len(diffs), 1)
282        eq_(diffs[0][0], "add_table")
283        eq_(diffs[0][1].schema, default_schema)
284        eq_(diffs[0][1].c.keys(), ["x"])
285
286
287class AutogenDefaultSchemaIsNoneTest(AutogenFixtureTest, TestBase):
288    __only_on__ = "sqlite"
289
290    def setUp(self):
291        super(AutogenDefaultSchemaIsNoneTest, self).setUp()
292
293        # in SQLAlchemy 1.4, SQLite dialect is setting this name
294        # to "main" as is the actual default schema name for SQLite.
295        self.bind.dialect.default_schema_name = None
296
297        # prerequisite
298        eq_(self.bind.dialect.default_schema_name, None)
299
300    def test_no_default_schema(self):
301
302        m1 = MetaData()
303        m2 = MetaData()
304
305        Table("a", m1, Column("x", String(50)))
306        Table("a", m2, Column("x", String(50)))
307
308        def _include_object(obj, name, type_, reflected, compare_to):
309            if type_ == "table":
310                return name in "a" and obj.schema != "main"
311            else:
312                return True
313
314        diffs = self._fixture(
315            m1, m2, include_schemas=True, object_filters=_include_object
316        )
317        eq_(len(diffs), 0)
318
319
320class ModelOne:
321    __requires__ = ("unique_constraint_reflection",)
322
323    schema = None
324
325    @classmethod
326    def _get_db_schema(cls):
327        schema = cls.schema
328
329        m = MetaData(schema=schema)
330
331        Table(
332            "user",
333            m,
334            Column("id", Integer, primary_key=True),
335            Column("name", String(50)),
336            Column("a1", Text),
337            Column("pw", String(50)),
338            Index("pw_idx", "pw"),
339        )
340
341        Table(
342            "address",
343            m,
344            Column("id", Integer, primary_key=True),
345            Column("email_address", String(100), nullable=False),
346        )
347
348        Table(
349            "order",
350            m,
351            Column("order_id", Integer, primary_key=True),
352            Column(
353                "amount",
354                Numeric(8, 2),
355                nullable=False,
356                server_default=text("0"),
357            ),
358            CheckConstraint("amount >= 0", name="ck_order_amount"),
359        )
360
361        Table(
362            "extra",
363            m,
364            Column("x", CHAR),
365            Column("uid", Integer, ForeignKey("user.id")),
366        )
367
368        return m
369
370    @classmethod
371    def _get_model_schema(cls):
372        schema = cls.schema
373
374        m = MetaData(schema=schema)
375
376        Table(
377            "user",
378            m,
379            Column("id", Integer, primary_key=True),
380            Column("name", String(50), nullable=False),
381            Column("a1", Text, server_default="x"),
382        )
383
384        Table(
385            "address",
386            m,
387            Column("id", Integer, primary_key=True),
388            Column("email_address", String(100), nullable=False),
389            Column("street", String(50)),
390            UniqueConstraint("email_address", name="uq_email"),
391        )
392
393        Table(
394            "order",
395            m,
396            Column("order_id", Integer, primary_key=True),
397            Column(
398                "amount",
399                Numeric(10, 2),
400                nullable=True,
401                server_default=text("0"),
402            ),
403            Column("user_id", Integer, ForeignKey("user.id")),
404            CheckConstraint("amount > -1", name="ck_order_amount"),
405        )
406
407        Table(
408            "item",
409            m,
410            Column("id", Integer, primary_key=True),
411            Column("description", String(100)),
412            Column("order_id", Integer, ForeignKey("order.order_id")),
413            CheckConstraint("len(description) > 5"),
414        )
415        return m
416
417
418class AutogenerateDiffTest(ModelOne, AutogenTest, TestBase):
419    __only_on__ = "sqlite"
420
421    def test_diffs(self):
422        """test generation of diff rules"""
423
424        metadata = self.m2
425        uo = ops.UpgradeOps(ops=[])
426        ctx = self.autogen_context
427
428        autogenerate._produce_net_changes(ctx, uo)
429
430        diffs = uo.as_diffs()
431        eq_(
432            diffs[0],
433            ("add_table", schemacompare.CompareTable(metadata.tables["item"])),
434        )
435
436        eq_(diffs[1][0], "remove_table")
437        eq_(diffs[1][1].name, "extra")
438
439        eq_(diffs[2][0], "add_column")
440        eq_(diffs[2][1], None)
441        eq_(diffs[2][2], "address")
442        eq_(diffs[2][3], metadata.tables["address"].c.street)
443
444        eq_(diffs[3][0], "add_constraint")
445        eq_(diffs[3][1].name, "uq_email")
446
447        eq_(diffs[4][0], "add_column")
448        eq_(diffs[4][1], None)
449        eq_(diffs[4][2], "order")
450        eq_(diffs[4][3], metadata.tables["order"].c.user_id)
451
452        eq_(diffs[5][0][0], "modify_type")
453        eq_(diffs[5][0][1], None)
454        eq_(diffs[5][0][2], "order")
455        eq_(diffs[5][0][3], "amount")
456        eq_(repr(diffs[5][0][5]), "NUMERIC(precision=8, scale=2)")
457        eq_(repr(diffs[5][0][6]), "Numeric(precision=10, scale=2)")
458
459        self._assert_fk_diff(
460            diffs[6], "add_fk", "order", ["user_id"], "user", ["id"]
461        )
462
463        eq_(diffs[7][0][0], "modify_nullable")
464        eq_(diffs[7][0][5], True)
465        eq_(diffs[7][0][6], False)
466
467        eq_(diffs[8][0][0], "modify_default")
468        eq_(diffs[8][0][1], None)
469        eq_(diffs[8][0][2], "user")
470        eq_(diffs[8][0][3], "a1")
471        eq_(diffs[8][0][6].arg, "x")
472
473        eq_(diffs[9][0], "remove_index")
474        eq_(diffs[9][1].name, "pw_idx")
475
476        eq_(diffs[10][0], "remove_column")
477        eq_(diffs[10][3].name, "pw")
478        eq_(diffs[10][3].table.name, "user")
479        assert isinstance(diffs[10][3].type, String)
480
481    def test_include_object(self):
482        def include_object(obj, name, type_, reflected, compare_to):
483            assert obj.name == name
484            if type_ == "table":
485                if reflected:
486                    assert obj.metadata is not self.m2
487                else:
488                    assert obj.metadata is self.m2
489                return name in ("address", "order", "user")
490            elif type_ == "column":
491                if reflected:
492                    assert obj.table.metadata is not self.m2
493                else:
494                    assert obj.table.metadata is self.m2
495                return name != "street"
496            else:
497                return True
498
499        context = MigrationContext.configure(
500            connection=self.bind.connect(),
501            opts={
502                "compare_type": True,
503                "compare_server_default": True,
504                "target_metadata": self.m2,
505                "include_object": include_object,
506            },
507        )
508
509        diffs = autogenerate.compare_metadata(
510            context, context.opts["target_metadata"]
511        )
512
513        alter_cols = (
514            set(
515                [
516                    d[2]
517                    for d in self._flatten_diffs(diffs)
518                    if d[0].startswith("modify")
519                ]
520            )
521            .union(
522                d[3].name
523                for d in self._flatten_diffs(diffs)
524                if d[0] == "add_column"
525            )
526            .union(
527                d[1].name
528                for d in self._flatten_diffs(diffs)
529                if d[0] == "add_table"
530            )
531        )
532        eq_(alter_cols, set(["user_id", "order", "user"]))
533
534    def test_include_name(self):
535        all_names = set()
536
537        def include_name(name, type_, parent_names):
538            all_names.add((name, type_, parent_names.get("table_name", None)))
539            if type_ == "table":
540                eq_(
541                    parent_names,
542                    {"schema_name": None, "schema_qualified_table_name": name},
543                )
544                return name in ("address", "order", "user")
545            elif type_ == "column":
546                return name != "street"
547            else:
548                return True
549
550        context = MigrationContext.configure(
551            connection=self.bind.connect(),
552            opts={
553                "compare_type": True,
554                "compare_server_default": True,
555                "target_metadata": self.m2,
556                "include_name": include_name,
557            },
558        )
559
560        diffs = autogenerate.compare_metadata(
561            context, context.opts["target_metadata"]
562        )
563        eq_(
564            all_names,
565            {
566                (None, "schema", None),
567                ("user", "table", None),
568                ("id", "column", "user"),
569                ("name", "column", "user"),
570                ("a1", "column", "user"),
571                ("pw", "column", "user"),
572                ("pw_idx", "index", "user"),
573                ("order", "table", None),
574                ("order_id", "column", "order"),
575                ("amount", "column", "order"),
576                ("address", "table", None),
577                ("id", "column", "address"),
578                ("email_address", "column", "address"),
579                ("extra", "table", None),
580            },
581        )
582
583        alter_cols = (
584            set(
585                [
586                    d[2]
587                    for d in self._flatten_diffs(diffs)
588                    if d[0].startswith("modify")
589                ]
590            )
591            .union(
592                d[3].name
593                for d in self._flatten_diffs(diffs)
594                if d[0] == "add_column"
595            )
596            .union(
597                d[1].name
598                for d in self._flatten_diffs(diffs)
599                if d[0] == "add_table"
600            )
601        )
602        eq_(alter_cols, {"user_id", "order", "user", "street", "item"})
603
604    def test_skip_null_type_comparison_reflected(self):
605        ac = ops.AlterColumnOp("sometable", "somecol")
606        autogenerate.compare._compare_type(
607            self.autogen_context,
608            ac,
609            None,
610            "sometable",
611            "somecol",
612            Column("somecol", NULLTYPE),
613            Column("somecol", Integer()),
614        )
615        diff = ac.to_diff_tuple()
616        assert not diff
617
618    def test_skip_null_type_comparison_local(self):
619        ac = ops.AlterColumnOp("sometable", "somecol")
620        autogenerate.compare._compare_type(
621            self.autogen_context,
622            ac,
623            None,
624            "sometable",
625            "somecol",
626            Column("somecol", Integer()),
627            Column("somecol", NULLTYPE),
628        )
629        diff = ac.to_diff_tuple()
630        assert not diff
631
632    def test_custom_type_compare(self):
633        class MyType(TypeDecorator):
634            impl = Integer
635
636            def compare_against_backend(self, dialect, conn_type):
637                return isinstance(conn_type, Integer)
638
639        ac = ops.AlterColumnOp("sometable", "somecol")
640        autogenerate.compare._compare_type(
641            self.autogen_context,
642            ac,
643            None,
644            "sometable",
645            "somecol",
646            Column("somecol", INTEGER()),
647            Column("somecol", MyType()),
648        )
649
650        assert not ac.has_changes()
651
652        ac = ops.AlterColumnOp("sometable", "somecol")
653        autogenerate.compare._compare_type(
654            self.autogen_context,
655            ac,
656            None,
657            "sometable",
658            "somecol",
659            Column("somecol", String()),
660            Column("somecol", MyType()),
661        )
662        diff = ac.to_diff_tuple()
663        eq_(diff[0][0:4], ("modify_type", None, "sometable", "somecol"))
664
665    def test_affinity_typedec(self):
666        class MyType(TypeDecorator):
667            impl = CHAR
668
669            def load_dialect_impl(self, dialect):
670                if dialect.name == "sqlite":
671                    return dialect.type_descriptor(Integer())
672                else:
673                    return dialect.type_descriptor(CHAR(32))
674
675        uo = ops.AlterColumnOp("sometable", "somecol")
676        autogenerate.compare._compare_type(
677            self.autogen_context,
678            uo,
679            None,
680            "sometable",
681            "somecol",
682            Column("somecol", Integer, nullable=True),
683            Column("somecol", MyType()),
684        )
685        assert not uo.has_changes()
686
687    def test_dont_barf_on_already_reflected(self):
688        from sqlalchemy.util import OrderedSet
689
690        inspector = inspect(self.bind)
691        uo = ops.UpgradeOps(ops=[])
692        autogenerate.compare._compare_tables(
693            OrderedSet([(None, "extra"), (None, "user")]),
694            OrderedSet(),
695            inspector,
696            uo,
697            self.autogen_context,
698        )
699        eq_(
700            [(rec[0], rec[1].name) for rec in uo.as_diffs()],
701            [
702                ("remove_table", "extra"),
703                ("remove_index", "pw_idx"),
704                ("remove_table", "user"),
705            ],
706        )
707
708
709class AutogenerateDiffTestWSchema(ModelOne, AutogenTest, TestBase):
710    __only_on__ = "postgresql"
711    __backend__ = True
712    schema = "test_schema"
713
714    def test_diffs(self):
715        """test generation of diff rules"""
716
717        metadata = self.m2
718
719        self._update_context(include_schemas=True)
720        uo = ops.UpgradeOps(ops=[])
721        autogenerate._produce_net_changes(self.autogen_context, uo)
722
723        diffs = uo.as_diffs()
724
725        eq_(
726            diffs[0],
727            (
728                "add_table",
729                schemacompare.CompareTable(
730                    metadata.tables["%s.item" % self.schema]
731                ),
732            ),
733        )
734
735        eq_(diffs[1][0], "remove_table")
736        eq_(diffs[1][1].name, "extra")
737
738        eq_(diffs[2][0], "add_column")
739        eq_(diffs[2][1], self.schema)
740        eq_(diffs[2][2], "address")
741        eq_(
742            schemacompare.CompareColumn(
743                metadata.tables["%s.address" % self.schema].c.street
744            ),
745            diffs[2][3],
746        )
747
748        eq_(diffs[3][0], "add_constraint")
749        eq_(diffs[3][1].name, "uq_email")
750
751        eq_(diffs[4][0], "add_column")
752        eq_(diffs[4][1], self.schema)
753        eq_(diffs[4][2], "order")
754        eq_(
755            schemacompare.CompareColumn(
756                metadata.tables["%s.order" % self.schema].c.user_id
757            ),
758            diffs[4][3],
759        )
760
761        eq_(diffs[5][0][0], "modify_type")
762        eq_(diffs[5][0][1], self.schema)
763        eq_(diffs[5][0][2], "order")
764        eq_(diffs[5][0][3], "amount")
765        eq_(repr(diffs[5][0][5]), "NUMERIC(precision=8, scale=2)")
766        eq_(repr(diffs[5][0][6]), "Numeric(precision=10, scale=2)")
767
768        self._assert_fk_diff(
769            diffs[6],
770            "add_fk",
771            "order",
772            ["user_id"],
773            "user",
774            ["id"],
775            source_schema=config.test_schema,
776        )
777
778        eq_(diffs[7][0][0], "modify_nullable")
779        eq_(diffs[7][0][5], True)
780        eq_(diffs[7][0][6], False)
781
782        eq_(diffs[8][0][0], "modify_default")
783        eq_(diffs[8][0][1], self.schema)
784        eq_(diffs[8][0][2], "user")
785        eq_(diffs[8][0][3], "a1")
786        eq_(diffs[8][0][6].arg, "x")
787
788        eq_(diffs[9][0], "remove_index")
789        eq_(diffs[9][1].name, "pw_idx")
790
791        eq_(diffs[10][0], "remove_column")
792        eq_(diffs[10][3].name, "pw")
793
794
795class CompareTypeSpecificityTest(TestBase):
796    @testing.fixture
797    def impl_fixture(self):
798        from alembic.ddl import impl
799        from sqlalchemy.engine import default
800
801        return impl.DefaultImpl(
802            default.DefaultDialect(), None, False, True, None, {}
803        )
804
805    def test_typedec_to_nonstandard(self, impl_fixture):
806        class PasswordType(TypeDecorator):
807            impl = VARBINARY
808
809            def copy(self, **kw):
810                return PasswordType(self.impl.length)
811
812            def load_dialect_impl(self, dialect):
813                if dialect.name == "default":
814                    impl = sqlite.NUMERIC(self.length)
815                else:
816                    impl = VARBINARY(self.length)
817                return dialect.type_descriptor(impl)
818
819        impl_fixture.compare_type(
820            Column("x", sqlite.NUMERIC(50)), Column("x", PasswordType(50))
821        )
822
823    @testing.combinations(
824        (VARCHAR(30), String(30), False),
825        (VARCHAR(30), String(40), True),
826        (VARCHAR(30), Integer(), True),
827        (Text(), String(255), True),
828        # insp + metadata types same number of
829        # args but are different; they're different
830        (DECIMAL(10, 5), DECIMAL(10, 6), True),
831        # insp + metadata types, inspected type
832        # has an additional arg; assume this is additional
833        # default precision on the part of the DB, assume they are
834        # equivalent
835        (DECIMAL(10, 5), DECIMAL(10), False),
836        # insp + metadata types, metadata type
837        # has an additional arg; this can go either way, either the
838        # metadata has extra precision, or the DB doesn't support the
839        # element, go with consider them equivalent for now
840        (DECIMAL(10), DECIMAL(10, 5), False),
841        (DECIMAL(10, 2), Numeric(10), False),
842        (DECIMAL(10, 5), Numeric(10, 5), False),
843        (DECIMAL(10, 5), Numeric(12, 5), True),
844        (DECIMAL(10, 5), DateTime(), True),
845        (Numeric(), Numeric(scale=5), False),
846        (INTEGER(), Integer(), False),
847        (BIGINT(), Integer(), True),
848        (BIGINT(), BigInteger(), False),
849        (BIGINT(), SmallInteger(), True),
850        (INTEGER(), SmallInteger(), True),
851        (Integer(), String(), True),
852        id_="ssa",
853        argnames="inspected_type,metadata_type,expected",
854    )
855    def test_compare_type(
856        self, impl_fixture, inspected_type, metadata_type, expected
857    ):
858
859        is_(
860            impl_fixture.compare_type(
861                Column("x", inspected_type), Column("x", metadata_type)
862            ),
863            expected,
864        )
865
866
867class CompareMetadataToInspectorTest(TestBase):
868    __backend__ = True
869
870    @classmethod
871    def _get_bind(cls):
872        return config.db
873
874    configure_opts = {}
875
876    def setUp(self):
877        staging_env()
878        self.bind = self._get_bind()
879        self.m1 = MetaData()
880
881    def tearDown(self):
882        self.m1.drop_all(self.bind)
883        clear_staging_env()
884
885    def _compare_columns(self, cola, colb):
886        Table("sometable", self.m1, Column("col", cola))
887        self.m1.create_all(self.bind)
888        m2 = MetaData()
889        Table("sometable", m2, Column("col", colb))
890
891        ctx_opts = {
892            "compare_type": True,
893            "compare_server_default": True,
894            "target_metadata": m2,
895            "upgrade_token": "upgrades",
896            "downgrade_token": "downgrades",
897            "alembic_module_prefix": "op.",
898            "sqlalchemy_module_prefix": "sa.",
899            "include_object": _default_object_filters,
900            "include_name": _default_name_filters,
901        }
902        if self.configure_opts:
903            ctx_opts.update(self.configure_opts)
904        with self.bind.connect() as conn:
905            context = MigrationContext.configure(
906                connection=conn, opts=ctx_opts
907            )
908            autogen_context = api.AutogenContext(context, m2)
909            uo = ops.UpgradeOps(ops=[])
910            autogenerate._produce_net_changes(autogen_context, uo)
911        return bool(uo.as_diffs())
912
913    @testing.combinations(
914        (INTEGER(),),
915        (CHAR(),),
916        (VARCHAR(32),),
917        (Text(),),
918        (FLOAT(),),
919        (Numeric(),),
920        (DECIMAL(),),
921        (TIMESTAMP(),),
922        (DateTime(),),
923        (Boolean(),),
924        (BigInteger(),),
925        (SmallInteger(),),
926        (DATE(),),
927        (String(32),),
928        (LargeBinary(),),
929        (Unicode(32),),
930        (Enum("one", "two", "three", name="the_enum"),),
931    )
932    def test_introspected_columns_match_metadata_columns(self, cola):
933        # this is ensuring false positives aren't generated for types
934        # that have not changed.
935        is_(self._compare_columns(cola, cola), False)
936
937    # TODO: ideally the backend-specific types would be tested
938    # within the test suites for those backends.
939    @testing.combinations(
940        (String(32), VARCHAR(32), False),
941        (VARCHAR(6), String(6), False),
942        (CHAR(), String(1), True),
943        (Text(), VARCHAR(255), True),
944        (Unicode(32), String(32), False, config.requirements.unicode_string),
945        (Unicode(32), VARCHAR(32), False, config.requirements.unicode_string),
946        (VARCHAR(6), VARCHAR(12), True),
947        (VARCHAR(6), String(12), True),
948        (Integer(), String(10), True),
949        (String(10), Integer(), True),
950        (
951            Unicode(30, collation="en_US"),
952            Unicode(30, collation="en_US"),
953            False,  # unfortunately dialects don't seem to consistently
954            # reflect collations right now so we can't test for
955            # positives here
956            config.requirements.postgresql,
957        ),
958        (
959            mysql.VARCHAR(200, charset="utf8"),
960            Unicode(200),
961            False,
962            config.requirements.mysql,
963        ),
964        (
965            mysql.VARCHAR(200, charset="latin1"),
966            mysql.VARCHAR(200, charset="utf-8"),
967            True,
968            config.requirements.mysql + config.requirements.sqlalchemy_13,
969        ),
970        (
971            String(255, collation="utf8_bin"),
972            String(255),
973            False,
974            config.requirements.mysql,
975        ),
976        (
977            String(255, collation="utf8_bin"),
978            String(255, collation="latin1_bin"),
979            True,
980            config.requirements.mysql + config.requirements.sqlalchemy_13,
981        ),
982    )
983    def test_string_comparisons(self, cola, colb, expect_changes):
984        is_(self._compare_columns(cola, colb), expect_changes)
985
986    @testing.combinations(
987        (
988            DateTime(),
989            DateTime(timezone=False),
990            False,
991            config.requirements.datetime_timezone,
992        ),
993        (
994            DateTime(),
995            DateTime(timezone=True),
996            True,
997            config.requirements.datetime_timezone,
998        ),
999        (
1000            DateTime(timezone=True),
1001            DateTime(timezone=False),
1002            True,
1003            config.requirements.datetime_timezone,
1004        ),
1005    )
1006    def test_datetime_comparisons(self, cola, colb, expect_changes):
1007        is_(self._compare_columns(cola, colb), expect_changes)
1008
1009    @testing.combinations(
1010        (Integer(), Integer(), False),
1011        (
1012            Integer(),
1013            Numeric(8, 0),
1014            True,
1015            config.requirements.integer_subtype_comparisons,
1016        ),
1017        (Numeric(8, 0), Numeric(8, 2), True),
1018        (
1019            BigInteger(),
1020            Integer(),
1021            True,
1022            config.requirements.integer_subtype_comparisons,
1023        ),
1024        (
1025            SmallInteger(),
1026            Integer(),
1027            True,
1028            config.requirements.integer_subtype_comparisons,
1029        ),
1030        (  # note that the mysql.INTEGER tests only use these params
1031            # if the dialect is "mysql".  however we also test that their
1032            # dialect-agnostic representation compares by running this
1033            # against other dialects.
1034            mysql.INTEGER(unsigned=True, display_width=10),
1035            mysql.INTEGER(unsigned=True, display_width=10),
1036            False,
1037        ),
1038        (mysql.INTEGER(unsigned=True), mysql.INTEGER(unsigned=True), False),
1039        (
1040            mysql.INTEGER(unsigned=True, display_width=10),
1041            mysql.INTEGER(unsigned=True),
1042            False,
1043        ),
1044        (
1045            mysql.INTEGER(unsigned=True),
1046            mysql.INTEGER(unsigned=True, display_width=10),
1047            False,
1048        ),
1049    )
1050    def test_numeric_comparisons(self, cola, colb, expect_changes):
1051        is_(self._compare_columns(cola, colb), expect_changes)
1052
1053
1054class AutogenSystemColTest(AutogenTest, TestBase):
1055    __only_on__ = "postgresql"
1056
1057    @classmethod
1058    def _get_db_schema(cls):
1059        m = MetaData()
1060
1061        Table("sometable", m, Column("id", Integer, primary_key=True))
1062        return m
1063
1064    @classmethod
1065    def _get_model_schema(cls):
1066        m = MetaData()
1067
1068        # 'xmin' is implicitly present, when added to a model should produce
1069        # no change
1070        Table(
1071            "sometable",
1072            m,
1073            Column("id", Integer, primary_key=True),
1074            Column("xmin", Integer, system=True),
1075        )
1076        return m
1077
1078    def test_dont_add_system(self):
1079        uo = ops.UpgradeOps(ops=[])
1080        autogenerate._produce_net_changes(self.autogen_context, uo)
1081
1082        diffs = uo.as_diffs()
1083        eq_(diffs, [])
1084
1085
1086class AutogenerateVariantCompareTest(AutogenTest, TestBase):
1087    __backend__ = True
1088
1089    @classmethod
1090    def _get_db_schema(cls):
1091        m = MetaData()
1092
1093        Table(
1094            "sometable",
1095            m,
1096            Column(
1097                "id",
1098                BigInteger().with_variant(Integer, "sqlite"),
1099                primary_key=True,
1100            ),
1101            Column("value", String(50)),
1102        )
1103        return m
1104
1105    @classmethod
1106    def _get_model_schema(cls):
1107        m = MetaData()
1108
1109        Table(
1110            "sometable",
1111            m,
1112            Column(
1113                "id",
1114                BigInteger().with_variant(Integer, "sqlite"),
1115                primary_key=True,
1116            ),
1117            Column("value", String(50)),
1118        )
1119        return m
1120
1121    def test_variant_no_issue(self):
1122        uo = ops.UpgradeOps(ops=[])
1123        autogenerate._produce_net_changes(self.autogen_context, uo)
1124
1125        diffs = uo.as_diffs()
1126        eq_(diffs, [])
1127
1128
1129class AutogenerateCustomCompareTypeTest(AutogenTest, TestBase):
1130    __only_on__ = "sqlite"
1131
1132    @classmethod
1133    def _get_db_schema(cls):
1134        m = MetaData()
1135
1136        Table(
1137            "sometable",
1138            m,
1139            Column("id", Integer, primary_key=True),
1140            Column("value", Integer),
1141        )
1142        return m
1143
1144    @classmethod
1145    def _get_model_schema(cls):
1146        m = MetaData()
1147
1148        Table(
1149            "sometable",
1150            m,
1151            Column("id", Integer, primary_key=True),
1152            Column("value", String),
1153        )
1154        return m
1155
1156    def test_uses_custom_compare_type_function(self):
1157        my_compare_type = mock.Mock()
1158        self.context._user_compare_type = my_compare_type
1159
1160        uo = ops.UpgradeOps(ops=[])
1161
1162        ctx = self.autogen_context
1163        autogenerate._produce_net_changes(ctx, uo)
1164
1165        first_table = self.m2.tables["sometable"]
1166        first_column = first_table.columns["id"]
1167
1168        eq_(len(my_compare_type.mock_calls), 2)
1169
1170        # We'll just test the first call
1171        _, args, _ = my_compare_type.mock_calls[0]
1172        (
1173            context,
1174            inspected_column,
1175            metadata_column,
1176            inspected_type,
1177            metadata_type,
1178        ) = args
1179        eq_(context, self.context)
1180        eq_(metadata_column, first_column)
1181        eq_(metadata_type, first_column.type)
1182        eq_(inspected_column.name, first_column.name)
1183        eq_(type(inspected_type), INTEGER)
1184
1185    def test_column_type_not_modified_custom_compare_type_returns_False(self):
1186        my_compare_type = mock.Mock()
1187        my_compare_type.return_value = False
1188        self.context._user_compare_type = my_compare_type
1189
1190        diffs = []
1191        ctx = self.autogen_context
1192        diffs = []
1193        autogenerate._produce_net_changes(ctx, diffs)
1194
1195        eq_(diffs, [])
1196
1197    def test_column_type_modified_custom_compare_type_returns_True(self):
1198        my_compare_type = mock.Mock()
1199        my_compare_type.return_value = True
1200        self.context._user_compare_type = my_compare_type
1201
1202        ctx = self.autogen_context
1203        uo = ops.UpgradeOps(ops=[])
1204        autogenerate._produce_net_changes(ctx, uo)
1205        diffs = uo.as_diffs()
1206
1207        eq_(diffs[0][0][0], "modify_type")
1208        eq_(diffs[1][0][0], "modify_type")
1209
1210
1211class IncludeFiltersAPITest(AutogenTest, TestBase):
1212    @classmethod
1213    def _get_db_schema(cls):
1214        return MetaData()
1215
1216    @classmethod
1217    def _get_model_schema(cls):
1218        return MetaData()
1219
1220    def test_run_name_filters_supports_extension_types(self):
1221        include_name = mock.Mock()
1222
1223        self._update_context(name_filters=include_name, include_schemas=True)
1224
1225        self.autogen_context.run_name_filters(
1226            name="some_function",
1227            type_="function",
1228            parent_names={"schema_name": "public"},
1229        )
1230
1231        eq_(
1232            include_name.mock_calls,
1233            [
1234                mock.call(
1235                    "some_function", "function", {"schema_name": "public"}
1236                )
1237            ],
1238        )
1239
1240    def test_run_object_filters_supports_extension_types(self):
1241        include_object = mock.Mock()
1242
1243        self._update_context(
1244            object_filters=include_object, include_schemas=True
1245        )
1246
1247        class ExtFunction:
1248            pass
1249
1250        extfunc = ExtFunction()
1251        self.autogen_context.run_object_filters(
1252            object_=extfunc,
1253            name="some_function",
1254            type_="function",
1255            reflected=False,
1256            compare_to=None,
1257        )
1258
1259        eq_(
1260            include_object.mock_calls,
1261            [mock.call(extfunc, "some_function", "function", False, None)],
1262        )
1263
1264
1265class PKConstraintUpgradesIgnoresNullableTest(AutogenTest, TestBase):
1266    __backend__ = True
1267
1268    # test behavior for issue originally observed in SQLAlchemy issue #3023,
1269    # alembic issue #199
1270    @classmethod
1271    def _get_db_schema(cls):
1272        m = MetaData()
1273
1274        Table(
1275            "person_to_role",
1276            m,
1277            Column("person_id", Integer, autoincrement=False),
1278            Column("role_id", Integer, autoincrement=False),
1279            PrimaryKeyConstraint("person_id", "role_id"),
1280        )
1281        return m
1282
1283    @classmethod
1284    def _get_model_schema(cls):
1285        return cls._get_db_schema()
1286
1287    def test_no_change(self):
1288        uo = ops.UpgradeOps(ops=[])
1289        ctx = self.autogen_context
1290        autogenerate._produce_net_changes(ctx, uo)
1291        diffs = uo.as_diffs()
1292        eq_(diffs, [])
1293
1294
1295class AutogenKeyTest(AutogenTest, TestBase):
1296    __only_on__ = "sqlite"
1297
1298    @classmethod
1299    def _get_db_schema(cls):
1300        m = MetaData()
1301
1302        Table(
1303            "someothertable",
1304            m,
1305            Column("id", Integer, primary_key=True),
1306            Column("value", Integer, key="somekey"),
1307        )
1308        return m
1309
1310    @classmethod
1311    def _get_model_schema(cls):
1312        m = MetaData()
1313
1314        Table(
1315            "sometable",
1316            m,
1317            Column("id", Integer, primary_key=True),
1318            Column("value", Integer, key="someotherkey"),
1319        )
1320        Table(
1321            "someothertable",
1322            m,
1323            Column("id", Integer, primary_key=True),
1324            Column("value", Integer, key="somekey"),
1325            Column("othervalue", Integer, key="otherkey"),
1326        )
1327        return m
1328
1329    symbols = ["someothertable", "sometable"]
1330
1331    def test_autogen(self):
1332
1333        uo = ops.UpgradeOps(ops=[])
1334
1335        ctx = self.autogen_context
1336        autogenerate._produce_net_changes(ctx, uo)
1337        diffs = uo.as_diffs()
1338        eq_(diffs[0][0], "add_table")
1339        eq_(diffs[0][1].name, "sometable")
1340        eq_(diffs[1][0], "add_column")
1341        eq_(diffs[1][3].key, "otherkey")
1342
1343
1344class AutogenVersionTableTest(AutogenTest, TestBase):
1345    __only_on__ = "sqlite"
1346    version_table_name = "alembic_version"
1347    version_table_schema = None
1348
1349    @classmethod
1350    def _get_db_schema(cls):
1351        m = MetaData()
1352        Table(
1353            cls.version_table_name,
1354            m,
1355            Column("x", Integer),
1356            schema=cls.version_table_schema,
1357        )
1358        return m
1359
1360    @classmethod
1361    def _get_model_schema(cls):
1362        m = MetaData()
1363        return m
1364
1365    def test_no_version_table(self):
1366        ctx = self.autogen_context
1367
1368        uo = ops.UpgradeOps(ops=[])
1369        autogenerate._produce_net_changes(ctx, uo)
1370        eq_(uo.as_diffs(), [])
1371
1372    def test_version_table_in_target(self):
1373        Table(
1374            self.version_table_name,
1375            self.m2,
1376            Column("x", Integer),
1377            schema=self.version_table_schema,
1378        )
1379
1380        ctx = self.autogen_context
1381        uo = ops.UpgradeOps(ops=[])
1382        autogenerate._produce_net_changes(ctx, uo)
1383        eq_(uo.as_diffs(), [])
1384
1385
1386class AutogenCustomVersionTableSchemaTest(AutogenVersionTableTest):
1387    __only_on__ = "postgresql"
1388    __backend__ = True
1389    version_table_schema = "test_schema"
1390    configure_opts = {"version_table_schema": "test_schema"}
1391
1392
1393class AutogenCustomVersionTableTest(AutogenVersionTableTest):
1394    version_table_name = "my_version_table"
1395    configure_opts = {"version_table": "my_version_table"}
1396
1397
1398class AutogenCustomVersionTableAndSchemaTest(AutogenVersionTableTest):
1399    __only_on__ = "postgresql"
1400    __backend__ = True
1401    version_table_name = "my_version_table"
1402    version_table_schema = "test_schema"
1403    configure_opts = {
1404        "version_table": "my_version_table",
1405        "version_table_schema": "test_schema",
1406    }
1407
1408
1409class AutogenerateDiffOrderTest(AutogenTest, TestBase):
1410    __only_on__ = "sqlite"
1411
1412    @classmethod
1413    def _get_db_schema(cls):
1414        return MetaData()
1415
1416    @classmethod
1417    def _get_model_schema(cls):
1418        m = MetaData()
1419        Table("parent", m, Column("id", Integer, primary_key=True))
1420
1421        Table(
1422            "child", m, Column("parent_id", Integer, ForeignKey("parent.id"))
1423        )
1424
1425        return m
1426
1427    def test_diffs_order(self):
1428        """
1429        Added in order to test that child tables(tables with FKs) are
1430        generated before their parent tables
1431        """
1432
1433        ctx = self.autogen_context
1434        uo = ops.UpgradeOps(ops=[])
1435        autogenerate._produce_net_changes(ctx, uo)
1436        diffs = uo.as_diffs()
1437
1438        eq_(diffs[0][0], "add_table")
1439        eq_(diffs[0][1].name, "parent")
1440        eq_(diffs[1][0], "add_table")
1441        eq_(diffs[1][1].name, "child")
1442
1443
1444class CompareMetadataTest(ModelOne, AutogenTest, TestBase):
1445    __only_on__ = "sqlite"
1446
1447    def test_compare_metadata(self):
1448        metadata = self.m2
1449
1450        diffs = autogenerate.compare_metadata(self.context, metadata)
1451
1452        eq_(
1453            diffs[0],
1454            ("add_table", schemacompare.CompareTable(metadata.tables["item"])),
1455        )
1456
1457        eq_(diffs[1][0], "remove_table")
1458        eq_(diffs[1][1].name, "extra")
1459
1460        eq_(diffs[2][0], "add_column")
1461        eq_(diffs[2][1], None)
1462        eq_(diffs[2][2], "address")
1463        eq_(diffs[2][3], metadata.tables["address"].c.street)
1464
1465        eq_(diffs[3][0], "add_constraint")
1466        eq_(diffs[3][1].name, "uq_email")
1467
1468        eq_(diffs[4][0], "add_column")
1469        eq_(diffs[4][1], None)
1470        eq_(diffs[4][2], "order")
1471        eq_(diffs[4][3], metadata.tables["order"].c.user_id)
1472
1473        eq_(diffs[5][0][0], "modify_type")
1474        eq_(diffs[5][0][1], None)
1475        eq_(diffs[5][0][2], "order")
1476        eq_(diffs[5][0][3], "amount")
1477        eq_(repr(diffs[5][0][5]), "NUMERIC(precision=8, scale=2)")
1478        eq_(repr(diffs[5][0][6]), "Numeric(precision=10, scale=2)")
1479
1480        self._assert_fk_diff(
1481            diffs[6], "add_fk", "order", ["user_id"], "user", ["id"]
1482        )
1483
1484        eq_(diffs[7][0][0], "modify_nullable")
1485        eq_(diffs[7][0][5], True)
1486        eq_(diffs[7][0][6], False)
1487
1488        eq_(diffs[8][0][0], "modify_default")
1489        eq_(diffs[8][0][1], None)
1490        eq_(diffs[8][0][2], "user")
1491        eq_(diffs[8][0][3], "a1")
1492        eq_(diffs[8][0][6].arg, "x")
1493
1494        eq_(diffs[9][0], "remove_index")
1495        eq_(diffs[9][1].name, "pw_idx")
1496
1497        eq_(diffs[10][0], "remove_column")
1498        eq_(diffs[10][3].name, "pw")
1499
1500    def test_compare_metadata_include_object(self):
1501        metadata = self.m2
1502
1503        def include_object(obj, name, type_, reflected, compare_to):
1504            if type_ == "table":
1505                return name in ("extra", "order")
1506            elif type_ == "column":
1507                return name != "amount"
1508            else:
1509                return True
1510
1511        context = MigrationContext.configure(
1512            connection=self.bind.connect(),
1513            opts={
1514                "compare_type": True,
1515                "compare_server_default": True,
1516                "include_object": include_object,
1517            },
1518        )
1519
1520        diffs = autogenerate.compare_metadata(context, metadata)
1521
1522        eq_(diffs[0][0], "remove_table")
1523        eq_(diffs[0][1].name, "extra")
1524
1525        eq_(diffs[1][0], "add_column")
1526        eq_(diffs[1][1], None)
1527        eq_(diffs[1][2], "order")
1528        eq_(diffs[1][3], metadata.tables["order"].c.user_id)
1529
1530    def test_compare_metadata_include_name(self):
1531        metadata = self.m2
1532
1533        all_names = set()
1534
1535        def include_name(name, type_, parent_names):
1536            all_names.add((name, type_, parent_names.get("table_name", None)))
1537            if type_ == "table":
1538                return name in ("extra", "order")
1539            elif type_ == "column":
1540                return name != "amount"
1541            else:
1542                return True
1543
1544        context = MigrationContext.configure(
1545            connection=self.bind.connect(),
1546            opts={
1547                "compare_type": True,
1548                "compare_server_default": True,
1549                "include_name": include_name,
1550            },
1551        )
1552
1553        diffs = autogenerate.compare_metadata(context, metadata)
1554        eq_(
1555            all_names,
1556            {
1557                ("user", "table", None),
1558                ("order", "table", None),
1559                ("address", "table", None),
1560                (None, "schema", None),
1561                ("amount", "column", "order"),
1562                ("extra", "table", None),
1563                ("order_id", "column", "order"),
1564            },
1565        )
1566
1567        eq_(
1568            {
1569                (
1570                    d[0],
1571                    d[3].name if d[0] == "add_column" else d[1].name,
1572                    d[2] if d[0] == "add_column" else None,
1573                )
1574                for d in diffs
1575            },
1576            {
1577                ("remove_table", "extra", None),
1578                ("add_fk", None, None),
1579                ("add_column", "amount", "order"),
1580                ("add_table", "user", None),
1581                ("add_table", "item", None),
1582                ("add_column", "user_id", "order"),
1583                ("add_table", "address", None),
1584            },
1585        )
1586
1587    def test_compare_metadata_as_sql(self):
1588        context = MigrationContext.configure(
1589            connection=self.bind.connect(), opts={"as_sql": True}
1590        )
1591        metadata = self.m2
1592
1593        assert_raises_message(
1594            CommandError,
1595            "autogenerate can't use as_sql=True as it prevents "
1596            "querying the database for schema information",
1597            autogenerate.compare_metadata,
1598            context,
1599            metadata,
1600        )
1601
1602
1603class PGCompareMetaData(ModelOne, AutogenTest, TestBase):
1604    __only_on__ = "postgresql"
1605    __backend__ = True
1606    schema = "test_schema"
1607
1608    def test_compare_metadata_schema(self):
1609        metadata = self.m2
1610
1611        context = MigrationContext.configure(
1612            connection=self.bind.connect(), opts={"include_schemas": True}
1613        )
1614
1615        diffs = autogenerate.compare_metadata(context, metadata)
1616
1617        eq_(
1618            diffs[0],
1619            (
1620                "add_table",
1621                schemacompare.CompareTable(
1622                    metadata.tables["test_schema.item"]
1623                ),
1624            ),
1625        )
1626
1627        eq_(diffs[1][0], "remove_table")
1628        eq_(diffs[1][1].name, "extra")
1629
1630        eq_(diffs[2][0], "add_column")
1631        eq_(diffs[2][1], "test_schema")
1632        eq_(diffs[2][2], "address")
1633        eq_(
1634            schemacompare.CompareColumn(
1635                metadata.tables["test_schema.address"].c.street
1636            ),
1637            diffs[2][3],
1638        )
1639
1640        eq_(diffs[3][0], "add_constraint")
1641        eq_(diffs[3][1].name, "uq_email")
1642
1643        eq_(diffs[4][0], "add_column")
1644        eq_(diffs[4][1], "test_schema")
1645        eq_(diffs[4][2], "order")
1646        eq_(
1647            schemacompare.CompareColumn(
1648                metadata.tables["test_schema.order"].c.user_id
1649            ),
1650            diffs[4][3],
1651        )
1652
1653        eq_(diffs[5][0][0], "modify_nullable")
1654        eq_(diffs[5][0][5], False)
1655        eq_(diffs[5][0][6], True)
1656
1657
1658class OrigObjectTest(TestBase):
1659    def setUp(self):
1660        self.metadata = m = MetaData()
1661        t = Table(
1662            "t",
1663            m,
1664            Column("id", Integer(), primary_key=True),
1665            Column("x", Integer()),
1666        )
1667        self.ix = Index("ix1", t.c.id)
1668        fk = ForeignKeyConstraint(["t_id"], ["t.id"])
1669        q = Table("q", m, Column("t_id", Integer()), fk)
1670        self.table = t
1671        self.fk = fk
1672        self.ck = CheckConstraint(t.c.x > 5)
1673        t.append_constraint(self.ck)
1674        self.uq = UniqueConstraint(q.c.t_id)
1675        self.pk = t.primary_key
1676
1677    def test_drop_fk(self):
1678        fk = self.fk
1679        op = ops.DropConstraintOp.from_constraint(fk)
1680        eq_(op.to_constraint(), schemacompare.CompareForeignKey(fk))
1681        eq_(op.reverse().to_constraint(), schemacompare.CompareForeignKey(fk))
1682
1683    def test_add_fk(self):
1684        fk = self.fk
1685        op = ops.AddConstraintOp.from_constraint(fk)
1686        eq_(op.to_constraint(), schemacompare.CompareForeignKey(fk))
1687        eq_(op.reverse().to_constraint(), schemacompare.CompareForeignKey(fk))
1688        is_not_(None, op.to_constraint().table)
1689
1690    def test_add_check(self):
1691        ck = self.ck
1692        op = ops.AddConstraintOp.from_constraint(ck)
1693        eq_(op.to_constraint(), schemacompare.CompareCheckConstraint(ck))
1694        eq_(
1695            op.reverse().to_constraint(),
1696            schemacompare.CompareCheckConstraint(ck),
1697        )
1698        is_not_(None, op.to_constraint().table)
1699
1700    def test_drop_check(self):
1701        ck = self.ck
1702        op = ops.DropConstraintOp.from_constraint(ck)
1703        eq_(op.to_constraint(), schemacompare.CompareCheckConstraint(ck))
1704        eq_(
1705            op.reverse().to_constraint(),
1706            schemacompare.CompareCheckConstraint(ck),
1707        )
1708        is_not_(None, op.to_constraint().table)
1709
1710    def test_add_unique(self):
1711        uq = self.uq
1712        op = ops.AddConstraintOp.from_constraint(uq)
1713        eq_(op.to_constraint(), schemacompare.CompareUniqueConstraint(uq))
1714        eq_(
1715            op.reverse().to_constraint(),
1716            schemacompare.CompareUniqueConstraint(uq),
1717        )
1718        is_not_(None, op.to_constraint().table)
1719
1720    def test_drop_unique(self):
1721        uq = self.uq
1722        op = ops.DropConstraintOp.from_constraint(uq)
1723        eq_(op.to_constraint(), schemacompare.CompareUniqueConstraint(uq))
1724        eq_(
1725            op.reverse().to_constraint(),
1726            schemacompare.CompareUniqueConstraint(uq),
1727        )
1728        is_not_(None, op.to_constraint().table)
1729
1730    def test_add_pk_no_orig(self):
1731        op = ops.CreatePrimaryKeyOp("pk1", "t", ["x", "y"])
1732        pk = op.to_constraint()
1733        eq_(pk.name, "pk1")
1734        eq_(pk.table.name, "t")
1735
1736    def test_add_pk(self):
1737        pk = self.pk
1738        op = ops.AddConstraintOp.from_constraint(pk)
1739        eq_(op.to_constraint(), schemacompare.ComparePrimaryKey(pk))
1740        eq_(op.reverse().to_constraint(), schemacompare.ComparePrimaryKey(pk))
1741        is_not_(None, op.to_constraint().table)
1742
1743    def test_drop_pk(self):
1744        pk = self.pk
1745        op = ops.DropConstraintOp.from_constraint(pk)
1746        eq_(op.to_constraint(), schemacompare.ComparePrimaryKey(pk))
1747        eq_(op.reverse().to_constraint(), schemacompare.ComparePrimaryKey(pk))
1748        is_not_(None, op.to_constraint().table)
1749
1750    def test_drop_column(self):
1751        t = self.table
1752
1753        op = ops.DropColumnOp.from_column_and_tablename(None, "t", t.c.x)
1754        is_(op.to_column(), t.c.x)
1755        is_(op.reverse().to_column(), t.c.x)
1756        is_not_(None, op.to_column().table)
1757
1758    def test_add_column(self):
1759        t = self.table
1760
1761        op = ops.AddColumnOp.from_column_and_tablename(None, "t", t.c.x)
1762        is_(op.to_column(), t.c.x)
1763        is_(op.reverse().to_column(), t.c.x)
1764        is_not_(None, op.to_column().table)
1765
1766    def test_drop_table(self):
1767        t = self.table
1768
1769        op = ops.DropTableOp.from_table(t)
1770        eq_(op.to_table(), schemacompare.CompareTable(t))
1771        eq_(op.reverse().to_table(), schemacompare.CompareTable(t))
1772
1773    def test_add_table(self):
1774        t = self.table
1775
1776        op = ops.CreateTableOp.from_table(t)
1777        eq_(op.to_table(), schemacompare.CompareTable(t))
1778        eq_(op.reverse().to_table(), schemacompare.CompareTable(t))
1779
1780    def test_drop_index(self):
1781        op = ops.DropIndexOp.from_index(self.ix)
1782        eq_(op.to_index(), schemacompare.CompareIndex(self.ix))
1783        eq_(op.reverse().to_index(), schemacompare.CompareIndex(self.ix))
1784
1785    def test_create_index(self):
1786        op = ops.CreateIndexOp.from_index(self.ix)
1787        eq_(op.to_index(), schemacompare.CompareIndex(self.ix))
1788        eq_(op.reverse().to_index(), schemacompare.CompareIndex(self.ix))
1789
1790
1791class MultipleMetaDataTest(AutogenFixtureTest, TestBase):
1792    def test_multiple(self):
1793        m1a = MetaData()
1794        m1b = MetaData()
1795        m1c = MetaData()
1796
1797        m2a = MetaData()
1798        m2b = MetaData()
1799        m2c = MetaData()
1800
1801        Table("a", m1a, Column("id", Integer, primary_key=True))
1802        Table("b1", m1b, Column("id", Integer, primary_key=True))
1803        Table("b2", m1b, Column("id", Integer, primary_key=True))
1804        Table(
1805            "c1",
1806            m1c,
1807            Column("id", Integer, primary_key=True),
1808            Column("x", Integer),
1809        )
1810
1811        a = Table(
1812            "a",
1813            m2a,
1814            Column("id", Integer, primary_key=True),
1815            Column("q", Integer),
1816        )
1817        Table("b1", m2b, Column("id", Integer, primary_key=True))
1818        Table("c1", m2c, Column("id", Integer, primary_key=True))
1819        c2 = Table("c2", m2c, Column("id", Integer, primary_key=True))
1820
1821        diffs = self._fixture([m1a, m1b, m1c], [m2a, m2b, m2c])
1822        eq_(diffs[0], ("add_table", schemacompare.CompareTable(c2)))
1823        eq_(diffs[1][0], "remove_table")
1824        eq_(diffs[1][1].name, "b2")
1825        eq_(diffs[2], ("add_column", None, "a", a.c.q))
1826        eq_(diffs[3][0:3], ("remove_column", None, "c1"))
1827        eq_(diffs[3][3].name, "x")
1828
1829    def test_empty_list(self):
1830        # because they're going to do it....
1831
1832        diffs = self._fixture([], [])
1833        eq_(diffs, [])
1834
1835    def test_non_list_sequence(self):
1836        # we call it "sequence", let's check that
1837
1838        m1a = MetaData()
1839        m1b = MetaData()
1840
1841        m2a = MetaData()
1842        m2b = MetaData()
1843
1844        Table("a", m1a, Column("id", Integer, primary_key=True))
1845        Table("b", m1b, Column("id", Integer, primary_key=True))
1846
1847        Table("a", m2a, Column("id", Integer, primary_key=True))
1848        b = Table(
1849            "b",
1850            m2b,
1851            Column("id", Integer, primary_key=True),
1852            Column("q", Integer),
1853        )
1854
1855        diffs = self._fixture((m1a, m1b), (m2a, m2b))
1856        eq_(diffs, [("add_column", None, "b", b.c.q)])
1857
1858    def test_raise_on_dupe(self):
1859        m1a = MetaData()
1860        m1b = MetaData()
1861
1862        m2a = MetaData()
1863        m2b = MetaData()
1864
1865        Table("a", m1a, Column("id", Integer, primary_key=True))
1866        Table("b1", m1b, Column("id", Integer, primary_key=True))
1867        Table("b2", m1b, Column("id", Integer, primary_key=True))
1868        Table("b3", m1b, Column("id", Integer, primary_key=True))
1869
1870        Table("a", m2a, Column("id", Integer, primary_key=True))
1871        Table("a", m2b, Column("id", Integer, primary_key=True))
1872        Table("b1", m2b, Column("id", Integer, primary_key=True))
1873        Table("b2", m2a, Column("id", Integer, primary_key=True))
1874        Table("b2", m2b, Column("id", Integer, primary_key=True))
1875
1876        assert_raises_message(
1877            ValueError,
1878            'Duplicate table keys across multiple MetaData objects: "a", "b2"',
1879            self._fixture,
1880            [m1a, m1b],
1881            [m2a, m2b],
1882        )
1883