1from contextlib import contextmanager
2import re
3
4from sqlalchemy import Boolean
5from sqlalchemy import CheckConstraint
6from sqlalchemy import Column
7from sqlalchemy import DateTime
8from sqlalchemy import Enum
9from sqlalchemy import ForeignKey
10from sqlalchemy import ForeignKeyConstraint
11from sqlalchemy import func
12from sqlalchemy import Index
13from sqlalchemy import inspect
14from sqlalchemy import Integer
15from sqlalchemy import JSON
16from sqlalchemy import MetaData
17from sqlalchemy import PrimaryKeyConstraint
18from sqlalchemy import String
19from sqlalchemy import Table
20from sqlalchemy import Text
21from sqlalchemy import UniqueConstraint
22from sqlalchemy.dialects import sqlite as sqlite_dialect
23from sqlalchemy.schema import CreateIndex
24from sqlalchemy.schema import CreateTable
25from sqlalchemy.sql import column
26from sqlalchemy.sql import text
27
28from alembic import testing
29from alembic.ddl import sqlite
30from alembic.operations import Operations
31from alembic.operations.batch import ApplyBatchImpl
32from alembic.runtime.migration import MigrationContext
33from alembic.testing import assert_raises_message
34from alembic.testing import config
35from alembic.testing import eq_
36from alembic.testing import exclusions
37from alembic.testing import is_
38from alembic.testing import mock
39from alembic.testing import TestBase
40from alembic.testing.fixtures import op_fixture
41from alembic.util import exc as alembic_exc
42from alembic.util.sqla_compat import _safe_commit_connection_transaction
43from alembic.util.sqla_compat import _select
44from alembic.util.sqla_compat import has_computed
45from alembic.util.sqla_compat import has_identity
46from alembic.util.sqla_compat import sqla_14
47
48if has_computed:
49    from alembic.util.sqla_compat import Computed
50
51if has_identity:
52    from alembic.util.sqla_compat import Identity
53
54
55class BatchApplyTest(TestBase):
56    def setUp(self):
57        self.op = Operations(mock.Mock(opts={}))
58        self.impl = sqlite.SQLiteImpl(
59            sqlite_dialect.dialect(), None, False, False, None, {}
60        )
61
62    def _simple_fixture(self, table_args=(), table_kwargs={}, **kw):
63        m = MetaData()
64        t = Table(
65            "tname",
66            m,
67            Column("id", Integer, primary_key=True),
68            Column("x", String(10)),
69            Column("y", Integer),
70        )
71        return ApplyBatchImpl(
72            self.impl, t, table_args, table_kwargs, False, **kw
73        )
74
75    def _uq_fixture(self, table_args=(), table_kwargs={}):
76        m = MetaData()
77        t = Table(
78            "tname",
79            m,
80            Column("id", Integer, primary_key=True),
81            Column("x", String()),
82            Column("y", Integer),
83            UniqueConstraint("y", name="uq1"),
84        )
85        return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False)
86
87    def _named_ck_table_fixture(self, table_args=(), table_kwargs={}):
88        m = MetaData()
89        t = Table(
90            "tname",
91            m,
92            Column("id", Integer, primary_key=True),
93            Column("x", String()),
94            Column("y", Integer),
95            CheckConstraint("y > 5", name="ck1"),
96        )
97        return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False)
98
99    def _named_ck_col_fixture(self, table_args=(), table_kwargs={}):
100        m = MetaData()
101        t = Table(
102            "tname",
103            m,
104            Column("id", Integer, primary_key=True),
105            Column("x", String()),
106            Column("y", Integer, CheckConstraint("y > 5", name="ck1")),
107        )
108        return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False)
109
110    def _ix_fixture(self, table_args=(), table_kwargs={}):
111        m = MetaData()
112        t = Table(
113            "tname",
114            m,
115            Column("id", Integer, primary_key=True),
116            Column("x", String()),
117            Column("y", Integer),
118            Index("ix1", "y"),
119        )
120        return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False)
121
122    def _pk_fixture(self):
123        m = MetaData()
124        t = Table(
125            "tname",
126            m,
127            Column("id", Integer),
128            Column("x", String()),
129            Column("y", Integer),
130            PrimaryKeyConstraint("id", name="mypk"),
131        )
132        return ApplyBatchImpl(self.impl, t, (), {}, False)
133
134    def _literal_ck_fixture(
135        self, copy_from=None, table_args=(), table_kwargs={}
136    ):
137        m = MetaData()
138        if copy_from is not None:
139            t = copy_from
140        else:
141            t = Table(
142                "tname",
143                m,
144                Column("id", Integer, primary_key=True),
145                Column("email", String()),
146                CheckConstraint("email LIKE '%@%'"),
147            )
148        return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False)
149
150    def _sql_ck_fixture(self, table_args=(), table_kwargs={}):
151        m = MetaData()
152        t = Table(
153            "tname",
154            m,
155            Column("id", Integer, primary_key=True),
156            Column("email", String()),
157        )
158        t.append_constraint(CheckConstraint(t.c.email.like("%@%")))
159        return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False)
160
161    def _fk_fixture(self, table_args=(), table_kwargs={}):
162        m = MetaData()
163        t = Table(
164            "tname",
165            m,
166            Column("id", Integer, primary_key=True),
167            Column("email", String()),
168            Column("user_id", Integer, ForeignKey("user.id")),
169        )
170        return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False)
171
172    def _multi_fk_fixture(self, table_args=(), table_kwargs={}, schema=None):
173        m = MetaData()
174        if schema:
175            schemaarg = "%s." % schema
176        else:
177            schemaarg = ""
178
179        t = Table(
180            "tname",
181            m,
182            Column("id", Integer, primary_key=True),
183            Column("email", String()),
184            Column("user_id_1", Integer, ForeignKey("%suser.id" % schemaarg)),
185            Column("user_id_2", Integer, ForeignKey("%suser.id" % schemaarg)),
186            Column("user_id_3", Integer),
187            Column("user_id_version", Integer),
188            ForeignKeyConstraint(
189                ["user_id_3", "user_id_version"],
190                ["%suser.id" % schemaarg, "%suser.id_version" % schemaarg],
191            ),
192            schema=schema,
193        )
194        return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False)
195
196    def _named_fk_fixture(self, table_args=(), table_kwargs={}):
197        m = MetaData()
198        t = Table(
199            "tname",
200            m,
201            Column("id", Integer, primary_key=True),
202            Column("email", String()),
203            Column("user_id", Integer, ForeignKey("user.id", name="ufk")),
204        )
205        return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False)
206
207    def _selfref_fk_fixture(self, table_args=(), table_kwargs={}):
208        m = MetaData()
209        t = Table(
210            "tname",
211            m,
212            Column("id", Integer, primary_key=True),
213            Column("parent_id", Integer, ForeignKey("tname.id")),
214            Column("data", String),
215        )
216        return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False)
217
218    def _boolean_fixture(self, table_args=(), table_kwargs={}):
219        m = MetaData()
220        t = Table(
221            "tname",
222            m,
223            Column("id", Integer, primary_key=True),
224            Column("flag", Boolean(create_constraint=True)),
225        )
226        return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False)
227
228    def _boolean_no_ck_fixture(self, table_args=(), table_kwargs={}):
229        m = MetaData()
230        t = Table(
231            "tname",
232            m,
233            Column("id", Integer, primary_key=True),
234            Column("flag", Boolean(create_constraint=False)),
235        )
236        return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False)
237
238    def _enum_fixture(self, table_args=(), table_kwargs={}):
239        m = MetaData()
240        t = Table(
241            "tname",
242            m,
243            Column("id", Integer, primary_key=True),
244            Column("thing", Enum("a", "b", "c", create_constraint=True)),
245        )
246        return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False)
247
248    def _server_default_fixture(self, table_args=(), table_kwargs={}):
249        m = MetaData()
250        t = Table(
251            "tname",
252            m,
253            Column("id", Integer, primary_key=True),
254            Column("thing", String(), server_default=""),
255        )
256        return ApplyBatchImpl(self.impl, t, table_args, table_kwargs, False)
257
258    def _assert_impl(
259        self,
260        impl,
261        colnames=None,
262        ddl_contains=None,
263        ddl_not_contains=None,
264        dialect="default",
265        schema=None,
266    ):
267        context = op_fixture(dialect=dialect)
268
269        impl._create(context.impl)
270
271        if colnames is None:
272            colnames = ["id", "x", "y"]
273        eq_(impl.new_table.c.keys(), colnames)
274
275        pk_cols = [col for col in impl.new_table.c if col.primary_key]
276        eq_(list(impl.new_table.primary_key), pk_cols)
277
278        create_stmt = str(
279            CreateTable(impl.new_table).compile(dialect=context.dialect)
280        )
281        create_stmt = re.sub(r"[\n\t]", "", create_stmt)
282
283        idx_stmt = ""
284        for idx in impl.indexes.values():
285            idx_stmt += str(CreateIndex(idx).compile(dialect=context.dialect))
286        for idx in impl.new_indexes.values():
287            impl.new_table.name = impl.table.name
288            idx_stmt += str(CreateIndex(idx).compile(dialect=context.dialect))
289            impl.new_table.name = ApplyBatchImpl._calc_temp_name(
290                impl.table.name
291            )
292        idx_stmt = re.sub(r"[\n\t]", "", idx_stmt)
293
294        if ddl_contains:
295            assert ddl_contains in create_stmt + idx_stmt
296        if ddl_not_contains:
297            assert ddl_not_contains not in create_stmt + idx_stmt
298
299        expected = [create_stmt]
300
301        if schema:
302            args = {"schema": "%s." % schema}
303        else:
304            args = {"schema": ""}
305
306        args["temp_name"] = impl.new_table.name
307
308        args["colnames"] = ", ".join(
309            [
310                impl.new_table.c[name].name
311                for name in colnames
312                if name in impl.table.c
313            ]
314        )
315
316        args["tname_colnames"] = ", ".join(
317            "CAST(%(schema)stname.%(name)s AS %(type)s) AS %(cast_label)s"
318            % {
319                "schema": args["schema"],
320                "name": name,
321                "type": impl.new_table.c[name].type,
322                "cast_label": name if sqla_14 else "anon_1",
323            }
324            if (
325                impl.new_table.c[name].type._type_affinity
326                is not impl.table.c[name].type._type_affinity
327            )
328            else "%(schema)stname.%(name)s"
329            % {"schema": args["schema"], "name": name}
330            for name in colnames
331            if name in impl.table.c
332        )
333
334        expected.extend(
335            [
336                "INSERT INTO %(schema)s%(temp_name)s (%(colnames)s) "
337                "SELECT %(tname_colnames)s FROM %(schema)stname" % args,
338                "DROP TABLE %(schema)stname" % args,
339                "ALTER TABLE %(schema)s%(temp_name)s "
340                "RENAME TO %(schema)stname" % args,
341            ]
342        )
343        if idx_stmt:
344            expected.append(idx_stmt)
345        context.assert_(*expected)
346        return impl.new_table
347
348    def test_change_type(self):
349        impl = self._simple_fixture()
350        impl.alter_column("tname", "x", type_=String)
351        new_table = self._assert_impl(impl)
352        assert new_table.c.x.type._type_affinity is String
353
354    def test_rename_col(self):
355        impl = self._simple_fixture()
356        impl.alter_column("tname", "x", name="q")
357        new_table = self._assert_impl(impl)
358        eq_(new_table.c.x.name, "q")
359
360    def test_alter_column_comment(self):
361        impl = self._simple_fixture()
362        impl.alter_column("tname", "x", comment="some comment")
363        new_table = self._assert_impl(impl)
364        eq_(new_table.c.x.comment, "some comment")
365
366    def test_add_column_comment(self):
367        impl = self._simple_fixture()
368        impl.add_column("tname", Column("q", Integer, comment="some comment"))
369        new_table = self._assert_impl(impl, colnames=["id", "x", "y", "q"])
370        eq_(new_table.c.q.comment, "some comment")
371
372    def test_rename_col_boolean(self):
373        impl = self._boolean_fixture()
374        impl.alter_column("tname", "flag", name="bflag")
375        new_table = self._assert_impl(
376            impl,
377            ddl_contains="CHECK (bflag IN (0, 1)",
378            colnames=["id", "flag"],
379        )
380        eq_(new_table.c.flag.name, "bflag")
381        eq_(
382            len(
383                [
384                    const
385                    for const in new_table.constraints
386                    if isinstance(const, CheckConstraint)
387                ]
388            ),
389            1,
390        )
391
392    def test_change_type_schematype_to_non(self):
393        impl = self._boolean_fixture()
394        impl.alter_column("tname", "flag", type_=Integer)
395        new_table = self._assert_impl(
396            impl, colnames=["id", "flag"], ddl_not_contains="CHECK"
397        )
398        assert new_table.c.flag.type._type_affinity is Integer
399
400        # NOTE: we can't do test_change_type_non_to_schematype
401        # at this level because the "add_constraint" part of this
402        # comes from toimpl.py, which we aren't testing here
403
404    def test_rename_col_boolean_no_ck(self):
405        impl = self._boolean_no_ck_fixture()
406        impl.alter_column("tname", "flag", name="bflag")
407        new_table = self._assert_impl(
408            impl, ddl_not_contains="CHECK", colnames=["id", "flag"]
409        )
410        eq_(new_table.c.flag.name, "bflag")
411        eq_(
412            len(
413                [
414                    const
415                    for const in new_table.constraints
416                    if isinstance(const, CheckConstraint)
417                ]
418            ),
419            0,
420        )
421
422    def test_rename_col_enum(self):
423        impl = self._enum_fixture()
424        impl.alter_column("tname", "thing", name="thang")
425        new_table = self._assert_impl(
426            impl,
427            ddl_contains="CHECK (thang IN ('a', 'b', 'c')",
428            colnames=["id", "thing"],
429        )
430        eq_(new_table.c.thing.name, "thang")
431        eq_(
432            len(
433                [
434                    const
435                    for const in new_table.constraints
436                    if isinstance(const, CheckConstraint)
437                ]
438            ),
439            1,
440        )
441
442    def test_rename_col_literal_ck(self):
443        impl = self._literal_ck_fixture()
444        impl.alter_column("tname", "email", name="emol")
445        new_table = self._assert_impl(
446            # note this is wrong, we don't dig into the SQL
447            impl,
448            ddl_contains="CHECK (email LIKE '%@%')",
449            colnames=["id", "email"],
450        )
451        eq_(
452            len(
453                [
454                    c
455                    for c in new_table.constraints
456                    if isinstance(c, CheckConstraint)
457                ]
458            ),
459            1,
460        )
461
462        eq_(new_table.c.email.name, "emol")
463
464    def test_rename_col_literal_ck_workaround(self):
465        impl = self._literal_ck_fixture(
466            copy_from=Table(
467                "tname",
468                MetaData(),
469                Column("id", Integer, primary_key=True),
470                Column("email", String),
471            ),
472            table_args=[CheckConstraint("emol LIKE '%@%'")],
473        )
474
475        impl.alter_column("tname", "email", name="emol")
476        new_table = self._assert_impl(
477            impl,
478            ddl_contains="CHECK (emol LIKE '%@%')",
479            colnames=["id", "email"],
480        )
481        eq_(
482            len(
483                [
484                    c
485                    for c in new_table.constraints
486                    if isinstance(c, CheckConstraint)
487                ]
488            ),
489            1,
490        )
491        eq_(new_table.c.email.name, "emol")
492
493    def test_rename_col_sql_ck(self):
494        impl = self._sql_ck_fixture()
495
496        impl.alter_column("tname", "email", name="emol")
497        new_table = self._assert_impl(
498            impl,
499            ddl_contains="CHECK (emol LIKE '%@%')",
500            colnames=["id", "email"],
501        )
502        eq_(
503            len(
504                [
505                    c
506                    for c in new_table.constraints
507                    if isinstance(c, CheckConstraint)
508                ]
509            ),
510            1,
511        )
512
513        eq_(new_table.c.email.name, "emol")
514
515    def test_add_col(self):
516        impl = self._simple_fixture()
517        col = Column("g", Integer)
518        # operations.add_column produces a table
519        t = self.op.schema_obj.table("tname", col)  # noqa
520        impl.add_column("tname", col)
521        new_table = self._assert_impl(impl, colnames=["id", "x", "y", "g"])
522        eq_(new_table.c.g.name, "g")
523
524    def test_partial_reordering(self):
525        impl = self._simple_fixture(partial_reordering=[("x", "id", "y")])
526        new_table = self._assert_impl(impl, colnames=["x", "id", "y"])
527        eq_(new_table.c.x.name, "x")
528
529    def test_add_col_partial_reordering(self):
530        impl = self._simple_fixture(partial_reordering=[("id", "x", "g", "y")])
531        col = Column("g", Integer)
532        # operations.add_column produces a table
533        t = self.op.schema_obj.table("tname", col)  # noqa
534        impl.add_column("tname", col)
535        new_table = self._assert_impl(impl, colnames=["id", "x", "g", "y"])
536        eq_(new_table.c.g.name, "g")
537
538    def test_add_col_insert_before(self):
539        impl = self._simple_fixture()
540        col = Column("g", Integer)
541        # operations.add_column produces a table
542        t = self.op.schema_obj.table("tname", col)  # noqa
543        impl.add_column("tname", col, insert_before="x")
544        new_table = self._assert_impl(impl, colnames=["id", "g", "x", "y"])
545        eq_(new_table.c.g.name, "g")
546
547    def test_add_col_insert_before_beginning(self):
548        impl = self._simple_fixture()
549        impl.add_column("tname", Column("g", Integer), insert_before="id")
550        new_table = self._assert_impl(impl, colnames=["g", "id", "x", "y"])
551        eq_(new_table.c.g.name, "g")
552
553    def test_add_col_insert_before_middle(self):
554        impl = self._simple_fixture()
555        impl.add_column("tname", Column("g", Integer), insert_before="y")
556        new_table = self._assert_impl(impl, colnames=["id", "x", "g", "y"])
557        eq_(new_table.c.g.name, "g")
558
559    def test_add_col_insert_after_middle(self):
560        impl = self._simple_fixture()
561        impl.add_column("tname", Column("g", Integer), insert_after="id")
562        new_table = self._assert_impl(impl, colnames=["id", "g", "x", "y"])
563        eq_(new_table.c.g.name, "g")
564
565    def test_add_col_insert_after_penultimate(self):
566        impl = self._simple_fixture()
567        impl.add_column("tname", Column("g", Integer), insert_after="x")
568        self._assert_impl(impl, colnames=["id", "x", "g", "y"])
569
570    def test_add_col_insert_after_end(self):
571        impl = self._simple_fixture()
572        impl.add_column("tname", Column("g", Integer), insert_after="y")
573        new_table = self._assert_impl(impl, colnames=["id", "x", "y", "g"])
574        eq_(new_table.c.g.name, "g")
575
576    def test_add_col_insert_after_plus_no_order(self):
577        impl = self._simple_fixture()
578        # operations.add_column produces a table
579        impl.add_column("tname", Column("g", Integer), insert_after="id")
580        impl.add_column("tname", Column("q", Integer))
581        new_table = self._assert_impl(
582            impl, colnames=["id", "g", "x", "y", "q"]
583        )
584        eq_(new_table.c.g.name, "g")
585
586    def test_add_col_no_order_plus_insert_after(self):
587        impl = self._simple_fixture()
588        col = Column("g", Integer)
589        # operations.add_column produces a table
590        t = self.op.schema_obj.table("tname", col)  # noqa
591        impl.add_column("tname", Column("q", Integer))
592        impl.add_column("tname", Column("g", Integer), insert_after="id")
593        new_table = self._assert_impl(
594            impl, colnames=["id", "g", "x", "y", "q"]
595        )
596        eq_(new_table.c.g.name, "g")
597
598    def test_add_col_insert_after_another_insert(self):
599        impl = self._simple_fixture()
600        impl.add_column("tname", Column("g", Integer), insert_after="id")
601        impl.add_column("tname", Column("q", Integer), insert_after="g")
602        new_table = self._assert_impl(
603            impl, colnames=["id", "g", "q", "x", "y"]
604        )
605        eq_(new_table.c.g.name, "g")
606
607    def test_add_col_insert_before_another_insert(self):
608        impl = self._simple_fixture()
609        impl.add_column("tname", Column("g", Integer), insert_after="id")
610        impl.add_column("tname", Column("q", Integer), insert_before="g")
611        new_table = self._assert_impl(
612            impl, colnames=["id", "q", "g", "x", "y"]
613        )
614        eq_(new_table.c.g.name, "g")
615
616    def test_add_server_default(self):
617        impl = self._simple_fixture()
618        impl.alter_column("tname", "y", server_default="10")
619        new_table = self._assert_impl(impl, ddl_contains="DEFAULT '10'")
620        eq_(new_table.c.y.server_default.arg, "10")
621
622    def test_drop_server_default(self):
623        impl = self._server_default_fixture()
624        impl.alter_column("tname", "thing", server_default=None)
625        new_table = self._assert_impl(
626            impl, colnames=["id", "thing"], ddl_not_contains="DEFAULT"
627        )
628        eq_(new_table.c.thing.server_default, None)
629
630    def test_rename_col_pk(self):
631        impl = self._simple_fixture()
632        impl.alter_column("tname", "id", name="foobar")
633        new_table = self._assert_impl(
634            impl, ddl_contains="PRIMARY KEY (foobar)"
635        )
636        eq_(new_table.c.id.name, "foobar")
637        eq_(list(new_table.primary_key), [new_table.c.id])
638
639    def test_rename_col_fk(self):
640        impl = self._fk_fixture()
641        impl.alter_column("tname", "user_id", name="foobar")
642        new_table = self._assert_impl(
643            impl,
644            colnames=["id", "email", "user_id"],
645            ddl_contains='FOREIGN KEY(foobar) REFERENCES "user" (id)',
646        )
647        eq_(new_table.c.user_id.name, "foobar")
648        eq_(
649            list(new_table.c.user_id.foreign_keys)[0]._get_colspec(), "user.id"
650        )
651
652    def test_regen_multi_fk(self):
653        impl = self._multi_fk_fixture()
654        self._assert_impl(
655            impl,
656            colnames=[
657                "id",
658                "email",
659                "user_id_1",
660                "user_id_2",
661                "user_id_3",
662                "user_id_version",
663            ],
664            ddl_contains="FOREIGN KEY(user_id_3, user_id_version) "
665            'REFERENCES "user" (id, id_version)',
666        )
667
668    def test_regen_multi_fk_schema(self):
669        impl = self._multi_fk_fixture(schema="foo_schema")
670        self._assert_impl(
671            impl,
672            colnames=[
673                "id",
674                "email",
675                "user_id_1",
676                "user_id_2",
677                "user_id_3",
678                "user_id_version",
679            ],
680            ddl_contains="FOREIGN KEY(user_id_3, user_id_version) "
681            'REFERENCES foo_schema."user" (id, id_version)',
682            schema="foo_schema",
683        )
684
685    def test_do_not_add_existing_columns_columns(self):
686        impl = self._multi_fk_fixture()
687        meta = impl.table.metadata
688
689        cid = Column("id", Integer())
690        user = Table("user", meta, cid)
691
692        fk = [
693            c
694            for c in impl.unnamed_constraints
695            if isinstance(c, ForeignKeyConstraint)
696        ]
697        impl._setup_referent(meta, fk[0])
698        is_(user.c.id, cid)
699
700    def test_drop_col(self):
701        impl = self._simple_fixture()
702        impl.drop_column("tname", column("x"))
703        new_table = self._assert_impl(impl, colnames=["id", "y"])
704        assert "y" in new_table.c
705        assert "x" not in new_table.c
706
707    def test_drop_col_remove_pk(self):
708        impl = self._simple_fixture()
709        impl.drop_column("tname", column("id"))
710        new_table = self._assert_impl(
711            impl, colnames=["x", "y"], ddl_not_contains="PRIMARY KEY"
712        )
713        assert "y" in new_table.c
714        assert "id" not in new_table.c
715        assert not new_table.primary_key
716
717    def test_drop_col_remove_fk(self):
718        impl = self._fk_fixture()
719        impl.drop_column("tname", column("user_id"))
720        new_table = self._assert_impl(
721            impl, colnames=["id", "email"], ddl_not_contains="FOREIGN KEY"
722        )
723        assert "user_id" not in new_table.c
724        assert not new_table.foreign_keys
725
726    def test_drop_col_retain_fk(self):
727        impl = self._fk_fixture()
728        impl.drop_column("tname", column("email"))
729        new_table = self._assert_impl(
730            impl,
731            colnames=["id", "user_id"],
732            ddl_contains='FOREIGN KEY(user_id) REFERENCES "user" (id)',
733        )
734        assert "email" not in new_table.c
735        assert new_table.c.user_id.foreign_keys
736
737    def test_drop_col_retain_fk_selfref(self):
738        impl = self._selfref_fk_fixture()
739        impl.drop_column("tname", column("data"))
740        new_table = self._assert_impl(impl, colnames=["id", "parent_id"])
741        assert "data" not in new_table.c
742        assert new_table.c.parent_id.foreign_keys
743
744    def test_add_fk(self):
745        impl = self._simple_fixture()
746        impl.add_column("tname", Column("user_id", Integer))
747        fk = self.op.schema_obj.foreign_key_constraint(
748            "fk1", "tname", "user", ["user_id"], ["id"]
749        )
750        impl.add_constraint(fk)
751        new_table = self._assert_impl(
752            impl,
753            colnames=["id", "x", "y", "user_id"],
754            ddl_contains="CONSTRAINT fk1 FOREIGN KEY(user_id) "
755            'REFERENCES "user" (id)',
756        )
757        eq_(
758            list(new_table.c.user_id.foreign_keys)[0]._get_colspec(), "user.id"
759        )
760
761    def test_drop_fk(self):
762        impl = self._named_fk_fixture()
763        fk = ForeignKeyConstraint([], [], name="ufk")
764        impl.drop_constraint(fk)
765        new_table = self._assert_impl(
766            impl,
767            colnames=["id", "email", "user_id"],
768            ddl_not_contains="CONSTRANT fk1",
769        )
770        eq_(list(new_table.foreign_keys), [])
771
772    def test_add_uq(self):
773        impl = self._simple_fixture()
774        uq = self.op.schema_obj.unique_constraint("uq1", "tname", ["y"])
775
776        impl.add_constraint(uq)
777        self._assert_impl(
778            impl,
779            colnames=["id", "x", "y"],
780            ddl_contains="CONSTRAINT uq1 UNIQUE",
781        )
782
783    def test_drop_uq(self):
784        impl = self._uq_fixture()
785
786        uq = self.op.schema_obj.unique_constraint("uq1", "tname", ["y"])
787        impl.drop_constraint(uq)
788        self._assert_impl(
789            impl,
790            colnames=["id", "x", "y"],
791            ddl_not_contains="CONSTRAINT uq1 UNIQUE",
792        )
793
794    def test_add_ck(self):
795        impl = self._simple_fixture()
796        ck = self.op.schema_obj.check_constraint("ck1", "tname", "y > 5")
797
798        impl.add_constraint(ck)
799        self._assert_impl(
800            impl,
801            colnames=["id", "x", "y"],
802            ddl_contains="CONSTRAINT ck1 CHECK (y > 5)",
803        )
804
805    def test_drop_ck_table(self):
806        impl = self._named_ck_table_fixture()
807
808        ck = self.op.schema_obj.check_constraint("ck1", "tname", "y > 5")
809        impl.drop_constraint(ck)
810        self._assert_impl(
811            impl,
812            colnames=["id", "x", "y"],
813            ddl_not_contains="CONSTRAINT ck1 CHECK (y > 5)",
814        )
815
816    def test_drop_ck_col(self):
817        impl = self._named_ck_col_fixture()
818
819        ck = self.op.schema_obj.check_constraint("ck1", "tname", "y > 5")
820        impl.drop_constraint(ck)
821        self._assert_impl(
822            impl,
823            colnames=["id", "x", "y"],
824            ddl_not_contains="CONSTRAINT ck1 CHECK (y > 5)",
825        )
826
827    def test_create_index(self):
828        impl = self._simple_fixture()
829        ix = self.op.schema_obj.index("ix1", "tname", ["y"])
830
831        impl.create_index(ix)
832        self._assert_impl(
833            impl, colnames=["id", "x", "y"], ddl_contains="CREATE INDEX ix1"
834        )
835
836    def test_drop_index(self):
837        impl = self._ix_fixture()
838
839        ix = self.op.schema_obj.index("ix1", "tname", ["y"])
840        impl.drop_index(ix)
841        self._assert_impl(
842            impl,
843            colnames=["id", "x", "y"],
844            ddl_not_contains="CONSTRAINT uq1 UNIQUE",
845        )
846
847    def test_add_table_opts(self):
848        impl = self._simple_fixture(table_kwargs={"mysql_engine": "InnoDB"})
849        self._assert_impl(impl, ddl_contains="ENGINE=InnoDB", dialect="mysql")
850
851    def test_drop_pk(self):
852        impl = self._pk_fixture()
853        pk = self.op.schema_obj.primary_key_constraint("mypk", "tname", ["id"])
854        impl.drop_constraint(pk)
855        new_table = self._assert_impl(impl)
856        assert not new_table.c.id.primary_key
857        assert not len(new_table.primary_key)
858
859
860class BatchAPITest(TestBase):
861    @contextmanager
862    def _fixture(self, schema=None):
863
864        migration_context = mock.Mock(
865            opts={},
866            impl=mock.MagicMock(__dialect__="sqlite", connection=object()),
867        )
868        op = Operations(migration_context)
869        batch = op.batch_alter_table(
870            "tname", recreate="never", schema=schema
871        ).__enter__()
872
873        mock_schema = mock.MagicMock()
874        with mock.patch("alembic.operations.schemaobj.sa_schema", mock_schema):
875            yield batch
876        batch.impl.flush()
877        self.mock_schema = mock_schema
878
879    def test_drop_col(self):
880        with self._fixture() as batch:
881            batch.drop_column("q")
882
883        eq_(
884            batch.impl.operations.impl.mock_calls,
885            [
886                mock.call.drop_column(
887                    "tname", self.mock_schema.Column(), schema=None
888                )
889            ],
890        )
891
892    def test_add_col(self):
893        column = Column("w", String(50))
894
895        with self._fixture() as batch:
896            batch.add_column(column)
897
898        assert (
899            mock.call.add_column("tname", column, schema=None)
900            in batch.impl.operations.impl.mock_calls
901        )
902
903    def test_create_fk(self):
904        with self._fixture() as batch:
905            batch.create_foreign_key("myfk", "user", ["x"], ["y"])
906
907        eq_(
908            self.mock_schema.ForeignKeyConstraint.mock_calls,
909            [
910                mock.call(
911                    ["x"],
912                    ["user.y"],
913                    onupdate=None,
914                    ondelete=None,
915                    name="myfk",
916                    initially=None,
917                    deferrable=None,
918                    match=None,
919                )
920            ],
921        )
922        eq_(
923            self.mock_schema.Table.mock_calls,
924            [
925                mock.call(
926                    "user",
927                    self.mock_schema.MetaData(),
928                    self.mock_schema.Column(),
929                    schema=None,
930                ),
931                mock.call(
932                    "tname",
933                    self.mock_schema.MetaData(),
934                    self.mock_schema.Column(),
935                    schema=None,
936                ),
937                mock.call().append_constraint(
938                    self.mock_schema.ForeignKeyConstraint()
939                ),
940            ],
941        )
942        eq_(
943            batch.impl.operations.impl.mock_calls,
944            [
945                mock.call.add_constraint(
946                    self.mock_schema.ForeignKeyConstraint()
947                )
948            ],
949        )
950
951    def test_create_fk_schema(self):
952        with self._fixture(schema="foo") as batch:
953            batch.create_foreign_key("myfk", "user", ["x"], ["y"])
954
955        eq_(
956            self.mock_schema.ForeignKeyConstraint.mock_calls,
957            [
958                mock.call(
959                    ["x"],
960                    ["user.y"],
961                    onupdate=None,
962                    ondelete=None,
963                    name="myfk",
964                    initially=None,
965                    deferrable=None,
966                    match=None,
967                )
968            ],
969        )
970        eq_(
971            self.mock_schema.Table.mock_calls,
972            [
973                mock.call(
974                    "user",
975                    self.mock_schema.MetaData(),
976                    self.mock_schema.Column(),
977                    schema=None,
978                ),
979                mock.call(
980                    "tname",
981                    self.mock_schema.MetaData(),
982                    self.mock_schema.Column(),
983                    schema="foo",
984                ),
985                mock.call().append_constraint(
986                    self.mock_schema.ForeignKeyConstraint()
987                ),
988            ],
989        )
990        eq_(
991            batch.impl.operations.impl.mock_calls,
992            [
993                mock.call.add_constraint(
994                    self.mock_schema.ForeignKeyConstraint()
995                )
996            ],
997        )
998
999    def test_create_uq(self):
1000        with self._fixture() as batch:
1001            batch.create_unique_constraint("uq1", ["a", "b"])
1002
1003        eq_(
1004            self.mock_schema.Table().c.__getitem__.mock_calls,
1005            [mock.call("a"), mock.call("b")],
1006        )
1007
1008        eq_(
1009            self.mock_schema.UniqueConstraint.mock_calls,
1010            [
1011                mock.call(
1012                    self.mock_schema.Table().c.__getitem__(),
1013                    self.mock_schema.Table().c.__getitem__(),
1014                    name="uq1",
1015                )
1016            ],
1017        )
1018        eq_(
1019            batch.impl.operations.impl.mock_calls,
1020            [mock.call.add_constraint(self.mock_schema.UniqueConstraint())],
1021        )
1022
1023    def test_create_pk(self):
1024        with self._fixture() as batch:
1025            batch.create_primary_key("pk1", ["a", "b"])
1026
1027        eq_(
1028            self.mock_schema.Table().c.__getitem__.mock_calls,
1029            [mock.call("a"), mock.call("b")],
1030        )
1031
1032        eq_(
1033            self.mock_schema.PrimaryKeyConstraint.mock_calls,
1034            [
1035                mock.call(
1036                    self.mock_schema.Table().c.__getitem__(),
1037                    self.mock_schema.Table().c.__getitem__(),
1038                    name="pk1",
1039                )
1040            ],
1041        )
1042        eq_(
1043            batch.impl.operations.impl.mock_calls,
1044            [
1045                mock.call.add_constraint(
1046                    self.mock_schema.PrimaryKeyConstraint()
1047                )
1048            ],
1049        )
1050
1051    def test_create_check(self):
1052        expr = text("a > b")
1053        with self._fixture() as batch:
1054            batch.create_check_constraint("ck1", expr)
1055
1056        eq_(
1057            self.mock_schema.CheckConstraint.mock_calls,
1058            [mock.call(expr, name="ck1")],
1059        )
1060        eq_(
1061            batch.impl.operations.impl.mock_calls,
1062            [mock.call.add_constraint(self.mock_schema.CheckConstraint())],
1063        )
1064
1065    def test_drop_constraint(self):
1066        with self._fixture() as batch:
1067            batch.drop_constraint("uq1")
1068
1069        eq_(self.mock_schema.Constraint.mock_calls, [mock.call(name="uq1")])
1070        eq_(
1071            batch.impl.operations.impl.mock_calls,
1072            [mock.call.drop_constraint(self.mock_schema.Constraint())],
1073        )
1074
1075
1076class CopyFromTest(TestBase):
1077    def _fixture(self):
1078        self.metadata = MetaData()
1079        self.table = Table(
1080            "foo",
1081            self.metadata,
1082            Column("id", Integer, primary_key=True),
1083            Column("data", String(50)),
1084            Column("x", Integer),
1085        )
1086
1087        context = op_fixture(dialect="sqlite", as_sql=True)
1088        self.op = Operations(context)
1089        return context
1090
1091    @config.requirements.sqlalchemy_13
1092    def test_change_type(self):
1093        context = self._fixture()
1094        self.table.append_column(Column("toj", Text))
1095        self.table.append_column(Column("fromj", JSON))
1096        with self.op.batch_alter_table(
1097            "foo", copy_from=self.table
1098        ) as batch_op:
1099            batch_op.alter_column("data", type_=Integer)
1100            batch_op.alter_column("toj", type_=JSON)
1101            batch_op.alter_column("fromj", type_=Text)
1102        context.assert_(
1103            "CREATE TABLE _alembic_tmp_foo (id INTEGER NOT NULL, "
1104            "data INTEGER, x INTEGER, toj JSON, fromj TEXT, PRIMARY KEY (id))",
1105            "INSERT INTO _alembic_tmp_foo (id, data, x, toj, fromj) "
1106            "SELECT foo.id, "
1107            "CAST(foo.data AS INTEGER) AS %s, foo.x, foo.toj, "
1108            "CAST(foo.fromj AS TEXT) AS %s FROM foo"
1109            % (
1110                ("data" if sqla_14 else "anon_1"),
1111                ("fromj" if sqla_14 else "anon_2"),
1112            ),
1113            "DROP TABLE foo",
1114            "ALTER TABLE _alembic_tmp_foo RENAME TO foo",
1115        )
1116
1117    def test_change_type_from_schematype(self):
1118        context = self._fixture()
1119        self.table.append_column(
1120            Column("y", Boolean(create_constraint=True, name="ck1"))
1121        )
1122
1123        with self.op.batch_alter_table(
1124            "foo", copy_from=self.table
1125        ) as batch_op:
1126            batch_op.alter_column(
1127                "y",
1128                type_=Integer,
1129                existing_type=Boolean(create_constraint=True, name="ck1"),
1130            )
1131        context.assert_(
1132            "CREATE TABLE _alembic_tmp_foo (id INTEGER NOT NULL, "
1133            "data VARCHAR(50), x INTEGER, y INTEGER, PRIMARY KEY (id))",
1134            "INSERT INTO _alembic_tmp_foo (id, data, x, y) SELECT foo.id, "
1135            "foo.data, foo.x, CAST(foo.y AS INTEGER) AS %s FROM foo"
1136            % (("y" if sqla_14 else "anon_1"),),
1137            "DROP TABLE foo",
1138            "ALTER TABLE _alembic_tmp_foo RENAME TO foo",
1139        )
1140
1141    def test_change_type_to_schematype(self):
1142        context = self._fixture()
1143        self.table.append_column(Column("y", Integer))
1144
1145        with self.op.batch_alter_table(
1146            "foo", copy_from=self.table
1147        ) as batch_op:
1148            batch_op.alter_column(
1149                "y",
1150                existing_type=Integer,
1151                type_=Boolean(create_constraint=True, name="ck1"),
1152            )
1153        context.assert_(
1154            "CREATE TABLE _alembic_tmp_foo (id INTEGER NOT NULL, "
1155            "data VARCHAR(50), x INTEGER, y BOOLEAN, PRIMARY KEY (id), "
1156            "CONSTRAINT ck1 CHECK (y IN (0, 1)))",
1157            "INSERT INTO _alembic_tmp_foo (id, data, x, y) SELECT foo.id, "
1158            "foo.data, foo.x, CAST(foo.y AS BOOLEAN) AS %s FROM foo"
1159            % (("y" if sqla_14 else "anon_1"),),
1160            "DROP TABLE foo",
1161            "ALTER TABLE _alembic_tmp_foo RENAME TO foo",
1162        )
1163
1164    def test_create_drop_index_w_always(self):
1165        context = self._fixture()
1166        with self.op.batch_alter_table(
1167            "foo", copy_from=self.table, recreate="always"
1168        ) as batch_op:
1169            batch_op.create_index("ix_data", ["data"], unique=True)
1170
1171        context.assert_(
1172            "CREATE TABLE _alembic_tmp_foo (id INTEGER NOT NULL, "
1173            "data VARCHAR(50), "
1174            "x INTEGER, PRIMARY KEY (id))",
1175            "INSERT INTO _alembic_tmp_foo (id, data, x) "
1176            "SELECT foo.id, foo.data, foo.x FROM foo",
1177            "DROP TABLE foo",
1178            "ALTER TABLE _alembic_tmp_foo RENAME TO foo",
1179            "CREATE UNIQUE INDEX ix_data ON foo (data)",
1180        )
1181
1182        context.clear_assertions()
1183
1184        Index("ix_data", self.table.c.data, unique=True)
1185        with self.op.batch_alter_table(
1186            "foo", copy_from=self.table, recreate="always"
1187        ) as batch_op:
1188            batch_op.drop_index("ix_data")
1189
1190        context.assert_(
1191            "CREATE TABLE _alembic_tmp_foo (id INTEGER NOT NULL, "
1192            "data VARCHAR(50), x INTEGER, PRIMARY KEY (id))",
1193            "INSERT INTO _alembic_tmp_foo (id, data, x) "
1194            "SELECT foo.id, foo.data, foo.x FROM foo",
1195            "DROP TABLE foo",
1196            "ALTER TABLE _alembic_tmp_foo RENAME TO foo",
1197        )
1198
1199    def test_create_drop_index_wo_always(self):
1200        context = self._fixture()
1201        with self.op.batch_alter_table(
1202            "foo", copy_from=self.table
1203        ) as batch_op:
1204            batch_op.create_index("ix_data", ["data"], unique=True)
1205
1206        context.assert_("CREATE UNIQUE INDEX ix_data ON foo (data)")
1207
1208        context.clear_assertions()
1209
1210        Index("ix_data", self.table.c.data, unique=True)
1211        with self.op.batch_alter_table(
1212            "foo", copy_from=self.table
1213        ) as batch_op:
1214            batch_op.drop_index("ix_data")
1215
1216        context.assert_("DROP INDEX ix_data")
1217
1218    def test_create_drop_index_w_other_ops(self):
1219        context = self._fixture()
1220        with self.op.batch_alter_table(
1221            "foo", copy_from=self.table
1222        ) as batch_op:
1223            batch_op.alter_column("data", type_=Integer)
1224            batch_op.create_index("ix_data", ["data"], unique=True)
1225
1226        context.assert_(
1227            "CREATE TABLE _alembic_tmp_foo (id INTEGER NOT NULL, "
1228            "data INTEGER, x INTEGER, PRIMARY KEY (id))",
1229            "INSERT INTO _alembic_tmp_foo (id, data, x) SELECT foo.id, "
1230            "CAST(foo.data AS INTEGER) AS %s, foo.x FROM foo"
1231            % (("data" if sqla_14 else "anon_1"),),
1232            "DROP TABLE foo",
1233            "ALTER TABLE _alembic_tmp_foo RENAME TO foo",
1234            "CREATE UNIQUE INDEX ix_data ON foo (data)",
1235        )
1236
1237        context.clear_assertions()
1238
1239        Index("ix_data", self.table.c.data, unique=True)
1240        with self.op.batch_alter_table(
1241            "foo", copy_from=self.table
1242        ) as batch_op:
1243            batch_op.drop_index("ix_data")
1244            batch_op.alter_column("data", type_=String)
1245
1246        context.assert_(
1247            "CREATE TABLE _alembic_tmp_foo (id INTEGER NOT NULL, "
1248            "data VARCHAR, x INTEGER, PRIMARY KEY (id))",
1249            "INSERT INTO _alembic_tmp_foo (id, data, x) SELECT foo.id, "
1250            "foo.data, foo.x FROM foo",
1251            "DROP TABLE foo",
1252            "ALTER TABLE _alembic_tmp_foo RENAME TO foo",
1253        )
1254
1255
1256class BatchRoundTripTest(TestBase):
1257    __only_on__ = "sqlite"
1258
1259    def setUp(self):
1260        self.conn = config.db.connect()
1261        self.metadata = MetaData()
1262        t1 = Table(
1263            "foo",
1264            self.metadata,
1265            Column("id", Integer, primary_key=True),
1266            Column("data", String(50)),
1267            Column("x", Integer),
1268            mysql_engine="InnoDB",
1269        )
1270        with self.conn.begin():
1271            t1.create(self.conn)
1272
1273            self.conn.execute(
1274                t1.insert(),
1275                [
1276                    {"id": 1, "data": "d1", "x": 5},
1277                    {"id": 2, "data": "22", "x": 6},
1278                    {"id": 3, "data": "8.5", "x": 7},
1279                    {"id": 4, "data": "9.46", "x": 8},
1280                    {"id": 5, "data": "d5", "x": 9},
1281                ],
1282            )
1283        context = MigrationContext.configure(self.conn)
1284        self.op = Operations(context)
1285
1286    def tearDown(self):
1287        # why commit?  because SQLite has inconsistent treatment
1288        # of transactional DDL. A test that runs CREATE TABLE and then
1289        # ALTER TABLE to change the name of that table, will end up
1290        # committing the CREATE TABLE but not the ALTER. As batch mode
1291        # does this with a temp table name that's not even in the
1292        # metadata collection, we don't have an explicit drop for it
1293        # (though we could do that too).  calling commit means the
1294        # ALTER will go through and the drop_all() will then catch it.
1295        _safe_commit_connection_transaction(self.conn)
1296        with self.conn.begin():
1297            self.metadata.drop_all(self.conn)
1298        self.conn.close()
1299
1300    @contextmanager
1301    def _sqlite_referential_integrity(self):
1302        self.conn.exec_driver_sql("PRAGMA foreign_keys=ON")
1303        try:
1304            yield
1305        finally:
1306            self.conn.exec_driver_sql("PRAGMA foreign_keys=OFF")
1307
1308            # as these tests are typically intentional fails, clean out
1309            # tables left over
1310            m = MetaData()
1311            m.reflect(self.conn)
1312            with self.conn.begin():
1313                m.drop_all(self.conn)
1314
1315    def _no_pk_fixture(self):
1316        with self.conn.begin():
1317            nopk = Table(
1318                "nopk",
1319                self.metadata,
1320                Column("a", Integer),
1321                Column("b", Integer),
1322                Column("c", Integer),
1323                mysql_engine="InnoDB",
1324            )
1325            nopk.create(self.conn)
1326            self.conn.execute(
1327                nopk.insert(),
1328                [{"a": 1, "b": 2, "c": 3}, {"a": 2, "b": 4, "c": 5}],
1329            )
1330            return nopk
1331
1332    def _table_w_index_fixture(self):
1333        with self.conn.begin():
1334            t = Table(
1335                "t_w_ix",
1336                self.metadata,
1337                Column("id", Integer, primary_key=True),
1338                Column("thing", Integer),
1339                Column("data", String(20)),
1340            )
1341            Index("ix_thing", t.c.thing)
1342            t.create(self.conn)
1343            return t
1344
1345    def _boolean_fixture(self):
1346        with self.conn.begin():
1347            t = Table(
1348                "hasbool",
1349                self.metadata,
1350                Column("x", Boolean(create_constraint=True, name="ck1")),
1351                Column("y", Integer),
1352            )
1353            t.create(self.conn)
1354
1355    def _timestamp_fixture(self):
1356        with self.conn.begin():
1357            t = Table("hasts", self.metadata, Column("x", DateTime()))
1358            t.create(self.conn)
1359            return t
1360
1361    def _ck_constraint_fixture(self):
1362        with self.conn.begin():
1363            t = Table(
1364                "ck_table",
1365                self.metadata,
1366                Column("id", Integer, nullable=False),
1367                CheckConstraint("id is not NULL", name="ck"),
1368            )
1369            t.create(self.conn)
1370            return t
1371
1372    def _datetime_server_default_fixture(self):
1373        return func.datetime("now", "localtime")
1374
1375    def _timestamp_w_expr_default_fixture(self):
1376        with self.conn.begin():
1377            t = Table(
1378                "hasts",
1379                self.metadata,
1380                Column(
1381                    "x",
1382                    DateTime(),
1383                    server_default=self._datetime_server_default_fixture(),
1384                    nullable=False,
1385                ),
1386            )
1387            t.create(self.conn)
1388            return t
1389
1390    def _int_to_boolean_fixture(self):
1391        with self.conn.begin():
1392            t = Table("hasbool", self.metadata, Column("x", Integer))
1393            t.create(self.conn)
1394
1395    def test_change_type_boolean_to_int(self):
1396        self._boolean_fixture()
1397        with self.op.batch_alter_table("hasbool") as batch_op:
1398            batch_op.alter_column(
1399                "x",
1400                type_=Integer,
1401                existing_type=Boolean(create_constraint=True, name="ck1"),
1402            )
1403        insp = inspect(self.conn)
1404
1405        eq_(
1406            [
1407                c["type"]._type_affinity
1408                for c in insp.get_columns("hasbool")
1409                if c["name"] == "x"
1410            ],
1411            [Integer],
1412        )
1413
1414    def test_no_net_change_timestamp(self):
1415        t = self._timestamp_fixture()
1416
1417        import datetime
1418
1419        with self.conn.begin():
1420            self.conn.execute(
1421                t.insert(), {"x": datetime.datetime(2012, 5, 18, 15, 32, 5)}
1422            )
1423
1424        with self.op.batch_alter_table("hasts") as batch_op:
1425            batch_op.alter_column("x", type_=DateTime())
1426
1427        eq_(
1428            self.conn.execute(_select(t.c.x)).fetchall(),
1429            [(datetime.datetime(2012, 5, 18, 15, 32, 5),)],
1430        )
1431
1432    def test_no_net_change_timestamp_w_default(self):
1433        t = self._timestamp_w_expr_default_fixture()
1434
1435        with self.op.batch_alter_table("hasts") as batch_op:
1436            batch_op.alter_column(
1437                "x",
1438                type_=DateTime(),
1439                nullable=False,
1440                server_default=self._datetime_server_default_fixture(),
1441            )
1442
1443        with self.conn.begin():
1444            self.conn.execute(t.insert())
1445        res = self.conn.execute(_select(t.c.x))
1446        if sqla_14:
1447            assert res.scalar_one_or_none() is not None
1448        else:
1449            row = res.fetchone()
1450            assert row["x"] is not None
1451
1452    def test_drop_col_schematype(self):
1453        self._boolean_fixture()
1454        with self.op.batch_alter_table("hasbool") as batch_op:
1455            batch_op.drop_column(
1456                "x", existing_type=Boolean(create_constraint=True, name="ck1")
1457            )
1458        insp = inspect(self.conn)
1459
1460        assert "x" not in (c["name"] for c in insp.get_columns("hasbool"))
1461
1462    def test_change_type_int_to_boolean(self):
1463        self._int_to_boolean_fixture()
1464        with self.op.batch_alter_table("hasbool") as batch_op:
1465            batch_op.alter_column(
1466                "x", type_=Boolean(create_constraint=True, name="ck1")
1467            )
1468        insp = inspect(self.conn)
1469
1470        if exclusions.against(config, "sqlite"):
1471            eq_(
1472                [
1473                    c["type"]._type_affinity
1474                    for c in insp.get_columns("hasbool")
1475                    if c["name"] == "x"
1476                ],
1477                [Boolean],
1478            )
1479        elif exclusions.against(config, "mysql"):
1480            eq_(
1481                [
1482                    c["type"]._type_affinity
1483                    for c in insp.get_columns("hasbool")
1484                    if c["name"] == "x"
1485                ],
1486                [Integer],
1487            )
1488
1489    def _assert_data(self, data, tablename="foo"):
1490        res = self.conn.execute(text("select * from %s" % tablename))
1491        if sqla_14:
1492            res = res.mappings()
1493        eq_([dict(row) for row in res], data)
1494
1495    def test_ix_existing(self):
1496        self._table_w_index_fixture()
1497
1498        with self.op.batch_alter_table("t_w_ix") as batch_op:
1499            batch_op.alter_column("data", type_=String(30))
1500            batch_op.create_index("ix_data", ["data"])
1501
1502        insp = inspect(self.conn)
1503        eq_(
1504            set(
1505                (ix["name"], tuple(ix["column_names"]))
1506                for ix in insp.get_indexes("t_w_ix")
1507            ),
1508            set([("ix_data", ("data",)), ("ix_thing", ("thing",))]),
1509        )
1510
1511    def test_fk_points_to_me_auto(self):
1512        self._test_fk_points_to_me("auto")
1513
1514    # in particular, this tests that the failures
1515    # on PG and MySQL result in recovery of the batch system,
1516    # e.g. that the _alembic_tmp_temp table is dropped
1517    @config.requirements.no_referential_integrity
1518    def test_fk_points_to_me_recreate(self):
1519        self._test_fk_points_to_me("always")
1520
1521    @exclusions.only_on("sqlite")
1522    @exclusions.fails(
1523        "intentionally asserting that this "
1524        "doesn't work w/ pragma foreign keys"
1525    )
1526    def test_fk_points_to_me_sqlite_refinteg(self):
1527        with self._sqlite_referential_integrity():
1528            self._test_fk_points_to_me("auto")
1529
1530    def _test_fk_points_to_me(self, recreate):
1531        bar = Table(
1532            "bar",
1533            self.metadata,
1534            Column("id", Integer, primary_key=True),
1535            Column("foo_id", Integer, ForeignKey("foo.id")),
1536            mysql_engine="InnoDB",
1537        )
1538        with self.conn.begin():
1539            bar.create(self.conn)
1540            self.conn.execute(bar.insert(), {"id": 1, "foo_id": 3})
1541
1542        with self.op.batch_alter_table("foo", recreate=recreate) as batch_op:
1543            batch_op.alter_column(
1544                "data", new_column_name="newdata", existing_type=String(50)
1545            )
1546
1547        insp = inspect(self.conn)
1548        eq_(
1549            [
1550                (
1551                    key["referred_table"],
1552                    key["referred_columns"],
1553                    key["constrained_columns"],
1554                )
1555                for key in insp.get_foreign_keys("bar")
1556            ],
1557            [("foo", ["id"], ["foo_id"])],
1558        )
1559
1560    def test_selfref_fk_auto(self):
1561        self._test_selfref_fk("auto")
1562
1563    @config.requirements.no_referential_integrity
1564    def test_selfref_fk_recreate(self):
1565        self._test_selfref_fk("always")
1566
1567    @exclusions.only_on("sqlite")
1568    @exclusions.fails(
1569        "intentionally asserting that this "
1570        "doesn't work w/ pragma foreign keys"
1571    )
1572    def test_selfref_fk_sqlite_refinteg(self):
1573        with self._sqlite_referential_integrity():
1574            self._test_selfref_fk("auto")
1575
1576    def _test_selfref_fk(self, recreate):
1577        bar = Table(
1578            "bar",
1579            self.metadata,
1580            Column("id", Integer, primary_key=True),
1581            Column("bar_id", Integer, ForeignKey("bar.id")),
1582            Column("data", String(50)),
1583            mysql_engine="InnoDB",
1584        )
1585        with self.conn.begin():
1586            bar.create(self.conn)
1587            self.conn.execute(
1588                bar.insert(), {"id": 1, "data": "x", "bar_id": None}
1589            )
1590            self.conn.execute(
1591                bar.insert(), {"id": 2, "data": "y", "bar_id": 1}
1592            )
1593
1594        with self.op.batch_alter_table("bar", recreate=recreate) as batch_op:
1595            batch_op.alter_column(
1596                "data", new_column_name="newdata", existing_type=String(50)
1597            )
1598
1599        insp = inspect(self.conn)
1600
1601        eq_(
1602            [
1603                (
1604                    key["referred_table"],
1605                    key["referred_columns"],
1606                    key["constrained_columns"],
1607                )
1608                for key in insp.get_foreign_keys("bar")
1609            ],
1610            [("bar", ["id"], ["bar_id"])],
1611        )
1612
1613    def test_change_type(self):
1614        with self.op.batch_alter_table("foo") as batch_op:
1615            batch_op.alter_column("data", type_=Integer)
1616
1617        self._assert_data(
1618            [
1619                {"id": 1, "data": 0, "x": 5},
1620                {"id": 2, "data": 22, "x": 6},
1621                {"id": 3, "data": 8, "x": 7},
1622                {"id": 4, "data": 9, "x": 8},
1623                {"id": 5, "data": 0, "x": 9},
1624            ]
1625        )
1626
1627    def test_drop_column(self):
1628        with self.op.batch_alter_table("foo") as batch_op:
1629            batch_op.drop_column("data")
1630
1631        self._assert_data(
1632            [
1633                {"id": 1, "x": 5},
1634                {"id": 2, "x": 6},
1635                {"id": 3, "x": 7},
1636                {"id": 4, "x": 8},
1637                {"id": 5, "x": 9},
1638            ]
1639        )
1640
1641    def test_drop_pk_col_readd_col(self):
1642        # drop a column, add it back without primary_key=True, should no
1643        # longer be in the constraint
1644        with self.op.batch_alter_table("foo") as batch_op:
1645            batch_op.drop_column("id")
1646            batch_op.add_column(Column("id", Integer))
1647
1648        pk_const = inspect(self.conn).get_pk_constraint("foo")
1649        eq_(pk_const["constrained_columns"], [])
1650
1651    def test_drop_pk_col_readd_pk_col(self):
1652        # drop a column, add it back with primary_key=True, should remain
1653        with self.op.batch_alter_table("foo") as batch_op:
1654            batch_op.drop_column("id")
1655            batch_op.add_column(Column("id", Integer, primary_key=True))
1656
1657        pk_const = inspect(self.conn).get_pk_constraint("foo")
1658        eq_(pk_const["constrained_columns"], ["id"])
1659
1660    def test_drop_pk_col_readd_col_also_pk_const(self):
1661        # drop a column, add it back without primary_key=True, but then
1662        # also make anew PK constraint that includes it, should remain
1663        with self.op.batch_alter_table("foo") as batch_op:
1664            batch_op.drop_column("id")
1665            batch_op.add_column(Column("id", Integer))
1666            batch_op.create_primary_key("newpk", ["id"])
1667
1668        pk_const = inspect(self.conn).get_pk_constraint("foo")
1669        eq_(pk_const["constrained_columns"], ["id"])
1670
1671    @testing.combinations(("always",), ("auto",), argnames="recreate")
1672    def test_add_pk_constraint(self, recreate):
1673        self._no_pk_fixture()
1674        with self.op.batch_alter_table("nopk", recreate=recreate) as batch_op:
1675            batch_op.create_primary_key("newpk", ["a", "b"])
1676
1677        pk_const = inspect(self.conn).get_pk_constraint("nopk")
1678        with config.requirements.reflects_pk_names.fail_if():
1679            eq_(pk_const["name"], "newpk")
1680        eq_(pk_const["constrained_columns"], ["a", "b"])
1681
1682    @testing.combinations(("always",), ("auto",), argnames="recreate")
1683    @config.requirements.check_constraint_reflection
1684    def test_add_ck_constraint(self, recreate):
1685        with self.op.batch_alter_table("foo", recreate=recreate) as batch_op:
1686            batch_op.create_check_constraint("newck", text("x > 0"))
1687
1688        ck_consts = inspect(self.conn).get_check_constraints("foo")
1689        ck_consts[0]["sqltext"] = re.sub(
1690            r"[\'\"`\(\)]", "", ck_consts[0]["sqltext"]
1691        )
1692        eq_(ck_consts, [{"sqltext": "x > 0", "name": "newck"}])
1693
1694    @testing.combinations(("always",), ("auto",), argnames="recreate")
1695    @config.requirements.check_constraint_reflection
1696    def test_drop_ck_constraint(self, recreate):
1697        self._ck_constraint_fixture()
1698
1699        with self.op.batch_alter_table(
1700            "ck_table", recreate=recreate
1701        ) as batch_op:
1702            batch_op.drop_constraint("ck", "check")
1703
1704        ck_consts = inspect(self.conn).get_check_constraints("ck_table")
1705        eq_(ck_consts, [])
1706
1707    @config.requirements.unnamed_constraints
1708    def test_drop_foreign_key(self):
1709        bar = Table(
1710            "bar",
1711            self.metadata,
1712            Column("id", Integer, primary_key=True),
1713            Column("foo_id", Integer, ForeignKey("foo.id")),
1714            mysql_engine="InnoDB",
1715        )
1716        with self.conn.begin():
1717            bar.create(self.conn)
1718            self.conn.execute(bar.insert(), {"id": 1, "foo_id": 3})
1719
1720        naming_convention = {
1721            "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s"
1722        }
1723        with self.op.batch_alter_table(
1724            "bar", naming_convention=naming_convention
1725        ) as batch_op:
1726            batch_op.drop_constraint("fk_bar_foo_id_foo", type_="foreignkey")
1727        eq_(inspect(self.conn).get_foreign_keys("bar"), [])
1728
1729    def test_drop_column_fk_recreate(self):
1730        with self.op.batch_alter_table("foo", recreate="always") as batch_op:
1731            batch_op.drop_column("data")
1732
1733        self._assert_data(
1734            [
1735                {"id": 1, "x": 5},
1736                {"id": 2, "x": 6},
1737                {"id": 3, "x": 7},
1738                {"id": 4, "x": 8},
1739                {"id": 5, "x": 9},
1740            ]
1741        )
1742
1743    def _assert_table_comment(self, tname, comment):
1744        insp = inspect(self.conn)
1745
1746        tcomment = insp.get_table_comment(tname)
1747        eq_(tcomment, {"text": comment})
1748
1749    @testing.combinations(("always",), ("auto",), argnames="recreate")
1750    def test_add_uq(self, recreate):
1751        with self.op.batch_alter_table("foo", recreate=recreate) as batch_op:
1752            batch_op.create_unique_constraint("newuk", ["x"])
1753
1754        uq_consts = inspect(self.conn).get_unique_constraints("foo")
1755        eq_(
1756            [
1757                {"name": uc["name"], "column_names": uc["column_names"]}
1758                for uc in uq_consts
1759            ],
1760            [{"name": "newuk", "column_names": ["x"]}],
1761        )
1762
1763    @testing.combinations(("always",), ("auto",), argnames="recreate")
1764    def test_add_uq_plus_col(self, recreate):
1765        with self.op.batch_alter_table("foo", recreate=recreate) as batch_op:
1766            batch_op.add_column(Column("y", Integer))
1767            batch_op.create_unique_constraint("newuk", ["x", "y"])
1768
1769        uq_consts = inspect(self.conn).get_unique_constraints("foo")
1770
1771        eq_(
1772            [
1773                {"name": uc["name"], "column_names": uc["column_names"]}
1774                for uc in uq_consts
1775            ],
1776            [{"name": "newuk", "column_names": ["x", "y"]}],
1777        )
1778
1779    @config.requirements.comments
1780    def test_add_table_comment(self):
1781        with self.op.batch_alter_table("foo") as batch_op:
1782            batch_op.create_table_comment("some comment")
1783
1784        self._assert_table_comment("foo", "some comment")
1785
1786        with self.op.batch_alter_table("foo") as batch_op:
1787            batch_op.create_table_comment(
1788                "some new comment", existing_comment="some comment"
1789            )
1790
1791        self._assert_table_comment("foo", "some new comment")
1792
1793    @config.requirements.comments
1794    def test_drop_table_comment(self):
1795        with self.op.batch_alter_table("foo") as batch_op:
1796            batch_op.create_table_comment("some comment")
1797
1798        with self.op.batch_alter_table("foo") as batch_op:
1799            batch_op.drop_table_comment(existing_comment="some comment")
1800
1801        self._assert_table_comment("foo", None)
1802
1803    def _assert_column_comment(self, tname, cname, comment):
1804        insp = inspect(self.conn)
1805
1806        cols = {col["name"]: col for col in insp.get_columns(tname)}
1807        eq_(cols[cname]["comment"], comment)
1808
1809    @config.requirements.comments
1810    def test_add_column_comment(self):
1811        with self.op.batch_alter_table("foo") as batch_op:
1812            batch_op.add_column(Column("y", Integer, comment="some comment"))
1813
1814        self._assert_column_comment("foo", "y", "some comment")
1815
1816        self._assert_data(
1817            [
1818                {"id": 1, "data": "d1", "x": 5, "y": None},
1819                {"id": 2, "data": "22", "x": 6, "y": None},
1820                {"id": 3, "data": "8.5", "x": 7, "y": None},
1821                {"id": 4, "data": "9.46", "x": 8, "y": None},
1822                {"id": 5, "data": "d5", "x": 9, "y": None},
1823            ]
1824        )
1825
1826    @config.requirements.comments
1827    def test_add_column_comment_recreate(self):
1828        with self.op.batch_alter_table("foo", recreate="always") as batch_op:
1829            batch_op.add_column(Column("y", Integer, comment="some comment"))
1830
1831        self._assert_column_comment("foo", "y", "some comment")
1832
1833        self._assert_data(
1834            [
1835                {"id": 1, "data": "d1", "x": 5, "y": None},
1836                {"id": 2, "data": "22", "x": 6, "y": None},
1837                {"id": 3, "data": "8.5", "x": 7, "y": None},
1838                {"id": 4, "data": "9.46", "x": 8, "y": None},
1839                {"id": 5, "data": "d5", "x": 9, "y": None},
1840            ]
1841        )
1842
1843    @config.requirements.comments
1844    def test_alter_column_comment(self):
1845        with self.op.batch_alter_table("foo") as batch_op:
1846            batch_op.alter_column(
1847                "x", existing_type=Integer(), comment="some comment"
1848            )
1849
1850        self._assert_column_comment("foo", "x", "some comment")
1851
1852        self._assert_data(
1853            [
1854                {"id": 1, "data": "d1", "x": 5},
1855                {"id": 2, "data": "22", "x": 6},
1856                {"id": 3, "data": "8.5", "x": 7},
1857                {"id": 4, "data": "9.46", "x": 8},
1858                {"id": 5, "data": "d5", "x": 9},
1859            ]
1860        )
1861
1862    @config.requirements.comments
1863    def test_alter_column_comment_recreate(self):
1864        with self.op.batch_alter_table("foo", recreate="always") as batch_op:
1865            batch_op.alter_column("x", comment="some comment")
1866
1867        self._assert_column_comment("foo", "x", "some comment")
1868
1869        self._assert_data(
1870            [
1871                {"id": 1, "data": "d1", "x": 5},
1872                {"id": 2, "data": "22", "x": 6},
1873                {"id": 3, "data": "8.5", "x": 7},
1874                {"id": 4, "data": "9.46", "x": 8},
1875                {"id": 5, "data": "d5", "x": 9},
1876            ]
1877        )
1878
1879    def test_rename_column(self):
1880        with self.op.batch_alter_table("foo") as batch_op:
1881            batch_op.alter_column("x", new_column_name="y")
1882
1883        self._assert_data(
1884            [
1885                {"id": 1, "data": "d1", "y": 5},
1886                {"id": 2, "data": "22", "y": 6},
1887                {"id": 3, "data": "8.5", "y": 7},
1888                {"id": 4, "data": "9.46", "y": 8},
1889                {"id": 5, "data": "d5", "y": 9},
1890            ]
1891        )
1892
1893    def test_rename_column_boolean(self):
1894        bar = Table(
1895            "bar",
1896            self.metadata,
1897            Column("id", Integer, primary_key=True),
1898            Column("flag", Boolean(create_constraint=True)),
1899            mysql_engine="InnoDB",
1900        )
1901        with self.conn.begin():
1902            bar.create(self.conn)
1903            self.conn.execute(bar.insert(), {"id": 1, "flag": True})
1904            self.conn.execute(bar.insert(), {"id": 2, "flag": False})
1905
1906        with self.op.batch_alter_table("bar") as batch_op:
1907            batch_op.alter_column(
1908                "flag", new_column_name="bflag", existing_type=Boolean
1909            )
1910
1911        self._assert_data(
1912            [{"id": 1, "bflag": True}, {"id": 2, "bflag": False}], "bar"
1913        )
1914
1915    #    @config.requirements.check_constraint_reflection
1916    def test_rename_column_boolean_named_ck(self):
1917        bar = Table(
1918            "bar",
1919            self.metadata,
1920            Column("id", Integer, primary_key=True),
1921            Column("flag", Boolean(create_constraint=True, name="ck1")),
1922            mysql_engine="InnoDB",
1923        )
1924        with self.conn.begin():
1925            bar.create(self.conn)
1926            self.conn.execute(bar.insert(), {"id": 1, "flag": True})
1927            self.conn.execute(bar.insert(), {"id": 2, "flag": False})
1928
1929        with self.op.batch_alter_table("bar", recreate="always") as batch_op:
1930            batch_op.alter_column(
1931                "flag",
1932                new_column_name="bflag",
1933                existing_type=Boolean(create_constraint=True, name="ck1"),
1934            )
1935
1936        self._assert_data(
1937            [{"id": 1, "bflag": True}, {"id": 2, "bflag": False}], "bar"
1938        )
1939
1940    @config.requirements.non_native_boolean
1941    def test_rename_column_non_native_boolean_no_ck(self):
1942        bar = Table(
1943            "bar",
1944            self.metadata,
1945            Column("id", Integer, primary_key=True),
1946            Column("flag", Boolean(create_constraint=False)),
1947            mysql_engine="InnoDB",
1948        )
1949        with self.conn.begin():
1950            bar.create(self.conn)
1951            self.conn.execute(bar.insert(), {"id": 1, "flag": True})
1952            self.conn.execute(bar.insert(), {"id": 2, "flag": False})
1953            self.conn.execute(
1954                # override Boolean type which as of 1.1 coerces numerics
1955                # to 1/0
1956                text("insert into bar (id, flag) values (:id, :flag)"),
1957                {"id": 3, "flag": 5},
1958            )
1959
1960        with self.op.batch_alter_table(
1961            "bar",
1962            reflect_args=[Column("flag", Boolean(create_constraint=False))],
1963        ) as batch_op:
1964            batch_op.alter_column(
1965                "flag", new_column_name="bflag", existing_type=Boolean
1966            )
1967
1968        self._assert_data(
1969            [
1970                {"id": 1, "bflag": True},
1971                {"id": 2, "bflag": False},
1972                {"id": 3, "bflag": 5},
1973            ],
1974            "bar",
1975        )
1976
1977    def test_drop_column_pk(self):
1978        with self.op.batch_alter_table("foo") as batch_op:
1979            batch_op.drop_column("id")
1980
1981        self._assert_data(
1982            [
1983                {"data": "d1", "x": 5},
1984                {"data": "22", "x": 6},
1985                {"data": "8.5", "x": 7},
1986                {"data": "9.46", "x": 8},
1987                {"data": "d5", "x": 9},
1988            ]
1989        )
1990
1991    def test_rename_column_pk(self):
1992        with self.op.batch_alter_table("foo") as batch_op:
1993            batch_op.alter_column("id", new_column_name="ident")
1994
1995        self._assert_data(
1996            [
1997                {"ident": 1, "data": "d1", "x": 5},
1998                {"ident": 2, "data": "22", "x": 6},
1999                {"ident": 3, "data": "8.5", "x": 7},
2000                {"ident": 4, "data": "9.46", "x": 8},
2001                {"ident": 5, "data": "d5", "x": 9},
2002            ]
2003        )
2004
2005    def test_add_column_auto(self):
2006        # note this uses ALTER
2007        with self.op.batch_alter_table("foo") as batch_op:
2008            batch_op.add_column(
2009                Column("data2", String(50), server_default="hi")
2010            )
2011
2012        self._assert_data(
2013            [
2014                {"id": 1, "data": "d1", "x": 5, "data2": "hi"},
2015                {"id": 2, "data": "22", "x": 6, "data2": "hi"},
2016                {"id": 3, "data": "8.5", "x": 7, "data2": "hi"},
2017                {"id": 4, "data": "9.46", "x": 8, "data2": "hi"},
2018                {"id": 5, "data": "d5", "x": 9, "data2": "hi"},
2019            ]
2020        )
2021        eq_(
2022            [col["name"] for col in inspect(config.db).get_columns("foo")],
2023            ["id", "data", "x", "data2"],
2024        )
2025
2026    def test_add_column_auto_server_default_calculated(self):
2027        """test #883"""
2028        with self.op.batch_alter_table("foo") as batch_op:
2029            batch_op.add_column(
2030                Column(
2031                    "data2",
2032                    DateTime(),
2033                    server_default=self._datetime_server_default_fixture(),
2034                )
2035            )
2036
2037        self._assert_data(
2038            [
2039                {"id": 1, "data": "d1", "x": 5, "data2": mock.ANY},
2040                {"id": 2, "data": "22", "x": 6, "data2": mock.ANY},
2041                {"id": 3, "data": "8.5", "x": 7, "data2": mock.ANY},
2042                {"id": 4, "data": "9.46", "x": 8, "data2": mock.ANY},
2043                {"id": 5, "data": "d5", "x": 9, "data2": mock.ANY},
2044            ]
2045        )
2046        eq_(
2047            [col["name"] for col in inspect(self.conn).get_columns("foo")],
2048            ["id", "data", "x", "data2"],
2049        )
2050
2051    @testing.combinations((True,), (False,))
2052    @testing.exclusions.only_on("sqlite")
2053    @config.requirements.computed_columns
2054    def test_add_column_auto_generated(self, persisted):
2055        """test #883"""
2056        with self.op.batch_alter_table("foo") as batch_op:
2057            batch_op.add_column(
2058                Column(
2059                    "data2", Integer, Computed("1 + 1", persisted=persisted)
2060                )
2061            )
2062
2063        self._assert_data(
2064            [
2065                {"id": 1, "data": "d1", "x": 5, "data2": 2},
2066                {"id": 2, "data": "22", "x": 6, "data2": 2},
2067                {"id": 3, "data": "8.5", "x": 7, "data2": 2},
2068                {"id": 4, "data": "9.46", "x": 8, "data2": 2},
2069                {"id": 5, "data": "d5", "x": 9, "data2": 2},
2070            ]
2071        )
2072        eq_(
2073            [col["name"] for col in inspect(self.conn).get_columns("foo")],
2074            ["id", "data", "x", "data2"],
2075        )
2076
2077    @config.requirements.identity_columns
2078    def test_add_column_auto_identity(self):
2079        """test #883"""
2080
2081        self._no_pk_fixture()
2082
2083        with self.op.batch_alter_table("nopk") as batch_op:
2084            batch_op.add_column(Column("id", Integer, Identity()))
2085
2086        self._assert_data(
2087            [
2088                {"a": 1, "b": 2, "c": 3, "id": 1},
2089                {"a": 2, "b": 4, "c": 5, "id": 2},
2090            ],
2091            tablename="nopk",
2092        )
2093        eq_(
2094            [col["name"] for col in inspect(self.conn).get_columns("foo")],
2095            ["id", "data", "x"],
2096        )
2097
2098    def test_add_column_insert_before_recreate(self):
2099        with self.op.batch_alter_table("foo", recreate="always") as batch_op:
2100            batch_op.add_column(
2101                Column("data2", String(50), server_default="hi"),
2102                insert_before="data",
2103            )
2104        self._assert_data(
2105            [
2106                {"id": 1, "data": "d1", "x": 5, "data2": "hi"},
2107                {"id": 2, "data": "22", "x": 6, "data2": "hi"},
2108                {"id": 3, "data": "8.5", "x": 7, "data2": "hi"},
2109                {"id": 4, "data": "9.46", "x": 8, "data2": "hi"},
2110                {"id": 5, "data": "d5", "x": 9, "data2": "hi"},
2111            ]
2112        )
2113        eq_(
2114            [col["name"] for col in inspect(self.conn).get_columns("foo")],
2115            ["id", "data2", "data", "x"],
2116        )
2117
2118    def test_add_column_insert_after_recreate(self):
2119        with self.op.batch_alter_table("foo", recreate="always") as batch_op:
2120            batch_op.add_column(
2121                Column("data2", String(50), server_default="hi"),
2122                insert_after="data",
2123            )
2124        self._assert_data(
2125            [
2126                {"id": 1, "data": "d1", "x": 5, "data2": "hi"},
2127                {"id": 2, "data": "22", "x": 6, "data2": "hi"},
2128                {"id": 3, "data": "8.5", "x": 7, "data2": "hi"},
2129                {"id": 4, "data": "9.46", "x": 8, "data2": "hi"},
2130                {"id": 5, "data": "d5", "x": 9, "data2": "hi"},
2131            ]
2132        )
2133        eq_(
2134            [col["name"] for col in inspect(self.conn).get_columns("foo")],
2135            ["id", "data", "data2", "x"],
2136        )
2137
2138    def test_add_column_insert_before_raise_on_alter(self):
2139        def go():
2140            with self.op.batch_alter_table("foo") as batch_op:
2141                batch_op.add_column(
2142                    Column("data2", String(50), server_default="hi"),
2143                    insert_before="data",
2144                )
2145
2146        assert_raises_message(
2147            alembic_exc.CommandError,
2148            "Can't specify insert_before or insert_after when using ALTER",
2149            go,
2150        )
2151
2152    def test_add_column_recreate(self):
2153        with self.op.batch_alter_table("foo", recreate="always") as batch_op:
2154            batch_op.add_column(
2155                Column("data2", String(50), server_default="hi")
2156            )
2157
2158        self._assert_data(
2159            [
2160                {"id": 1, "data": "d1", "x": 5, "data2": "hi"},
2161                {"id": 2, "data": "22", "x": 6, "data2": "hi"},
2162                {"id": 3, "data": "8.5", "x": 7, "data2": "hi"},
2163                {"id": 4, "data": "9.46", "x": 8, "data2": "hi"},
2164                {"id": 5, "data": "d5", "x": 9, "data2": "hi"},
2165            ]
2166        )
2167        eq_(
2168            [col["name"] for col in inspect(self.conn).get_columns("foo")],
2169            ["id", "data", "x", "data2"],
2170        )
2171
2172    def test_create_drop_index(self):
2173        insp = inspect(self.conn)
2174        eq_(insp.get_indexes("foo"), [])
2175
2176        with self.op.batch_alter_table("foo", recreate="always") as batch_op:
2177            batch_op.create_index("ix_data", ["data"], unique=True)
2178
2179        self._assert_data(
2180            [
2181                {"id": 1, "data": "d1", "x": 5},
2182                {"id": 2, "data": "22", "x": 6},
2183                {"id": 3, "data": "8.5", "x": 7},
2184                {"id": 4, "data": "9.46", "x": 8},
2185                {"id": 5, "data": "d5", "x": 9},
2186            ]
2187        )
2188        insp = inspect(self.conn)
2189        eq_(
2190            [
2191                dict(
2192                    unique=ix["unique"],
2193                    name=ix["name"],
2194                    column_names=ix["column_names"],
2195                )
2196                for ix in insp.get_indexes("foo")
2197            ],
2198            [{"unique": True, "name": "ix_data", "column_names": ["data"]}],
2199        )
2200
2201        with self.op.batch_alter_table("foo", recreate="always") as batch_op:
2202            batch_op.drop_index("ix_data")
2203
2204        insp = inspect(self.conn)
2205        eq_(insp.get_indexes("foo"), [])
2206
2207
2208class BatchRoundTripMySQLTest(BatchRoundTripTest):
2209    __only_on__ = "mysql", "mariadb"
2210    __backend__ = True
2211
2212    def _datetime_server_default_fixture(self):
2213        return func.current_timestamp()
2214
2215    @exclusions.fails()
2216    def test_drop_pk_col_readd_pk_col(self):
2217        super(BatchRoundTripMySQLTest, self).test_drop_pk_col_readd_pk_col()
2218
2219    @exclusions.fails()
2220    def test_drop_pk_col_readd_col_also_pk_const(self):
2221        super(
2222            BatchRoundTripMySQLTest, self
2223        ).test_drop_pk_col_readd_col_also_pk_const()
2224
2225    @exclusions.fails()
2226    def test_rename_column_pk(self):
2227        super(BatchRoundTripMySQLTest, self).test_rename_column_pk()
2228
2229    @exclusions.fails()
2230    def test_rename_column(self):
2231        super(BatchRoundTripMySQLTest, self).test_rename_column()
2232
2233    @exclusions.fails()
2234    def test_change_type(self):
2235        super(BatchRoundTripMySQLTest, self).test_change_type()
2236
2237    def test_create_drop_index(self):
2238        super(BatchRoundTripMySQLTest, self).test_create_drop_index()
2239
2240    # fails on mariadb 10.2, succeeds on 10.3
2241    @exclusions.fails_if(config.requirements.mysql_check_col_name_change)
2242    def test_rename_column_boolean(self):
2243        super(BatchRoundTripMySQLTest, self).test_rename_column_boolean()
2244
2245    def test_change_type_boolean_to_int(self):
2246        super(BatchRoundTripMySQLTest, self).test_change_type_boolean_to_int()
2247
2248    def test_change_type_int_to_boolean(self):
2249        super(BatchRoundTripMySQLTest, self).test_change_type_int_to_boolean()
2250
2251
2252class BatchRoundTripPostgresqlTest(BatchRoundTripTest):
2253    __only_on__ = "postgresql"
2254    __backend__ = True
2255
2256    def _native_boolean_fixture(self):
2257        t = Table(
2258            "has_native_bool",
2259            self.metadata,
2260            Column(
2261                "x",
2262                Boolean(create_constraint=True),
2263                server_default="false",
2264                nullable=False,
2265            ),
2266            Column("y", Integer),
2267        )
2268        with self.conn.begin():
2269            t.create(self.conn)
2270
2271    def _datetime_server_default_fixture(self):
2272        return func.current_timestamp()
2273
2274    @exclusions.fails()
2275    def test_drop_pk_col_readd_pk_col(self):
2276        super(
2277            BatchRoundTripPostgresqlTest, self
2278        ).test_drop_pk_col_readd_pk_col()
2279
2280    @exclusions.fails()
2281    def test_drop_pk_col_readd_col_also_pk_const(self):
2282        super(
2283            BatchRoundTripPostgresqlTest, self
2284        ).test_drop_pk_col_readd_col_also_pk_const()
2285
2286    @exclusions.fails()
2287    def test_change_type(self):
2288        super(BatchRoundTripPostgresqlTest, self).test_change_type()
2289
2290    def test_create_drop_index(self):
2291        super(BatchRoundTripPostgresqlTest, self).test_create_drop_index()
2292
2293    @exclusions.fails()
2294    def test_change_type_int_to_boolean(self):
2295        super(
2296            BatchRoundTripPostgresqlTest, self
2297        ).test_change_type_int_to_boolean()
2298
2299    @exclusions.fails()
2300    def test_change_type_boolean_to_int(self):
2301        super(
2302            BatchRoundTripPostgresqlTest, self
2303        ).test_change_type_boolean_to_int()
2304
2305    def test_add_col_table_has_native_boolean(self):
2306        self._native_boolean_fixture()
2307
2308        # to ensure test coverage on SQLAlchemy 1.4 and above,
2309        # force the create_constraint flag to True even though it
2310        # defaults to false in 1.4.  this test wants to ensure that the
2311        # "should create" rule is consulted
2312        def listen_for_reflect(inspector, table, column_info):
2313            if isinstance(column_info["type"], Boolean):
2314                column_info["type"].create_constraint = True
2315
2316        with self.op.batch_alter_table(
2317            "has_native_bool",
2318            recreate="always",
2319            reflect_kwargs={
2320                "listeners": [("column_reflect", listen_for_reflect)]
2321            },
2322        ) as batch_op:
2323            batch_op.add_column(Column("data", Integer))
2324
2325        insp = inspect(self.conn)
2326
2327        eq_(
2328            [
2329                c["type"]._type_affinity
2330                for c in insp.get_columns("has_native_bool")
2331                if c["name"] == "data"
2332            ],
2333            [Integer],
2334        )
2335        eq_(
2336            [
2337                c["type"]._type_affinity
2338                for c in insp.get_columns("has_native_bool")
2339                if c["name"] == "x"
2340            ],
2341            [Boolean],
2342        )
2343