1from sqlalchemy import CheckConstraint
2from sqlalchemy import Column
3from sqlalchemy import exc
4from sqlalchemy import ForeignKey
5from sqlalchemy import ForeignKeyConstraint
6from sqlalchemy import func
7from sqlalchemy import Index
8from sqlalchemy import Integer
9from sqlalchemy import MetaData
10from sqlalchemy import PrimaryKeyConstraint
11from sqlalchemy import schema
12from sqlalchemy import String
13from sqlalchemy import Table
14from sqlalchemy import testing
15from sqlalchemy import text
16from sqlalchemy import UniqueConstraint
17from sqlalchemy.engine import default
18from sqlalchemy.testing import assert_raises
19from sqlalchemy.testing import assert_raises_message
20from sqlalchemy.testing import AssertsCompiledSQL
21from sqlalchemy.testing import AssertsExecutionResults
22from sqlalchemy.testing import engines
23from sqlalchemy.testing import eq_
24from sqlalchemy.testing import fixtures
25from sqlalchemy.testing.assertions import expect_warnings
26from sqlalchemy.testing.assertsql import AllOf
27from sqlalchemy.testing.assertsql import CompiledSQL
28from sqlalchemy.testing.assertsql import DialectSQL
29from sqlalchemy.testing.assertsql import RegexSQL
30
31
32class ConstraintGenTest(fixtures.TestBase, AssertsExecutionResults):
33    __dialect__ = "default"
34    __backend__ = True
35
36    @testing.provide_metadata
37    def test_pk_fk_constraint_create(self):
38        metadata = self.metadata
39
40        Table(
41            "employees",
42            metadata,
43            Column("id", Integer),
44            Column("soc", String(40)),
45            Column("name", String(30)),
46            PrimaryKeyConstraint("id", "soc"),
47        )
48        Table(
49            "elements",
50            metadata,
51            Column("id", Integer),
52            Column("stuff", String(30)),
53            Column("emp_id", Integer),
54            Column("emp_soc", String(40)),
55            PrimaryKeyConstraint("id", name="elements_primkey"),
56            ForeignKeyConstraint(
57                ["emp_id", "emp_soc"], ["employees.id", "employees.soc"]
58            ),
59        )
60        self.assert_sql_execution(
61            testing.db,
62            lambda: metadata.create_all(checkfirst=False),
63            CompiledSQL(
64                "CREATE TABLE employees ("
65                "id INTEGER NOT NULL, "
66                "soc VARCHAR(40) NOT NULL, "
67                "name VARCHAR(30), "
68                "PRIMARY KEY (id, soc)"
69                ")"
70            ),
71            CompiledSQL(
72                "CREATE TABLE elements ("
73                "id INTEGER NOT NULL, "
74                "stuff VARCHAR(30), "
75                "emp_id INTEGER, "
76                "emp_soc VARCHAR(40), "
77                "CONSTRAINT elements_primkey PRIMARY KEY (id), "
78                "FOREIGN KEY(emp_id, emp_soc) "
79                "REFERENCES employees (id, soc)"
80                ")"
81            ),
82        )
83
84    @testing.force_drop_names("a", "b")
85    def test_fk_cant_drop_cycled_unnamed(self):
86        metadata = MetaData()
87
88        Table(
89            "a",
90            metadata,
91            Column("id", Integer, primary_key=True),
92            Column("bid", Integer),
93            ForeignKeyConstraint(["bid"], ["b.id"]),
94        )
95        Table(
96            "b",
97            metadata,
98            Column("id", Integer, primary_key=True),
99            Column("aid", Integer),
100            ForeignKeyConstraint(["aid"], ["a.id"]),
101        )
102        metadata.create_all(testing.db)
103        if testing.db.dialect.supports_alter:
104            assert_raises_message(
105                exc.CircularDependencyError,
106                "Can't sort tables for DROP; an unresolvable foreign key "
107                "dependency exists between tables: a, b.  Please ensure "
108                "that the ForeignKey and ForeignKeyConstraint objects "
109                "involved in the cycle have names so that they can be "
110                "dropped using DROP CONSTRAINT.",
111                metadata.drop_all,
112                testing.db,
113            )
114        else:
115            with expect_warnings(
116                "Can't sort tables for DROP; an unresolvable "
117                "foreign key dependency "
118            ):
119                with self.sql_execution_asserter() as asserter:
120                    metadata.drop_all(testing.db, checkfirst=False)
121
122            asserter.assert_(
123                AllOf(CompiledSQL("DROP TABLE a"), CompiledSQL("DROP TABLE b"))
124            )
125
126    @testing.provide_metadata
127    def test_fk_table_auto_alter_constraint_create(self):
128        metadata = self.metadata
129
130        Table(
131            "a",
132            metadata,
133            Column("id", Integer, primary_key=True),
134            Column("bid", Integer),
135            ForeignKeyConstraint(["bid"], ["b.id"]),
136        )
137        Table(
138            "b",
139            metadata,
140            Column("id", Integer, primary_key=True),
141            Column("aid", Integer),
142            ForeignKeyConstraint(["aid"], ["a.id"], name="bfk"),
143        )
144        self._assert_cyclic_constraint(
145            metadata, auto=True, sqlite_warning=True
146        )
147
148    @testing.provide_metadata
149    def test_fk_column_auto_alter_inline_constraint_create(self):
150        metadata = self.metadata
151
152        Table(
153            "a",
154            metadata,
155            Column("id", Integer, primary_key=True),
156            Column("bid", Integer, ForeignKey("b.id")),
157        )
158        Table(
159            "b",
160            metadata,
161            Column("id", Integer, primary_key=True),
162            Column("aid", Integer, ForeignKey("a.id", name="bfk")),
163        )
164        self._assert_cyclic_constraint(
165            metadata, auto=True, sqlite_warning=True
166        )
167
168    @testing.provide_metadata
169    def test_fk_column_use_alter_inline_constraint_create(self):
170        metadata = self.metadata
171
172        Table(
173            "a",
174            metadata,
175            Column("id", Integer, primary_key=True),
176            Column("bid", Integer, ForeignKey("b.id")),
177        )
178        Table(
179            "b",
180            metadata,
181            Column("id", Integer, primary_key=True),
182            Column(
183                "aid", Integer, ForeignKey("a.id", name="bfk", use_alter=True)
184            ),
185        )
186        self._assert_cyclic_constraint(metadata, auto=False)
187
188    @testing.provide_metadata
189    def test_fk_table_use_alter_constraint_create(self):
190        metadata = self.metadata
191
192        Table(
193            "a",
194            metadata,
195            Column("id", Integer, primary_key=True),
196            Column("bid", Integer),
197            ForeignKeyConstraint(["bid"], ["b.id"]),
198        )
199        Table(
200            "b",
201            metadata,
202            Column("id", Integer, primary_key=True),
203            Column("aid", Integer),
204            ForeignKeyConstraint(
205                ["aid"], ["a.id"], use_alter=True, name="bfk"
206            ),
207        )
208        self._assert_cyclic_constraint(metadata)
209
210    @testing.provide_metadata
211    def test_fk_column_use_alter_constraint_create(self):
212        metadata = self.metadata
213
214        Table(
215            "a",
216            metadata,
217            Column("id", Integer, primary_key=True),
218            Column("bid", Integer, ForeignKey("b.id")),
219        )
220        Table(
221            "b",
222            metadata,
223            Column("id", Integer, primary_key=True),
224            Column(
225                "aid", Integer, ForeignKey("a.id", use_alter=True, name="bfk")
226            ),
227        )
228        self._assert_cyclic_constraint(metadata, auto=False)
229
230    def _assert_cyclic_constraint(
231        self, metadata, auto=False, sqlite_warning=False
232    ):
233        if testing.db.dialect.supports_alter:
234            self._assert_cyclic_constraint_supports_alter(metadata, auto=auto)
235        else:
236            self._assert_cyclic_constraint_no_alter(
237                metadata, auto=auto, sqlite_warning=sqlite_warning
238            )
239
240    def _assert_cyclic_constraint_supports_alter(self, metadata, auto=False):
241        table_assertions = []
242        if auto:
243            table_assertions = [
244                CompiledSQL(
245                    "CREATE TABLE b ("
246                    "id INTEGER NOT NULL, "
247                    "aid INTEGER, "
248                    "PRIMARY KEY (id)"
249                    ")"
250                ),
251                CompiledSQL(
252                    "CREATE TABLE a ("
253                    "id INTEGER NOT NULL, "
254                    "bid INTEGER, "
255                    "PRIMARY KEY (id)"
256                    ")"
257                ),
258            ]
259        else:
260            table_assertions = [
261                CompiledSQL(
262                    "CREATE TABLE b ("
263                    "id INTEGER NOT NULL, "
264                    "aid INTEGER, "
265                    "PRIMARY KEY (id)"
266                    ")"
267                ),
268                CompiledSQL(
269                    "CREATE TABLE a ("
270                    "id INTEGER NOT NULL, "
271                    "bid INTEGER, "
272                    "PRIMARY KEY (id), "
273                    "FOREIGN KEY(bid) REFERENCES b (id)"
274                    ")"
275                ),
276            ]
277
278        assertions = [AllOf(*table_assertions)]
279        fk_assertions = []
280        fk_assertions.append(
281            CompiledSQL(
282                "ALTER TABLE b ADD CONSTRAINT bfk "
283                "FOREIGN KEY(aid) REFERENCES a (id)"
284            )
285        )
286        if auto:
287            fk_assertions.append(
288                CompiledSQL(
289                    "ALTER TABLE a ADD " "FOREIGN KEY(bid) REFERENCES b (id)"
290                )
291            )
292        assertions.append(AllOf(*fk_assertions))
293
294        with self.sql_execution_asserter() as asserter:
295            metadata.create_all(checkfirst=False)
296        asserter.assert_(*assertions)
297
298        assertions = [
299            CompiledSQL("ALTER TABLE b DROP CONSTRAINT bfk"),
300            CompiledSQL("DROP TABLE a"),
301            CompiledSQL("DROP TABLE b"),
302        ]
303
304        with self.sql_execution_asserter() as asserter:
305            metadata.drop_all(checkfirst=False),
306        asserter.assert_(*assertions)
307
308    def _assert_cyclic_constraint_no_alter(
309        self, metadata, auto=False, sqlite_warning=False
310    ):
311        table_assertions = []
312        if auto:
313            table_assertions.append(
314                DialectSQL(
315                    "CREATE TABLE b ("
316                    "id INTEGER NOT NULL, "
317                    "aid INTEGER, "
318                    "PRIMARY KEY (id), "
319                    "CONSTRAINT bfk FOREIGN KEY(aid) REFERENCES a (id)"
320                    ")"
321                )
322            )
323            table_assertions.append(
324                DialectSQL(
325                    "CREATE TABLE a ("
326                    "id INTEGER NOT NULL, "
327                    "bid INTEGER, "
328                    "PRIMARY KEY (id), "
329                    "FOREIGN KEY(bid) REFERENCES b (id)"
330                    ")"
331                )
332            )
333        else:
334            table_assertions.append(
335                DialectSQL(
336                    "CREATE TABLE b ("
337                    "id INTEGER NOT NULL, "
338                    "aid INTEGER, "
339                    "PRIMARY KEY (id), "
340                    "CONSTRAINT bfk FOREIGN KEY(aid) REFERENCES a (id)"
341                    ")"
342                )
343            )
344
345            table_assertions.append(
346                DialectSQL(
347                    "CREATE TABLE a ("
348                    "id INTEGER NOT NULL, "
349                    "bid INTEGER, "
350                    "PRIMARY KEY (id), "
351                    "FOREIGN KEY(bid) REFERENCES b (id)"
352                    ")"
353                )
354            )
355
356        assertions = [AllOf(*table_assertions)]
357
358        with self.sql_execution_asserter() as asserter:
359            metadata.create_all(checkfirst=False)
360        asserter.assert_(*assertions)
361
362        assertions = [
363            AllOf(CompiledSQL("DROP TABLE a"), CompiledSQL("DROP TABLE b"))
364        ]
365
366        if sqlite_warning:
367            with expect_warnings("Can't sort tables for DROP; "):
368                with self.sql_execution_asserter() as asserter:
369                    metadata.drop_all(checkfirst=False),
370        else:
371            with self.sql_execution_asserter() as asserter:
372                metadata.drop_all(checkfirst=False),
373        asserter.assert_(*assertions)
374
375    @testing.force_drop_names("a", "b")
376    def test_cycle_unnamed_fks(self):
377        metadata = MetaData(testing.db)
378
379        Table(
380            "a",
381            metadata,
382            Column("id", Integer, primary_key=True),
383            Column("bid", Integer, ForeignKey("b.id")),
384        )
385
386        Table(
387            "b",
388            metadata,
389            Column("id", Integer, primary_key=True),
390            Column("aid", Integer, ForeignKey("a.id")),
391        )
392
393        assertions = [
394            AllOf(
395                CompiledSQL(
396                    "CREATE TABLE b ("
397                    "id INTEGER NOT NULL, "
398                    "aid INTEGER, "
399                    "PRIMARY KEY (id)"
400                    ")"
401                ),
402                CompiledSQL(
403                    "CREATE TABLE a ("
404                    "id INTEGER NOT NULL, "
405                    "bid INTEGER, "
406                    "PRIMARY KEY (id)"
407                    ")"
408                ),
409            ),
410            AllOf(
411                CompiledSQL(
412                    "ALTER TABLE b ADD " "FOREIGN KEY(aid) REFERENCES a (id)"
413                ),
414                CompiledSQL(
415                    "ALTER TABLE a ADD " "FOREIGN KEY(bid) REFERENCES b (id)"
416                ),
417            ),
418        ]
419        with self.sql_execution_asserter() as asserter:
420            metadata.create_all(checkfirst=False)
421
422        if testing.db.dialect.supports_alter:
423            asserter.assert_(*assertions)
424
425            assert_raises_message(
426                exc.CircularDependencyError,
427                "Can't sort tables for DROP; an unresolvable foreign key "
428                "dependency exists between tables: a, b.  "
429                "Please ensure that the "
430                "ForeignKey and ForeignKeyConstraint objects involved in the "
431                "cycle have names so that they can be dropped using "
432                "DROP CONSTRAINT.",
433                metadata.drop_all,
434                checkfirst=False,
435            )
436        else:
437            with expect_warnings(
438                "Can't sort tables for DROP; an unresolvable "
439                "foreign key dependency exists between tables"
440            ):
441                with self.sql_execution_asserter() as asserter:
442                    metadata.drop_all(checkfirst=False)
443
444            asserter.assert_(
445                AllOf(CompiledSQL("DROP TABLE b"), CompiledSQL("DROP TABLE a"))
446            )
447
448    @testing.force_drop_names("a", "b")
449    def test_cycle_named_fks(self):
450        metadata = MetaData(testing.db)
451
452        Table(
453            "a",
454            metadata,
455            Column("id", Integer, primary_key=True),
456            Column("bid", Integer, ForeignKey("b.id")),
457        )
458
459        Table(
460            "b",
461            metadata,
462            Column("id", Integer, primary_key=True),
463            Column(
464                "aid",
465                Integer,
466                ForeignKey("a.id", use_alter=True, name="aidfk"),
467            ),
468        )
469
470        assertions = [
471            AllOf(
472                CompiledSQL(
473                    "CREATE TABLE b ("
474                    "id INTEGER NOT NULL, "
475                    "aid INTEGER, "
476                    "PRIMARY KEY (id)"
477                    ")"
478                ),
479                CompiledSQL(
480                    "CREATE TABLE a ("
481                    "id INTEGER NOT NULL, "
482                    "bid INTEGER, "
483                    "PRIMARY KEY (id), "
484                    "FOREIGN KEY(bid) REFERENCES b (id)"
485                    ")"
486                ),
487            ),
488            CompiledSQL(
489                "ALTER TABLE b ADD CONSTRAINT aidfk "
490                "FOREIGN KEY(aid) REFERENCES a (id)"
491            ),
492        ]
493        with self.sql_execution_asserter() as asserter:
494            metadata.create_all(checkfirst=False)
495
496        if testing.db.dialect.supports_alter:
497            asserter.assert_(*assertions)
498
499            with self.sql_execution_asserter() as asserter:
500                metadata.drop_all(checkfirst=False)
501
502            asserter.assert_(
503                CompiledSQL("ALTER TABLE b DROP CONSTRAINT aidfk"),
504                AllOf(
505                    CompiledSQL("DROP TABLE b"), CompiledSQL("DROP TABLE a")
506                ),
507            )
508        else:
509            with self.sql_execution_asserter() as asserter:
510                metadata.drop_all(checkfirst=False)
511
512            asserter.assert_(
513                AllOf(CompiledSQL("DROP TABLE b"), CompiledSQL("DROP TABLE a"))
514            )
515
516    @testing.requires.check_constraints
517    @testing.provide_metadata
518    def test_check_constraint_create(self):
519        metadata = self.metadata
520
521        Table(
522            "foo",
523            metadata,
524            Column("id", Integer, primary_key=True),
525            Column("x", Integer),
526            Column("y", Integer),
527            CheckConstraint("x>y"),
528        )
529        Table(
530            "bar",
531            metadata,
532            Column("id", Integer, primary_key=True),
533            Column("x", Integer, CheckConstraint("x>7")),
534            Column("z", Integer),
535        )
536
537        self.assert_sql_execution(
538            testing.db,
539            lambda: metadata.create_all(checkfirst=False),
540            AllOf(
541                CompiledSQL(
542                    "CREATE TABLE foo ("
543                    "id INTEGER NOT NULL, "
544                    "x INTEGER, "
545                    "y INTEGER, "
546                    "PRIMARY KEY (id), "
547                    "CHECK (x>y)"
548                    ")"
549                ),
550                CompiledSQL(
551                    "CREATE TABLE bar ("
552                    "id INTEGER NOT NULL, "
553                    "x INTEGER CHECK (x>7), "
554                    "z INTEGER, "
555                    "PRIMARY KEY (id)"
556                    ")"
557                ),
558            ),
559        )
560
561    @testing.provide_metadata
562    def test_unique_constraint_create(self):
563        metadata = self.metadata
564
565        Table(
566            "foo",
567            metadata,
568            Column("id", Integer, primary_key=True),
569            Column("value", String(30), unique=True),
570        )
571        Table(
572            "bar",
573            metadata,
574            Column("id", Integer, primary_key=True),
575            Column("value", String(30)),
576            Column("value2", String(30)),
577            UniqueConstraint("value", "value2", name="uix1"),
578        )
579
580        self.assert_sql_execution(
581            testing.db,
582            lambda: metadata.create_all(checkfirst=False),
583            AllOf(
584                CompiledSQL(
585                    "CREATE TABLE foo ("
586                    "id INTEGER NOT NULL, "
587                    "value VARCHAR(30), "
588                    "PRIMARY KEY (id), "
589                    "UNIQUE (value)"
590                    ")"
591                ),
592                CompiledSQL(
593                    "CREATE TABLE bar ("
594                    "id INTEGER NOT NULL, "
595                    "value VARCHAR(30), "
596                    "value2 VARCHAR(30), "
597                    "PRIMARY KEY (id), "
598                    "CONSTRAINT uix1 UNIQUE (value, value2)"
599                    ")"
600                ),
601            ),
602        )
603
604    @testing.provide_metadata
605    def test_index_create(self):
606        metadata = self.metadata
607
608        employees = Table(
609            "employees",
610            metadata,
611            Column("id", Integer, primary_key=True),
612            Column("first_name", String(30)),
613            Column("last_name", String(30)),
614            Column("email_address", String(30)),
615        )
616
617        i = Index(
618            "employee_name_index",
619            employees.c.last_name,
620            employees.c.first_name,
621        )
622        assert i in employees.indexes
623
624        i2 = Index(
625            "employee_email_index", employees.c.email_address, unique=True
626        )
627        assert i2 in employees.indexes
628
629        self.assert_sql_execution(
630            testing.db,
631            lambda: metadata.create_all(checkfirst=False),
632            RegexSQL("^CREATE TABLE"),
633            AllOf(
634                CompiledSQL(
635                    "CREATE INDEX employee_name_index ON "
636                    "employees (last_name, first_name)",
637                    [],
638                ),
639                CompiledSQL(
640                    "CREATE UNIQUE INDEX employee_email_index ON "
641                    "employees (email_address)",
642                    [],
643                ),
644            ),
645        )
646
647    @testing.provide_metadata
648    def test_index_create_camelcase(self):
649        """test that mixed-case index identifiers are legal"""
650
651        metadata = self.metadata
652
653        employees = Table(
654            "companyEmployees",
655            metadata,
656            Column("id", Integer, primary_key=True),
657            Column("firstName", String(30)),
658            Column("lastName", String(30)),
659            Column("emailAddress", String(30)),
660        )
661
662        Index("employeeNameIndex", employees.c.lastName, employees.c.firstName)
663
664        Index("employeeEmailIndex", employees.c.emailAddress, unique=True)
665
666        self.assert_sql_execution(
667            testing.db,
668            lambda: metadata.create_all(checkfirst=False),
669            RegexSQL("^CREATE TABLE"),
670            AllOf(
671                CompiledSQL(
672                    'CREATE INDEX "employeeNameIndex" ON '
673                    '"companyEmployees" ("lastName", "firstName")',
674                    [],
675                ),
676                CompiledSQL(
677                    'CREATE UNIQUE INDEX "employeeEmailIndex" ON '
678                    '"companyEmployees" ("emailAddress")',
679                    [],
680                ),
681            ),
682        )
683
684    @testing.provide_metadata
685    def test_index_create_inline(self):
686        # test an index create using index=True, unique=True
687
688        metadata = self.metadata
689
690        events = Table(
691            "events",
692            metadata,
693            Column("id", Integer, primary_key=True),
694            Column("name", String(30), index=True, unique=True),
695            Column("location", String(30), index=True),
696            Column("sport", String(30)),
697            Column("announcer", String(30)),
698            Column("winner", String(30)),
699        )
700
701        Index(
702            "sport_announcer", events.c.sport, events.c.announcer, unique=True
703        )
704        Index("idx_winners", events.c.winner)
705
706        eq_(
707            set(ix.name for ix in events.indexes),
708            set(
709                [
710                    "ix_events_name",
711                    "ix_events_location",
712                    "sport_announcer",
713                    "idx_winners",
714                ]
715            ),
716        )
717
718        self.assert_sql_execution(
719            testing.db,
720            lambda: events.create(testing.db),
721            RegexSQL("^CREATE TABLE events"),
722            AllOf(
723                CompiledSQL(
724                    "CREATE UNIQUE INDEX ix_events_name ON events " "(name)"
725                ),
726                CompiledSQL(
727                    "CREATE INDEX ix_events_location ON events " "(location)"
728                ),
729                CompiledSQL(
730                    "CREATE UNIQUE INDEX sport_announcer ON events "
731                    "(sport, announcer)"
732                ),
733                CompiledSQL("CREATE INDEX idx_winners ON events (winner)"),
734            ),
735        )
736
737    @testing.provide_metadata
738    def test_index_functional_create(self):
739        metadata = self.metadata
740
741        t = Table(
742            "sometable",
743            metadata,
744            Column("id", Integer, primary_key=True),
745            Column("data", String(50)),
746        )
747        Index("myindex", t.c.data.desc())
748        self.assert_sql_execution(
749            testing.db,
750            lambda: t.create(testing.db),
751            CompiledSQL(
752                "CREATE TABLE sometable (id INTEGER NOT NULL, "
753                "data VARCHAR(50), PRIMARY KEY (id))"
754            ),
755            CompiledSQL("CREATE INDEX myindex ON sometable (data DESC)"),
756        )
757
758
759class ConstraintCompilationTest(fixtures.TestBase, AssertsCompiledSQL):
760    __dialect__ = "default"
761
762    def test_create_index_plain(self):
763        t = Table("t", MetaData(), Column("x", Integer))
764        i = Index("xyz", t.c.x)
765        self.assert_compile(schema.CreateIndex(i), "CREATE INDEX xyz ON t (x)")
766
767    def test_drop_index_plain_unattached(self):
768        self.assert_compile(
769            schema.DropIndex(Index(name="xyz")), "DROP INDEX xyz"
770        )
771
772    def test_drop_index_plain(self):
773        self.assert_compile(
774            schema.DropIndex(Index(name="xyz")), "DROP INDEX xyz"
775        )
776
777    def test_create_index_schema(self):
778        t = Table("t", MetaData(), Column("x", Integer), schema="foo")
779        i = Index("xyz", t.c.x)
780        self.assert_compile(
781            schema.CreateIndex(i), "CREATE INDEX xyz ON foo.t (x)"
782        )
783
784    def test_drop_index_schema(self):
785        t = Table("t", MetaData(), Column("x", Integer), schema="foo")
786        i = Index("xyz", t.c.x)
787        self.assert_compile(schema.DropIndex(i), "DROP INDEX foo.xyz")
788
789    def test_too_long_index_name(self):
790        dialect = testing.db.dialect.__class__()
791
792        for max_ident, max_index in [(22, None), (256, 22)]:
793            dialect.max_identifier_length = max_ident
794            dialect.max_index_name_length = max_index
795
796            for tname, cname, exp in [
797                ("sometable", "this_name_is_too_long", "ix_sometable_t_09aa"),
798                ("sometable", "this_name_alsois_long", "ix_sometable_t_3cf1"),
799            ]:
800
801                t1 = Table(
802                    tname, MetaData(), Column(cname, Integer, index=True)
803                )
804                ix1 = list(t1.indexes)[0]
805
806                self.assert_compile(
807                    schema.CreateIndex(ix1),
808                    "CREATE INDEX %s " "ON %s (%s)" % (exp, tname, cname),
809                    dialect=dialect,
810                )
811
812        dialect.max_identifier_length = 22
813        dialect.max_index_name_length = None
814
815        t1 = Table("t", MetaData(), Column("c", Integer))
816        assert_raises(
817            exc.IdentifierError,
818            schema.CreateIndex(
819                Index(
820                    "this_other_name_is_too_long_for_what_were_doing", t1.c.c
821                )
822            ).compile,
823            dialect=dialect,
824        )
825
826    def test_functional_index(self):
827        metadata = MetaData()
828        x = Table("x", metadata, Column("q", String(50)))
829        idx = Index("y", func.lower(x.c.q))
830
831        self.assert_compile(
832            schema.CreateIndex(idx), "CREATE INDEX y ON x (lower(q))"
833        )
834
835        self.assert_compile(
836            schema.CreateIndex(idx),
837            "CREATE INDEX y ON x (lower(q))",
838            dialect=testing.db.dialect,
839        )
840
841    def test_index_against_text_separate(self):
842        metadata = MetaData()
843        idx = Index("y", text("some_function(q)"))
844        t = Table("x", metadata, Column("q", String(50)))
845        t.append_constraint(idx)
846        self.assert_compile(
847            schema.CreateIndex(idx), "CREATE INDEX y ON x (some_function(q))"
848        )
849
850    def test_index_against_text_inline(self):
851        metadata = MetaData()
852        idx = Index("y", text("some_function(q)"))
853        x = Table("x", metadata, Column("q", String(50)), idx)
854
855        self.assert_compile(
856            schema.CreateIndex(idx), "CREATE INDEX y ON x (some_function(q))"
857        )
858
859    def test_index_declaration_inline(self):
860        metadata = MetaData()
861
862        t1 = Table(
863            "t1",
864            metadata,
865            Column("x", Integer),
866            Column("y", Integer),
867            Index("foo", "x", "y"),
868        )
869        self.assert_compile(
870            schema.CreateIndex(list(t1.indexes)[0]),
871            "CREATE INDEX foo ON t1 (x, y)",
872        )
873
874    def _test_deferrable(self, constraint_factory):
875        dialect = default.DefaultDialect()
876
877        t = Table(
878            "tbl",
879            MetaData(),
880            Column("a", Integer),
881            Column("b", Integer),
882            constraint_factory(deferrable=True),
883        )
884
885        sql = str(schema.CreateTable(t).compile(dialect=dialect))
886        assert "DEFERRABLE" in sql, sql
887        assert "NOT DEFERRABLE" not in sql, sql
888
889        t = Table(
890            "tbl",
891            MetaData(),
892            Column("a", Integer),
893            Column("b", Integer),
894            constraint_factory(deferrable=False),
895        )
896
897        sql = str(schema.CreateTable(t).compile(dialect=dialect))
898        assert "NOT DEFERRABLE" in sql
899
900        t = Table(
901            "tbl",
902            MetaData(),
903            Column("a", Integer),
904            Column("b", Integer),
905            constraint_factory(deferrable=True, initially="IMMEDIATE"),
906        )
907        sql = str(schema.CreateTable(t).compile(dialect=dialect))
908        assert "NOT DEFERRABLE" not in sql
909        assert "INITIALLY IMMEDIATE" in sql
910
911        t = Table(
912            "tbl",
913            MetaData(),
914            Column("a", Integer),
915            Column("b", Integer),
916            constraint_factory(deferrable=True, initially="DEFERRED"),
917        )
918        sql = str(schema.CreateTable(t).compile(dialect=dialect))
919
920        assert "NOT DEFERRABLE" not in sql
921        assert "INITIALLY DEFERRED" in sql
922
923    def test_column_level_ck_name(self):
924        t = Table(
925            "tbl",
926            MetaData(),
927            Column(
928                "a",
929                Integer,
930                CheckConstraint("a > 5", name="ck_a_greater_five"),
931            ),
932        )
933        self.assert_compile(
934            schema.CreateTable(t),
935            "CREATE TABLE tbl (a INTEGER CONSTRAINT "
936            "ck_a_greater_five CHECK (a > 5))",
937        )
938
939    def test_deferrable_pk(self):
940        def factory(**kw):
941            return PrimaryKeyConstraint("a", **kw)
942
943        self._test_deferrable(factory)
944
945    def test_deferrable_table_fk(self):
946        def factory(**kw):
947            return ForeignKeyConstraint(["b"], ["tbl.a"], **kw)
948
949        self._test_deferrable(factory)
950
951    def test_deferrable_column_fk(self):
952        t = Table(
953            "tbl",
954            MetaData(),
955            Column("a", Integer),
956            Column(
957                "b",
958                Integer,
959                ForeignKey("tbl.a", deferrable=True, initially="DEFERRED"),
960            ),
961        )
962
963        self.assert_compile(
964            schema.CreateTable(t),
965            "CREATE TABLE tbl (a INTEGER, b INTEGER, "
966            "FOREIGN KEY(b) REFERENCES tbl "
967            "(a) DEFERRABLE INITIALLY DEFERRED)",
968        )
969
970    def test_fk_match_clause(self):
971        t = Table(
972            "tbl",
973            MetaData(),
974            Column("a", Integer),
975            Column("b", Integer, ForeignKey("tbl.a", match="SIMPLE")),
976        )
977
978        self.assert_compile(
979            schema.CreateTable(t),
980            "CREATE TABLE tbl (a INTEGER, b INTEGER, "
981            "FOREIGN KEY(b) REFERENCES tbl "
982            "(a) MATCH SIMPLE)",
983        )
984
985        self.assert_compile(
986            schema.AddConstraint(list(t.foreign_keys)[0].constraint),
987            "ALTER TABLE tbl ADD FOREIGN KEY(b) "
988            "REFERENCES tbl (a) MATCH SIMPLE",
989        )
990
991    def test_create_table_omit_fks(self):
992        fkcs = [
993            ForeignKeyConstraint(["a"], ["remote.id"], name="foo"),
994            ForeignKeyConstraint(["b"], ["remote.id"], name="bar"),
995            ForeignKeyConstraint(["c"], ["remote.id"], name="bat"),
996        ]
997        m = MetaData()
998        t = Table(
999            "t",
1000            m,
1001            Column("a", Integer),
1002            Column("b", Integer),
1003            Column("c", Integer),
1004            *fkcs
1005        )
1006        Table("remote", m, Column("id", Integer, primary_key=True))
1007
1008        self.assert_compile(
1009            schema.CreateTable(t, include_foreign_key_constraints=[]),
1010            "CREATE TABLE t (a INTEGER, b INTEGER, c INTEGER)",
1011        )
1012        self.assert_compile(
1013            schema.CreateTable(t, include_foreign_key_constraints=fkcs[0:2]),
1014            "CREATE TABLE t (a INTEGER, b INTEGER, c INTEGER, "
1015            "CONSTRAINT foo FOREIGN KEY(a) REFERENCES remote (id), "
1016            "CONSTRAINT bar FOREIGN KEY(b) REFERENCES remote (id))",
1017        )
1018
1019    def test_deferrable_unique(self):
1020        def factory(**kw):
1021            return UniqueConstraint("b", **kw)
1022
1023        self._test_deferrable(factory)
1024
1025    def test_deferrable_table_check(self):
1026        def factory(**kw):
1027            return CheckConstraint("a < b", **kw)
1028
1029        self._test_deferrable(factory)
1030
1031    def test_multiple(self):
1032        m = MetaData()
1033        Table(
1034            "foo",
1035            m,
1036            Column("id", Integer, primary_key=True),
1037            Column("bar", Integer, primary_key=True),
1038        )
1039        tb = Table(
1040            "some_table",
1041            m,
1042            Column("id", Integer, primary_key=True),
1043            Column("foo_id", Integer, ForeignKey("foo.id")),
1044            Column("foo_bar", Integer, ForeignKey("foo.bar")),
1045        )
1046        self.assert_compile(
1047            schema.CreateTable(tb),
1048            "CREATE TABLE some_table ("
1049            "id INTEGER NOT NULL, "
1050            "foo_id INTEGER, "
1051            "foo_bar INTEGER, "
1052            "PRIMARY KEY (id), "
1053            "FOREIGN KEY(foo_id) REFERENCES foo (id), "
1054            "FOREIGN KEY(foo_bar) REFERENCES foo (bar))",
1055        )
1056
1057    def test_empty_pkc(self):
1058        # test that an empty primary key is ignored
1059        metadata = MetaData()
1060        tbl = Table(
1061            "test",
1062            metadata,
1063            Column("x", Integer, autoincrement=False),
1064            Column("y", Integer, autoincrement=False),
1065            PrimaryKeyConstraint(),
1066        )
1067        self.assert_compile(
1068            schema.CreateTable(tbl), "CREATE TABLE test (x INTEGER, y INTEGER)"
1069        )
1070
1071    def test_empty_uc(self):
1072        # test that an empty constraint is ignored
1073        metadata = MetaData()
1074        tbl = Table(
1075            "test",
1076            metadata,
1077            Column("x", Integer, autoincrement=False),
1078            Column("y", Integer, autoincrement=False),
1079            UniqueConstraint(),
1080        )
1081        self.assert_compile(
1082            schema.CreateTable(tbl), "CREATE TABLE test (x INTEGER, y INTEGER)"
1083        )
1084
1085    def test_deferrable_column_check(self):
1086        t = Table(
1087            "tbl",
1088            MetaData(),
1089            Column("a", Integer),
1090            Column(
1091                "b",
1092                Integer,
1093                CheckConstraint(
1094                    "a < b", deferrable=True, initially="DEFERRED"
1095                ),
1096            ),
1097        )
1098
1099        self.assert_compile(
1100            schema.CreateTable(t),
1101            "CREATE TABLE tbl (a INTEGER, b INTEGER CHECK (a < b) "
1102            "DEFERRABLE INITIALLY DEFERRED)",
1103        )
1104
1105    def test_use_alter(self):
1106        m = MetaData()
1107        Table("t", m, Column("a", Integer))
1108
1109        Table(
1110            "t2",
1111            m,
1112            Column(
1113                "a", Integer, ForeignKey("t.a", use_alter=True, name="fk_ta")
1114            ),
1115            Column("b", Integer, ForeignKey("t.a", name="fk_tb")),
1116        )
1117
1118        e = engines.mock_engine(dialect_name="postgresql")
1119        m.create_all(e)
1120        m.drop_all(e)
1121
1122        e.assert_sql(
1123            [
1124                "CREATE TABLE t (a INTEGER)",
1125                "CREATE TABLE t2 (a INTEGER, b INTEGER, CONSTRAINT fk_tb "
1126                "FOREIGN KEY(b) REFERENCES t (a))",
1127                "ALTER TABLE t2 "
1128                "ADD CONSTRAINT fk_ta FOREIGN KEY(a) REFERENCES t (a)",
1129                "ALTER TABLE t2 DROP CONSTRAINT fk_ta",
1130                "DROP TABLE t2",
1131                "DROP TABLE t",
1132            ]
1133        )
1134
1135    def _constraint_create_fixture(self):
1136        m = MetaData()
1137
1138        t = Table("tbl", m, Column("a", Integer), Column("b", Integer))
1139
1140        t2 = Table("t2", m, Column("a", Integer), Column("b", Integer))
1141
1142        return t, t2
1143
1144    def test_render_ck_constraint_inline(self):
1145        t, t2 = self._constraint_create_fixture()
1146
1147        CheckConstraint(
1148            "a < b",
1149            name="my_test_constraint",
1150            deferrable=True,
1151            initially="DEFERRED",
1152            table=t,
1153        )
1154
1155        # before we create an AddConstraint,
1156        # the CONSTRAINT comes out inline
1157        self.assert_compile(
1158            schema.CreateTable(t),
1159            "CREATE TABLE tbl ("
1160            "a INTEGER, "
1161            "b INTEGER, "
1162            "CONSTRAINT my_test_constraint CHECK (a < b) "
1163            "DEFERRABLE INITIALLY DEFERRED"
1164            ")",
1165        )
1166
1167    def test_render_ck_constraint_external(self):
1168        t, t2 = self._constraint_create_fixture()
1169
1170        constraint = CheckConstraint(
1171            "a < b",
1172            name="my_test_constraint",
1173            deferrable=True,
1174            initially="DEFERRED",
1175            table=t,
1176        )
1177
1178        self.assert_compile(
1179            schema.AddConstraint(constraint),
1180            "ALTER TABLE tbl ADD CONSTRAINT my_test_constraint "
1181            "CHECK (a < b) DEFERRABLE INITIALLY DEFERRED",
1182        )
1183
1184    def test_external_ck_constraint_cancels_internal(self):
1185        t, t2 = self._constraint_create_fixture()
1186
1187        constraint = CheckConstraint(
1188            "a < b",
1189            name="my_test_constraint",
1190            deferrable=True,
1191            initially="DEFERRED",
1192            table=t,
1193        )
1194
1195        schema.AddConstraint(constraint)
1196
1197        # once we make an AddConstraint,
1198        # inline compilation of the CONSTRAINT
1199        # is disabled
1200        self.assert_compile(
1201            schema.CreateTable(t),
1202            "CREATE TABLE tbl (" "a INTEGER, " "b INTEGER" ")",
1203        )
1204
1205    def test_render_drop_constraint(self):
1206        t, t2 = self._constraint_create_fixture()
1207
1208        constraint = CheckConstraint(
1209            "a < b",
1210            name="my_test_constraint",
1211            deferrable=True,
1212            initially="DEFERRED",
1213            table=t,
1214        )
1215
1216        self.assert_compile(
1217            schema.DropConstraint(constraint),
1218            "ALTER TABLE tbl DROP CONSTRAINT my_test_constraint",
1219        )
1220
1221    def test_render_drop_constraint_cascade(self):
1222        t, t2 = self._constraint_create_fixture()
1223
1224        constraint = CheckConstraint(
1225            "a < b",
1226            name="my_test_constraint",
1227            deferrable=True,
1228            initially="DEFERRED",
1229            table=t,
1230        )
1231
1232        self.assert_compile(
1233            schema.DropConstraint(constraint, cascade=True),
1234            "ALTER TABLE tbl DROP CONSTRAINT my_test_constraint CASCADE",
1235        )
1236
1237    def test_render_add_fk_constraint_stringcol(self):
1238        t, t2 = self._constraint_create_fixture()
1239
1240        constraint = ForeignKeyConstraint(["b"], ["t2.a"])
1241        t.append_constraint(constraint)
1242        self.assert_compile(
1243            schema.AddConstraint(constraint),
1244            "ALTER TABLE tbl ADD FOREIGN KEY(b) REFERENCES t2 (a)",
1245        )
1246
1247    def test_render_add_fk_constraint_realcol(self):
1248        t, t2 = self._constraint_create_fixture()
1249
1250        constraint = ForeignKeyConstraint([t.c.a], [t2.c.b])
1251        t.append_constraint(constraint)
1252        self.assert_compile(
1253            schema.AddConstraint(constraint),
1254            "ALTER TABLE tbl ADD FOREIGN KEY(a) REFERENCES t2 (b)",
1255        )
1256
1257    def test_render_add_uq_constraint_stringcol(self):
1258        t, t2 = self._constraint_create_fixture()
1259
1260        constraint = UniqueConstraint("a", "b", name="uq_cst")
1261        t2.append_constraint(constraint)
1262        self.assert_compile(
1263            schema.AddConstraint(constraint),
1264            "ALTER TABLE t2 ADD CONSTRAINT uq_cst UNIQUE (a, b)",
1265        )
1266
1267    def test_render_add_uq_constraint_realcol(self):
1268        t, t2 = self._constraint_create_fixture()
1269
1270        constraint = UniqueConstraint(t2.c.a, t2.c.b, name="uq_cs2")
1271        self.assert_compile(
1272            schema.AddConstraint(constraint),
1273            "ALTER TABLE t2 ADD CONSTRAINT uq_cs2 UNIQUE (a, b)",
1274        )
1275
1276    def test_render_add_pk_constraint(self):
1277        t, t2 = self._constraint_create_fixture()
1278
1279        assert t.c.a.primary_key is False
1280        constraint = PrimaryKeyConstraint(t.c.a)
1281        assert t.c.a.primary_key is True
1282        self.assert_compile(
1283            schema.AddConstraint(constraint),
1284            "ALTER TABLE tbl ADD PRIMARY KEY (a)",
1285        )
1286
1287    def test_render_check_constraint_sql_literal(self):
1288        t, t2 = self._constraint_create_fixture()
1289
1290        constraint = CheckConstraint(t.c.a > 5)
1291
1292        self.assert_compile(
1293            schema.AddConstraint(constraint),
1294            "ALTER TABLE tbl ADD CHECK (a > 5)",
1295        )
1296
1297    def test_render_check_constraint_inline_sql_literal(self):
1298        t, t2 = self._constraint_create_fixture()
1299
1300        m = MetaData()
1301        t = Table(
1302            "t",
1303            m,
1304            Column("a", Integer, CheckConstraint(Column("a", Integer) > 5)),
1305        )
1306
1307        self.assert_compile(
1308            schema.CreateColumn(t.c.a), "a INTEGER CHECK (a > 5)"
1309        )
1310
1311    def test_render_index_sql_literal(self):
1312        t, t2 = self._constraint_create_fixture()
1313
1314        constraint = Index("name", t.c.a + 5)
1315
1316        self.assert_compile(
1317            schema.CreateIndex(constraint), "CREATE INDEX name ON tbl (a + 5)"
1318        )
1319