1import sqlalchemy as tsa
2from sqlalchemy import create_engine
3from sqlalchemy import create_mock_engine
4from sqlalchemy import event
5from sqlalchemy import Integer
6from sqlalchemy import MetaData
7from sqlalchemy import String
8from sqlalchemy import testing
9from sqlalchemy import text
10from sqlalchemy.schema import AddConstraint
11from sqlalchemy.schema import CheckConstraint
12from sqlalchemy.schema import DDL
13from sqlalchemy.schema import DropConstraint
14from sqlalchemy.testing import AssertsCompiledSQL
15from sqlalchemy.testing import engines
16from sqlalchemy.testing import eq_
17from sqlalchemy.testing import fixtures
18from sqlalchemy.testing import mock
19from sqlalchemy.testing.schema import Column
20from sqlalchemy.testing.schema import Table
21
22
23class DDLEventTest(fixtures.TestBase):
24    def setup_test(self):
25        self.bind = engines.mock_engine()
26        self.metadata = MetaData()
27        self.table = Table("t", self.metadata, Column("id", Integer))
28
29    def test_table_create_before(self):
30        table, bind = self.table, self.bind
31        canary = mock.Mock()
32        event.listen(table, "before_create", canary.before_create)
33
34        table.create(bind)
35        table.drop(bind)
36        eq_(
37            canary.mock_calls,
38            [
39                mock.call.before_create(
40                    table,
41                    self.bind,
42                    checkfirst=False,
43                    _ddl_runner=mock.ANY,
44                    _is_metadata_operation=mock.ANY,
45                )
46            ],
47        )
48
49    def test_table_create_after(self):
50        table, bind = self.table, self.bind
51        canary = mock.Mock()
52        event.listen(table, "after_create", canary.after_create)
53
54        table.create(bind)
55        table.drop(bind)
56        eq_(
57            canary.mock_calls,
58            [
59                mock.call.after_create(
60                    table,
61                    self.bind,
62                    checkfirst=False,
63                    _ddl_runner=mock.ANY,
64                    _is_metadata_operation=mock.ANY,
65                )
66            ],
67        )
68
69    def test_table_create_both(self):
70        table, bind = self.table, self.bind
71        canary = mock.Mock()
72        event.listen(table, "before_create", canary.before_create)
73        event.listen(table, "after_create", canary.after_create)
74
75        table.create(bind)
76        table.drop(bind)
77        eq_(
78            canary.mock_calls,
79            [
80                mock.call.before_create(
81                    table,
82                    self.bind,
83                    checkfirst=False,
84                    _ddl_runner=mock.ANY,
85                    _is_metadata_operation=mock.ANY,
86                ),
87                mock.call.after_create(
88                    table,
89                    self.bind,
90                    checkfirst=False,
91                    _ddl_runner=mock.ANY,
92                    _is_metadata_operation=mock.ANY,
93                ),
94            ],
95        )
96
97    def test_table_drop_before(self):
98        table, bind = self.table, self.bind
99        canary = mock.Mock()
100        event.listen(table, "before_drop", canary.before_drop)
101
102        table.create(bind)
103        table.drop(bind)
104        eq_(
105            canary.mock_calls,
106            [
107                mock.call.before_drop(
108                    table,
109                    self.bind,
110                    checkfirst=False,
111                    _ddl_runner=mock.ANY,
112                    _is_metadata_operation=mock.ANY,
113                )
114            ],
115        )
116
117    def test_table_drop_after(self):
118        table, bind = self.table, self.bind
119        canary = mock.Mock()
120        event.listen(table, "after_drop", canary.after_drop)
121
122        table.create(bind)
123        canary.state = "skipped"
124        table.drop(bind)
125        eq_(
126            canary.mock_calls,
127            [
128                mock.call.after_drop(
129                    table,
130                    self.bind,
131                    checkfirst=False,
132                    _ddl_runner=mock.ANY,
133                    _is_metadata_operation=mock.ANY,
134                )
135            ],
136        )
137
138    def test_table_drop_both(self):
139        table, bind = self.table, self.bind
140        canary = mock.Mock()
141
142        event.listen(table, "before_drop", canary.before_drop)
143        event.listen(table, "after_drop", canary.after_drop)
144
145        table.create(bind)
146        table.drop(bind)
147        eq_(
148            canary.mock_calls,
149            [
150                mock.call.before_drop(
151                    table,
152                    self.bind,
153                    checkfirst=False,
154                    _ddl_runner=mock.ANY,
155                    _is_metadata_operation=mock.ANY,
156                ),
157                mock.call.after_drop(
158                    table,
159                    self.bind,
160                    checkfirst=False,
161                    _ddl_runner=mock.ANY,
162                    _is_metadata_operation=mock.ANY,
163                ),
164            ],
165        )
166
167    def test_table_all(self):
168        table, bind = self.table, self.bind
169        canary = mock.Mock()
170
171        event.listen(table, "before_create", canary.before_create)
172        event.listen(table, "after_create", canary.after_create)
173        event.listen(table, "before_drop", canary.before_drop)
174        event.listen(table, "after_drop", canary.after_drop)
175
176        table.create(bind)
177        table.drop(bind)
178        eq_(
179            canary.mock_calls,
180            [
181                mock.call.before_create(
182                    table,
183                    self.bind,
184                    checkfirst=False,
185                    _ddl_runner=mock.ANY,
186                    _is_metadata_operation=mock.ANY,
187                ),
188                mock.call.after_create(
189                    table,
190                    self.bind,
191                    checkfirst=False,
192                    _ddl_runner=mock.ANY,
193                    _is_metadata_operation=mock.ANY,
194                ),
195                mock.call.before_drop(
196                    table,
197                    self.bind,
198                    checkfirst=False,
199                    _ddl_runner=mock.ANY,
200                    _is_metadata_operation=mock.ANY,
201                ),
202                mock.call.after_drop(
203                    table,
204                    self.bind,
205                    checkfirst=False,
206                    _ddl_runner=mock.ANY,
207                    _is_metadata_operation=mock.ANY,
208                ),
209            ],
210        )
211
212    def test_metadata_create_before(self):
213        metadata, bind = self.metadata, self.bind
214        canary = mock.Mock()
215        event.listen(metadata, "before_create", canary.before_create)
216
217        metadata.create_all(bind)
218        metadata.drop_all(bind)
219        eq_(
220            canary.mock_calls,
221            [
222                mock.call.before_create(
223                    # checkfirst is False because of the MockConnection
224                    # used in the current testing strategy.
225                    metadata,
226                    self.bind,
227                    checkfirst=False,
228                    tables=list(metadata.tables.values()),
229                    _ddl_runner=mock.ANY,
230                )
231            ],
232        )
233
234    def test_metadata_create_after(self):
235        metadata, bind = self.metadata, self.bind
236        canary = mock.Mock()
237        event.listen(metadata, "after_create", canary.after_create)
238
239        metadata.create_all(bind)
240        metadata.drop_all(bind)
241        eq_(
242            canary.mock_calls,
243            [
244                mock.call.after_create(
245                    metadata,
246                    self.bind,
247                    checkfirst=False,
248                    tables=list(metadata.tables.values()),
249                    _ddl_runner=mock.ANY,
250                )
251            ],
252        )
253
254    def test_metadata_create_both(self):
255        metadata, bind = self.metadata, self.bind
256        canary = mock.Mock()
257
258        event.listen(metadata, "before_create", canary.before_create)
259        event.listen(metadata, "after_create", canary.after_create)
260
261        metadata.create_all(bind)
262        metadata.drop_all(bind)
263        eq_(
264            canary.mock_calls,
265            [
266                mock.call.before_create(
267                    metadata,
268                    self.bind,
269                    checkfirst=False,
270                    tables=list(metadata.tables.values()),
271                    _ddl_runner=mock.ANY,
272                ),
273                mock.call.after_create(
274                    metadata,
275                    self.bind,
276                    checkfirst=False,
277                    tables=list(metadata.tables.values()),
278                    _ddl_runner=mock.ANY,
279                ),
280            ],
281        )
282
283    def test_metadata_drop_before(self):
284        metadata, bind = self.metadata, self.bind
285        canary = mock.Mock()
286        event.listen(metadata, "before_drop", canary.before_drop)
287
288        metadata.create_all(bind)
289        metadata.drop_all(bind)
290        eq_(
291            canary.mock_calls,
292            [
293                mock.call.before_drop(
294                    metadata,
295                    self.bind,
296                    checkfirst=False,
297                    tables=list(metadata.tables.values()),
298                    _ddl_runner=mock.ANY,
299                )
300            ],
301        )
302
303    def test_metadata_drop_after(self):
304        metadata, bind = self.metadata, self.bind
305        canary = mock.Mock()
306        event.listen(metadata, "after_drop", canary.after_drop)
307
308        metadata.create_all(bind)
309        metadata.drop_all(bind)
310        eq_(
311            canary.mock_calls,
312            [
313                mock.call.after_drop(
314                    metadata,
315                    self.bind,
316                    checkfirst=False,
317                    tables=list(metadata.tables.values()),
318                    _ddl_runner=mock.ANY,
319                )
320            ],
321        )
322
323    def test_metadata_drop_both(self):
324        metadata, bind = self.metadata, self.bind
325        canary = mock.Mock()
326
327        event.listen(metadata, "before_drop", canary.before_drop)
328        event.listen(metadata, "after_drop", canary.after_drop)
329
330        metadata.create_all(bind)
331        metadata.drop_all(bind)
332        eq_(
333            canary.mock_calls,
334            [
335                mock.call.before_drop(
336                    metadata,
337                    self.bind,
338                    checkfirst=False,
339                    tables=list(metadata.tables.values()),
340                    _ddl_runner=mock.ANY,
341                ),
342                mock.call.after_drop(
343                    metadata,
344                    self.bind,
345                    checkfirst=False,
346                    tables=list(metadata.tables.values()),
347                    _ddl_runner=mock.ANY,
348                ),
349            ],
350        )
351
352    def test_metadata_table_isolation(self):
353        metadata, table = self.metadata, self.table
354        table_canary = mock.Mock()
355        metadata_canary = mock.Mock()
356
357        event.listen(table, "before_create", table_canary.before_create)
358
359        event.listen(metadata, "before_create", metadata_canary.before_create)
360        self.table.create(self.bind)
361        eq_(
362            table_canary.mock_calls,
363            [
364                mock.call.before_create(
365                    table,
366                    self.bind,
367                    checkfirst=False,
368                    _ddl_runner=mock.ANY,
369                    _is_metadata_operation=mock.ANY,
370                )
371            ],
372        )
373        eq_(metadata_canary.mock_calls, [])
374
375
376class DDLExecutionTest(fixtures.TestBase):
377    def setup_test(self):
378        self.engine = engines.mock_engine()
379        self.metadata = MetaData()
380        self.users = Table(
381            "users",
382            self.metadata,
383            Column("user_id", Integer, primary_key=True),
384            Column("user_name", String(40)),
385        )
386
387    def test_table_standalone(self):
388        users, engine = self.users, self.engine
389        event.listen(users, "before_create", DDL("mxyzptlk"))
390        event.listen(users, "after_create", DDL("klptzyxm"))
391        event.listen(users, "before_drop", DDL("xyzzy"))
392        event.listen(users, "after_drop", DDL("fnord"))
393
394        users.create(self.engine)
395        strings = [str(x) for x in engine.mock]
396        assert "mxyzptlk" in strings
397        assert "klptzyxm" in strings
398        assert "xyzzy" not in strings
399        assert "fnord" not in strings
400        del engine.mock[:]
401        users.drop(self.engine)
402        strings = [str(x) for x in engine.mock]
403        assert "mxyzptlk" not in strings
404        assert "klptzyxm" not in strings
405        assert "xyzzy" in strings
406        assert "fnord" in strings
407
408    def test_table_by_metadata(self):
409        metadata, users, engine = self.metadata, self.users, self.engine
410
411        event.listen(users, "before_create", DDL("mxyzptlk"))
412        event.listen(users, "after_create", DDL("klptzyxm"))
413        event.listen(users, "before_drop", DDL("xyzzy"))
414        event.listen(users, "after_drop", DDL("fnord"))
415
416        metadata.create_all(self.engine)
417        strings = [str(x) for x in engine.mock]
418        assert "mxyzptlk" in strings
419        assert "klptzyxm" in strings
420        assert "xyzzy" not in strings
421        assert "fnord" not in strings
422        del engine.mock[:]
423        metadata.drop_all(self.engine)
424        strings = [str(x) for x in engine.mock]
425        assert "mxyzptlk" not in strings
426        assert "klptzyxm" not in strings
427        assert "xyzzy" in strings
428        assert "fnord" in strings
429
430    def test_metadata(self):
431        metadata, engine = self.metadata, self.engine
432
433        event.listen(metadata, "before_create", DDL("mxyzptlk"))
434        event.listen(metadata, "after_create", DDL("klptzyxm"))
435        event.listen(metadata, "before_drop", DDL("xyzzy"))
436        event.listen(metadata, "after_drop", DDL("fnord"))
437
438        metadata.create_all(self.engine)
439        strings = [str(x) for x in engine.mock]
440        assert "mxyzptlk" in strings
441        assert "klptzyxm" in strings
442        assert "xyzzy" not in strings
443        assert "fnord" not in strings
444        del engine.mock[:]
445        metadata.drop_all(self.engine)
446        strings = [str(x) for x in engine.mock]
447        assert "mxyzptlk" not in strings
448        assert "klptzyxm" not in strings
449        assert "xyzzy" in strings
450        assert "fnord" in strings
451
452    def test_conditional_constraint(self):
453        metadata, users = self.metadata, self.users
454        nonpg_mock = engines.mock_engine(dialect_name="sqlite")
455        pg_mock = engines.mock_engine(dialect_name="postgresql")
456        constraint = CheckConstraint(
457            "a < b", name="my_test_constraint", table=users
458        )
459
460        # by placing the constraint in an Add/Drop construct, the
461        # 'inline_ddl' flag is set to False
462
463        event.listen(
464            users,
465            "after_create",
466            AddConstraint(constraint).execute_if(dialect="postgresql"),
467        )
468
469        event.listen(
470            users,
471            "before_drop",
472            DropConstraint(constraint).execute_if(dialect="postgresql"),
473        )
474
475        metadata.create_all(bind=nonpg_mock)
476        strings = " ".join(str(x) for x in nonpg_mock.mock)
477        assert "my_test_constraint" not in strings
478        metadata.drop_all(bind=nonpg_mock)
479        strings = " ".join(str(x) for x in nonpg_mock.mock)
480        assert "my_test_constraint" not in strings
481        metadata.create_all(bind=pg_mock)
482        strings = " ".join(str(x) for x in pg_mock.mock)
483        assert "my_test_constraint" in strings
484        metadata.drop_all(bind=pg_mock)
485        strings = " ".join(str(x) for x in pg_mock.mock)
486        assert "my_test_constraint" in strings
487
488    @testing.requires.sqlite
489    def test_ddl_execute(self):
490        engine = create_engine("sqlite:///")
491        cx = engine.connect()
492        cx.begin()
493        ddl = DDL("SELECT 1")
494
495        r = cx.execute(ddl)
496        eq_(list(r), [(1,)])
497
498    def test_platform_escape(self):
499        """test the escaping of % characters in the DDL construct."""
500
501        default_from = testing.db.dialect.statement_compiler(
502            testing.db.dialect, None
503        ).default_from()
504
505        # We're abusing the DDL()
506        # construct here by pushing a SELECT through it
507        # so that we can verify the round trip.
508        # the DDL() will trigger autocommit, which prohibits
509        # some DBAPIs from returning results (pyodbc), so we
510        # run in an explicit transaction.
511        with testing.db.begin() as conn:
512            eq_(
513                conn.execute(
514                    text("select 'foo%something'" + default_from)
515                ).scalar(),
516                "foo%something",
517            )
518
519            eq_(
520                conn.execute(
521                    DDL("select 'foo%%something'" + default_from)
522                ).scalar(),
523                "foo%something",
524            )
525
526
527class DDLTransactionTest(fixtures.TestBase):
528    """test DDL transactional behavior as of SQLAlchemy 1.4."""
529
530    @testing.fixture
531    def metadata_fixture(self):
532        m = MetaData()
533        Table("t1", m, Column("q", Integer))
534        Table("t2", m, Column("q", Integer))
535
536        try:
537            yield m
538        finally:
539            m.drop_all(testing.db)
540
541    def _listening_engine_fixture(self, future=False):
542        eng = engines.testing_engine(future=future)
543
544        m1 = mock.Mock()
545
546        event.listen(eng, "begin", m1.begin)
547        event.listen(eng, "commit", m1.commit)
548        event.listen(eng, "rollback", m1.rollback)
549
550        @event.listens_for(eng, "before_cursor_execute")
551        def before_cursor_execute(
552            conn, cursor, statement, parameters, context, executemany
553        ):
554            if "CREATE TABLE" in statement:
555                m1.cursor_execute("CREATE TABLE ...")
556
557        eng.connect().close()
558
559        return eng, m1
560
561    @testing.fixture
562    def listening_engine_fixture(self):
563        return self._listening_engine_fixture(future=False)
564
565    @testing.fixture
566    def future_listening_engine_fixture(self):
567        return self._listening_engine_fixture(future=True)
568
569    def test_ddl_legacy_engine(
570        self, metadata_fixture, listening_engine_fixture
571    ):
572        eng, m1 = listening_engine_fixture
573
574        metadata_fixture.create_all(eng)
575
576        eq_(
577            m1.mock_calls,
578            [
579                mock.call.begin(mock.ANY),
580                mock.call.cursor_execute("CREATE TABLE ..."),
581                mock.call.cursor_execute("CREATE TABLE ..."),
582                mock.call.commit(mock.ANY),
583            ],
584        )
585
586    def test_ddl_future_engine(
587        self, metadata_fixture, future_listening_engine_fixture
588    ):
589        eng, m1 = future_listening_engine_fixture
590
591        metadata_fixture.create_all(eng)
592
593        eq_(
594            m1.mock_calls,
595            [
596                mock.call.begin(mock.ANY),
597                mock.call.cursor_execute("CREATE TABLE ..."),
598                mock.call.cursor_execute("CREATE TABLE ..."),
599                mock.call.commit(mock.ANY),
600            ],
601        )
602
603    def test_ddl_legacy_connection_no_transaction(
604        self, metadata_fixture, listening_engine_fixture
605    ):
606        eng, m1 = listening_engine_fixture
607
608        with eng.connect() as conn:
609            with testing.expect_deprecated(
610                "The current statement is being autocommitted using "
611                "implicit autocommit"
612            ):
613                metadata_fixture.create_all(conn)
614
615        eq_(
616            m1.mock_calls,
617            [
618                mock.call.cursor_execute("CREATE TABLE ..."),
619                mock.call.commit(mock.ANY),
620                mock.call.cursor_execute("CREATE TABLE ..."),
621                mock.call.commit(mock.ANY),
622            ],
623        )
624
625    def test_ddl_legacy_connection_transaction(
626        self, metadata_fixture, listening_engine_fixture
627    ):
628        eng, m1 = listening_engine_fixture
629
630        with eng.connect() as conn:
631            with conn.begin():
632                metadata_fixture.create_all(conn)
633
634        eq_(
635            m1.mock_calls,
636            [
637                mock.call.begin(mock.ANY),
638                mock.call.cursor_execute("CREATE TABLE ..."),
639                mock.call.cursor_execute("CREATE TABLE ..."),
640                mock.call.commit(mock.ANY),
641            ],
642        )
643
644    def test_ddl_future_connection_autobegin_transaction(
645        self, metadata_fixture, future_listening_engine_fixture
646    ):
647        eng, m1 = future_listening_engine_fixture
648
649        with eng.connect() as conn:
650            metadata_fixture.create_all(conn)
651
652            conn.commit()
653
654        eq_(
655            m1.mock_calls,
656            [
657                mock.call.begin(mock.ANY),
658                mock.call.cursor_execute("CREATE TABLE ..."),
659                mock.call.cursor_execute("CREATE TABLE ..."),
660                mock.call.commit(mock.ANY),
661            ],
662        )
663
664    def test_ddl_future_connection_explicit_begin_transaction(
665        self, metadata_fixture, future_listening_engine_fixture
666    ):
667        eng, m1 = future_listening_engine_fixture
668
669        with eng.connect() as conn:
670            with conn.begin():
671                metadata_fixture.create_all(conn)
672
673        eq_(
674            m1.mock_calls,
675            [
676                mock.call.begin(mock.ANY),
677                mock.call.cursor_execute("CREATE TABLE ..."),
678                mock.call.cursor_execute("CREATE TABLE ..."),
679                mock.call.commit(mock.ANY),
680            ],
681        )
682
683
684class DDLTest(fixtures.TestBase, AssertsCompiledSQL):
685    def mock_engine(self):
686        def executor(*a, **kw):
687            return None
688
689        engine = create_mock_engine(testing.db.name + "://", executor)
690        # fmt: off
691        engine.dialect.identifier_preparer = \
692            tsa.sql.compiler.IdentifierPreparer(
693                engine.dialect
694            )
695        # fmt: on
696        return engine
697
698    def test_tokens(self):
699        m = MetaData()
700        sane_alone = Table("t", m, Column("id", Integer))
701        sane_schema = Table("t", m, Column("id", Integer), schema="s")
702        insane_alone = Table("t t", m, Column("id", Integer))
703        insane_schema = Table("t t", m, Column("id", Integer), schema="s s")
704        ddl = DDL("%(schema)s-%(table)s-%(fullname)s")
705        dialect = self.mock_engine().dialect
706        self.assert_compile(ddl.against(sane_alone), "-t-t", dialect=dialect)
707        self.assert_compile(
708            ddl.against(sane_schema), "s-t-s.t", dialect=dialect
709        )
710        self.assert_compile(
711            ddl.against(insane_alone), '-"t t"-"t t"', dialect=dialect
712        )
713        self.assert_compile(
714            ddl.against(insane_schema),
715            '"s s"-"t t"-"s s"."t t"',
716            dialect=dialect,
717        )
718
719        # overrides are used piece-meal and verbatim.
720
721        ddl = DDL(
722            "%(schema)s-%(table)s-%(fullname)s-%(bonus)s",
723            context={"schema": "S S", "table": "T T", "bonus": "b"},
724        )
725        self.assert_compile(
726            ddl.against(sane_alone), "S S-T T-t-b", dialect=dialect
727        )
728        self.assert_compile(
729            ddl.against(sane_schema), "S S-T T-s.t-b", dialect=dialect
730        )
731        self.assert_compile(
732            ddl.against(insane_alone), 'S S-T T-"t t"-b', dialect=dialect
733        )
734        self.assert_compile(
735            ddl.against(insane_schema),
736            'S S-T T-"s s"."t t"-b',
737            dialect=dialect,
738        )
739
740    def test_filter(self):
741        cx = self.mock_engine()
742
743        tbl = Table("t", MetaData(), Column("id", Integer))
744        target = cx.name
745
746        assert DDL("")._should_execute(tbl, cx)
747        assert DDL("").execute_if(dialect=target)._should_execute(tbl, cx)
748        assert not DDL("").execute_if(dialect="bogus")._should_execute(tbl, cx)
749        assert (
750            DDL("")
751            .execute_if(callable_=lambda d, y, z, **kw: True)
752            ._should_execute(tbl, cx)
753        )
754        assert (
755            DDL("")
756            .execute_if(
757                callable_=lambda d, y, z, **kw: z.engine.name != "bogus"
758            )
759            ._should_execute(tbl, cx)
760        )
761