1from contextlib import contextmanager
2import pickle
3
4import sqlalchemy as tsa
5from sqlalchemy import ARRAY
6from sqlalchemy import bindparam
7from sqlalchemy import BLANK_SCHEMA
8from sqlalchemy import Boolean
9from sqlalchemy import CheckConstraint
10from sqlalchemy import Column
11from sqlalchemy import column
12from sqlalchemy import ColumnDefault
13from sqlalchemy import desc
14from sqlalchemy import Enum
15from sqlalchemy import event
16from sqlalchemy import exc
17from sqlalchemy import ForeignKey
18from sqlalchemy import ForeignKeyConstraint
19from sqlalchemy import func
20from sqlalchemy import Index
21from sqlalchemy import Integer
22from sqlalchemy import MetaData
23from sqlalchemy import PrimaryKeyConstraint
24from sqlalchemy import schema
25from sqlalchemy import Sequence
26from sqlalchemy import String
27from sqlalchemy import Table
28from sqlalchemy import table
29from sqlalchemy import testing
30from sqlalchemy import text
31from sqlalchemy import TypeDecorator
32from sqlalchemy import types as sqltypes
33from sqlalchemy import Unicode
34from sqlalchemy import UniqueConstraint
35from sqlalchemy.engine import default
36from sqlalchemy.schema import AddConstraint
37from sqlalchemy.schema import CreateIndex
38from sqlalchemy.schema import DefaultClause
39from sqlalchemy.schema import DropIndex
40from sqlalchemy.sql import naming
41from sqlalchemy.sql import operators
42from sqlalchemy.sql.elements import _NONE_NAME
43from sqlalchemy.sql.elements import literal_column
44from sqlalchemy.testing import assert_raises
45from sqlalchemy.testing import assert_raises_message
46from sqlalchemy.testing import AssertsCompiledSQL
47from sqlalchemy.testing import ComparesTables
48from sqlalchemy.testing import emits_warning
49from sqlalchemy.testing import eq_
50from sqlalchemy.testing import expect_raises_message
51from sqlalchemy.testing import fixtures
52from sqlalchemy.testing import is_
53from sqlalchemy.testing import is_false
54from sqlalchemy.testing import is_true
55from sqlalchemy.testing import mock
56
57
58class MetaDataTest(fixtures.TestBase, ComparesTables):
59    def test_metadata_contains(self):
60        metadata = MetaData()
61        t1 = Table("t1", metadata, Column("x", Integer))
62        t2 = Table("t2", metadata, Column("x", Integer), schema="foo")
63        t3 = Table("t2", MetaData(), Column("x", Integer))
64        t4 = Table("t1", MetaData(), Column("x", Integer), schema="foo")
65
66        assert "t1" in metadata
67        assert "foo.t2" in metadata
68        assert "t2" not in metadata
69        assert "foo.t1" not in metadata
70        assert t1 in metadata
71        assert t2 in metadata
72        assert t3 not in metadata
73        assert t4 not in metadata
74
75    def test_uninitialized_column_copy(self):
76        for col in [
77            Column("foo", String(), nullable=False),
78            Column("baz", String(), unique=True),
79            Column(Integer(), primary_key=True),
80            Column(
81                "bar",
82                Integer(),
83                Sequence("foo_seq"),
84                primary_key=True,
85                key="bar",
86            ),
87            Column(Integer(), ForeignKey("bat.blah"), doc="this is a col"),
88            Column(
89                "bar",
90                Integer(),
91                ForeignKey("bat.blah"),
92                primary_key=True,
93                key="bar",
94            ),
95            Column("bar", Integer(), info={"foo": "bar"}),
96        ]:
97            c2 = col._copy()
98            for attr in (
99                "name",
100                "type",
101                "nullable",
102                "primary_key",
103                "key",
104                "unique",
105                "info",
106                "doc",
107            ):
108                eq_(getattr(col, attr), getattr(c2, attr))
109            eq_(len(col.foreign_keys), len(c2.foreign_keys))
110            if col.default:
111                eq_(c2.default.name, "foo_seq")
112            for a1, a2 in zip(col.foreign_keys, c2.foreign_keys):
113                assert a1 is not a2
114                eq_(a2._colspec, "bat.blah")
115
116    def test_col_subclass_copy(self):
117        class MyColumn(schema.Column):
118            def __init__(self, *args, **kw):
119                self.widget = kw.pop("widget", None)
120                super(MyColumn, self).__init__(*args, **kw)
121
122            def _copy(self, *arg, **kw):
123                c = super(MyColumn, self)._copy(*arg, **kw)
124                c.widget = self.widget
125                return c
126
127        c1 = MyColumn("foo", Integer, widget="x")
128        c2 = c1._copy()
129        assert isinstance(c2, MyColumn)
130        eq_(c2.widget, "x")
131
132    def test_uninitialized_column_copy_events(self):
133        msgs = []
134
135        def write(c, t):
136            msgs.append("attach %s.%s" % (t.name, c.name))
137
138        c1 = Column("foo", String())
139        m = MetaData()
140        for i in range(3):
141            cx = c1._copy()
142            # as of 0.7, these events no longer copy.  its expected
143            # that listeners will be re-established from the
144            # natural construction of things.
145            cx._on_table_attach(write)
146            Table("foo%d" % i, m, cx)
147        eq_(msgs, ["attach foo0.foo", "attach foo1.foo", "attach foo2.foo"])
148
149    def test_schema_collection_add(self):
150        metadata = MetaData()
151
152        Table("t1", metadata, Column("x", Integer), schema="foo")
153        Table("t2", metadata, Column("x", Integer), schema="bar")
154        Table("t3", metadata, Column("x", Integer))
155
156        eq_(metadata._schemas, set(["foo", "bar"]))
157        eq_(len(metadata.tables), 3)
158
159    def test_schema_collection_remove(self):
160        metadata = MetaData()
161
162        t1 = Table("t1", metadata, Column("x", Integer), schema="foo")
163        Table("t2", metadata, Column("x", Integer), schema="bar")
164        t3 = Table("t3", metadata, Column("x", Integer), schema="bar")
165
166        metadata.remove(t3)
167        eq_(metadata._schemas, set(["foo", "bar"]))
168        eq_(len(metadata.tables), 2)
169
170        metadata.remove(t1)
171        eq_(metadata._schemas, set(["bar"]))
172        eq_(len(metadata.tables), 1)
173
174    def test_schema_collection_remove_all(self):
175        metadata = MetaData()
176
177        Table("t1", metadata, Column("x", Integer), schema="foo")
178        Table("t2", metadata, Column("x", Integer), schema="bar")
179
180        metadata.clear()
181        eq_(metadata._schemas, set())
182        eq_(len(metadata.tables), 0)
183
184    def test_metadata_tables_immutable(self):
185        # this use case was added due to #1917.
186        metadata = MetaData()
187
188        Table("t1", metadata, Column("x", Integer))
189        assert "t1" in metadata.tables
190
191        assert_raises(TypeError, lambda: metadata.tables.pop("t1"))
192
193    @testing.provide_metadata
194    def test_dupe_tables(self):
195        metadata = self.metadata
196        Table(
197            "table1",
198            metadata,
199            Column("col1", Integer, primary_key=True),
200            Column("col2", String(20)),
201        )
202
203        metadata.create_all(testing.db)
204        Table("table1", metadata, autoload_with=testing.db)
205
206        def go():
207            Table(
208                "table1",
209                metadata,
210                Column("col1", Integer, primary_key=True),
211                Column("col2", String(20)),
212            )
213
214        assert_raises_message(
215            tsa.exc.InvalidRequestError,
216            "Table 'table1' is already defined for this "
217            "MetaData instance.  Specify 'extend_existing=True' "
218            "to redefine options and columns on an existing "
219            "Table object.",
220            go,
221        )
222
223    def test_fk_copy(self):
224        c1 = Column("foo", Integer)
225        c2 = Column("bar", Integer)
226        m = MetaData()
227        t1 = Table("t", m, c1, c2)
228
229        kw = dict(
230            onupdate="X",
231            ondelete="Y",
232            use_alter=True,
233            name="f1",
234            deferrable="Z",
235            initially="Q",
236            link_to_name=True,
237        )
238
239        fk1 = ForeignKey(c1, **kw)
240        fk2 = ForeignKeyConstraint((c1,), (c2,), **kw)
241
242        t1.append_constraint(fk2)
243        fk1c = fk1._copy()
244        fk2c = fk2._copy()
245
246        for k in kw:
247            eq_(getattr(fk1c, k), kw[k])
248            eq_(getattr(fk2c, k), kw[k])
249
250    def test_check_constraint_copy(self):
251        def r(x):
252            return x
253
254        c = CheckConstraint(
255            "foo bar",
256            name="name",
257            initially=True,
258            deferrable=True,
259            _create_rule=r,
260        )
261        c2 = c._copy()
262        eq_(c2.name, "name")
263        eq_(str(c2.sqltext), "foo bar")
264        eq_(c2.initially, True)
265        eq_(c2.deferrable, True)
266        assert c2._create_rule is r
267
268    def test_col_replace_w_constraint(self):
269        m = MetaData()
270        a = Table("a", m, Column("id", Integer, primary_key=True))
271
272        aid = Column("a_id", ForeignKey("a.id"))
273        b = Table("b", m, aid)
274        b.append_column(aid)
275
276        assert b.c.a_id.references(a.c.id)
277        eq_(len(b.constraints), 2)
278
279    def test_fk_construct(self):
280        c1 = Column("foo", Integer)
281        c2 = Column("bar", Integer)
282        m = MetaData()
283        t1 = Table("t", m, c1, c2)
284        fk1 = ForeignKeyConstraint(("foo",), ("bar",), table=t1)
285        assert fk1 in t1.constraints
286
287    def test_fk_constraint_col_collection_w_table(self):
288        c1 = Column("foo", Integer)
289        c2 = Column("bar", Integer)
290        m = MetaData()
291        t1 = Table("t", m, c1, c2)
292        fk1 = ForeignKeyConstraint(("foo",), ("bar",), table=t1)
293        eq_(dict(fk1.columns), {"foo": c1})
294
295    def test_fk_constraint_col_collection_no_table(self):
296        fk1 = ForeignKeyConstraint(("foo", "bat"), ("bar", "hoho"))
297        eq_(dict(fk1.columns), {})
298        eq_(fk1.column_keys, ["foo", "bat"])
299        eq_(fk1._col_description, "foo, bat")
300        eq_(fk1._elements, {"foo": fk1.elements[0], "bat": fk1.elements[1]})
301
302    def test_fk_constraint_col_collection_no_table_real_cols(self):
303        c1 = Column("foo", Integer)
304        c2 = Column("bar", Integer)
305        fk1 = ForeignKeyConstraint((c1,), (c2,))
306        eq_(dict(fk1.columns), {})
307        eq_(fk1.column_keys, ["foo"])
308        eq_(fk1._col_description, "foo")
309        eq_(fk1._elements, {"foo": fk1.elements[0]})
310
311    def test_fk_constraint_col_collection_added_to_table(self):
312        c1 = Column("foo", Integer)
313        m = MetaData()
314        fk1 = ForeignKeyConstraint(("foo",), ("bar",))
315        Table("t", m, c1, fk1)
316        eq_(dict(fk1.columns), {"foo": c1})
317        eq_(fk1._elements, {"foo": fk1.elements[0]})
318
319    def test_fk_constraint_col_collection_via_fk(self):
320        fk = ForeignKey("bar")
321        c1 = Column("foo", Integer, fk)
322        m = MetaData()
323        t1 = Table("t", m, c1)
324        fk1 = fk.constraint
325        eq_(fk1.column_keys, ["foo"])
326        assert fk1 in t1.constraints
327        eq_(fk1.column_keys, ["foo"])
328        eq_(dict(fk1.columns), {"foo": c1})
329        eq_(fk1._elements, {"foo": fk})
330
331    def test_fk_no_such_parent_col_error(self):
332        meta = MetaData()
333        a = Table("a", meta, Column("a", Integer))
334        Table("b", meta, Column("b", Integer))
335
336        def go():
337            a.append_constraint(ForeignKeyConstraint(["x"], ["b.b"]))
338
339        assert_raises_message(
340            exc.ArgumentError,
341            "Can't create ForeignKeyConstraint on "
342            "table 'a': no column named 'x' is present.",
343            go,
344        )
345
346    def test_fk_given_non_col(self):
347        not_a_col = bindparam("x")
348        assert_raises_message(
349            exc.ArgumentError,
350            "String column name or Column object for DDL foreign "
351            "key constraint expected, got .*Bind",
352            ForeignKey,
353            not_a_col,
354        )
355
356    def test_fk_given_non_col_clauseelem(self):
357        class Foo(object):
358            def __clause_element__(self):
359                return bindparam("x")
360
361        assert_raises_message(
362            exc.ArgumentError,
363            "String column name or Column object for DDL foreign "
364            "key constraint expected, got .*Foo",
365            ForeignKey,
366            Foo(),
367        )
368
369    def test_fk_given_col_non_table(self):
370        t = Table("t", MetaData(), Column("x", Integer))
371        xa = t.alias().c.x
372        assert_raises_message(
373            exc.ArgumentError,
374            "ForeignKey received Column not bound to a Table, got: .*Alias",
375            ForeignKey,
376            xa,
377        )
378
379    def test_fk_given_col_non_table_clauseelem(self):
380        t = Table("t", MetaData(), Column("x", Integer))
381
382        class Foo(object):
383            def __clause_element__(self):
384                return t.alias().c.x
385
386        assert_raises_message(
387            exc.ArgumentError,
388            "ForeignKey received Column not bound to a Table, got: .*Alias",
389            ForeignKey,
390            Foo(),
391        )
392
393    def test_fk_no_such_target_col_error_upfront(self):
394        meta = MetaData()
395        a = Table("a", meta, Column("a", Integer))
396        Table("b", meta, Column("b", Integer))
397
398        a.append_constraint(ForeignKeyConstraint(["a"], ["b.x"]))
399
400        assert_raises_message(
401            exc.NoReferencedColumnError,
402            "Could not initialize target column for ForeignKey 'b.x' on "
403            "table 'a': table 'b' has no column named 'x'",
404            getattr,
405            list(a.foreign_keys)[0],
406            "column",
407        )
408
409    def test_fk_no_such_target_col_error_delayed(self):
410        meta = MetaData()
411        a = Table("a", meta, Column("a", Integer))
412        a.append_constraint(ForeignKeyConstraint(["a"], ["b.x"]))
413
414        Table("b", meta, Column("b", Integer))
415
416        assert_raises_message(
417            exc.NoReferencedColumnError,
418            "Could not initialize target column for ForeignKey 'b.x' on "
419            "table 'a': table 'b' has no column named 'x'",
420            getattr,
421            list(a.foreign_keys)[0],
422            "column",
423        )
424
425    def test_fk_mismatched_local_remote_cols(self):
426
427        assert_raises_message(
428            exc.ArgumentError,
429            "ForeignKeyConstraint number of constrained columns must "
430            "match the number of referenced columns.",
431            ForeignKeyConstraint,
432            ["a"],
433            ["b.a", "b.b"],
434        )
435
436        assert_raises_message(
437            exc.ArgumentError,
438            "ForeignKeyConstraint number of constrained columns "
439            "must match the number of referenced columns.",
440            ForeignKeyConstraint,
441            ["a", "b"],
442            ["b.a"],
443        )
444
445        assert_raises_message(
446            exc.ArgumentError,
447            "ForeignKeyConstraint with duplicate source column "
448            "references are not supported.",
449            ForeignKeyConstraint,
450            ["a", "a"],
451            ["b.a", "b.b"],
452        )
453
454    def test_pickle_metadata_sequence_restated(self):
455        m1 = MetaData()
456        Table(
457            "a",
458            m1,
459            Column("id", Integer, primary_key=True),
460            Column("x", Integer, Sequence("x_seq")),
461        )
462
463        m2 = pickle.loads(pickle.dumps(m1))
464
465        s2 = Sequence("x_seq")
466        t2 = Table(
467            "a",
468            m2,
469            Column("id", Integer, primary_key=True),
470            Column("x", Integer, s2),
471            extend_existing=True,
472        )
473
474        assert m2._sequences["x_seq"] is t2.c.x.default
475        assert m2._sequences["x_seq"] is s2
476
477    def test_sequence_restated_replaced(self):
478        """Test restatement of Sequence replaces."""
479
480        m1 = MetaData()
481        s1 = Sequence("x_seq")
482        t = Table("a", m1, Column("x", Integer, s1))
483        assert m1._sequences["x_seq"] is s1
484
485        s2 = Sequence("x_seq")
486        Table("a", m1, Column("x", Integer, s2), extend_existing=True)
487        assert t.c.x.default is s2
488        assert m1._sequences["x_seq"] is s2
489
490    def test_sequence_attach_to_table(self):
491        m1 = MetaData()
492        s1 = Sequence("s")
493        Table("a", m1, Column("x", Integer, s1))
494        assert s1.metadata is m1
495
496    def test_sequence_attach_to_existing_table(self):
497        m1 = MetaData()
498        s1 = Sequence("s")
499        t = Table("a", m1, Column("x", Integer))
500        t.c.x._init_items(s1)
501        assert s1.metadata is m1
502
503    def test_pickle_metadata_sequence_implicit(self):
504        m1 = MetaData()
505        Table(
506            "a",
507            m1,
508            Column("id", Integer, primary_key=True),
509            Column("x", Integer, Sequence("x_seq")),
510        )
511
512        m2 = pickle.loads(pickle.dumps(m1))
513
514        t2 = Table("a", m2, extend_existing=True)
515
516        eq_(m2._sequences, {"x_seq": t2.c.x.default})
517
518    def test_pickle_metadata_schema(self):
519        m1 = MetaData()
520        Table(
521            "a",
522            m1,
523            Column("id", Integer, primary_key=True),
524            Column("x", Integer, Sequence("x_seq")),
525            schema="y",
526        )
527
528        m2 = pickle.loads(pickle.dumps(m1))
529
530        Table("a", m2, schema="y", extend_existing=True)
531
532        eq_(m2._schemas, m1._schemas)
533
534    def test_metadata_schema_arg(self):
535        m1 = MetaData(schema="sch1")
536        m2 = MetaData(schema="sch1", quote_schema=True)
537        m3 = MetaData(schema="sch1", quote_schema=False)
538        m4 = MetaData()
539
540        for (
541            i,
542            (
543                name,
544                metadata,
545                schema_,
546                quote_schema,
547                exp_schema,
548                exp_quote_schema,
549            ),
550        ) in enumerate(
551            [
552                ("t1", m1, None, None, "sch1", None),
553                ("t2", m1, "sch2", None, "sch2", None),
554                ("t3", m1, "sch2", True, "sch2", True),
555                ("t4", m1, "sch1", None, "sch1", None),
556                ("t5", m1, BLANK_SCHEMA, None, None, None),
557                ("t1", m2, None, None, "sch1", True),
558                ("t2", m2, "sch2", None, "sch2", None),
559                ("t3", m2, "sch2", True, "sch2", True),
560                ("t4", m2, "sch1", None, "sch1", None),
561                ("t1", m3, None, None, "sch1", False),
562                ("t2", m3, "sch2", None, "sch2", None),
563                ("t3", m3, "sch2", True, "sch2", True),
564                ("t4", m3, "sch1", None, "sch1", None),
565                ("t1", m4, None, None, None, None),
566                ("t2", m4, "sch2", None, "sch2", None),
567                ("t3", m4, "sch2", True, "sch2", True),
568                ("t4", m4, "sch1", None, "sch1", None),
569                ("t5", m4, BLANK_SCHEMA, None, None, None),
570            ]
571        ):
572            kw = {}
573            if schema_ is not None:
574                kw["schema"] = schema_
575            if quote_schema is not None:
576                kw["quote_schema"] = quote_schema
577            t = Table(name, metadata, **kw)
578            eq_(t.schema, exp_schema, "test %d, table schema" % i)
579            eq_(
580                t.schema.quote if t.schema is not None else None,
581                exp_quote_schema,
582                "test %d, table quote_schema" % i,
583            )
584            seq = Sequence(name, metadata=metadata, **kw)
585            eq_(seq.schema, exp_schema, "test %d, seq schema" % i)
586            eq_(
587                seq.schema.quote if seq.schema is not None else None,
588                exp_quote_schema,
589                "test %d, seq quote_schema" % i,
590            )
591
592    def test_manual_dependencies(self):
593        meta = MetaData()
594        a = Table("a", meta, Column("foo", Integer))
595        b = Table("b", meta, Column("foo", Integer))
596        c = Table("c", meta, Column("foo", Integer))
597        d = Table("d", meta, Column("foo", Integer))
598        e = Table("e", meta, Column("foo", Integer))
599
600        e.add_is_dependent_on(c)
601        a.add_is_dependent_on(b)
602        b.add_is_dependent_on(d)
603        e.add_is_dependent_on(b)
604        c.add_is_dependent_on(a)
605        eq_(meta.sorted_tables, [d, b, a, c, e])
606
607    def test_deterministic_order(self):
608        meta = MetaData()
609        a = Table("a", meta, Column("foo", Integer))
610        b = Table("b", meta, Column("foo", Integer))
611        c = Table("c", meta, Column("foo", Integer))
612        d = Table("d", meta, Column("foo", Integer))
613        e = Table("e", meta, Column("foo", Integer))
614
615        e.add_is_dependent_on(c)
616        a.add_is_dependent_on(b)
617        eq_(meta.sorted_tables, [b, c, d, a, e])
618
619    def test_fks_deterministic_order(self):
620        meta = MetaData()
621        a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo")))
622        b = Table("b", meta, Column("foo", Integer))
623        c = Table("c", meta, Column("foo", Integer))
624        d = Table("d", meta, Column("foo", Integer))
625        e = Table("e", meta, Column("foo", Integer, ForeignKey("c.foo")))
626
627        eq_(meta.sorted_tables, [b, c, d, a, e])
628
629    def test_cycles_fks_warning_one(self):
630        meta = MetaData()
631        a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo")))
632        b = Table("b", meta, Column("foo", Integer, ForeignKey("d.foo")))
633        c = Table("c", meta, Column("foo", Integer, ForeignKey("b.foo")))
634        d = Table("d", meta, Column("foo", Integer, ForeignKey("c.foo")))
635        e = Table("e", meta, Column("foo", Integer))
636
637        with testing.expect_warnings(
638            "Cannot correctly sort tables; there are unresolvable cycles "
639            'between tables "b, c, d", which is usually caused by mutually '
640            "dependent foreign key constraints.  "
641            "Foreign key constraints involving these tables will not be "
642            "considered"
643        ):
644            eq_(meta.sorted_tables, [b, c, d, e, a])
645
646    def test_cycles_fks_warning_two(self):
647        meta = MetaData()
648        a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo")))
649        b = Table("b", meta, Column("foo", Integer, ForeignKey("a.foo")))
650        c = Table("c", meta, Column("foo", Integer, ForeignKey("e.foo")))
651        d = Table("d", meta, Column("foo", Integer))
652        e = Table("e", meta, Column("foo", Integer, ForeignKey("d.foo")))
653
654        with testing.expect_warnings(
655            "Cannot correctly sort tables; there are unresolvable cycles "
656            'between tables "a, b", which is usually caused by mutually '
657            "dependent foreign key constraints.  "
658            "Foreign key constraints involving these tables will not be "
659            "considered"
660        ):
661            eq_(meta.sorted_tables, [a, b, d, e, c])
662
663    def test_cycles_fks_fks_delivered_separately(self):
664        meta = MetaData()
665        a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo")))
666        b = Table("b", meta, Column("foo", Integer, ForeignKey("a.foo")))
667        c = Table("c", meta, Column("foo", Integer, ForeignKey("e.foo")))
668        d = Table("d", meta, Column("foo", Integer))
669        e = Table("e", meta, Column("foo", Integer, ForeignKey("d.foo")))
670
671        results = schema.sort_tables_and_constraints(
672            sorted(meta.tables.values(), key=lambda t: t.key)
673        )
674        results[-1] = (None, set(results[-1][-1]))
675        eq_(
676            results,
677            [
678                (a, set()),
679                (b, set()),
680                (d, {fk.constraint for fk in d.foreign_keys}),
681                (e, {fk.constraint for fk in e.foreign_keys}),
682                (c, {fk.constraint for fk in c.foreign_keys}),
683                (
684                    None,
685                    {fk.constraint for fk in a.foreign_keys}.union(
686                        fk.constraint for fk in b.foreign_keys
687                    ),
688                ),
689            ],
690        )
691
692    def test_cycles_fks_usealter(self):
693        meta = MetaData()
694        a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo")))
695        b = Table(
696            "b",
697            meta,
698            Column("foo", Integer, ForeignKey("d.foo", use_alter=True)),
699        )
700        c = Table("c", meta, Column("foo", Integer, ForeignKey("b.foo")))
701        d = Table("d", meta, Column("foo", Integer, ForeignKey("c.foo")))
702        e = Table("e", meta, Column("foo", Integer))
703
704        eq_(meta.sorted_tables, [b, e, a, c, d])
705
706    def test_nonexistent(self):
707        assert_raises(
708            tsa.exc.NoSuchTableError,
709            Table,
710            "fake_table",
711            MetaData(),
712            autoload_with=testing.db,
713        )
714
715    def test_assorted_repr(self):
716        t1 = Table("foo", MetaData(), Column("x", Integer))
717        i1 = Index("bar", t1.c.x)
718        ck = schema.CheckConstraint("x > y", name="someconstraint")
719
720        for const, exp in (
721            (Sequence("my_seq"), "Sequence('my_seq')"),
722            (Sequence("my_seq", start=5), "Sequence('my_seq', start=5)"),
723            (Column("foo", Integer), "Column('foo', Integer(), table=None)"),
724            (
725                Column(
726                    "foo",
727                    Integer,
728                    primary_key=True,
729                    nullable=False,
730                    onupdate=1,
731                    default=42,
732                    server_default="42",
733                    comment="foo",
734                ),
735                "Column('foo', Integer(), table=None, primary_key=True, "
736                "nullable=False, onupdate=%s, default=%s, server_default=%s, "
737                "comment='foo')"
738                % (
739                    ColumnDefault(1),
740                    ColumnDefault(42),
741                    DefaultClause("42"),
742                ),
743            ),
744            (
745                Table("bar", MetaData(), Column("x", String)),
746                "Table('bar', MetaData(), "
747                "Column('x', String(), table=<bar>), schema=None)",
748            ),
749            (
750                schema.DefaultGenerator(for_update=True),
751                "DefaultGenerator(for_update=True)",
752            ),
753            (schema.Index("bar", "c"), "Index('bar', 'c')"),
754            (i1, "Index('bar', Column('x', Integer(), table=<foo>))"),
755            (schema.FetchedValue(), "FetchedValue()"),
756            (
757                ck,
758                "CheckConstraint("
759                "%s"
760                ", name='someconstraint')" % repr(ck.sqltext),
761            ),
762            (ColumnDefault(("foo", "bar")), "ColumnDefault(('foo', 'bar'))"),
763        ):
764            eq_(repr(const), exp)
765
766
767class ToMetaDataTest(fixtures.TestBase, AssertsCompiledSQL, ComparesTables):
768    @testing.requires.check_constraints
769    def test_copy(self):
770        # TODO: modernize this test for 2.0
771
772        from sqlalchemy.testing.schema import Table
773
774        meta = MetaData()
775
776        table = Table(
777            "mytable",
778            meta,
779            Column("myid", Integer, Sequence("foo_id_seq"), primary_key=True),
780            Column("name", String(40), nullable=True),
781            Column(
782                "foo",
783                String(40),
784                nullable=False,
785                server_default="x",
786                server_onupdate="q",
787            ),
788            Column(
789                "bar", String(40), nullable=False, default="y", onupdate="z"
790            ),
791            Column(
792                "description", String(30), CheckConstraint("description='hi'")
793            ),
794            UniqueConstraint("name"),
795            test_needs_fk=True,
796        )
797
798        table2 = Table(
799            "othertable",
800            meta,
801            Column("id", Integer, Sequence("foo_seq"), primary_key=True),
802            Column("myid", Integer, ForeignKey("mytable.myid")),
803            test_needs_fk=True,
804        )
805
806        table3 = Table(
807            "has_comments",
808            meta,
809            Column("foo", Integer, comment="some column"),
810            comment="table comment",
811        )
812
813        def test_to_metadata():
814            meta2 = MetaData()
815            table_c = table.to_metadata(meta2)
816            table2_c = table2.to_metadata(meta2)
817            table3_c = table3.to_metadata(meta2)
818            return (table_c, table2_c, table3_c)
819
820        def test_pickle():
821            meta.bind = testing.db
822            meta2 = pickle.loads(pickle.dumps(meta))
823            assert meta2.bind is None
824            pickle.loads(pickle.dumps(meta2))
825            return (
826                meta2.tables["mytable"],
827                meta2.tables["othertable"],
828                meta2.tables["has_comments"],
829            )
830
831        def test_pickle_via_reflect():
832            # this is the most common use case, pickling the results of a
833            # database reflection
834            meta2 = MetaData()
835            t1 = Table("mytable", meta2, autoload_with=testing.db)
836            Table("othertable", meta2, autoload_with=testing.db)
837            Table("has_comments", meta2, autoload_with=testing.db)
838            meta3 = pickle.loads(pickle.dumps(meta2))
839            assert meta3.bind is None
840            assert meta3.tables["mytable"] is not t1
841
842            return (
843                meta3.tables["mytable"],
844                meta3.tables["othertable"],
845                meta3.tables["has_comments"],
846            )
847
848        meta.create_all(testing.db)
849        try:
850            for test, has_constraints, reflect in (
851                (test_to_metadata, True, False),
852                (test_pickle, True, False),
853                (test_pickle_via_reflect, False, True),
854            ):
855                table_c, table2_c, table3_c = test()
856                self.assert_tables_equal(table, table_c)
857                self.assert_tables_equal(table2, table2_c)
858                assert table is not table_c
859                assert table.primary_key is not table_c.primary_key
860                assert (
861                    list(table2_c.c.myid.foreign_keys)[0].column
862                    is table_c.c.myid
863                )
864                assert (
865                    list(table2_c.c.myid.foreign_keys)[0].column
866                    is not table.c.myid
867                )
868                assert "x" in str(table_c.c.foo.server_default.arg)
869                if not reflect:
870                    assert isinstance(table_c.c.myid.default, Sequence)
871                    assert str(table_c.c.foo.server_onupdate.arg) == "q"
872                    assert str(table_c.c.bar.default.arg) == "y"
873                    assert (
874                        getattr(
875                            table_c.c.bar.onupdate.arg,
876                            "arg",
877                            table_c.c.bar.onupdate.arg,
878                        )
879                        == "z"
880                    )
881                    assert isinstance(table2_c.c.id.default, Sequence)
882
883                # constraints don't get reflected for any dialect right
884                # now
885
886                if has_constraints:
887                    for c in table_c.c.description.constraints:
888                        if isinstance(c, CheckConstraint):
889                            break
890                    else:
891                        assert False
892                    assert str(c.sqltext) == "description='hi'"
893                    for c in table_c.constraints:
894                        if isinstance(c, UniqueConstraint):
895                            break
896                    else:
897                        assert False
898                    assert c.columns.contains_column(table_c.c.name)
899                    assert not c.columns.contains_column(table.c.name)
900
901                if testing.requires.comment_reflection.enabled:
902                    eq_(table3_c.comment, "table comment")
903                    eq_(table3_c.c.foo.comment, "some column")
904
905        finally:
906            meta.drop_all(testing.db)
907
908    def test_col_key_fk_parent(self):
909        # test #2643
910        m1 = MetaData()
911        a = Table("a", m1, Column("x", Integer))
912        b = Table("b", m1, Column("x", Integer, ForeignKey("a.x"), key="y"))
913        assert b.c.y.references(a.c.x)
914
915        m2 = MetaData()
916        b2 = b.to_metadata(m2)
917        a2 = a.to_metadata(m2)
918        assert b2.c.y.references(a2.c.x)
919
920    def test_column_collection_constraint_w_ad_hoc_columns(self):
921        """Test ColumnCollectionConstraint that has columns that aren't
922        part of the Table.
923
924        """
925        meta = MetaData()
926
927        uq1 = UniqueConstraint(literal_column("some_name"))
928        cc1 = CheckConstraint(literal_column("some_name") > 5)
929        table = Table(
930            "mytable",
931            meta,
932            Column("myid", Integer, primary_key=True),
933            Column("name", String(40), nullable=True),
934            uq1,
935            cc1,
936        )
937
938        self.assert_compile(
939            schema.AddConstraint(uq1),
940            "ALTER TABLE mytable ADD UNIQUE (some_name)",
941            dialect="default",
942        )
943        self.assert_compile(
944            schema.AddConstraint(cc1),
945            "ALTER TABLE mytable ADD CHECK (some_name > 5)",
946            dialect="default",
947        )
948        meta2 = MetaData()
949        table2 = table.to_metadata(meta2)
950        uq2 = [
951            c for c in table2.constraints if isinstance(c, UniqueConstraint)
952        ][0]
953        cc2 = [
954            c for c in table2.constraints if isinstance(c, CheckConstraint)
955        ][0]
956        self.assert_compile(
957            schema.AddConstraint(uq2),
958            "ALTER TABLE mytable ADD UNIQUE (some_name)",
959            dialect="default",
960        )
961        self.assert_compile(
962            schema.AddConstraint(cc2),
963            "ALTER TABLE mytable ADD CHECK (some_name > 5)",
964            dialect="default",
965        )
966
967    def test_change_schema(self):
968        meta = MetaData()
969
970        table = Table(
971            "mytable",
972            meta,
973            Column("myid", Integer, primary_key=True),
974            Column("name", String(40), nullable=True),
975            Column(
976                "description", String(30), CheckConstraint("description='hi'")
977            ),
978            UniqueConstraint("name"),
979        )
980
981        table2 = Table(
982            "othertable",
983            meta,
984            Column("id", Integer, primary_key=True),
985            Column("myid", Integer, ForeignKey("mytable.myid")),
986        )
987
988        meta2 = MetaData()
989        table_c = table.to_metadata(meta2, schema="someschema")
990        table2_c = table2.to_metadata(meta2, schema="someschema")
991
992        eq_(
993            str(table_c.join(table2_c).onclause),
994            str(table_c.c.myid == table2_c.c.myid),
995        )
996        eq_(
997            str(table_c.join(table2_c).onclause),
998            "someschema.mytable.myid = someschema.othertable.myid",
999        )
1000
1001    def test_retain_table_schema(self):
1002        meta = MetaData()
1003
1004        table = Table(
1005            "mytable",
1006            meta,
1007            Column("myid", Integer, primary_key=True),
1008            Column("name", String(40), nullable=True),
1009            Column(
1010                "description", String(30), CheckConstraint("description='hi'")
1011            ),
1012            UniqueConstraint("name"),
1013            schema="myschema",
1014        )
1015
1016        table2 = Table(
1017            "othertable",
1018            meta,
1019            Column("id", Integer, primary_key=True),
1020            Column("myid", Integer, ForeignKey("myschema.mytable.myid")),
1021            schema="myschema",
1022        )
1023
1024        meta2 = MetaData()
1025        table_c = table.to_metadata(meta2)
1026        table2_c = table2.to_metadata(meta2)
1027
1028        eq_(
1029            str(table_c.join(table2_c).onclause),
1030            str(table_c.c.myid == table2_c.c.myid),
1031        )
1032        eq_(
1033            str(table_c.join(table2_c).onclause),
1034            "myschema.mytable.myid = myschema.othertable.myid",
1035        )
1036
1037    def test_change_name_retain_metadata(self):
1038        meta = MetaData()
1039
1040        table = Table(
1041            "mytable",
1042            meta,
1043            Column("myid", Integer, primary_key=True),
1044            Column("name", String(40), nullable=True),
1045            Column(
1046                "description", String(30), CheckConstraint("description='hi'")
1047            ),
1048            UniqueConstraint("name"),
1049            schema="myschema",
1050        )
1051
1052        table2 = table.to_metadata(table.metadata, name="newtable")
1053        table3 = table.to_metadata(
1054            table.metadata, schema="newschema", name="newtable"
1055        )
1056
1057        assert table.metadata is table2.metadata
1058        assert table.metadata is table3.metadata
1059        eq_(
1060            (table.name, table2.name, table3.name),
1061            ("mytable", "newtable", "newtable"),
1062        )
1063        eq_(
1064            (table.key, table2.key, table3.key),
1065            ("myschema.mytable", "myschema.newtable", "newschema.newtable"),
1066        )
1067
1068    def test_change_name_change_metadata(self):
1069        meta = MetaData()
1070        meta2 = MetaData()
1071
1072        table = Table(
1073            "mytable",
1074            meta,
1075            Column("myid", Integer, primary_key=True),
1076            Column("name", String(40), nullable=True),
1077            Column(
1078                "description", String(30), CheckConstraint("description='hi'")
1079            ),
1080            UniqueConstraint("name"),
1081            schema="myschema",
1082        )
1083
1084        table2 = table.to_metadata(meta2, name="newtable")
1085
1086        assert table.metadata is not table2.metadata
1087        eq_((table.name, table2.name), ("mytable", "newtable"))
1088        eq_((table.key, table2.key), ("myschema.mytable", "myschema.newtable"))
1089
1090    def test_change_name_selfref_fk_moves(self):
1091        meta = MetaData()
1092
1093        referenced = Table(
1094            "ref", meta, Column("id", Integer, primary_key=True)
1095        )
1096        table = Table(
1097            "mytable",
1098            meta,
1099            Column("id", Integer, primary_key=True),
1100            Column("parent_id", ForeignKey("mytable.id")),
1101            Column("ref_id", ForeignKey("ref.id")),
1102        )
1103
1104        table2 = table.to_metadata(table.metadata, name="newtable")
1105        assert table.metadata is table2.metadata
1106        assert table2.c.ref_id.references(referenced.c.id)
1107        assert table2.c.parent_id.references(table2.c.id)
1108
1109    def test_change_name_selfref_fk_moves_w_schema(self):
1110        meta = MetaData()
1111
1112        referenced = Table(
1113            "ref", meta, Column("id", Integer, primary_key=True)
1114        )
1115        table = Table(
1116            "mytable",
1117            meta,
1118            Column("id", Integer, primary_key=True),
1119            Column("parent_id", ForeignKey("mytable.id")),
1120            Column("ref_id", ForeignKey("ref.id")),
1121        )
1122
1123        table2 = table.to_metadata(
1124            table.metadata, name="newtable", schema="newschema"
1125        )
1126        ref2 = referenced.to_metadata(table.metadata, schema="newschema")
1127        assert table.metadata is table2.metadata
1128        assert table2.c.ref_id.references(ref2.c.id)
1129        assert table2.c.parent_id.references(table2.c.id)
1130
1131    def _assert_fk(self, t2, schema, expected, referred_schema_fn=None):
1132        m2 = MetaData()
1133        existing_schema = t2.schema
1134        if schema:
1135            t2c = t2.to_metadata(
1136                m2, schema=schema, referred_schema_fn=referred_schema_fn
1137            )
1138            eq_(t2c.schema, schema)
1139        else:
1140            t2c = t2.to_metadata(m2, referred_schema_fn=referred_schema_fn)
1141            eq_(t2c.schema, existing_schema)
1142        eq_(list(t2c.c.y.foreign_keys)[0]._get_colspec(), expected)
1143
1144    def test_fk_has_schema_string_retain_schema(self):
1145        m = MetaData()
1146
1147        t2 = Table("t2", m, Column("y", Integer, ForeignKey("q.t1.x")))
1148        self._assert_fk(t2, None, "q.t1.x")
1149
1150        Table("t1", m, Column("x", Integer), schema="q")
1151        self._assert_fk(t2, None, "q.t1.x")
1152
1153    def test_fk_has_schema_string_new_schema(self):
1154        m = MetaData()
1155
1156        t2 = Table("t2", m, Column("y", Integer, ForeignKey("q.t1.x")))
1157        self._assert_fk(t2, "z", "q.t1.x")
1158
1159        Table("t1", m, Column("x", Integer), schema="q")
1160        self._assert_fk(t2, "z", "q.t1.x")
1161
1162    def test_fk_has_schema_col_retain_schema(self):
1163        m = MetaData()
1164
1165        t1 = Table("t1", m, Column("x", Integer), schema="q")
1166        t2 = Table("t2", m, Column("y", Integer, ForeignKey(t1.c.x)))
1167
1168        self._assert_fk(t2, "z", "q.t1.x")
1169
1170    def test_fk_has_schema_col_new_schema(self):
1171        m = MetaData()
1172
1173        t1 = Table("t1", m, Column("x", Integer), schema="q")
1174        t2 = Table("t2", m, Column("y", Integer, ForeignKey(t1.c.x)))
1175
1176        self._assert_fk(t2, "z", "q.t1.x")
1177
1178    def test_fk_and_referent_has_same_schema_string_retain_schema(self):
1179        m = MetaData()
1180
1181        t2 = Table(
1182            "t2", m, Column("y", Integer, ForeignKey("q.t1.x")), schema="q"
1183        )
1184
1185        self._assert_fk(t2, None, "q.t1.x")
1186
1187        Table("t1", m, Column("x", Integer), schema="q")
1188        self._assert_fk(t2, None, "q.t1.x")
1189
1190    def test_fk_and_referent_has_same_schema_string_new_schema(self):
1191        m = MetaData()
1192
1193        t2 = Table(
1194            "t2", m, Column("y", Integer, ForeignKey("q.t1.x")), schema="q"
1195        )
1196
1197        self._assert_fk(t2, "z", "z.t1.x")
1198
1199        Table("t1", m, Column("x", Integer), schema="q")
1200        self._assert_fk(t2, "z", "z.t1.x")
1201
1202    def test_fk_and_referent_has_same_schema_col_retain_schema(self):
1203        m = MetaData()
1204
1205        t1 = Table("t1", m, Column("x", Integer), schema="q")
1206        t2 = Table(
1207            "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q"
1208        )
1209        self._assert_fk(t2, None, "q.t1.x")
1210
1211    def test_fk_and_referent_has_same_schema_col_new_schema(self):
1212        m = MetaData()
1213
1214        t1 = Table("t1", m, Column("x", Integer), schema="q")
1215        t2 = Table(
1216            "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q"
1217        )
1218        self._assert_fk(t2, "z", "z.t1.x")
1219
1220    def test_fk_and_referent_has_diff_schema_string_retain_schema(self):
1221        m = MetaData()
1222
1223        t2 = Table(
1224            "t2", m, Column("y", Integer, ForeignKey("p.t1.x")), schema="q"
1225        )
1226
1227        self._assert_fk(t2, None, "p.t1.x")
1228
1229        Table("t1", m, Column("x", Integer), schema="p")
1230        self._assert_fk(t2, None, "p.t1.x")
1231
1232    def test_fk_and_referent_has_diff_schema_string_new_schema(self):
1233        m = MetaData()
1234
1235        t2 = Table(
1236            "t2", m, Column("y", Integer, ForeignKey("p.t1.x")), schema="q"
1237        )
1238
1239        self._assert_fk(t2, "z", "p.t1.x")
1240
1241        Table("t1", m, Column("x", Integer), schema="p")
1242        self._assert_fk(t2, "z", "p.t1.x")
1243
1244    def test_fk_and_referent_has_diff_schema_col_retain_schema(self):
1245        m = MetaData()
1246
1247        t1 = Table("t1", m, Column("x", Integer), schema="p")
1248        t2 = Table(
1249            "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q"
1250        )
1251        self._assert_fk(t2, None, "p.t1.x")
1252
1253    def test_fk_and_referent_has_diff_schema_col_new_schema(self):
1254        m = MetaData()
1255
1256        t1 = Table("t1", m, Column("x", Integer), schema="p")
1257        t2 = Table(
1258            "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q"
1259        )
1260        self._assert_fk(t2, "z", "p.t1.x")
1261
1262    def test_fk_custom_system(self):
1263        m = MetaData()
1264        t2 = Table(
1265            "t2", m, Column("y", Integer, ForeignKey("p.t1.x")), schema="q"
1266        )
1267
1268        def ref_fn(table, to_schema, constraint, referred_schema):
1269            assert table is t2
1270            eq_(to_schema, "z")
1271            eq_(referred_schema, "p")
1272            return "h"
1273
1274        self._assert_fk(t2, "z", "h.t1.x", referred_schema_fn=ref_fn)
1275
1276    def test_copy_info(self):
1277        m = MetaData()
1278        fk = ForeignKey("t2.id")
1279        c = Column("c", Integer, fk)
1280        ck = CheckConstraint("c > 5")
1281        t = Table("t", m, c, ck)
1282
1283        m.info["minfo"] = True
1284        fk.info["fkinfo"] = True
1285        c.info["cinfo"] = True
1286        ck.info["ckinfo"] = True
1287        t.info["tinfo"] = True
1288        t.primary_key.info["pkinfo"] = True
1289        fkc = [
1290            const
1291            for const in t.constraints
1292            if isinstance(const, ForeignKeyConstraint)
1293        ][0]
1294        fkc.info["fkcinfo"] = True
1295
1296        m2 = MetaData()
1297        t2 = t.to_metadata(m2)
1298
1299        m.info["minfo"] = False
1300        fk.info["fkinfo"] = False
1301        c.info["cinfo"] = False
1302        ck.info["ckinfo"] = False
1303        t.primary_key.info["pkinfo"] = False
1304        fkc.info["fkcinfo"] = False
1305
1306        eq_(m2.info, {})
1307        eq_(t2.info, {"tinfo": True})
1308        eq_(t2.c.c.info, {"cinfo": True})
1309        eq_(list(t2.c.c.foreign_keys)[0].info, {"fkinfo": True})
1310        eq_(t2.primary_key.info, {"pkinfo": True})
1311
1312        fkc2 = [
1313            const
1314            for const in t2.constraints
1315            if isinstance(const, ForeignKeyConstraint)
1316        ][0]
1317        eq_(fkc2.info, {"fkcinfo": True})
1318
1319        ck2 = [
1320            const
1321            for const in t2.constraints
1322            if isinstance(const, CheckConstraint)
1323        ][0]
1324        eq_(ck2.info, {"ckinfo": True})
1325
1326    def test_dialect_kwargs(self):
1327        meta = MetaData()
1328
1329        table = Table(
1330            "mytable",
1331            meta,
1332            Column("myid", Integer, primary_key=True),
1333            mysql_engine="InnoDB",
1334        )
1335
1336        meta2 = MetaData()
1337        table_c = table.to_metadata(meta2)
1338
1339        eq_(table.kwargs, {"mysql_engine": "InnoDB"})
1340
1341        eq_(table.kwargs, table_c.kwargs)
1342
1343    def test_indexes(self):
1344        meta = MetaData()
1345
1346        table = Table(
1347            "mytable",
1348            meta,
1349            Column("id", Integer, primary_key=True),
1350            Column("data1", Integer, index=True),
1351            Column("data2", Integer),
1352            Index("text", text("data1 + 1")),
1353        )
1354        Index("multi", table.c.data1, table.c.data2)
1355        Index("func", func.abs(table.c.data1))
1356        Index("multi-func", table.c.data1, func.abs(table.c.data2))
1357
1358        meta2 = MetaData()
1359        table_c = table.to_metadata(meta2)
1360
1361        def _get_key(i):
1362            return (
1363                [i.name, i.unique]
1364                + sorted(i.kwargs.items())
1365                + [str(col) for col in i.expressions]
1366            )
1367
1368        eq_(
1369            sorted([_get_key(i) for i in table.indexes]),
1370            sorted([_get_key(i) for i in table_c.indexes]),
1371        )
1372
1373    def test_indexes_with_col_redefine(self):
1374        meta = MetaData()
1375
1376        table = Table(
1377            "mytable",
1378            meta,
1379            Column("id", Integer, primary_key=True),
1380            Column("data1", Integer),
1381            Column("data2", Integer),
1382            Index("text", text("data1 + 1")),
1383        )
1384        Index("multi", table.c.data1, table.c.data2)
1385        Index("func", func.abs(table.c.data1))
1386        Index("multi-func", table.c.data1, func.abs(table.c.data2))
1387
1388        table = Table(
1389            "mytable",
1390            meta,
1391            Column("data1", Integer),
1392            Column("data2", Integer),
1393            extend_existing=True,
1394        )
1395
1396        meta2 = MetaData()
1397        table_c = table.to_metadata(meta2)
1398
1399        def _get_key(i):
1400            return (
1401                [i.name, i.unique]
1402                + sorted(i.kwargs.items())
1403                + [str(col) for col in i.expressions]
1404            )
1405
1406        eq_(
1407            sorted([_get_key(i) for i in table.indexes]),
1408            sorted([_get_key(i) for i in table_c.indexes]),
1409        )
1410
1411    @emits_warning("Table '.+' already exists within the given MetaData")
1412    def test_already_exists(self):
1413
1414        meta1 = MetaData()
1415        table1 = Table(
1416            "mytable", meta1, Column("myid", Integer, primary_key=True)
1417        )
1418        meta2 = MetaData()
1419        table2 = Table(
1420            "mytable", meta2, Column("yourid", Integer, primary_key=True)
1421        )
1422
1423        table_c = table1.to_metadata(meta2)
1424        table_d = table2.to_metadata(meta2)
1425
1426        # d'oh!
1427        assert table_c is table_d
1428
1429    def test_default_schema_metadata(self):
1430        meta = MetaData(schema="myschema")
1431
1432        table = Table(
1433            "mytable",
1434            meta,
1435            Column("myid", Integer, primary_key=True),
1436            Column("name", String(40), nullable=True),
1437            Column(
1438                "description", String(30), CheckConstraint("description='hi'")
1439            ),
1440            UniqueConstraint("name"),
1441        )
1442
1443        table2 = Table(
1444            "othertable",
1445            meta,
1446            Column("id", Integer, primary_key=True),
1447            Column("myid", Integer, ForeignKey("myschema.mytable.myid")),
1448        )
1449
1450        meta2 = MetaData(schema="someschema")
1451        table_c = table.to_metadata(meta2, schema=None)
1452        table2_c = table2.to_metadata(meta2, schema=None)
1453
1454        eq_(
1455            str(table_c.join(table2_c).onclause),
1456            str(table_c.c.myid == table2_c.c.myid),
1457        )
1458        eq_(
1459            str(table_c.join(table2_c).onclause),
1460            "someschema.mytable.myid = someschema.othertable.myid",
1461        )
1462
1463    def test_strip_schema(self):
1464        meta = MetaData()
1465
1466        table = Table(
1467            "mytable",
1468            meta,
1469            Column("myid", Integer, primary_key=True),
1470            Column("name", String(40), nullable=True),
1471            Column(
1472                "description", String(30), CheckConstraint("description='hi'")
1473            ),
1474            UniqueConstraint("name"),
1475        )
1476
1477        table2 = Table(
1478            "othertable",
1479            meta,
1480            Column("id", Integer, primary_key=True),
1481            Column("myid", Integer, ForeignKey("mytable.myid")),
1482        )
1483
1484        meta2 = MetaData()
1485        table_c = table.to_metadata(meta2, schema=None)
1486        table2_c = table2.to_metadata(meta2, schema=None)
1487
1488        eq_(
1489            str(table_c.join(table2_c).onclause),
1490            str(table_c.c.myid == table2_c.c.myid),
1491        )
1492        eq_(
1493            str(table_c.join(table2_c).onclause),
1494            "mytable.myid = othertable.myid",
1495        )
1496
1497    def test_unique_true_flag(self):
1498        meta = MetaData()
1499
1500        table = Table("mytable", meta, Column("x", Integer, unique=True))
1501
1502        m2 = MetaData()
1503
1504        t2 = table.to_metadata(m2)
1505
1506        eq_(
1507            len(
1508                [
1509                    const
1510                    for const in t2.constraints
1511                    if isinstance(const, UniqueConstraint)
1512                ]
1513            ),
1514            1,
1515        )
1516
1517    def test_index_true_flag(self):
1518        meta = MetaData()
1519
1520        table = Table("mytable", meta, Column("x", Integer, index=True))
1521
1522        m2 = MetaData()
1523
1524        t2 = table.to_metadata(m2)
1525
1526        eq_(len(t2.indexes), 1)
1527
1528
1529class InfoTest(fixtures.TestBase):
1530    def test_metadata_info(self):
1531        m1 = MetaData()
1532        eq_(m1.info, {})
1533
1534        m1 = MetaData(info={"foo": "bar"})
1535        eq_(m1.info, {"foo": "bar"})
1536
1537    def test_foreignkey_constraint_info(self):
1538        fkc = ForeignKeyConstraint(["a"], ["b"], name="bar")
1539        eq_(fkc.info, {})
1540
1541        fkc = ForeignKeyConstraint(
1542            ["a"], ["b"], name="bar", info={"foo": "bar"}
1543        )
1544        eq_(fkc.info, {"foo": "bar"})
1545
1546    def test_foreignkey_info(self):
1547        fkc = ForeignKey("a")
1548        eq_(fkc.info, {})
1549
1550        fkc = ForeignKey("a", info={"foo": "bar"})
1551        eq_(fkc.info, {"foo": "bar"})
1552
1553    def test_primarykey_constraint_info(self):
1554        pkc = PrimaryKeyConstraint("a", name="x")
1555        eq_(pkc.info, {})
1556
1557        pkc = PrimaryKeyConstraint("a", name="x", info={"foo": "bar"})
1558        eq_(pkc.info, {"foo": "bar"})
1559
1560    def test_unique_constraint_info(self):
1561        uc = UniqueConstraint("a", name="x")
1562        eq_(uc.info, {})
1563
1564        uc = UniqueConstraint("a", name="x", info={"foo": "bar"})
1565        eq_(uc.info, {"foo": "bar"})
1566
1567    def test_check_constraint_info(self):
1568        cc = CheckConstraint("foo=bar", name="x")
1569        eq_(cc.info, {})
1570
1571        cc = CheckConstraint("foo=bar", name="x", info={"foo": "bar"})
1572        eq_(cc.info, {"foo": "bar"})
1573
1574    def test_index_info(self):
1575        ix = Index("x", "a")
1576        eq_(ix.info, {})
1577
1578        ix = Index("x", "a", info={"foo": "bar"})
1579        eq_(ix.info, {"foo": "bar"})
1580
1581    def test_column_info(self):
1582        c = Column("x", Integer)
1583        eq_(c.info, {})
1584
1585        c = Column("x", Integer, info={"foo": "bar"})
1586        eq_(c.info, {"foo": "bar"})
1587
1588    def test_table_info(self):
1589        t = Table("x", MetaData())
1590        eq_(t.info, {})
1591
1592        t = Table("x", MetaData(), info={"foo": "bar"})
1593        eq_(t.info, {"foo": "bar"})
1594
1595
1596class TableTest(fixtures.TestBase, AssertsCompiledSQL):
1597    @testing.requires.temporary_tables
1598    @testing.skip_if("mssql", "different col format")
1599    def test_prefixes(self):
1600        from sqlalchemy import Table
1601
1602        table1 = Table(
1603            "temporary_table_1",
1604            MetaData(),
1605            Column("col1", Integer),
1606            prefixes=["TEMPORARY"],
1607        )
1608
1609        self.assert_compile(
1610            schema.CreateTable(table1),
1611            "CREATE TEMPORARY TABLE temporary_table_1 (col1 INTEGER)",
1612        )
1613
1614        table2 = Table(
1615            "temporary_table_2",
1616            MetaData(),
1617            Column("col1", Integer),
1618            prefixes=["VIRTUAL"],
1619        )
1620        self.assert_compile(
1621            schema.CreateTable(table2),
1622            "CREATE VIRTUAL TABLE temporary_table_2 (col1 INTEGER)",
1623        )
1624
1625    @testing.combinations((None, []), ((), []), ([], []), (["foo"], ["foo"]))
1626    def test_prefixes_parameter_parsing(self, arg, expected):
1627        """test #6685"""
1628        table = Table("foo", MetaData(), Column("bar", Integer), prefixes=arg)
1629        eq_(table._prefixes, expected)
1630
1631    def test_table_info(self):
1632        metadata = MetaData()
1633        t1 = Table("foo", metadata, info={"x": "y"})
1634        t2 = Table("bar", metadata, info={})
1635        t3 = Table("bat", metadata)
1636        assert t1.info == {"x": "y"}
1637        assert t2.info == {}
1638        assert t3.info == {}
1639        for t in (t1, t2, t3):
1640            t.info["bar"] = "zip"
1641            assert t.info["bar"] == "zip"
1642
1643    def test_invalid_objects(self):
1644        assert_raises_message(
1645            tsa.exc.ArgumentError,
1646            "'SchemaItem' object, such as a 'Column' or a "
1647            "'Constraint' expected, got <.*ColumnClause at .*; q>",
1648            Table,
1649            "asdf",
1650            MetaData(),
1651            tsa.column("q", Integer),
1652        )
1653
1654        assert_raises_message(
1655            tsa.exc.ArgumentError,
1656            r"'SchemaItem' object, such as a 'Column' or a "
1657            r"'Constraint' expected, got String\(\)",
1658            Table,
1659            "asdf",
1660            MetaData(),
1661            String(),
1662        )
1663
1664        assert_raises_message(
1665            tsa.exc.ArgumentError,
1666            "'SchemaItem' object, such as a 'Column' or a "
1667            "'Constraint' expected, got 12",
1668            Table,
1669            "asdf",
1670            MetaData(),
1671            12,
1672        )
1673
1674    def test_reset_exported_passes(self):
1675
1676        m = MetaData()
1677
1678        t = Table("t", m, Column("foo", Integer))
1679        eq_(list(t.c), [t.c.foo])
1680
1681        t._reset_exported()
1682
1683        eq_(list(t.c), [t.c.foo])
1684
1685    def test_foreign_key_constraints_collection(self):
1686        metadata = MetaData()
1687        t1 = Table("foo", metadata, Column("a", Integer))
1688        eq_(t1.foreign_key_constraints, set())
1689
1690        fk1 = ForeignKey("q.id")
1691        fk2 = ForeignKey("j.id")
1692        fk3 = ForeignKeyConstraint(["b", "c"], ["r.x", "r.y"])
1693
1694        t1.append_column(Column("b", Integer, fk1))
1695        eq_(t1.foreign_key_constraints, set([fk1.constraint]))
1696
1697        t1.append_column(Column("c", Integer, fk2))
1698        eq_(t1.foreign_key_constraints, set([fk1.constraint, fk2.constraint]))
1699
1700        t1.append_constraint(fk3)
1701        eq_(
1702            t1.foreign_key_constraints,
1703            set([fk1.constraint, fk2.constraint, fk3]),
1704        )
1705
1706    def test_c_immutable(self):
1707        m = MetaData()
1708        t1 = Table("t", m, Column("x", Integer), Column("y", Integer))
1709        assert_raises(TypeError, t1.c.extend, [Column("z", Integer)])
1710
1711        def assign():
1712            t1.c["z"] = Column("z", Integer)
1713
1714        assert_raises(TypeError, assign)
1715
1716        def assign2():
1717            t1.c.z = Column("z", Integer)
1718
1719        assert_raises(TypeError, assign2)
1720
1721    def test_c_mutate_after_unpickle(self):
1722        m = MetaData()
1723
1724        y = Column("y", Integer)
1725        t1 = Table("t", m, Column("x", Integer), y)
1726
1727        # note we are testing immutable column collection here
1728        t2 = pickle.loads(pickle.dumps(t1))
1729        z = Column("z", Integer)
1730        g = Column("g", Integer)
1731        t2.append_column(z)
1732
1733        is_(t1.c.contains_column(y), True)
1734        is_(t2.c.contains_column(y), False)
1735        y2 = t2.c.y
1736        is_(t2.c.contains_column(y2), True)
1737
1738        is_(t2.c.contains_column(z), True)
1739        is_(t2.c.contains_column(g), False)
1740
1741    def test_table_ctor_duplicated_column_name(self):
1742        def go():
1743            return Table(
1744                "t",
1745                MetaData(),
1746                Column("a", Integer),
1747                Column("col", Integer),
1748                Column("col", String),
1749            )
1750
1751        with testing.expect_deprecated(
1752            "A column with name 'col' is already present in table 't'",
1753        ):
1754            t = go()
1755        is_true(isinstance(t.c.col.type, String))
1756        # when it will raise
1757        # with testing.expect_raises_message(
1758        #     exc.ArgumentError,
1759        #     "A column with name 'col' is already present in table 't'",
1760        # ):
1761        #     go()
1762
1763    def test_append_column_existing_name(self):
1764        t = Table("t", MetaData(), Column("col", Integer))
1765
1766        with testing.expect_deprecated(
1767            "A column with name 'col' is already present in table 't'",
1768        ):
1769            t.append_column(Column("col", String))
1770        is_true(isinstance(t.c.col.type, String))
1771        # when it will raise
1772        # col = t.c.col
1773        # with testing.expect_raises_message(
1774        #     exc.ArgumentError,
1775        #     "A column with name 'col' is already present in table 't'",
1776        # ):
1777        #     t.append_column(Column("col", String))
1778        # is_true(t.c.col is col)
1779
1780    def test_append_column_replace_existing(self):
1781        t = Table("t", MetaData(), Column("col", Integer))
1782        t.append_column(Column("col", String), replace_existing=True)
1783        is_true(isinstance(t.c.col.type, String))
1784
1785    def test_autoincrement_replace(self):
1786        m = MetaData()
1787
1788        t = Table("t", m, Column("id", Integer, primary_key=True))
1789
1790        is_(t._autoincrement_column, t.c.id)
1791
1792        t = Table(
1793            "t",
1794            m,
1795            Column("id", Integer, primary_key=True),
1796            extend_existing=True,
1797        )
1798        is_(t._autoincrement_column, t.c.id)
1799
1800    def test_pk_args_standalone(self):
1801        m = MetaData()
1802        t = Table(
1803            "t",
1804            m,
1805            Column("x", Integer, primary_key=True),
1806            PrimaryKeyConstraint(mssql_clustered=True),
1807        )
1808        eq_(list(t.primary_key), [t.c.x])
1809        eq_(t.primary_key.dialect_kwargs, {"mssql_clustered": True})
1810
1811    def test_pk_cols_sets_flags(self):
1812        m = MetaData()
1813        t = Table(
1814            "t",
1815            m,
1816            Column("x", Integer),
1817            Column("y", Integer),
1818            Column("z", Integer),
1819            PrimaryKeyConstraint("x", "y"),
1820        )
1821        eq_(t.c.x.primary_key, True)
1822        eq_(t.c.y.primary_key, True)
1823        eq_(t.c.z.primary_key, False)
1824
1825    def test_pk_col_mismatch_one(self):
1826        m = MetaData()
1827        assert_raises_message(
1828            exc.SAWarning,
1829            "Table 't' specifies columns 'x' as primary_key=True, "
1830            "not matching locally specified columns 'q'",
1831            Table,
1832            "t",
1833            m,
1834            Column("x", Integer, primary_key=True),
1835            Column("q", Integer),
1836            PrimaryKeyConstraint("q"),
1837        )
1838
1839    def test_pk_col_mismatch_two(self):
1840        m = MetaData()
1841        assert_raises_message(
1842            exc.SAWarning,
1843            "Table 't' specifies columns 'a', 'b', 'c' as primary_key=True, "
1844            "not matching locally specified columns 'b', 'c'",
1845            Table,
1846            "t",
1847            m,
1848            Column("a", Integer, primary_key=True),
1849            Column("b", Integer, primary_key=True),
1850            Column("c", Integer, primary_key=True),
1851            PrimaryKeyConstraint("b", "c"),
1852        )
1853
1854    @testing.emits_warning("Table 't'")
1855    def test_pk_col_mismatch_three(self):
1856        m = MetaData()
1857        t = Table(
1858            "t",
1859            m,
1860            Column("x", Integer, primary_key=True),
1861            Column("q", Integer),
1862            PrimaryKeyConstraint("q"),
1863        )
1864        eq_(list(t.primary_key), [t.c.q])
1865
1866    @testing.emits_warning("Table 't'")
1867    def test_pk_col_mismatch_four(self):
1868        m = MetaData()
1869        t = Table(
1870            "t",
1871            m,
1872            Column("a", Integer, primary_key=True),
1873            Column("b", Integer, primary_key=True),
1874            Column("c", Integer, primary_key=True),
1875            PrimaryKeyConstraint("b", "c"),
1876        )
1877        eq_(list(t.primary_key), [t.c.b, t.c.c])
1878
1879    def test_pk_always_flips_nullable(self):
1880        m = MetaData()
1881
1882        t1 = Table("t1", m, Column("x", Integer), PrimaryKeyConstraint("x"))
1883
1884        t2 = Table("t2", m, Column("x", Integer, primary_key=True))
1885
1886        eq_(list(t1.primary_key), [t1.c.x])
1887
1888        eq_(list(t2.primary_key), [t2.c.x])
1889
1890        assert t1.c.x.primary_key
1891        assert t2.c.x.primary_key
1892
1893        assert not t2.c.x.nullable
1894        assert not t1.c.x.nullable
1895
1896    def test_pk_can_be_nullable(self):
1897        m = MetaData()
1898
1899        t1 = Table(
1900            "t1",
1901            m,
1902            Column("x", Integer, nullable=True),
1903            PrimaryKeyConstraint("x"),
1904        )
1905
1906        t2 = Table(
1907            "t2", m, Column("x", Integer, primary_key=True, nullable=True)
1908        )
1909
1910        eq_(list(t1.primary_key), [t1.c.x])
1911
1912        eq_(list(t2.primary_key), [t2.c.x])
1913
1914        assert t1.c.x.primary_key
1915        assert t2.c.x.primary_key
1916
1917        assert t2.c.x.nullable
1918        assert t1.c.x.nullable
1919
1920    def test_must_exist(self):
1921        with testing.expect_raises_message(
1922            exc.InvalidRequestError, "Table 'foo' not defined"
1923        ):
1924            Table("foo", MetaData(), must_exist=True)
1925
1926    @testing.combinations(
1927        ("comment", ("A", "B", "A")),
1928        ("implicit_returning", (True, False, True)),
1929        ("info", ({"A": 1}, {"A": 2}, {"A": 1})),
1930    )
1931    def test_extend_attributes(self, attrib, attrib_values):
1932        """
1933        ensure `extend_existing` is compatible with simple attributes
1934        """
1935        metadata = MetaData()
1936        for counter, _attrib_value in enumerate(attrib_values):
1937            _extend_existing = True if (counter > 0) else False
1938            _kwargs = {
1939                "extend_existing": _extend_existing,
1940                attrib: _attrib_value,
1941            }
1942            table_a = Table(
1943                "a",
1944                metadata,
1945                Column("foo", String, primary_key=True),
1946                **_kwargs
1947            )
1948            eq_(getattr(table_a, attrib), _attrib_value)
1949            eq_(getattr(metadata.tables["a"], attrib), _attrib_value)
1950
1951
1952class PKAutoIncrementTest(fixtures.TestBase):
1953    def test_multi_integer_no_autoinc(self):
1954        pk = PrimaryKeyConstraint(Column("a", Integer), Column("b", Integer))
1955        t = Table("t", MetaData())
1956        t.append_constraint(pk)
1957
1958        is_(pk._autoincrement_column, None)
1959
1960    def test_multi_integer_multi_autoinc(self):
1961        pk = PrimaryKeyConstraint(
1962            Column("a", Integer, autoincrement=True),
1963            Column("b", Integer, autoincrement=True),
1964        )
1965        t = Table("t", MetaData())
1966        t.append_constraint(pk)
1967
1968        assert_raises_message(
1969            exc.ArgumentError,
1970            "Only one Column may be marked",
1971            lambda: pk._autoincrement_column,
1972        )
1973
1974    def test_single_integer_no_autoinc(self):
1975        pk = PrimaryKeyConstraint(Column("a", Integer))
1976        t = Table("t", MetaData())
1977        t.append_constraint(pk)
1978
1979        is_(pk._autoincrement_column, pk.columns["a"])
1980
1981    def test_single_string_no_autoinc(self):
1982        pk = PrimaryKeyConstraint(Column("a", String))
1983        t = Table("t", MetaData())
1984        t.append_constraint(pk)
1985
1986        is_(pk._autoincrement_column, None)
1987
1988    def test_single_string_illegal_autoinc(self):
1989        t = Table("t", MetaData(), Column("a", String, autoincrement=True))
1990        pk = PrimaryKeyConstraint(t.c.a)
1991        t.append_constraint(pk)
1992
1993        assert_raises_message(
1994            exc.ArgumentError,
1995            "Column type VARCHAR on column 't.a'",
1996            lambda: pk._autoincrement_column,
1997        )
1998
1999    def test_single_integer_default(self):
2000        t = Table(
2001            "t",
2002            MetaData(),
2003            Column("a", Integer, autoincrement=True, default=lambda: 1),
2004        )
2005        pk = PrimaryKeyConstraint(t.c.a)
2006        t.append_constraint(pk)
2007
2008        is_(pk._autoincrement_column, t.c.a)
2009
2010    def test_single_integer_server_default(self):
2011        # new as of 1.1; now that we have three states for autoincrement,
2012        # if the user puts autoincrement=True with a server_default, trust
2013        # them on it
2014        t = Table(
2015            "t",
2016            MetaData(),
2017            Column(
2018                "a", Integer, autoincrement=True, server_default=func.magic()
2019            ),
2020        )
2021        pk = PrimaryKeyConstraint(t.c.a)
2022        t.append_constraint(pk)
2023
2024        is_(pk._autoincrement_column, t.c.a)
2025
2026    def test_implicit_autoinc_but_fks(self):
2027        m = MetaData()
2028        Table("t1", m, Column("id", Integer, primary_key=True))
2029        t2 = Table("t2", MetaData(), Column("a", Integer, ForeignKey("t1.id")))
2030        pk = PrimaryKeyConstraint(t2.c.a)
2031        t2.append_constraint(pk)
2032        is_(pk._autoincrement_column, None)
2033
2034    def test_explicit_autoinc_but_fks(self):
2035        m = MetaData()
2036        Table("t1", m, Column("id", Integer, primary_key=True))
2037        t2 = Table(
2038            "t2",
2039            MetaData(),
2040            Column("a", Integer, ForeignKey("t1.id"), autoincrement=True),
2041        )
2042        pk = PrimaryKeyConstraint(t2.c.a)
2043        t2.append_constraint(pk)
2044        is_(pk._autoincrement_column, t2.c.a)
2045
2046        t3 = Table(
2047            "t3",
2048            MetaData(),
2049            Column(
2050                "a", Integer, ForeignKey("t1.id"), autoincrement="ignore_fk"
2051            ),
2052        )
2053        pk = PrimaryKeyConstraint(t3.c.a)
2054        t3.append_constraint(pk)
2055        is_(pk._autoincrement_column, t3.c.a)
2056
2057    def test_no_kw_args(self):
2058        with expect_raises_message(
2059            TypeError,
2060            r"Table\(\) takes at least two positional-only arguments",
2061            check_context=False,
2062        ):
2063            Table(name="foo", metadata=MetaData())
2064        with expect_raises_message(
2065            TypeError,
2066            r"Table\(\) takes at least two positional-only arguments",
2067            check_context=False,
2068        ):
2069            Table("foo", metadata=MetaData())
2070
2071
2072class SchemaTypeTest(fixtures.TestBase):
2073    __backend__ = True
2074
2075    class TrackEvents(object):
2076        column = None
2077        table = None
2078        evt_targets = ()
2079
2080        def _set_table(self, column, table):
2081            super(SchemaTypeTest.TrackEvents, self)._set_table(column, table)
2082            self.column = column
2083            self.table = table
2084
2085        def _on_table_create(self, target, bind, **kw):
2086            super(SchemaTypeTest.TrackEvents, self)._on_table_create(
2087                target, bind, **kw
2088            )
2089            self.evt_targets += (target,)
2090
2091        def _on_metadata_create(self, target, bind, **kw):
2092            super(SchemaTypeTest.TrackEvents, self)._on_metadata_create(
2093                target, bind, **kw
2094            )
2095            self.evt_targets += (target,)
2096
2097    # TODO: Enum and Boolean put TypeEngine first.  Changing that here
2098    # causes collection-mutate-while-iterated errors in the event system
2099    # since the hooks here call upon the adapted type.  Need to figure out
2100    # why Enum and Boolean don't have this problem.
2101    class MyType(TrackEvents, sqltypes.SchemaType, sqltypes.TypeEngine):
2102        pass
2103
2104    class WrapEnum(TrackEvents, Enum):
2105        pass
2106
2107    class WrapBoolean(TrackEvents, Boolean):
2108        pass
2109
2110    class MyTypeWImpl(MyType):
2111        def _gen_dialect_impl(self, dialect):
2112            return self.adapt(SchemaTypeTest.MyTypeImpl)
2113
2114    class MyTypeImpl(MyTypeWImpl):
2115        pass
2116
2117    class MyTypeDecAndSchema(TypeDecorator, sqltypes.SchemaType):
2118        impl = String()
2119        cache_ok = True
2120
2121        evt_targets = ()
2122
2123        def __init__(self):
2124            TypeDecorator.__init__(self)
2125            sqltypes.SchemaType.__init__(self)
2126
2127        def _on_table_create(self, target, bind, **kw):
2128            self.evt_targets += (target,)
2129
2130        def _on_metadata_create(self, target, bind, **kw):
2131            self.evt_targets += (target,)
2132
2133    def test_before_parent_attach_plain(self):
2134        typ = self.MyType()
2135        self._test_before_parent_attach(typ)
2136
2137    def test_before_parent_attach_typedec_enclosing_schematype(self):
2138        # additional test for [ticket:2919] as part of test for
2139        # [ticket:3832]
2140        # this also serves as the test for [ticket:6152]
2141
2142        class MySchemaType(sqltypes.TypeEngine, sqltypes.SchemaType):
2143            pass
2144
2145        target_typ = MySchemaType()
2146
2147        class MyType(TypeDecorator):
2148            impl = target_typ
2149            cache_ok = True
2150
2151        typ = MyType()
2152        self._test_before_parent_attach(typ, target_typ)
2153
2154    def test_before_parent_attach_array_enclosing_schematype(self):
2155        # test for [ticket:4141] which is the same idea as [ticket:3832]
2156        # for ARRAY
2157
2158        typ = ARRAY(String)
2159
2160        self._test_before_parent_attach(typ)
2161
2162    def test_before_parent_attach_typedec_of_schematype(self):
2163        class MyType(TypeDecorator, sqltypes.SchemaType):
2164            impl = String
2165            cache_ok = True
2166
2167        typ = MyType()
2168        self._test_before_parent_attach(typ)
2169
2170    def test_before_parent_attach_schematype_of_typedec(self):
2171        class MyType(sqltypes.SchemaType, TypeDecorator):
2172            impl = String
2173            cache_ok = True
2174
2175        typ = MyType()
2176        self._test_before_parent_attach(typ)
2177
2178    def test_before_parent_attach_variant_array_schematype(self):
2179
2180        target = Enum("one", "two", "three")
2181        typ = ARRAY(target).with_variant(String(), "other")
2182        self._test_before_parent_attach(typ, evt_target=target)
2183
2184    def _test_before_parent_attach(self, typ, evt_target=None):
2185        canary = mock.Mock()
2186
2187        if evt_target is None:
2188            evt_target = typ
2189
2190        orig_set_parent = evt_target._set_parent
2191        orig_set_parent_w_dispatch = evt_target._set_parent_with_dispatch
2192
2193        def _set_parent(parent, **kw):
2194            orig_set_parent(parent, **kw)
2195            canary._set_parent(parent)
2196
2197        def _set_parent_w_dispatch(parent):
2198            orig_set_parent_w_dispatch(parent)
2199            canary._set_parent_with_dispatch(parent)
2200
2201        with mock.patch.object(evt_target, "_set_parent", _set_parent):
2202            with mock.patch.object(
2203                evt_target, "_set_parent_with_dispatch", _set_parent_w_dispatch
2204            ):
2205                event.listen(evt_target, "before_parent_attach", canary.go)
2206
2207                c = Column("q", typ)
2208
2209        eq_(
2210            canary.mock_calls,
2211            [
2212                mock.call.go(evt_target, c),
2213                mock.call._set_parent(c),
2214                mock.call._set_parent_with_dispatch(c),
2215            ],
2216        )
2217
2218    def test_independent_schema(self):
2219        m = MetaData()
2220        type_ = self.MyType(schema="q")
2221        t1 = Table("x", m, Column("y", type_), schema="z")
2222        eq_(t1.c.y.type.schema, "q")
2223
2224    def test_inherit_schema_from_metadata(self):
2225        """test #6373"""
2226        m = MetaData(schema="q")
2227        type_ = self.MyType(metadata=m)
2228        t1 = Table("x", m, Column("y", type_), schema="z")
2229        eq_(t1.c.y.type.schema, "q")
2230
2231    def test_inherit_schema_from_table_override_metadata(self):
2232        """test #6373"""
2233        m = MetaData(schema="q")
2234        type_ = self.MyType(metadata=m, inherit_schema=True)
2235        t1 = Table("x", m, Column("y", type_), schema="z")
2236        eq_(t1.c.y.type.schema, "z")
2237
2238    def test_inherit_schema_from_metadata_override_explicit(self):
2239        """test #6373"""
2240        m = MetaData(schema="q")
2241        type_ = self.MyType(schema="e", metadata=m)
2242        t1 = Table("x", m, Column("y", type_), schema="z")
2243        eq_(t1.c.y.type.schema, "e")
2244
2245    def test_inherit_schema(self):
2246        m = MetaData()
2247        type_ = self.MyType(schema="q", inherit_schema=True)
2248        t1 = Table("x", m, Column("y", type_), schema="z")
2249        eq_(t1.c.y.type.schema, "z")
2250
2251    def test_independent_schema_enum(self):
2252        m = MetaData()
2253        type_ = sqltypes.Enum("a", schema="q")
2254        t1 = Table("x", m, Column("y", type_), schema="z")
2255        eq_(t1.c.y.type.schema, "q")
2256
2257    def test_inherit_schema_enum(self):
2258        m = MetaData()
2259        type_ = sqltypes.Enum("a", "b", "c", schema="q", inherit_schema=True)
2260        t1 = Table("x", m, Column("y", type_), schema="z")
2261        eq_(t1.c.y.type.schema, "z")
2262
2263    def test_to_metadata_copy_type(self):
2264        m1 = MetaData()
2265
2266        type_ = self.MyType()
2267        t1 = Table("x", m1, Column("y", type_))
2268
2269        m2 = MetaData()
2270        t2 = t1.to_metadata(m2)
2271
2272        # metadata isn't set
2273        is_(t2.c.y.type.metadata, None)
2274
2275        # our test type sets table, though
2276        is_(t2.c.y.type.table, t2)
2277
2278    def test_to_metadata_copy_decorated(self):
2279        class MyDecorated(TypeDecorator):
2280            impl = self.MyType
2281            cache_ok = True
2282
2283        m1 = MetaData()
2284
2285        type_ = MyDecorated(schema="z")
2286        t1 = Table("x", m1, Column("y", type_))
2287
2288        m2 = MetaData()
2289        t2 = t1.to_metadata(m2)
2290        eq_(t2.c.y.type.schema, "z")
2291
2292    def test_to_metadata_independent_schema(self):
2293        m1 = MetaData()
2294
2295        type_ = self.MyType()
2296        t1 = Table("x", m1, Column("y", type_))
2297
2298        m2 = MetaData()
2299        t2 = t1.to_metadata(m2, schema="bar")
2300
2301        eq_(t2.c.y.type.schema, None)
2302
2303    def test_to_metadata_inherit_schema(self):
2304        m1 = MetaData()
2305
2306        type_ = self.MyType(inherit_schema=True)
2307        t1 = Table("x", m1, Column("y", type_))
2308
2309        m2 = MetaData()
2310        t2 = t1.to_metadata(m2, schema="bar")
2311
2312        eq_(t1.c.y.type.schema, None)
2313        eq_(t2.c.y.type.schema, "bar")
2314
2315    def test_to_metadata_independent_events(self):
2316        m1 = MetaData()
2317
2318        type_ = self.MyType()
2319        t1 = Table("x", m1, Column("y", type_))
2320
2321        m2 = MetaData()
2322        t2 = t1.to_metadata(m2)
2323
2324        t1.dispatch.before_create(t1, testing.db)
2325        eq_(t1.c.y.type.evt_targets, (t1,))
2326        eq_(t2.c.y.type.evt_targets, ())
2327
2328        t2.dispatch.before_create(t2, testing.db)
2329        t2.dispatch.before_create(t2, testing.db)
2330        eq_(t1.c.y.type.evt_targets, (t1,))
2331        eq_(t2.c.y.type.evt_targets, (t2, t2))
2332
2333    def test_enum_column_copy_transfers_events(self):
2334        m = MetaData()
2335
2336        type_ = self.WrapEnum("a", "b", "c", name="foo")
2337        y = Column("y", type_)
2338        y_copy = y._copy()
2339        t1 = Table("x", m, y_copy)
2340
2341        is_true(y_copy.type._create_events)
2342
2343        # for PostgreSQL, this will emit CREATE TYPE
2344        m.dispatch.before_create(t1, testing.db)
2345        try:
2346            eq_(t1.c.y.type.evt_targets, (t1,))
2347        finally:
2348            # do the drop so that PostgreSQL emits DROP TYPE
2349            m.dispatch.after_drop(t1, testing.db)
2350
2351    def test_enum_nonnative_column_copy_transfers_events(self):
2352        m = MetaData()
2353
2354        type_ = self.WrapEnum("a", "b", "c", name="foo", native_enum=False)
2355        y = Column("y", type_)
2356        y_copy = y._copy()
2357        t1 = Table("x", m, y_copy)
2358
2359        is_true(y_copy.type._create_events)
2360
2361        m.dispatch.before_create(t1, testing.db)
2362        eq_(t1.c.y.type.evt_targets, (t1,))
2363
2364    def test_enum_nonnative_column_copy_transfers_constraintpref(self):
2365        m = MetaData()
2366
2367        type_ = self.WrapEnum(
2368            "a",
2369            "b",
2370            "c",
2371            name="foo",
2372            native_enum=False,
2373            create_constraint=False,
2374        )
2375        y = Column("y", type_)
2376        y_copy = y._copy()
2377        Table("x", m, y_copy)
2378
2379        is_false(y_copy.type.create_constraint)
2380
2381    def test_boolean_column_copy_transfers_events(self):
2382        m = MetaData()
2383
2384        type_ = self.WrapBoolean()
2385        y = Column("y", type_)
2386        y_copy = y._copy()
2387        Table("x", m, y_copy)
2388
2389        is_true(y_copy.type._create_events)
2390
2391    def test_boolean_nonnative_column_copy_transfers_constraintpref(self):
2392        m = MetaData()
2393
2394        type_ = self.WrapBoolean(create_constraint=False)
2395        y = Column("y", type_)
2396        y_copy = y._copy()
2397        Table("x", m, y_copy)
2398
2399        is_false(y_copy.type.create_constraint)
2400
2401    def test_metadata_dispatch_no_new_impl(self):
2402        m1 = MetaData()
2403        typ = self.MyType(metadata=m1)
2404        m1.dispatch.before_create(m1, testing.db)
2405        eq_(typ.evt_targets, (m1,))
2406
2407        dialect_impl = typ.dialect_impl(testing.db.dialect)
2408        eq_(dialect_impl.evt_targets, ())
2409
2410    def test_metadata_dispatch_new_impl(self):
2411        m1 = MetaData()
2412        typ = self.MyTypeWImpl(metadata=m1)
2413        m1.dispatch.before_create(m1, testing.db)
2414        eq_(typ.evt_targets, (m1,))
2415
2416        dialect_impl = typ.dialect_impl(testing.db.dialect)
2417        eq_(dialect_impl.evt_targets, (m1,))
2418
2419    def test_table_dispatch_decorator_schematype(self):
2420        m1 = MetaData()
2421        typ = self.MyTypeDecAndSchema()
2422        t1 = Table("t1", m1, Column("x", typ))
2423        m1.dispatch.before_create(t1, testing.db)
2424        eq_(typ.evt_targets, (t1,))
2425
2426    def test_table_dispatch_no_new_impl(self):
2427        m1 = MetaData()
2428        typ = self.MyType()
2429        t1 = Table("t1", m1, Column("x", typ))
2430        m1.dispatch.before_create(t1, testing.db)
2431        eq_(typ.evt_targets, (t1,))
2432
2433        dialect_impl = typ.dialect_impl(testing.db.dialect)
2434        eq_(dialect_impl.evt_targets, ())
2435
2436    def test_table_dispatch_new_impl(self):
2437        m1 = MetaData()
2438        typ = self.MyTypeWImpl()
2439        t1 = Table("t1", m1, Column("x", typ))
2440        m1.dispatch.before_create(t1, testing.db)
2441        eq_(typ.evt_targets, (t1,))
2442
2443        dialect_impl = typ.dialect_impl(testing.db.dialect)
2444        eq_(dialect_impl.evt_targets, (t1,))
2445
2446    def test_create_metadata_bound_no_crash(self):
2447        m1 = MetaData()
2448        self.MyType(metadata=m1)
2449
2450        m1.create_all(testing.db)
2451
2452    def test_boolean_constraint_type_doesnt_double(self):
2453        m1 = MetaData()
2454
2455        t1 = Table("x", m1, Column("flag", Boolean(create_constraint=True)))
2456        eq_(
2457            len([c for c in t1.constraints if isinstance(c, CheckConstraint)]),
2458            1,
2459        )
2460        m2 = MetaData()
2461        t2 = t1.to_metadata(m2)
2462
2463        eq_(
2464            len([c for c in t2.constraints if isinstance(c, CheckConstraint)]),
2465            1,
2466        )
2467
2468    def test_enum_constraint_type_doesnt_double(self):
2469        m1 = MetaData()
2470
2471        t1 = Table(
2472            "x",
2473            m1,
2474            Column("flag", Enum("a", "b", "c", create_constraint=True)),
2475        )
2476        eq_(
2477            len([c for c in t1.constraints if isinstance(c, CheckConstraint)]),
2478            1,
2479        )
2480        m2 = MetaData()
2481        t2 = t1.to_metadata(m2)
2482
2483        eq_(
2484            len([c for c in t2.constraints if isinstance(c, CheckConstraint)]),
2485            1,
2486        )
2487
2488
2489class SchemaTest(fixtures.TestBase, AssertsCompiledSQL):
2490    def test_default_schema_metadata_fk(self):
2491        m = MetaData(schema="foo")
2492        t1 = Table("t1", m, Column("x", Integer))
2493        t2 = Table("t2", m, Column("x", Integer, ForeignKey("t1.x")))
2494        assert t2.c.x.references(t1.c.x)
2495
2496    def test_ad_hoc_schema_equiv_fk(self):
2497        m = MetaData()
2498        t1 = Table("t1", m, Column("x", Integer), schema="foo")
2499        t2 = Table(
2500            "t2", m, Column("x", Integer, ForeignKey("t1.x")), schema="foo"
2501        )
2502        assert_raises(
2503            exc.NoReferencedTableError, lambda: t2.c.x.references(t1.c.x)
2504        )
2505
2506    def test_default_schema_metadata_fk_alt_remote(self):
2507        m = MetaData(schema="foo")
2508        t1 = Table("t1", m, Column("x", Integer))
2509        t2 = Table(
2510            "t2", m, Column("x", Integer, ForeignKey("t1.x")), schema="bar"
2511        )
2512        assert t2.c.x.references(t1.c.x)
2513
2514    def test_default_schema_metadata_fk_alt_local_raises(self):
2515        m = MetaData(schema="foo")
2516        t1 = Table("t1", m, Column("x", Integer), schema="bar")
2517        t2 = Table("t2", m, Column("x", Integer, ForeignKey("t1.x")))
2518        assert_raises(
2519            exc.NoReferencedTableError, lambda: t2.c.x.references(t1.c.x)
2520        )
2521
2522    def test_default_schema_metadata_fk_alt_local(self):
2523        m = MetaData(schema="foo")
2524        t1 = Table("t1", m, Column("x", Integer), schema="bar")
2525        t2 = Table("t2", m, Column("x", Integer, ForeignKey("bar.t1.x")))
2526        assert t2.c.x.references(t1.c.x)
2527
2528    def test_create_drop_schema(self):
2529
2530        self.assert_compile(
2531            schema.CreateSchema("sa_schema"), "CREATE SCHEMA sa_schema"
2532        )
2533        self.assert_compile(
2534            schema.DropSchema("sa_schema"), "DROP SCHEMA sa_schema"
2535        )
2536        self.assert_compile(
2537            schema.DropSchema("sa_schema", cascade=True),
2538            "DROP SCHEMA sa_schema CASCADE",
2539        )
2540
2541    def test_iteration(self):
2542        metadata = MetaData()
2543        table1 = Table(
2544            "table1",
2545            metadata,
2546            Column("col1", Integer, primary_key=True),
2547            schema="someschema",
2548        )
2549        table2 = Table(
2550            "table2",
2551            metadata,
2552            Column("col1", Integer, primary_key=True),
2553            Column("col2", Integer, ForeignKey("someschema.table1.col1")),
2554            schema="someschema",
2555        )
2556
2557        t1 = str(schema.CreateTable(table1).compile(bind=testing.db))
2558        t2 = str(schema.CreateTable(table2).compile(bind=testing.db))
2559        if testing.db.dialect.preparer(testing.db.dialect).omit_schema:
2560            assert t1.index("CREATE TABLE table1") > -1
2561            assert t2.index("CREATE TABLE table2") > -1
2562        else:
2563            assert t1.index("CREATE TABLE someschema.table1") > -1
2564            assert t2.index("CREATE TABLE someschema.table2") > -1
2565
2566
2567class UseExistingTest(fixtures.TablesTest):
2568    @classmethod
2569    def define_tables(cls, metadata):
2570        Table(
2571            "users",
2572            metadata,
2573            Column("id", Integer, primary_key=True),
2574            Column("name", String(30)),
2575        )
2576
2577    @testing.fixture
2578    def existing_meta(self):
2579        meta2 = MetaData()
2580        Table("users", meta2, autoload_with=testing.db)
2581        return meta2
2582
2583    @testing.fixture
2584    def empty_meta(self):
2585        return MetaData()
2586
2587    def test_exception_no_flags(self, existing_meta):
2588        def go():
2589            Table(
2590                "users",
2591                existing_meta,
2592                Column("name", Unicode),
2593                autoload_with=testing.db,
2594            )
2595
2596        assert_raises_message(
2597            exc.InvalidRequestError,
2598            "Table 'users' is already defined for this " "MetaData instance.",
2599            go,
2600        )
2601
2602    def test_keep_plus_existing_raises(self, existing_meta):
2603        assert_raises(
2604            exc.ArgumentError,
2605            Table,
2606            "users",
2607            existing_meta,
2608            keep_existing=True,
2609            extend_existing=True,
2610        )
2611
2612    def test_keep_existing_no_dupe_constraints(self, empty_meta):
2613        users = Table(
2614            "users",
2615            empty_meta,
2616            Column("id", Integer),
2617            Column("name", Unicode),
2618            UniqueConstraint("name"),
2619            keep_existing=True,
2620        )
2621        assert "name" in users.c
2622        assert "id" in users.c
2623        eq_(len(users.constraints), 2)
2624
2625        u2 = Table(
2626            "users",
2627            empty_meta,
2628            Column("id", Integer),
2629            Column("name", Unicode),
2630            UniqueConstraint("name"),
2631            keep_existing=True,
2632        )
2633        eq_(len(u2.constraints), 2)
2634
2635    def test_extend_existing_dupes_constraints(self, empty_meta):
2636        users = Table(
2637            "users",
2638            empty_meta,
2639            Column("id", Integer),
2640            Column("name", Unicode),
2641            UniqueConstraint("name"),
2642            extend_existing=True,
2643        )
2644        assert "name" in users.c
2645        assert "id" in users.c
2646        eq_(len(users.constraints), 2)
2647
2648        u2 = Table(
2649            "users",
2650            empty_meta,
2651            Column("id", Integer),
2652            Column("name", Unicode),
2653            UniqueConstraint("name"),
2654            extend_existing=True,
2655        )
2656        # constraint got duped
2657        eq_(len(u2.constraints), 3)
2658
2659    def test_autoload_replace_column(self, empty_meta):
2660        users = Table(
2661            "users",
2662            empty_meta,
2663            Column("name", Unicode),
2664            autoload_with=testing.db,
2665        )
2666        assert isinstance(users.c.name.type, Unicode)
2667
2668    def test_keep_existing_coltype(self, existing_meta):
2669        users = Table(
2670            "users",
2671            existing_meta,
2672            Column("name", Unicode),
2673            autoload_with=testing.db,
2674            keep_existing=True,
2675        )
2676        assert not isinstance(users.c.name.type, Unicode)
2677
2678    def test_keep_existing_quote(self, existing_meta):
2679        users = Table(
2680            "users",
2681            existing_meta,
2682            quote=True,
2683            autoload_with=testing.db,
2684            keep_existing=True,
2685        )
2686        assert not users.name.quote
2687
2688    def test_keep_existing_add_column(self, existing_meta):
2689        users = Table(
2690            "users",
2691            existing_meta,
2692            Column("foo", Integer),
2693            autoload_with=testing.db,
2694            keep_existing=True,
2695        )
2696        assert "foo" not in users.c
2697
2698    def test_keep_existing_coltype_no_orig(self, empty_meta):
2699        users = Table(
2700            "users",
2701            empty_meta,
2702            Column("name", Unicode),
2703            autoload_with=testing.db,
2704            keep_existing=True,
2705        )
2706        assert isinstance(users.c.name.type, Unicode)
2707
2708    @testing.skip_if(
2709        lambda: testing.db.dialect.requires_name_normalize,
2710        "test depends on lowercase as case insensitive",
2711    )
2712    def test_keep_existing_quote_no_orig(self, empty_meta):
2713        users = Table(
2714            "users",
2715            empty_meta,
2716            quote=True,
2717            autoload_with=testing.db,
2718            keep_existing=True,
2719        )
2720        assert users.name.quote
2721
2722    def test_keep_existing_add_column_no_orig(self, empty_meta):
2723        users = Table(
2724            "users",
2725            empty_meta,
2726            Column("foo", Integer),
2727            autoload_with=testing.db,
2728            keep_existing=True,
2729        )
2730        assert "foo" in users.c
2731
2732    def test_keep_existing_coltype_no_reflection(self, existing_meta):
2733        users = Table(
2734            "users", existing_meta, Column("name", Unicode), keep_existing=True
2735        )
2736        assert not isinstance(users.c.name.type, Unicode)
2737
2738    def test_keep_existing_quote_no_reflection(self, existing_meta):
2739        users = Table("users", existing_meta, quote=True, keep_existing=True)
2740        assert not users.name.quote
2741
2742    def test_keep_existing_add_column_no_reflection(self, existing_meta):
2743        users = Table(
2744            "users", existing_meta, Column("foo", Integer), keep_existing=True
2745        )
2746        assert "foo" not in users.c
2747
2748    def test_extend_existing_coltype(self, existing_meta):
2749        users = Table(
2750            "users",
2751            existing_meta,
2752            Column("name", Unicode),
2753            autoload_with=testing.db,
2754            extend_existing=True,
2755        )
2756        assert isinstance(users.c.name.type, Unicode)
2757
2758    def test_extend_existing_quote(self, existing_meta):
2759        assert_raises_message(
2760            tsa.exc.ArgumentError,
2761            "Can't redefine 'quote' or 'quote_schema' arguments",
2762            Table,
2763            "users",
2764            existing_meta,
2765            quote=True,
2766            autoload_with=testing.db,
2767            extend_existing=True,
2768        )
2769
2770    def test_extend_existing_add_column(self, existing_meta):
2771        users = Table(
2772            "users",
2773            existing_meta,
2774            Column("foo", Integer),
2775            autoload_with=testing.db,
2776            extend_existing=True,
2777        )
2778        assert "foo" in users.c
2779
2780    def test_extend_existing_coltype_no_orig(self, empty_meta):
2781        users = Table(
2782            "users",
2783            empty_meta,
2784            Column("name", Unicode),
2785            autoload_with=testing.db,
2786            extend_existing=True,
2787        )
2788        assert isinstance(users.c.name.type, Unicode)
2789
2790    @testing.skip_if(
2791        lambda: testing.db.dialect.requires_name_normalize,
2792        "test depends on lowercase as case insensitive",
2793    )
2794    def test_extend_existing_quote_no_orig(self, empty_meta):
2795        users = Table(
2796            "users",
2797            empty_meta,
2798            quote=True,
2799            autoload_with=testing.db,
2800            extend_existing=True,
2801        )
2802        assert users.name.quote
2803
2804    def test_extend_existing_add_column_no_orig(self, empty_meta):
2805        users = Table(
2806            "users",
2807            empty_meta,
2808            Column("foo", Integer),
2809            autoload_with=testing.db,
2810            extend_existing=True,
2811        )
2812        assert "foo" in users.c
2813
2814    def test_extend_existing_coltype_no_reflection(self, existing_meta):
2815        users = Table(
2816            "users",
2817            existing_meta,
2818            Column("name", Unicode),
2819            extend_existing=True,
2820        )
2821        assert isinstance(users.c.name.type, Unicode)
2822
2823    def test_extend_existing_quote_no_reflection(self, existing_meta):
2824        assert_raises_message(
2825            tsa.exc.ArgumentError,
2826            "Can't redefine 'quote' or 'quote_schema' arguments",
2827            Table,
2828            "users",
2829            existing_meta,
2830            quote=True,
2831            extend_existing=True,
2832        )
2833
2834    def test_extend_existing_add_column_no_reflection(self, existing_meta):
2835        users = Table(
2836            "users",
2837            existing_meta,
2838            Column("foo", Integer),
2839            extend_existing=True,
2840        )
2841        assert "foo" in users.c
2842
2843
2844class ConstraintTest(fixtures.TestBase):
2845    def _single_fixture(self):
2846        m = MetaData()
2847
2848        t1 = Table("t1", m, Column("a", Integer), Column("b", Integer))
2849
2850        t2 = Table("t2", m, Column("a", Integer, ForeignKey("t1.a")))
2851
2852        t3 = Table("t3", m, Column("a", Integer))
2853        return t1, t2, t3
2854
2855    def _assert_index_col_x(self, t, i, columns=True):
2856        eq_(t.indexes, set([i]))
2857        if columns:
2858            eq_(list(i.columns), [t.c.x])
2859        else:
2860            eq_(list(i.columns), [])
2861        assert i.table is t
2862
2863    def test_separate_decl_columns(self):
2864        m = MetaData()
2865        t = Table("t", m, Column("x", Integer))
2866        i = Index("i", t.c.x)
2867        self._assert_index_col_x(t, i)
2868
2869    def test_separate_decl_columns_functional(self):
2870        m = MetaData()
2871        t = Table("t", m, Column("x", Integer))
2872        i = Index("i", func.foo(t.c.x))
2873        self._assert_index_col_x(t, i)
2874
2875    def test_index_no_cols_private_table_arg(self):
2876        m = MetaData()
2877        t = Table("t", m, Column("x", Integer))
2878        i = Index("i", _table=t)
2879        is_(i.table, t)
2880        eq_(list(i.columns), [])
2881
2882    def test_index_w_cols_private_table_arg(self):
2883        m = MetaData()
2884        t = Table("t", m, Column("x", Integer))
2885        i = Index("i", t.c.x, _table=t)
2886        is_(i.table, t)
2887
2888        eq_(list(i.columns), [t.c.x])
2889
2890    def test_inline_decl_columns(self):
2891        m = MetaData()
2892        c = Column("x", Integer)
2893        i = Index("i", c)
2894        t = Table("t", m, c, i)
2895        self._assert_index_col_x(t, i)
2896
2897    def test_inline_decl_columns_functional(self):
2898        m = MetaData()
2899        c = Column("x", Integer)
2900        i = Index("i", func.foo(c))
2901        t = Table("t", m, c, i)
2902        self._assert_index_col_x(t, i)
2903
2904    def test_inline_decl_string(self):
2905        m = MetaData()
2906        i = Index("i", "x")
2907        t = Table("t", m, Column("x", Integer), i)
2908        self._assert_index_col_x(t, i)
2909
2910    def test_inline_decl_textonly(self):
2911        m = MetaData()
2912        i = Index("i", text("foobar(x)"))
2913        t = Table("t", m, Column("x", Integer), i)
2914        self._assert_index_col_x(t, i, columns=False)
2915
2916    def test_separate_decl_textonly(self):
2917        m = MetaData()
2918        i = Index("i", text("foobar(x)"))
2919        t = Table("t", m, Column("x", Integer))
2920        t.append_constraint(i)
2921        self._assert_index_col_x(t, i, columns=False)
2922
2923    def test_unnamed_column_exception(self):
2924        # this can occur in some declarative situations
2925        c = Column(Integer)
2926        idx = Index("q", c)
2927        m = MetaData()
2928        t = Table("t", m, Column("q"))
2929        assert_raises_message(
2930            exc.ArgumentError,
2931            "Can't add unnamed column to column collection",
2932            t.append_constraint,
2933            idx,
2934        )
2935
2936    def test_non_attached_col_plus_string_expr(self):
2937        # another one that declarative can lead towards
2938        metadata = MetaData()
2939
2940        t1 = Table("a", metadata, Column("id", Integer))
2941
2942        c2 = Column("x", Integer)
2943
2944        # if we do it here, no problem
2945        # t1.append_column(c2)
2946
2947        idx = Index("foo", c2, desc("foo"))
2948
2949        t1.append_column(c2)
2950
2951        self._assert_index_col_x(t1, idx, columns=True)
2952
2953    def test_column_associated_w_lowercase_table(self):
2954        from sqlalchemy import table
2955
2956        c = Column("x", Integer)
2957        table("foo", c)
2958        idx = Index("q", c)
2959        is_(idx.table, None)  # lower-case-T table doesn't have indexes
2960
2961    def test_clauseelement_extraction_one(self):
2962        t = Table("t", MetaData(), Column("x", Integer), Column("y", Integer))
2963
2964        class MyThing(object):
2965            def __clause_element__(self):
2966                return t.c.x + 5
2967
2968        idx = Index("foo", MyThing())
2969        self._assert_index_col_x(t, idx)
2970
2971    def test_clauseelement_extraction_two(self):
2972        t = Table("t", MetaData(), Column("x", Integer), Column("y", Integer))
2973
2974        class MyThing(object):
2975            def __clause_element__(self):
2976                return t.c.x + 5
2977
2978        idx = Index("bar", MyThing(), t.c.y)
2979
2980        eq_(set(t.indexes), set([idx]))
2981
2982    def test_clauseelement_extraction_three(self):
2983        t = Table("t", MetaData(), Column("x", Integer), Column("y", Integer))
2984
2985        expr1 = t.c.x + 5
2986
2987        class MyThing(object):
2988            def __clause_element__(self):
2989                return expr1
2990
2991        idx = Index("bar", MyThing(), t.c.y)
2992
2993        is_true(idx.expressions[0].compare(expr1))
2994        is_(idx.expressions[1], t.c.y)
2995
2996    def test_table_references(self):
2997        t1, t2, t3 = self._single_fixture()
2998        assert list(t2.c.a.foreign_keys)[0].references(t1)
2999        assert not list(t2.c.a.foreign_keys)[0].references(t3)
3000
3001    def test_column_references(self):
3002        t1, t2, t3 = self._single_fixture()
3003        assert t2.c.a.references(t1.c.a)
3004        assert not t2.c.a.references(t3.c.a)
3005        assert not t2.c.a.references(t1.c.b)
3006
3007    def test_column_references_derived(self):
3008        t1, t2, t3 = self._single_fixture()
3009        s1 = tsa.select(tsa.select(t1).alias()).subquery()
3010        assert t2.c.a.references(s1.c.a)
3011        assert not t2.c.a.references(s1.c.b)
3012
3013    def test_copy_doesnt_reference(self):
3014        t1, t2, t3 = self._single_fixture()
3015        a2 = t2.c.a._copy()
3016        assert not a2.references(t1.c.a)
3017        assert not a2.references(t1.c.b)
3018
3019    def test_derived_column_references(self):
3020        t1, t2, t3 = self._single_fixture()
3021        s1 = tsa.select(tsa.select(t2).alias()).subquery()
3022        assert s1.c.a.references(t1.c.a)
3023        assert not s1.c.a.references(t1.c.b)
3024
3025    def test_referred_table_accessor(self):
3026        t1, t2, t3 = self._single_fixture()
3027        fkc = list(t2.foreign_key_constraints)[0]
3028        is_(fkc.referred_table, t1)
3029
3030    def test_referred_table_accessor_not_available(self):
3031        t1 = Table("t", MetaData(), Column("x", ForeignKey("q.id")))
3032        fkc = list(t1.foreign_key_constraints)[0]
3033        assert_raises_message(
3034            exc.InvalidRequestError,
3035            "Foreign key associated with column 't.x' could not find "
3036            "table 'q' with which to generate a foreign key to target "
3037            "column 'id'",
3038            getattr,
3039            fkc,
3040            "referred_table",
3041        )
3042
3043    def test_related_column_not_present_atfirst_ok(self):
3044        m = MetaData()
3045        base_table = Table("base", m, Column("id", Integer, primary_key=True))
3046        fk = ForeignKey("base.q")
3047        derived_table = Table(
3048            "derived", m, Column("id", None, fk, primary_key=True)
3049        )
3050
3051        base_table.append_column(Column("q", Integer))
3052        assert fk.column is base_table.c.q
3053        assert isinstance(derived_table.c.id.type, Integer)
3054
3055    def test_related_column_not_present_atfirst_ok_onname(self):
3056        m = MetaData()
3057        base_table = Table("base", m, Column("id", Integer, primary_key=True))
3058        fk = ForeignKey("base.q", link_to_name=True)
3059        derived_table = Table(
3060            "derived", m, Column("id", None, fk, primary_key=True)
3061        )
3062
3063        base_table.append_column(Column("q", Integer, key="zz"))
3064        assert fk.column is base_table.c.zz
3065        assert isinstance(derived_table.c.id.type, Integer)
3066
3067    def test_related_column_not_present_atfirst_ok_linktoname_conflict(self):
3068        m = MetaData()
3069        base_table = Table("base", m, Column("id", Integer, primary_key=True))
3070        fk = ForeignKey("base.q", link_to_name=True)
3071        derived_table = Table(
3072            "derived", m, Column("id", None, fk, primary_key=True)
3073        )
3074
3075        base_table.append_column(Column("zz", Integer, key="q"))
3076        base_table.append_column(Column("q", Integer, key="zz"))
3077        assert fk.column is base_table.c.zz
3078        assert isinstance(derived_table.c.id.type, Integer)
3079
3080    def test_invalid_composite_fk_check_strings(self):
3081        m = MetaData()
3082
3083        assert_raises_message(
3084            exc.ArgumentError,
3085            r"ForeignKeyConstraint on t1\(x, y\) refers to "
3086            "multiple remote tables: t2 and t3",
3087            Table,
3088            "t1",
3089            m,
3090            Column("x", Integer),
3091            Column("y", Integer),
3092            ForeignKeyConstraint(["x", "y"], ["t2.x", "t3.y"]),
3093        )
3094
3095    def test_invalid_composite_fk_check_columns(self):
3096        m = MetaData()
3097
3098        t2 = Table("t2", m, Column("x", Integer))
3099        t3 = Table("t3", m, Column("y", Integer))
3100
3101        assert_raises_message(
3102            exc.ArgumentError,
3103            r"ForeignKeyConstraint on t1\(x, y\) refers to "
3104            "multiple remote tables: t2 and t3",
3105            Table,
3106            "t1",
3107            m,
3108            Column("x", Integer),
3109            Column("y", Integer),
3110            ForeignKeyConstraint(["x", "y"], [t2.c.x, t3.c.y]),
3111        )
3112
3113    def test_invalid_composite_fk_check_columns_notattached(self):
3114        m = MetaData()
3115        x = Column("x", Integer)
3116        y = Column("y", Integer)
3117
3118        # no error is raised for this one right now.
3119        # which is a minor bug.
3120        Table(
3121            "t1",
3122            m,
3123            Column("x", Integer),
3124            Column("y", Integer),
3125            ForeignKeyConstraint(["x", "y"], [x, y]),
3126        )
3127
3128        Table("t2", m, x)
3129        Table("t3", m, y)
3130
3131    def test_constraint_copied_to_proxy_ok(self):
3132        m = MetaData()
3133        Table("t1", m, Column("id", Integer, primary_key=True))
3134        t2 = Table(
3135            "t2",
3136            m,
3137            Column("id", Integer, ForeignKey("t1.id"), primary_key=True),
3138        )
3139
3140        s = tsa.select(t2).subquery()
3141        t2fk = list(t2.c.id.foreign_keys)[0]
3142        sfk = list(s.c.id.foreign_keys)[0]
3143
3144        # the two FKs share the ForeignKeyConstraint
3145        is_(t2fk.constraint, sfk.constraint)
3146
3147        # but the ForeignKeyConstraint isn't
3148        # aware of the select's FK
3149        eq_(t2fk.constraint.elements, [t2fk])
3150
3151    def test_type_propagate_composite_fk_string(self):
3152        metadata = MetaData()
3153        Table(
3154            "a",
3155            metadata,
3156            Column("key1", Integer, primary_key=True),
3157            Column("key2", String(40), primary_key=True),
3158        )
3159
3160        b = Table(
3161            "b",
3162            metadata,
3163            Column("a_key1", None),
3164            Column("a_key2", None),
3165            Column("id", Integer, primary_key=True),
3166            ForeignKeyConstraint(["a_key1", "a_key2"], ["a.key1", "a.key2"]),
3167        )
3168
3169        assert isinstance(b.c.a_key1.type, Integer)
3170        assert isinstance(b.c.a_key2.type, String)
3171
3172    def test_type_propagate_composite_fk_col(self):
3173        metadata = MetaData()
3174        a = Table(
3175            "a",
3176            metadata,
3177            Column("key1", Integer, primary_key=True),
3178            Column("key2", String(40), primary_key=True),
3179        )
3180
3181        b = Table(
3182            "b",
3183            metadata,
3184            Column("a_key1", None),
3185            Column("a_key2", None),
3186            Column("id", Integer, primary_key=True),
3187            ForeignKeyConstraint(["a_key1", "a_key2"], [a.c.key1, a.c.key2]),
3188        )
3189
3190        assert isinstance(b.c.a_key1.type, Integer)
3191        assert isinstance(b.c.a_key2.type, String)
3192
3193    def test_type_propagate_standalone_fk_string(self):
3194        metadata = MetaData()
3195        Table("a", metadata, Column("key1", Integer, primary_key=True))
3196
3197        b = Table("b", metadata, Column("a_key1", None, ForeignKey("a.key1")))
3198
3199        assert isinstance(b.c.a_key1.type, Integer)
3200
3201    def test_type_propagate_standalone_fk_col(self):
3202        metadata = MetaData()
3203        a = Table("a", metadata, Column("key1", Integer, primary_key=True))
3204
3205        b = Table("b", metadata, Column("a_key1", None, ForeignKey(a.c.key1)))
3206
3207        assert isinstance(b.c.a_key1.type, Integer)
3208
3209    def test_type_propagate_chained_string_source_first(self):
3210        metadata = MetaData()
3211        Table("a", metadata, Column("key1", Integer, primary_key=True))
3212
3213        b = Table("b", metadata, Column("a_key1", None, ForeignKey("a.key1")))
3214
3215        c = Table(
3216            "c", metadata, Column("b_key1", None, ForeignKey("b.a_key1"))
3217        )
3218
3219        assert isinstance(b.c.a_key1.type, Integer)
3220        assert isinstance(c.c.b_key1.type, Integer)
3221
3222    def test_type_propagate_chained_string_source_last(self):
3223        metadata = MetaData()
3224
3225        b = Table("b", metadata, Column("a_key1", None, ForeignKey("a.key1")))
3226
3227        c = Table(
3228            "c", metadata, Column("b_key1", None, ForeignKey("b.a_key1"))
3229        )
3230
3231        Table("a", metadata, Column("key1", Integer, primary_key=True))
3232
3233        assert isinstance(b.c.a_key1.type, Integer)
3234        assert isinstance(c.c.b_key1.type, Integer)
3235
3236    def test_type_propagate_chained_string_source_last_onname(self):
3237        metadata = MetaData()
3238
3239        b = Table(
3240            "b",
3241            metadata,
3242            Column(
3243                "a_key1",
3244                None,
3245                ForeignKey("a.key1", link_to_name=True),
3246                key="ak1",
3247            ),
3248        )
3249
3250        c = Table(
3251            "c",
3252            metadata,
3253            Column(
3254                "b_key1",
3255                None,
3256                ForeignKey("b.a_key1", link_to_name=True),
3257                key="bk1",
3258            ),
3259        )
3260
3261        Table(
3262            "a", metadata, Column("key1", Integer, primary_key=True, key="ak1")
3263        )
3264
3265        assert isinstance(b.c.ak1.type, Integer)
3266        assert isinstance(c.c.bk1.type, Integer)
3267
3268    def test_type_propagate_chained_string_source_last_onname_conflict(self):
3269        metadata = MetaData()
3270
3271        b = Table(
3272            "b",
3273            metadata,
3274            # b.c.key1 -> a.c.key1 -> String
3275            Column(
3276                "ak1",
3277                None,
3278                ForeignKey("a.key1", link_to_name=False),
3279                key="key1",
3280            ),
3281            # b.c.ak1 -> a.c.ak1 -> Integer
3282            Column(
3283                "a_key1",
3284                None,
3285                ForeignKey("a.key1", link_to_name=True),
3286                key="ak1",
3287            ),
3288        )
3289
3290        c = Table(
3291            "c",
3292            metadata,
3293            # c.c.b_key1 -> b.c.ak1 -> Integer
3294            Column("b_key1", None, ForeignKey("b.ak1", link_to_name=False)),
3295            # c.c.b_ak1 -> b.c.ak1
3296            Column("b_ak1", None, ForeignKey("b.ak1", link_to_name=True)),
3297        )
3298
3299        Table(
3300            "a",
3301            metadata,
3302            # a.c.key1
3303            Column("ak1", String, key="key1"),
3304            # a.c.ak1
3305            Column("key1", Integer, primary_key=True, key="ak1"),
3306        )
3307
3308        assert isinstance(b.c.key1.type, String)
3309        assert isinstance(b.c.ak1.type, Integer)
3310
3311        assert isinstance(c.c.b_ak1.type, String)
3312        assert isinstance(c.c.b_key1.type, Integer)
3313
3314    def test_type_propagate_chained_col_orig_first(self):
3315        metadata = MetaData()
3316        a = Table("a", metadata, Column("key1", Integer, primary_key=True))
3317
3318        b = Table("b", metadata, Column("a_key1", None, ForeignKey(a.c.key1)))
3319
3320        c = Table(
3321            "c", metadata, Column("b_key1", None, ForeignKey(b.c.a_key1))
3322        )
3323
3324        assert isinstance(b.c.a_key1.type, Integer)
3325        assert isinstance(c.c.b_key1.type, Integer)
3326
3327    def test_column_accessor_col(self):
3328        c1 = Column("x", Integer)
3329        fk = ForeignKey(c1)
3330        is_(fk.column, c1)
3331
3332    def test_column_accessor_clause_element(self):
3333        c1 = Column("x", Integer)
3334
3335        class CThing(object):
3336            def __init__(self, c):
3337                self.c = c
3338
3339            def __clause_element__(self):
3340                return self.c
3341
3342        fk = ForeignKey(CThing(c1))
3343        is_(fk.column, c1)
3344
3345    def test_column_accessor_string_no_parent(self):
3346        fk = ForeignKey("sometable.somecol")
3347        assert_raises_message(
3348            exc.InvalidRequestError,
3349            "this ForeignKey object does not yet have a parent "
3350            "Column associated with it.",
3351            getattr,
3352            fk,
3353            "column",
3354        )
3355
3356    def test_column_accessor_string_no_parent_table(self):
3357        fk = ForeignKey("sometable.somecol")
3358        Column("x", fk)
3359        assert_raises_message(
3360            exc.InvalidRequestError,
3361            "this ForeignKey's parent column is not yet "
3362            "associated with a Table.",
3363            getattr,
3364            fk,
3365            "column",
3366        )
3367
3368    def test_column_accessor_string_no_target_table(self):
3369        fk = ForeignKey("sometable.somecol")
3370        c1 = Column("x", fk)
3371        Table("t", MetaData(), c1)
3372        assert_raises_message(
3373            exc.NoReferencedTableError,
3374            "Foreign key associated with column 't.x' could not find "
3375            "table 'sometable' with which to generate a "
3376            "foreign key to target column 'somecol'",
3377            getattr,
3378            fk,
3379            "column",
3380        )
3381
3382    def test_column_accessor_string_no_target_column(self):
3383        fk = ForeignKey("sometable.somecol")
3384        c1 = Column("x", fk)
3385        m = MetaData()
3386        Table("t", m, c1)
3387        Table("sometable", m, Column("notsomecol", Integer))
3388        assert_raises_message(
3389            exc.NoReferencedColumnError,
3390            "Could not initialize target column for ForeignKey "
3391            "'sometable.somecol' on table 't': "
3392            "table 'sometable' has no column named 'somecol'",
3393            getattr,
3394            fk,
3395            "column",
3396        )
3397
3398    def test_remove_table_fk_bookkeeping(self):
3399        metadata = MetaData()
3400        fk = ForeignKey("t1.x")
3401        t2 = Table("t2", metadata, Column("y", Integer, fk))
3402        t3 = Table("t3", metadata, Column("y", Integer, ForeignKey("t1.x")))
3403
3404        assert t2.key in metadata.tables
3405        assert ("t1", "x") in metadata._fk_memos
3406
3407        metadata.remove(t2)
3408
3409        # key is removed
3410        assert t2.key not in metadata.tables
3411
3412        # the memo for the FK is still there
3413        assert ("t1", "x") in metadata._fk_memos
3414
3415        # fk is not in the collection
3416        assert fk not in metadata._fk_memos[("t1", "x")]
3417
3418        # make the referenced table
3419        t1 = Table("t1", metadata, Column("x", Integer))
3420
3421        # t2 tells us exactly what's wrong
3422        assert_raises_message(
3423            exc.InvalidRequestError,
3424            "Table t2 is no longer associated with its parent MetaData",
3425            getattr,
3426            fk,
3427            "column",
3428        )
3429
3430        # t3 is unaffected
3431        assert t3.c.y.references(t1.c.x)
3432
3433        # remove twice OK
3434        metadata.remove(t2)
3435
3436    def test_double_fk_usage_raises(self):
3437        f = ForeignKey("b.id")
3438
3439        Column("x", Integer, f)
3440        assert_raises(exc.InvalidRequestError, Column, "y", Integer, f)
3441
3442    def test_auto_append_constraint(self):
3443        m = MetaData()
3444
3445        t = Table("tbl", m, Column("a", Integer), Column("b", Integer))
3446
3447        t2 = Table("t2", m, Column("a", Integer), Column("b", Integer))
3448
3449        for c in (
3450            UniqueConstraint(t.c.a),
3451            CheckConstraint(t.c.a > 5),
3452            ForeignKeyConstraint([t.c.a], [t2.c.a]),
3453            PrimaryKeyConstraint(t.c.a),
3454        ):
3455            assert c in t.constraints
3456            t.append_constraint(c)
3457            assert c in t.constraints
3458
3459        c = Index("foo", t.c.a)
3460        assert c in t.indexes
3461
3462    def test_auto_append_lowercase_table(self):
3463        from sqlalchemy import table, column
3464
3465        t = table("t", column("a"))
3466        t2 = table("t2", column("a"))
3467        for c in (
3468            UniqueConstraint(t.c.a),
3469            CheckConstraint(t.c.a > 5),
3470            ForeignKeyConstraint([t.c.a], [t2.c.a]),
3471            PrimaryKeyConstraint(t.c.a),
3472            Index("foo", t.c.a),
3473        ):
3474            assert True
3475
3476    def test_to_metadata_ok(self):
3477        m = MetaData()
3478
3479        t = Table("tbl", m, Column("a", Integer), Column("b", Integer))
3480
3481        t2 = Table("t2", m, Column("a", Integer), Column("b", Integer))
3482
3483        UniqueConstraint(t.c.a)
3484        CheckConstraint(t.c.a > 5)
3485        ForeignKeyConstraint([t.c.a], [t2.c.a])
3486        PrimaryKeyConstraint(t.c.a)
3487
3488        m2 = MetaData()
3489
3490        t3 = t.to_metadata(m2)
3491
3492        eq_(len(t3.constraints), 4)
3493
3494        for c in t3.constraints:
3495            assert c.table is t3
3496
3497    def test_check_constraint_copy(self):
3498        m = MetaData()
3499        t = Table("tbl", m, Column("a", Integer), Column("b", Integer))
3500        ck = CheckConstraint(t.c.a > 5)
3501        ck2 = ck._copy()
3502        assert ck in t.constraints
3503        assert ck2 not in t.constraints
3504
3505    def test_ambig_check_constraint_auto_append(self):
3506        m = MetaData()
3507
3508        t = Table("tbl", m, Column("a", Integer), Column("b", Integer))
3509
3510        t2 = Table("t2", m, Column("a", Integer), Column("b", Integer))
3511        c = CheckConstraint(t.c.a > t2.c.b)
3512        assert c not in t.constraints
3513        assert c not in t2.constraints
3514
3515    def test_auto_append_ck_on_col_attach_one(self):
3516        m = MetaData()
3517
3518        a = Column("a", Integer)
3519        b = Column("b", Integer)
3520        ck = CheckConstraint(a > b)
3521
3522        t = Table("tbl", m, a, b)
3523        assert ck in t.constraints
3524
3525    def test_auto_append_ck_on_col_attach_two(self):
3526        m = MetaData()
3527
3528        a = Column("a", Integer)
3529        b = Column("b", Integer)
3530        c = Column("c", Integer)
3531        ck = CheckConstraint(a > b + c)
3532
3533        t = Table("tbl", m, a)
3534        assert ck not in t.constraints
3535
3536        t.append_column(b)
3537        assert ck not in t.constraints
3538
3539        t.append_column(c)
3540        assert ck in t.constraints
3541
3542    def test_auto_append_ck_on_col_attach_three(self):
3543        m = MetaData()
3544
3545        a = Column("a", Integer)
3546        b = Column("b", Integer)
3547        c = Column("c", Integer)
3548        ck = CheckConstraint(a > b + c)
3549
3550        t = Table("tbl", m, a)
3551        assert ck not in t.constraints
3552
3553        t.append_column(b)
3554        assert ck not in t.constraints
3555
3556        t2 = Table("t2", m)
3557        t2.append_column(c)
3558
3559        # two different tables, so CheckConstraint does nothing.
3560        assert ck not in t.constraints
3561
3562    def test_auto_append_uq_on_col_attach_one(self):
3563        m = MetaData()
3564
3565        a = Column("a", Integer)
3566        b = Column("b", Integer)
3567        uq = UniqueConstraint(a, b)
3568
3569        t = Table("tbl", m, a, b)
3570        assert uq in t.constraints
3571
3572    def test_auto_append_uq_on_col_attach_two(self):
3573        m = MetaData()
3574
3575        a = Column("a", Integer)
3576        b = Column("b", Integer)
3577        c = Column("c", Integer)
3578        uq = UniqueConstraint(a, b, c)
3579
3580        t = Table("tbl", m, a)
3581        assert uq not in t.constraints
3582
3583        t.append_column(b)
3584        assert uq not in t.constraints
3585
3586        t.append_column(c)
3587        assert uq in t.constraints
3588
3589    def test_auto_append_uq_on_col_attach_three(self):
3590        m = MetaData()
3591
3592        a = Column("a", Integer)
3593        b = Column("b", Integer)
3594        c = Column("c", Integer)
3595        uq = UniqueConstraint(a, b, c)
3596
3597        t = Table("tbl", m, a)
3598        assert uq not in t.constraints
3599
3600        t.append_column(b)
3601        assert uq not in t.constraints
3602
3603        t2 = Table("t2", m)
3604
3605        # two different tables, so UniqueConstraint raises
3606        assert_raises_message(
3607            exc.ArgumentError,
3608            r"Column\(s\) 't2\.c' are not part of table 'tbl'\.",
3609            t2.append_column,
3610            c,
3611        )
3612
3613    def test_auto_append_uq_on_col_attach_four(self):
3614        """Test that a uniqueconstraint that names Column and string names
3615        won't autoattach using deferred column attachment.
3616
3617        """
3618        m = MetaData()
3619
3620        a = Column("a", Integer)
3621        b = Column("b", Integer)
3622        c = Column("c", Integer)
3623        uq = UniqueConstraint(a, "b", "c")
3624
3625        t = Table("tbl", m, a)
3626        assert uq not in t.constraints
3627
3628        t.append_column(b)
3629        assert uq not in t.constraints
3630
3631        t.append_column(c)
3632
3633        # we don't track events for previously unknown columns
3634        # named 'c' to be attached
3635        assert uq not in t.constraints
3636
3637        t.append_constraint(uq)
3638
3639        assert uq in t.constraints
3640
3641        eq_(
3642            [cn for cn in t.constraints if isinstance(cn, UniqueConstraint)],
3643            [uq],
3644        )
3645
3646    def test_auto_append_uq_on_col_attach_five(self):
3647        """Test that a uniqueconstraint that names Column and string names
3648        *will* autoattach if the table has all those names up front.
3649
3650        """
3651        m = MetaData()
3652
3653        a = Column("a", Integer)
3654        b = Column("b", Integer)
3655        c = Column("c", Integer)
3656
3657        t = Table("tbl", m, a, c, b)
3658
3659        uq = UniqueConstraint(a, "b", "c")
3660
3661        assert uq in t.constraints
3662
3663        t.append_constraint(uq)
3664
3665        assert uq in t.constraints
3666
3667        eq_(
3668            [cn for cn in t.constraints if isinstance(cn, UniqueConstraint)],
3669            [uq],
3670        )
3671
3672    def test_index_asserts_cols_standalone(self):
3673        metadata = MetaData()
3674
3675        t1 = Table("t1", metadata, Column("x", Integer))
3676        t2 = Table("t2", metadata, Column("y", Integer))
3677        assert_raises_message(
3678            exc.ArgumentError,
3679            r"Column\(s\) 't2.y' are not part of table 't1'.",
3680            Index,
3681            "bar",
3682            t1.c.x,
3683            t2.c.y,
3684        )
3685
3686    def test_index_asserts_cols_inline(self):
3687        metadata = MetaData()
3688
3689        t1 = Table("t1", metadata, Column("x", Integer))
3690        assert_raises_message(
3691            exc.ArgumentError,
3692            "Index 'bar' is against table 't1', and "
3693            "cannot be associated with table 't2'.",
3694            Table,
3695            "t2",
3696            metadata,
3697            Column("y", Integer),
3698            Index("bar", t1.c.x),
3699        )
3700
3701    def test_raise_index_nonexistent_name(self):
3702        m = MetaData()
3703        # the KeyError isn't ideal here, a nicer message
3704        # perhaps
3705        assert_raises(
3706            KeyError, Table, "t", m, Column("x", Integer), Index("foo", "q")
3707        )
3708
3709    def test_raise_not_a_column(self):
3710        assert_raises(exc.ArgumentError, Index, "foo", 5)
3711
3712    def test_raise_expr_no_column(self):
3713        idx = Index("foo", func.lower(5))
3714
3715        assert_raises_message(
3716            exc.CompileError,
3717            "Index 'foo' is not associated with any table.",
3718            schema.CreateIndex(idx).compile,
3719            dialect=testing.db.dialect,
3720        )
3721        assert_raises_message(
3722            exc.CompileError,
3723            "Index 'foo' is not associated with any table.",
3724            schema.CreateIndex(idx).compile,
3725        )
3726
3727    def test_no_warning_w_no_columns(self):
3728        idx = Index(name="foo")
3729
3730        assert_raises_message(
3731            exc.CompileError,
3732            "Index 'foo' is not associated with any table.",
3733            schema.CreateIndex(idx).compile,
3734            dialect=testing.db.dialect,
3735        )
3736        assert_raises_message(
3737            exc.CompileError,
3738            "Index 'foo' is not associated with any table.",
3739            schema.CreateIndex(idx).compile,
3740        )
3741
3742    def test_raise_clauseelement_not_a_column(self):
3743        m = MetaData()
3744        t2 = Table("t2", m, Column("x", Integer))
3745
3746        class SomeClass(object):
3747            def __clause_element__(self):
3748                return t2
3749
3750        assert_raises_message(
3751            exc.ArgumentError,
3752            r"String column name or column expression for DDL constraint "
3753            r"expected, got .*SomeClass",
3754            Index,
3755            "foo",
3756            SomeClass(),
3757        )
3758
3759    @testing.fixture
3760    def no_pickle_annotated(self):
3761        class NoPickle(object):
3762            def __reduce__(self):
3763                raise NotImplementedError()
3764
3765        class ClauseElement(operators.ColumnOperators):
3766            def __init__(self, col):
3767                self.col = col._annotate({"bar": NoPickle()})
3768
3769            def __clause_element__(self):
3770                return self.col
3771
3772            def operate(self, op, *other, **kwargs):
3773                return self.col.operate(op, *other, **kwargs)
3774
3775        m = MetaData()
3776        t = Table("t", m, Column("q", Integer))
3777        return t, ClauseElement(t.c.q)
3778
3779    def test_pickle_fk_annotated_col(self, no_pickle_annotated):
3780
3781        t, q_col = no_pickle_annotated
3782
3783        t2 = Table("t2", t.metadata, Column("p", ForeignKey(q_col)))
3784        assert t2.c.p.references(t.c.q)
3785
3786        m2 = pickle.loads(pickle.dumps(t.metadata))
3787
3788        m2_t, m2_t2 = m2.tables["t"], m2.tables["t2"]
3789
3790        is_true(m2_t2.c.p.references(m2_t.c.q))
3791
3792    def test_pickle_uq_annotated_col(self, no_pickle_annotated):
3793        t, q_col = no_pickle_annotated
3794
3795        t.append_constraint(UniqueConstraint(q_col))
3796
3797        m2 = pickle.loads(pickle.dumps(t.metadata))
3798
3799        const = [
3800            c
3801            for c in m2.tables["t"].constraints
3802            if isinstance(c, UniqueConstraint)
3803        ][0]
3804
3805        is_true(const.columns[0].compare(t.c.q))
3806
3807    def test_pickle_idx_expr_annotated_col(self, no_pickle_annotated):
3808        t, q_col = no_pickle_annotated
3809
3810        expr = q_col > 5
3811        t.append_constraint(Index("conditional_index", expr))
3812
3813        m2 = pickle.loads(pickle.dumps(t.metadata))
3814
3815        const = list(m2.tables["t"].indexes)[0]
3816
3817        is_true(const.expressions[0].compare(expr))
3818
3819    def test_pickle_ck_binary_annotated_col(self, no_pickle_annotated):
3820        t, q_col = no_pickle_annotated
3821
3822        ck = CheckConstraint(q_col > 5)
3823        t.append_constraint(ck)
3824
3825        m2 = pickle.loads(pickle.dumps(t.metadata))
3826        const = [
3827            c
3828            for c in m2.tables["t"].constraints
3829            if isinstance(c, CheckConstraint)
3830        ][0]
3831        is_true(const.sqltext.compare(ck.sqltext))
3832
3833
3834class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
3835
3836    """Test Column() construction."""
3837
3838    __dialect__ = "default"
3839
3840    def columns(self):
3841        return [
3842            Column(Integer),
3843            Column("b", Integer),
3844            Column(Integer),
3845            Column("d", Integer),
3846            Column(Integer, name="e"),
3847            Column(type_=Integer),
3848            Column(Integer()),
3849            Column("h", Integer()),
3850            Column(type_=Integer()),
3851        ]
3852
3853    def test_basic(self):
3854        c = self.columns()
3855
3856        for i, v in ((0, "a"), (2, "c"), (5, "f"), (6, "g"), (8, "i")):
3857            c[i].name = v
3858            c[i].key = v
3859        del i, v
3860
3861        tbl = Table("table", MetaData(), *c)
3862
3863        for i, col in enumerate(tbl.c):
3864            assert col.name == c[i].name
3865
3866    def test_name_none(self):
3867
3868        c = Column(Integer)
3869        assert_raises_message(
3870            exc.ArgumentError,
3871            "Column must be constructed with a non-blank name or assign a "
3872            "non-blank .name ",
3873            Table,
3874            "t",
3875            MetaData(),
3876            c,
3877        )
3878
3879    def test_name_blank(self):
3880
3881        c = Column("", Integer)
3882        assert_raises_message(
3883            exc.ArgumentError,
3884            "Column must be constructed with a non-blank name or assign a "
3885            "non-blank .name ",
3886            Table,
3887            "t",
3888            MetaData(),
3889            c,
3890        )
3891
3892    def test_no_shared_column_schema(self):
3893        c = Column("x", Integer)
3894        Table("t", MetaData(), c)
3895
3896        assert_raises_message(
3897            exc.ArgumentError,
3898            "Column object 'x' already assigned to Table 't'",
3899            Table,
3900            "q",
3901            MetaData(),
3902            c,
3903        )
3904
3905    def test_no_shared_column_sql(self):
3906        c = column("x", Integer)
3907        table("t", c)
3908
3909        assert_raises_message(
3910            exc.ArgumentError,
3911            "column object 'x' already assigned to table 't'",
3912            table,
3913            "q",
3914            c,
3915        )
3916
3917    def test_incomplete_key(self):
3918        c = Column(Integer)
3919        assert c.name is None
3920        assert c.key is None
3921
3922        c.name = "named"
3923        Table("t", MetaData(), c)
3924
3925        assert c.name == "named"
3926        assert c.name == c.key
3927
3928    def test_unique_index_flags_default_to_none(self):
3929        c = Column(Integer)
3930        eq_(c.unique, None)
3931        eq_(c.index, None)
3932
3933        c = Column("c", Integer, index=True)
3934        eq_(c.unique, None)
3935        eq_(c.index, True)
3936
3937        t = Table("t", MetaData(), c)
3938        eq_(list(t.indexes)[0].unique, False)
3939
3940        c = Column(Integer, unique=True)
3941        eq_(c.unique, True)
3942        eq_(c.index, None)
3943
3944        c = Column("c", Integer, index=True, unique=True)
3945        eq_(c.unique, True)
3946        eq_(c.index, True)
3947
3948        t = Table("t", MetaData(), c)
3949        eq_(list(t.indexes)[0].unique, True)
3950
3951    def test_bogus(self):
3952        assert_raises(exc.ArgumentError, Column, "foo", name="bar")
3953        assert_raises(
3954            exc.ArgumentError, Column, "foo", Integer, type_=Integer()
3955        )
3956
3957    def test_custom_subclass_proxy(self):
3958        """test proxy generation of a Column subclass, can be compiled."""
3959
3960        from sqlalchemy.schema import Column
3961        from sqlalchemy.ext.compiler import compiles
3962        from sqlalchemy.sql import select
3963
3964        class MyColumn(Column):
3965            def _constructor(self, name, type_, **kw):
3966                kw["name"] = name
3967                return MyColumn(type_, **kw)
3968
3969            def __init__(self, type_, **kw):
3970                Column.__init__(self, type_, **kw)
3971
3972            def my_goofy_thing(self):
3973                return "hi"
3974
3975        @compiles(MyColumn)
3976        def goofy(element, compiler, **kw):
3977            s = compiler.visit_column(element, **kw)
3978            return s + "-"
3979
3980        id_ = MyColumn(Integer, primary_key=True)
3981        id_.name = "id"
3982        name = MyColumn(String)
3983        name.name = "name"
3984        t1 = Table("foo", MetaData(), id_, name)
3985
3986        # goofy thing
3987        eq_(t1.c.name.my_goofy_thing(), "hi")
3988
3989        # create proxy
3990        s = select(t1.select().alias())
3991
3992        # proxy has goofy thing
3993        eq_(s.subquery().c.name.my_goofy_thing(), "hi")
3994
3995        # compile works
3996        self.assert_compile(
3997            select(t1.select().alias()),
3998            "SELECT anon_1.id-, anon_1.name- FROM "
3999            "(SELECT foo.id- AS id, foo.name- AS name "
4000            "FROM foo) AS anon_1",
4001        )
4002
4003    def test_custom_subclass_proxy_typeerror(self):
4004        from sqlalchemy.schema import Column
4005        from sqlalchemy.sql import select
4006
4007        class MyColumn(Column):
4008            def __init__(self, type_, **kw):
4009                Column.__init__(self, type_, **kw)
4010
4011        id_ = MyColumn(Integer, primary_key=True)
4012        id_.name = "id"
4013        name = MyColumn(String)
4014        name.name = "name"
4015        t1 = Table("foo", MetaData(), id_, name)
4016        assert_raises_message(
4017            TypeError,
4018            "Could not create a copy of this <class "
4019            "'test.sql.test_metadata..*MyColumn'> "
4020            "object.  Ensure the class includes a _constructor()",
4021            getattr,
4022            select(t1.select().alias()).subquery(),
4023            "c",
4024        )
4025
4026    def test_custom_create(self):
4027        from sqlalchemy.ext.compiler import compiles, deregister
4028
4029        @compiles(schema.CreateColumn)
4030        def compile_(element, compiler, **kw):
4031            column = element.element
4032
4033            if "special" not in column.info:
4034                return compiler.visit_create_column(element, **kw)
4035
4036            text = "%s SPECIAL DIRECTIVE %s" % (
4037                column.name,
4038                compiler.type_compiler.process(column.type),
4039            )
4040            default = compiler.get_column_default_string(column)
4041            if default is not None:
4042                text += " DEFAULT " + default
4043
4044            if not column.nullable:
4045                text += " NOT NULL"
4046
4047            if column.constraints:
4048                text += " ".join(
4049                    compiler.process(const) for const in column.constraints
4050                )
4051            return text
4052
4053        t = Table(
4054            "mytable",
4055            MetaData(),
4056            Column("x", Integer, info={"special": True}, primary_key=True),
4057            Column("y", String(50)),
4058            Column("z", String(20), info={"special": True}),
4059        )
4060
4061        self.assert_compile(
4062            schema.CreateTable(t),
4063            "CREATE TABLE mytable (x SPECIAL DIRECTIVE INTEGER "
4064            "NOT NULL, y VARCHAR(50), "
4065            "z SPECIAL DIRECTIVE VARCHAR(20), PRIMARY KEY (x))",
4066        )
4067
4068        deregister(schema.CreateColumn)
4069
4070
4071class ColumnDefaultsTest(fixtures.TestBase):
4072
4073    """test assignment of default fixures to columns"""
4074
4075    def _fixture(self, *arg, **kw):
4076        return Column("x", Integer, *arg, **kw)
4077
4078    def test_server_default_positional(self):
4079        target = schema.DefaultClause("y")
4080        c = self._fixture(target)
4081        assert c.server_default is target
4082        assert target.column is c
4083
4084    def test_onupdate_default_not_server_default_one(self):
4085        target1 = schema.DefaultClause("y")
4086        target2 = schema.DefaultClause("z")
4087
4088        c = self._fixture(server_default=target1, server_onupdate=target2)
4089        eq_(c.server_default.arg, "y")
4090        eq_(c.server_onupdate.arg, "z")
4091
4092    def test_onupdate_default_not_server_default_two(self):
4093        target1 = schema.DefaultClause("y", for_update=True)
4094        target2 = schema.DefaultClause("z", for_update=True)
4095
4096        c = self._fixture(server_default=target1, server_onupdate=target2)
4097        eq_(c.server_default.arg, "y")
4098        eq_(c.server_onupdate.arg, "z")
4099
4100    def test_onupdate_default_not_server_default_three(self):
4101        target1 = schema.DefaultClause("y", for_update=False)
4102        target2 = schema.DefaultClause("z", for_update=True)
4103
4104        c = self._fixture(target1, target2)
4105        eq_(c.server_default.arg, "y")
4106        eq_(c.server_onupdate.arg, "z")
4107
4108    def test_onupdate_default_not_server_default_four(self):
4109        target1 = schema.DefaultClause("y", for_update=False)
4110
4111        c = self._fixture(server_onupdate=target1)
4112        is_(c.server_default, None)
4113        eq_(c.server_onupdate.arg, "y")
4114
4115    def test_server_default_keyword_as_schemaitem(self):
4116        target = schema.DefaultClause("y")
4117        c = self._fixture(server_default=target)
4118        assert c.server_default is target
4119        assert target.column is c
4120
4121    def test_server_default_keyword_as_clause(self):
4122        target = "y"
4123        c = self._fixture(server_default=target)
4124        assert c.server_default.arg == target
4125        assert c.server_default.column is c
4126
4127    def test_server_default_onupdate_positional(self):
4128        target = schema.DefaultClause("y", for_update=True)
4129        c = self._fixture(target)
4130        assert c.server_onupdate is target
4131        assert target.column is c
4132
4133    def test_server_default_onupdate_keyword_as_schemaitem(self):
4134        target = schema.DefaultClause("y", for_update=True)
4135        c = self._fixture(server_onupdate=target)
4136        assert c.server_onupdate is target
4137        assert target.column is c
4138
4139    def test_server_default_onupdate_keyword_as_clause(self):
4140        target = "y"
4141        c = self._fixture(server_onupdate=target)
4142        assert c.server_onupdate.arg == target
4143        assert c.server_onupdate.column is c
4144
4145    def test_column_default_positional(self):
4146        target = schema.ColumnDefault("y")
4147        c = self._fixture(target)
4148        assert c.default is target
4149        assert target.column is c
4150
4151    def test_column_default_keyword_as_schemaitem(self):
4152        target = schema.ColumnDefault("y")
4153        c = self._fixture(default=target)
4154        assert c.default is target
4155        assert target.column is c
4156
4157    def test_column_default_keyword_as_clause(self):
4158        target = "y"
4159        c = self._fixture(default=target)
4160        assert c.default.arg == target
4161        assert c.default.column is c
4162
4163    def test_column_default_onupdate_positional(self):
4164        target = schema.ColumnDefault("y", for_update=True)
4165        c = self._fixture(target)
4166        assert c.onupdate is target
4167        assert target.column is c
4168
4169    def test_column_default_onupdate_keyword_as_schemaitem(self):
4170        target = schema.ColumnDefault("y", for_update=True)
4171        c = self._fixture(onupdate=target)
4172        assert c.onupdate is target
4173        assert target.column is c
4174
4175    def test_column_default_onupdate_keyword_as_clause(self):
4176        target = "y"
4177        c = self._fixture(onupdate=target)
4178        assert c.onupdate.arg == target
4179        assert c.onupdate.column is c
4180
4181
4182class ColumnOptionsTest(fixtures.TestBase):
4183    def test_default_generators(self):
4184        g1, g2 = Sequence("foo_id_seq"), ColumnDefault("f5")
4185        assert Column(String, default=g1).default is g1
4186        assert Column(String, onupdate=g1).onupdate is g1
4187        assert Column(String, default=g2).default is g2
4188        assert Column(String, onupdate=g2).onupdate is g2
4189
4190    def _null_type_no_error(self, col):
4191        c_str = str(schema.CreateColumn(col).compile())
4192        assert "NULL" in c_str
4193
4194    def _no_name_error(self, col):
4195        assert_raises_message(
4196            exc.ArgumentError,
4197            "Column must be constructed with a non-blank name or "
4198            "assign a non-blank .name",
4199            Table,
4200            "t",
4201            MetaData(),
4202            col,
4203        )
4204
4205    def _no_error(self, col):
4206        m = MetaData()
4207        Table("bar", m, Column("id", Integer))
4208        t = Table("t", m, col)
4209        schema.CreateTable(t).compile()
4210
4211    def test_argument_signatures(self):
4212        self._no_name_error(Column())
4213        self._null_type_no_error(Column("foo"))
4214        self._no_name_error(Column(default="foo"))
4215
4216        self._no_name_error(Column(Sequence("a")))
4217        self._null_type_no_error(Column("foo", default="foo"))
4218
4219        self._null_type_no_error(Column("foo", Sequence("a")))
4220
4221        self._no_name_error(Column(ForeignKey("bar.id")))
4222
4223        self._no_error(Column("foo", ForeignKey("bar.id")))
4224
4225        self._no_name_error(Column(ForeignKey("bar.id"), default="foo"))
4226
4227        self._no_name_error(Column(ForeignKey("bar.id"), Sequence("a")))
4228        self._no_error(Column("foo", ForeignKey("bar.id"), default="foo"))
4229        self._no_error(Column("foo", ForeignKey("bar.id"), Sequence("a")))
4230
4231    def test_column_info(self):
4232
4233        c1 = Column("foo", String, info={"x": "y"})
4234        c2 = Column("bar", String, info={})
4235        c3 = Column("bat", String)
4236        assert c1.info == {"x": "y"}
4237        assert c2.info == {}
4238        assert c3.info == {}
4239
4240        for c in (c1, c2, c3):
4241            c.info["bar"] = "zip"
4242            assert c.info["bar"] == "zip"
4243
4244
4245class CatchAllEventsTest(fixtures.RemovesEvents, fixtures.TestBase):
4246    def test_all_events(self):
4247        canary = []
4248
4249        def before_attach(obj, parent):
4250            canary.append(
4251                "%s->%s" % (obj.__class__.__name__, parent.__class__.__name__)
4252            )
4253
4254        def after_attach(obj, parent):
4255            canary.append("%s->%s" % (obj.__class__.__name__, parent))
4256
4257        self.event_listen(
4258            schema.SchemaItem, "before_parent_attach", before_attach
4259        )
4260        self.event_listen(
4261            schema.SchemaItem, "after_parent_attach", after_attach
4262        )
4263
4264        m = MetaData()
4265        Table(
4266            "t1",
4267            m,
4268            Column("id", Integer, Sequence("foo_id"), primary_key=True),
4269            Column("bar", String, ForeignKey("t2.id")),
4270        )
4271        Table("t2", m, Column("id", Integer, primary_key=True))
4272
4273        eq_(
4274            canary,
4275            [
4276                "Sequence->Column",
4277                "Sequence->id",
4278                "ForeignKey->Column",
4279                "ForeignKey->bar",
4280                "Table->MetaData",
4281                "PrimaryKeyConstraint->Table",
4282                "PrimaryKeyConstraint->t1",
4283                "Column->Table",
4284                "Column->t1",
4285                "Column->Table",
4286                "Column->t1",
4287                "ForeignKeyConstraint->Table",
4288                "ForeignKeyConstraint->t1",
4289                "Table->MetaData()",
4290                "Table->MetaData",
4291                "PrimaryKeyConstraint->Table",
4292                "PrimaryKeyConstraint->t2",
4293                "Column->Table",
4294                "Column->t2",
4295                "Table->MetaData()",
4296            ],
4297        )
4298
4299    def test_events_per_constraint(self):
4300        canary = []
4301
4302        def evt(target):
4303            def before_attach(obj, parent):
4304                canary.append(
4305                    "%s->%s" % (target.__name__, parent.__class__.__name__)
4306                )
4307
4308            def after_attach(obj, parent):
4309                assert hasattr(obj, "name")  # so we can change it
4310                canary.append("%s->%s" % (target.__name__, parent))
4311
4312            self.event_listen(target, "before_parent_attach", before_attach)
4313            self.event_listen(target, "after_parent_attach", after_attach)
4314
4315        for target in [
4316            schema.ForeignKeyConstraint,
4317            schema.PrimaryKeyConstraint,
4318            schema.UniqueConstraint,
4319            schema.CheckConstraint,
4320            schema.Index,
4321        ]:
4322            evt(target)
4323
4324        m = MetaData()
4325        Table(
4326            "t1",
4327            m,
4328            Column("id", Integer, Sequence("foo_id"), primary_key=True),
4329            Column("bar", String, ForeignKey("t2.id"), index=True),
4330            Column("bat", Integer, unique=True),
4331        )
4332        Table(
4333            "t2",
4334            m,
4335            Column("id", Integer, primary_key=True),
4336            Column("bar", Integer),
4337            Column("bat", Integer),
4338            CheckConstraint("bar>5"),
4339            UniqueConstraint("bar", "bat"),
4340            Index(None, "bar", "bat"),
4341        )
4342        eq_(
4343            canary,
4344            [
4345                "PrimaryKeyConstraint->Table",
4346                "PrimaryKeyConstraint->t1",
4347                "Index->Table",
4348                "Index->t1",
4349                "ForeignKeyConstraint->Table",
4350                "ForeignKeyConstraint->t1",
4351                "UniqueConstraint->Table",
4352                "UniqueConstraint->t1",
4353                "PrimaryKeyConstraint->Table",
4354                "PrimaryKeyConstraint->t2",
4355                "CheckConstraint->Table",
4356                "CheckConstraint->t2",
4357                "UniqueConstraint->Table",
4358                "UniqueConstraint->t2",
4359                "Index->Table",
4360                "Index->t2",
4361            ],
4362        )
4363
4364
4365class DialectKWArgTest(fixtures.TestBase):
4366    @contextmanager
4367    def _fixture(self):
4368        from sqlalchemy.engine.default import DefaultDialect
4369
4370        class ParticipatingDialect(DefaultDialect):
4371            construct_arguments = [
4372                (schema.Index, {"x": 5, "y": False, "z_one": None}),
4373                (schema.ForeignKeyConstraint, {"foobar": False}),
4374            ]
4375
4376        class ParticipatingDialect2(DefaultDialect):
4377            construct_arguments = [
4378                (schema.Index, {"x": 9, "y": True, "pp": "default"}),
4379                (schema.Table, {"*": None}),
4380            ]
4381
4382        class NonParticipatingDialect(DefaultDialect):
4383            construct_arguments = None
4384
4385        def load(dialect_name):
4386            if dialect_name == "participating":
4387                return ParticipatingDialect
4388            elif dialect_name == "participating2":
4389                return ParticipatingDialect2
4390            elif dialect_name == "nonparticipating":
4391                return NonParticipatingDialect
4392            else:
4393                raise exc.NoSuchModuleError("no dialect %r" % dialect_name)
4394
4395        with mock.patch("sqlalchemy.dialects.registry.load", load):
4396            yield
4397
4398    def teardown_test(self):
4399        Index._kw_registry.clear()
4400
4401    def test_participating(self):
4402        with self._fixture():
4403            idx = Index("a", "b", "c", participating_y=True)
4404            eq_(
4405                idx.dialect_options,
4406                {"participating": {"x": 5, "y": True, "z_one": None}},
4407            )
4408            eq_(idx.dialect_kwargs, {"participating_y": True})
4409
4410    def test_nonparticipating(self):
4411        with self._fixture():
4412            idx = Index(
4413                "a", "b", "c", nonparticipating_y=True, nonparticipating_q=5
4414            )
4415            eq_(
4416                idx.dialect_kwargs,
4417                {"nonparticipating_y": True, "nonparticipating_q": 5},
4418            )
4419
4420    def test_bad_kwarg_raise(self):
4421        with self._fixture():
4422            assert_raises_message(
4423                TypeError,
4424                "Additional arguments should be named "
4425                "<dialectname>_<argument>, got 'foobar'",
4426                Index,
4427                "a",
4428                "b",
4429                "c",
4430                foobar=True,
4431            )
4432
4433    def test_unknown_dialect_warning(self):
4434        with self._fixture():
4435            with testing.expect_warnings(
4436                "Can't validate argument 'unknown_y'; can't locate "
4437                "any SQLAlchemy dialect named 'unknown'",
4438            ):
4439                Index("a", "b", "c", unknown_y=True)
4440
4441    def test_participating_bad_kw(self):
4442        with self._fixture():
4443            assert_raises_message(
4444                exc.ArgumentError,
4445                "Argument 'participating_q_p_x' is not accepted by dialect "
4446                "'participating' on behalf of "
4447                "<class 'sqlalchemy.sql.schema.Index'>",
4448                Index,
4449                "a",
4450                "b",
4451                "c",
4452                participating_q_p_x=8,
4453            )
4454
4455    def test_participating_unknown_schema_item(self):
4456        with self._fixture():
4457            # the dialect doesn't include UniqueConstraint in
4458            # its registry at all.
4459            assert_raises_message(
4460                exc.ArgumentError,
4461                "Argument 'participating_q_p_x' is not accepted by dialect "
4462                "'participating' on behalf of "
4463                "<class 'sqlalchemy.sql.schema.UniqueConstraint'>",
4464                UniqueConstraint,
4465                "a",
4466                "b",
4467                participating_q_p_x=8,
4468            )
4469
4470    @testing.emits_warning("Can't validate")
4471    def test_unknown_dialect_warning_still_populates(self):
4472        with self._fixture():
4473            idx = Index("a", "b", "c", unknown_y=True)
4474            eq_(idx.dialect_kwargs, {"unknown_y": True})  # still populates
4475
4476    @testing.emits_warning("Can't validate")
4477    def test_unknown_dialect_warning_still_populates_multiple(self):
4478        with self._fixture():
4479            idx = Index(
4480                "a",
4481                "b",
4482                "c",
4483                unknown_y=True,
4484                unknown_z=5,
4485                otherunknown_foo="bar",
4486                participating_y=8,
4487            )
4488            eq_(
4489                idx.dialect_options,
4490                {
4491                    "unknown": {"y": True, "z": 5, "*": None},
4492                    "otherunknown": {"foo": "bar", "*": None},
4493                    "participating": {"x": 5, "y": 8, "z_one": None},
4494                },
4495            )
4496            eq_(
4497                idx.dialect_kwargs,
4498                {
4499                    "unknown_z": 5,
4500                    "participating_y": 8,
4501                    "unknown_y": True,
4502                    "otherunknown_foo": "bar",
4503                },
4504            )  # still populates
4505
4506    def test_combined(self):
4507        with self._fixture():
4508            idx = Index(
4509                "a", "b", "c", participating_x=7, nonparticipating_y=True
4510            )
4511
4512            eq_(
4513                idx.dialect_options,
4514                {
4515                    "participating": {"y": False, "x": 7, "z_one": None},
4516                    "nonparticipating": {"y": True, "*": None},
4517                },
4518            )
4519            eq_(
4520                idx.dialect_kwargs,
4521                {"participating_x": 7, "nonparticipating_y": True},
4522            )
4523
4524    def test_multiple_participating(self):
4525        with self._fixture():
4526            idx = Index(
4527                "a",
4528                "b",
4529                "c",
4530                participating_x=7,
4531                participating2_x=15,
4532                participating2_y="lazy",
4533            )
4534            eq_(
4535                idx.dialect_options,
4536                {
4537                    "participating": {"x": 7, "y": False, "z_one": None},
4538                    "participating2": {"x": 15, "y": "lazy", "pp": "default"},
4539                },
4540            )
4541            eq_(
4542                idx.dialect_kwargs,
4543                {
4544                    "participating_x": 7,
4545                    "participating2_x": 15,
4546                    "participating2_y": "lazy",
4547                },
4548            )
4549
4550    def test_foreign_key_propagate(self):
4551        with self._fixture():
4552            m = MetaData()
4553            fk = ForeignKey("t2.id", participating_foobar=True)
4554            t = Table("t", m, Column("id", Integer, fk))
4555            fkc = [
4556                c for c in t.constraints if isinstance(c, ForeignKeyConstraint)
4557            ][0]
4558            eq_(fkc.dialect_kwargs, {"participating_foobar": True})
4559
4560    def test_foreign_key_propagate_exceptions_delayed(self):
4561        with self._fixture():
4562            m = MetaData()
4563            fk = ForeignKey("t2.id", participating_fake=True)
4564            c1 = Column("id", Integer, fk)
4565            assert_raises_message(
4566                exc.ArgumentError,
4567                "Argument 'participating_fake' is not accepted by "
4568                "dialect 'participating' on behalf of "
4569                "<class 'sqlalchemy.sql.schema.ForeignKeyConstraint'>",
4570                Table,
4571                "t",
4572                m,
4573                c1,
4574            )
4575
4576    def test_wildcard(self):
4577        with self._fixture():
4578            m = MetaData()
4579            t = Table(
4580                "x",
4581                m,
4582                Column("x", Integer),
4583                participating2_xyz="foo",
4584                participating2_engine="InnoDB",
4585            )
4586            eq_(
4587                t.dialect_kwargs,
4588                {
4589                    "participating2_xyz": "foo",
4590                    "participating2_engine": "InnoDB",
4591                },
4592            )
4593
4594    def test_uninit_wildcard(self):
4595        with self._fixture():
4596            m = MetaData()
4597            t = Table("x", m, Column("x", Integer))
4598            eq_(t.dialect_options["participating2"], {"*": None})
4599            eq_(t.dialect_kwargs, {})
4600
4601    def test_not_contains_wildcard(self):
4602        with self._fixture():
4603            m = MetaData()
4604            t = Table("x", m, Column("x", Integer))
4605            assert "foobar" not in t.dialect_options["participating2"]
4606
4607    def test_contains_wildcard(self):
4608        with self._fixture():
4609            m = MetaData()
4610            t = Table("x", m, Column("x", Integer), participating2_foobar=5)
4611            assert "foobar" in t.dialect_options["participating2"]
4612
4613    def test_update(self):
4614        with self._fixture():
4615            idx = Index("a", "b", "c", participating_x=20)
4616            eq_(idx.dialect_kwargs, {"participating_x": 20})
4617            idx._validate_dialect_kwargs(
4618                {"participating_x": 25, "participating_z_one": "default"}
4619            )
4620            eq_(
4621                idx.dialect_options,
4622                {"participating": {"x": 25, "y": False, "z_one": "default"}},
4623            )
4624            eq_(
4625                idx.dialect_kwargs,
4626                {"participating_x": 25, "participating_z_one": "default"},
4627            )
4628
4629            idx._validate_dialect_kwargs(
4630                {"participating_x": 25, "participating_z_one": "default"}
4631            )
4632
4633            eq_(
4634                idx.dialect_options,
4635                {"participating": {"x": 25, "y": False, "z_one": "default"}},
4636            )
4637            eq_(
4638                idx.dialect_kwargs,
4639                {"participating_x": 25, "participating_z_one": "default"},
4640            )
4641
4642            idx._validate_dialect_kwargs(
4643                {"participating_y": True, "participating2_y": "p2y"}
4644            )
4645            eq_(
4646                idx.dialect_options,
4647                {
4648                    "participating": {"x": 25, "y": True, "z_one": "default"},
4649                    "participating2": {"y": "p2y", "pp": "default", "x": 9},
4650                },
4651            )
4652            eq_(
4653                idx.dialect_kwargs,
4654                {
4655                    "participating_x": 25,
4656                    "participating_y": True,
4657                    "participating2_y": "p2y",
4658                    "participating_z_one": "default",
4659                },
4660            )
4661
4662    def test_key_error_kwargs_no_dialect(self):
4663        with self._fixture():
4664            idx = Index("a", "b", "c")
4665            assert_raises(KeyError, idx.kwargs.__getitem__, "foo_bar")
4666
4667    def test_key_error_kwargs_no_underscore(self):
4668        with self._fixture():
4669            idx = Index("a", "b", "c")
4670            assert_raises(KeyError, idx.kwargs.__getitem__, "foobar")
4671
4672    def test_key_error_kwargs_no_argument(self):
4673        with self._fixture():
4674            idx = Index("a", "b", "c")
4675            assert_raises(
4676                KeyError, idx.kwargs.__getitem__, "participating_asdmfq34098"
4677            )
4678
4679            assert_raises(
4680                KeyError,
4681                idx.kwargs.__getitem__,
4682                "nonparticipating_asdmfq34098",
4683            )
4684
4685    def test_key_error_dialect_options(self):
4686        with self._fixture():
4687            idx = Index("a", "b", "c")
4688            assert_raises(
4689                KeyError,
4690                idx.dialect_options["participating"].__getitem__,
4691                "asdfaso890",
4692            )
4693
4694            assert_raises(
4695                KeyError,
4696                idx.dialect_options["nonparticipating"].__getitem__,
4697                "asdfaso890",
4698            )
4699
4700    def test_ad_hoc_participating_via_opt(self):
4701        with self._fixture():
4702            idx = Index("a", "b", "c")
4703            idx.dialect_options["participating"]["foobar"] = 5
4704
4705            eq_(idx.dialect_options["participating"]["foobar"], 5)
4706            eq_(idx.kwargs["participating_foobar"], 5)
4707
4708    def test_ad_hoc_nonparticipating_via_opt(self):
4709        with self._fixture():
4710            idx = Index("a", "b", "c")
4711            idx.dialect_options["nonparticipating"]["foobar"] = 5
4712
4713            eq_(idx.dialect_options["nonparticipating"]["foobar"], 5)
4714            eq_(idx.kwargs["nonparticipating_foobar"], 5)
4715
4716    def test_ad_hoc_participating_via_kwargs(self):
4717        with self._fixture():
4718            idx = Index("a", "b", "c")
4719            idx.kwargs["participating_foobar"] = 5
4720
4721            eq_(idx.dialect_options["participating"]["foobar"], 5)
4722            eq_(idx.kwargs["participating_foobar"], 5)
4723
4724    def test_ad_hoc_nonparticipating_via_kwargs(self):
4725        with self._fixture():
4726            idx = Index("a", "b", "c")
4727            idx.kwargs["nonparticipating_foobar"] = 5
4728
4729            eq_(idx.dialect_options["nonparticipating"]["foobar"], 5)
4730            eq_(idx.kwargs["nonparticipating_foobar"], 5)
4731
4732    def test_ad_hoc_via_kwargs_invalid_key(self):
4733        with self._fixture():
4734            idx = Index("a", "b", "c")
4735            assert_raises_message(
4736                exc.ArgumentError,
4737                "Keys must be of the form <dialectname>_<argname>",
4738                idx.kwargs.__setitem__,
4739                "foobar",
4740                5,
4741            )
4742
4743    def test_ad_hoc_via_kwargs_invalid_dialect(self):
4744        with self._fixture():
4745            idx = Index("a", "b", "c")
4746            assert_raises_message(
4747                exc.ArgumentError,
4748                "no dialect 'nonexistent'",
4749                idx.kwargs.__setitem__,
4750                "nonexistent_foobar",
4751                5,
4752            )
4753
4754    def test_add_new_arguments_participating(self):
4755        with self._fixture():
4756            Index.argument_for("participating", "xyzqpr", False)
4757
4758            idx = Index("a", "b", "c", participating_xyzqpr=True)
4759
4760            eq_(idx.kwargs["participating_xyzqpr"], True)
4761
4762            idx = Index("a", "b", "c")
4763            eq_(idx.dialect_options["participating"]["xyzqpr"], False)
4764
4765    def test_add_new_arguments_participating_no_existing(self):
4766        with self._fixture():
4767            PrimaryKeyConstraint.argument_for("participating", "xyzqpr", False)
4768
4769            pk = PrimaryKeyConstraint("a", "b", "c", participating_xyzqpr=True)
4770
4771            eq_(pk.kwargs["participating_xyzqpr"], True)
4772
4773            pk = PrimaryKeyConstraint("a", "b", "c")
4774            eq_(pk.dialect_options["participating"]["xyzqpr"], False)
4775
4776    def test_add_new_arguments_nonparticipating(self):
4777        with self._fixture():
4778            assert_raises_message(
4779                exc.ArgumentError,
4780                "Dialect 'nonparticipating' does have keyword-argument "
4781                "validation and defaults enabled configured",
4782                Index.argument_for,
4783                "nonparticipating",
4784                "xyzqpr",
4785                False,
4786            )
4787
4788    def test_add_new_arguments_invalid_dialect(self):
4789        with self._fixture():
4790            assert_raises_message(
4791                exc.ArgumentError,
4792                "no dialect 'nonexistent'",
4793                Index.argument_for,
4794                "nonexistent",
4795                "foobar",
4796                5,
4797            )
4798
4799
4800class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
4801    __dialect__ = "default"
4802
4803    def _fixture(self, naming_convention, table_schema=None):
4804        m1 = MetaData(naming_convention=naming_convention)
4805
4806        u1 = Table(
4807            "user",
4808            m1,
4809            Column("id", Integer, primary_key=True),
4810            Column("version", Integer, primary_key=True),
4811            Column("data", String(30)),
4812            Column("Data2", String(30), key="data2"),
4813            Column("Data3", String(30), key="data3"),
4814            schema=table_schema,
4815        )
4816
4817        return u1
4818
4819    def _colliding_name_fixture(self, naming_convention, id_flags):
4820        m1 = MetaData(naming_convention=naming_convention)
4821
4822        t1 = Table(
4823            "foo",
4824            m1,
4825            Column("id", Integer, **id_flags),
4826            Column("foo_id", Integer),
4827        )
4828        return t1
4829
4830    def test_colliding_col_label_from_index_flag(self):
4831        t1 = self._colliding_name_fixture(
4832            {"ix": "ix_%(column_0_label)s"}, {"index": True}
4833        )
4834
4835        idx = list(t1.indexes)[0]
4836
4837        # name is generated up front.  alembic really prefers this
4838        eq_(idx.name, "ix_foo_id")
4839        self.assert_compile(
4840            CreateIndex(idx), "CREATE INDEX ix_foo_id ON foo (id)"
4841        )
4842
4843    def test_colliding_col_label_from_unique_flag(self):
4844        t1 = self._colliding_name_fixture(
4845            {"uq": "uq_%(column_0_label)s"}, {"unique": True}
4846        )
4847
4848        const = [c for c in t1.constraints if isinstance(c, UniqueConstraint)]
4849        uq = const[0]
4850
4851        # name is generated up front.  alembic really prefers this
4852        eq_(uq.name, "uq_foo_id")
4853
4854        self.assert_compile(
4855            AddConstraint(uq),
4856            "ALTER TABLE foo ADD CONSTRAINT uq_foo_id UNIQUE (id)",
4857        )
4858
4859    def test_colliding_col_label_from_index_obj(self):
4860        t1 = self._colliding_name_fixture({"ix": "ix_%(column_0_label)s"}, {})
4861
4862        idx = Index(None, t1.c.id)
4863        is_(idx, list(t1.indexes)[0])
4864        eq_(idx.name, "ix_foo_id")
4865        self.assert_compile(
4866            CreateIndex(idx), "CREATE INDEX ix_foo_id ON foo (id)"
4867        )
4868
4869    def test_colliding_col_label_from_unique_obj(self):
4870        t1 = self._colliding_name_fixture({"uq": "uq_%(column_0_label)s"}, {})
4871        uq = UniqueConstraint(t1.c.id)
4872        const = [c for c in t1.constraints if isinstance(c, UniqueConstraint)]
4873        is_(const[0], uq)
4874        eq_(const[0].name, "uq_foo_id")
4875        self.assert_compile(
4876            AddConstraint(const[0]),
4877            "ALTER TABLE foo ADD CONSTRAINT uq_foo_id UNIQUE (id)",
4878        )
4879
4880    def test_colliding_col_label_from_index_flag_no_conv(self):
4881        t1 = self._colliding_name_fixture({"ck": "foo"}, {"index": True})
4882
4883        idx = list(t1.indexes)[0]
4884
4885        # this behavior needs to fail, as of #4911 since we are testing it,
4886        # ensure it raises a CompileError.  In #4289 we may want to revisit
4887        # this in some way, most likely specifically to Postgresql only.
4888        assert_raises_message(
4889            exc.CompileError,
4890            "CREATE INDEX requires that the index have a name",
4891            CreateIndex(idx).compile,
4892        )
4893
4894        assert_raises_message(
4895            exc.CompileError,
4896            "DROP INDEX requires that the index have a name",
4897            DropIndex(idx).compile,
4898        )
4899
4900    def test_colliding_col_label_from_unique_flag_no_conv(self):
4901        t1 = self._colliding_name_fixture({"ck": "foo"}, {"unique": True})
4902
4903        const = [c for c in t1.constraints if isinstance(c, UniqueConstraint)]
4904        is_(const[0].name, None)
4905
4906        self.assert_compile(
4907            AddConstraint(const[0]), "ALTER TABLE foo ADD UNIQUE (id)"
4908        )
4909
4910    @testing.combinations(
4911        ("nopk",),
4912        ("column",),
4913        ("constraint",),
4914        ("explicit_name",),
4915        argnames="pktype",
4916    )
4917    @testing.combinations(
4918        ("pk_%(table_name)s", "pk_t1"),
4919        ("pk_%(column_0_name)s", "pk_x"),
4920        ("pk_%(column_0_N_name)s", "pk_x_y"),
4921        ("pk_%(column_0_N_label)s", "pk_t1_x_t1_y"),
4922        ("%(column_0_name)s", "x"),
4923        ("%(column_0N_name)s", "xy"),
4924        argnames="conv, expected_name",
4925    )
4926    def test_pk_conventions(self, conv, expected_name, pktype):
4927        m1 = MetaData(naming_convention={"pk": conv})
4928
4929        if pktype == "column":
4930            t1 = Table(
4931                "t1",
4932                m1,
4933                Column("x", Integer, primary_key=True),
4934                Column("y", Integer, primary_key=True),
4935            )
4936        elif pktype == "constraint":
4937            t1 = Table(
4938                "t1",
4939                m1,
4940                Column("x", Integer),
4941                Column("y", Integer),
4942                PrimaryKeyConstraint("x", "y"),
4943            )
4944        elif pktype == "nopk":
4945            t1 = Table(
4946                "t1",
4947                m1,
4948                Column("x", Integer, nullable=False),
4949                Column("y", Integer, nullable=False),
4950            )
4951            expected_name = None
4952        elif pktype == "explicit_name":
4953            t1 = Table(
4954                "t1",
4955                m1,
4956                Column("x", Integer, primary_key=True),
4957                Column("y", Integer, primary_key=True),
4958                PrimaryKeyConstraint("x", "y", name="myname"),
4959            )
4960            expected_name = "myname"
4961
4962        if expected_name:
4963            eq_(t1.primary_key.name, expected_name)
4964
4965        if pktype == "nopk":
4966            self.assert_compile(
4967                schema.CreateTable(t1),
4968                "CREATE TABLE t1 (x INTEGER NOT NULL, y INTEGER NOT NULL)",
4969            )
4970        else:
4971            self.assert_compile(
4972                schema.CreateTable(t1),
4973                "CREATE TABLE t1 (x INTEGER NOT NULL, y INTEGER NOT NULL, "
4974                "CONSTRAINT %s PRIMARY KEY (x, y))" % expected_name,
4975            )
4976
4977    def test_uq_name(self):
4978        u1 = self._fixture(
4979            naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"}
4980        )
4981        uq = UniqueConstraint(u1.c.data)
4982        eq_(uq.name, "uq_user_data")
4983
4984    def test_uq_conv_name(self):
4985        u1 = self._fixture(
4986            naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"}
4987        )
4988        uq = UniqueConstraint(u1.c.data, name=naming.conv("myname"))
4989        self.assert_compile(
4990            schema.AddConstraint(uq),
4991            'ALTER TABLE "user" ADD CONSTRAINT myname UNIQUE (data)',
4992            dialect="default",
4993        )
4994
4995    def test_uq_defer_name_convention(self):
4996        u1 = self._fixture(
4997            naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"}
4998        )
4999        uq = UniqueConstraint(u1.c.data, name=naming._NONE_NAME)
5000        self.assert_compile(
5001            schema.AddConstraint(uq),
5002            'ALTER TABLE "user" ADD CONSTRAINT uq_user_data UNIQUE (data)',
5003            dialect="default",
5004        )
5005
5006    def test_uq_key(self):
5007        u1 = self._fixture(
5008            naming_convention={"uq": "uq_%(table_name)s_%(column_0_key)s"}
5009        )
5010        uq = UniqueConstraint(u1.c.data, u1.c.data2)
5011        eq_(uq.name, "uq_user_data")
5012
5013    def test_uq_label(self):
5014        u1 = self._fixture(
5015            naming_convention={"uq": "uq_%(table_name)s_%(column_0_label)s"}
5016        )
5017        uq = UniqueConstraint(u1.c.data, u1.c.data2)
5018        eq_(uq.name, "uq_user_user_data")
5019
5020    def test_uq_allcols_underscore_name(self):
5021        u1 = self._fixture(
5022            naming_convention={"uq": "uq_%(table_name)s_%(column_0_N_name)s"}
5023        )
5024        uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3)
5025        eq_(uq.name, "uq_user_data_Data2_Data3")
5026
5027    def test_uq_allcols_merged_name(self):
5028        u1 = self._fixture(
5029            naming_convention={"uq": "uq_%(table_name)s_%(column_0N_name)s"}
5030        )
5031        uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3)
5032        eq_(uq.name, "uq_user_dataData2Data3")
5033
5034    def test_uq_allcols_merged_key(self):
5035        u1 = self._fixture(
5036            naming_convention={"uq": "uq_%(table_name)s_%(column_0N_key)s"}
5037        )
5038        uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3)
5039        eq_(uq.name, "uq_user_datadata2data3")
5040
5041    def test_uq_allcols_truncated_name(self):
5042        u1 = self._fixture(
5043            naming_convention={"uq": "uq_%(table_name)s_%(column_0N_name)s"}
5044        )
5045        uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3)
5046
5047        dialect = default.DefaultDialect()
5048        self.assert_compile(
5049            schema.AddConstraint(uq),
5050            'ALTER TABLE "user" ADD '
5051            'CONSTRAINT "uq_user_dataData2Data3" '
5052            'UNIQUE (data, "Data2", "Data3")',
5053            dialect=dialect,
5054        )
5055
5056        dialect.max_identifier_length = 15
5057        self.assert_compile(
5058            schema.AddConstraint(uq),
5059            'ALTER TABLE "user" ADD '
5060            'CONSTRAINT uq_user_2769 UNIQUE (data, "Data2", "Data3")',
5061            dialect=dialect,
5062        )
5063
5064    def test_fk_allcols_underscore_name(self):
5065        u1 = self._fixture(
5066            naming_convention={
5067                "fk": "fk_%(table_name)s_%(column_0_N_name)s_"
5068                "%(referred_table_name)s_%(referred_column_0_N_name)s"
5069            }
5070        )
5071
5072        m1 = u1.metadata
5073        a1 = Table(
5074            "address",
5075            m1,
5076            Column("id", Integer, primary_key=True),
5077            Column("UserData", String(30), key="user_data"),
5078            Column("UserData2", String(30), key="user_data2"),
5079            Column("UserData3", String(30), key="user_data3"),
5080        )
5081        fk = ForeignKeyConstraint(
5082            ["user_data", "user_data2", "user_data3"],
5083            ["user.data", "user.data2", "user.data3"],
5084        )
5085        a1.append_constraint(fk)
5086        self.assert_compile(
5087            schema.AddConstraint(fk),
5088            "ALTER TABLE address ADD CONSTRAINT "
5089            '"fk_address_UserData_UserData2_UserData3_user_data_Data2_Data3" '
5090            'FOREIGN KEY("UserData", "UserData2", "UserData3") '
5091            'REFERENCES "user" (data, "Data2", "Data3")',
5092            dialect=default.DefaultDialect(),
5093        )
5094
5095    def test_fk_allcols_merged_name(self):
5096        u1 = self._fixture(
5097            naming_convention={
5098                "fk": "fk_%(table_name)s_%(column_0N_name)s_"
5099                "%(referred_table_name)s_%(referred_column_0N_name)s"
5100            }
5101        )
5102
5103        m1 = u1.metadata
5104        a1 = Table(
5105            "address",
5106            m1,
5107            Column("id", Integer, primary_key=True),
5108            Column("UserData", String(30), key="user_data"),
5109            Column("UserData2", String(30), key="user_data2"),
5110            Column("UserData3", String(30), key="user_data3"),
5111        )
5112        fk = ForeignKeyConstraint(
5113            ["user_data", "user_data2", "user_data3"],
5114            ["user.data", "user.data2", "user.data3"],
5115        )
5116        a1.append_constraint(fk)
5117        self.assert_compile(
5118            schema.AddConstraint(fk),
5119            "ALTER TABLE address ADD CONSTRAINT "
5120            '"fk_address_UserDataUserData2UserData3_user_dataData2Data3" '
5121            'FOREIGN KEY("UserData", "UserData2", "UserData3") '
5122            'REFERENCES "user" (data, "Data2", "Data3")',
5123            dialect=default.DefaultDialect(),
5124        )
5125
5126    def test_fk_allcols_truncated_name(self):
5127        u1 = self._fixture(
5128            naming_convention={
5129                "fk": "fk_%(table_name)s_%(column_0N_name)s_"
5130                "%(referred_table_name)s_%(referred_column_0N_name)s"
5131            }
5132        )
5133
5134        m1 = u1.metadata
5135        a1 = Table(
5136            "address",
5137            m1,
5138            Column("id", Integer, primary_key=True),
5139            Column("UserData", String(30), key="user_data"),
5140            Column("UserData2", String(30), key="user_data2"),
5141            Column("UserData3", String(30), key="user_data3"),
5142        )
5143        fk = ForeignKeyConstraint(
5144            ["user_data", "user_data2", "user_data3"],
5145            ["user.data", "user.data2", "user.data3"],
5146        )
5147        a1.append_constraint(fk)
5148
5149        dialect = default.DefaultDialect()
5150        dialect.max_identifier_length = 15
5151        self.assert_compile(
5152            schema.AddConstraint(fk),
5153            "ALTER TABLE address ADD CONSTRAINT "
5154            "fk_addr_f9ff "
5155            'FOREIGN KEY("UserData", "UserData2", "UserData3") '
5156            'REFERENCES "user" (data, "Data2", "Data3")',
5157            dialect=dialect,
5158        )
5159
5160    def test_ix_allcols_truncation(self):
5161        u1 = self._fixture(
5162            naming_convention={"ix": "ix_%(table_name)s_%(column_0N_name)s"}
5163        )
5164        ix = Index(None, u1.c.data, u1.c.data2, u1.c.data3)
5165        dialect = default.DefaultDialect()
5166        dialect.max_identifier_length = 15
5167        self.assert_compile(
5168            schema.CreateIndex(ix),
5169            "CREATE INDEX ix_user_2de9 ON " '"user" (data, "Data2", "Data3")',
5170            dialect=dialect,
5171        )
5172
5173    def test_ix_name(self):
5174        u1 = self._fixture(
5175            naming_convention={"ix": "ix_%(table_name)s_%(column_0_name)s"}
5176        )
5177        ix = Index(None, u1.c.data)
5178        eq_(ix.name, "ix_user_data")
5179
5180    def test_ck_name_required(self):
5181        u1 = self._fixture(
5182            naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
5183        )
5184        ck = CheckConstraint(u1.c.data == "x", name="mycheck")
5185        eq_(ck.name, "ck_user_mycheck")
5186
5187        assert_raises_message(
5188            exc.InvalidRequestError,
5189            r"Naming convention including %\(constraint_name\)s token "
5190            "requires that constraint is explicitly named.",
5191            CheckConstraint,
5192            u1.c.data == "x",
5193        )
5194
5195    def test_ck_name_deferred_required(self):
5196        u1 = self._fixture(
5197            naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
5198        )
5199        ck = CheckConstraint(u1.c.data == "x", name=naming._NONE_NAME)
5200
5201        assert_raises_message(
5202            exc.InvalidRequestError,
5203            r"Naming convention including %\(constraint_name\)s token "
5204            "requires that constraint is explicitly named.",
5205            schema.AddConstraint(ck).compile,
5206        )
5207
5208    def test_column_attached_ck_name(self):
5209        m = MetaData(
5210            naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
5211        )
5212        ck = CheckConstraint("x > 5", name="x1")
5213        Table("t", m, Column("x", ck))
5214        eq_(ck.name, "ck_t_x1")
5215
5216    def test_table_attached_ck_name(self):
5217        m = MetaData(
5218            naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
5219        )
5220        ck = CheckConstraint("x > 5", name="x1")
5221        Table("t", m, Column("x", Integer), ck)
5222        eq_(ck.name, "ck_t_x1")
5223
5224    def test_uq_name_already_conv(self):
5225        m = MetaData(
5226            naming_convention={
5227                "uq": "uq_%(constraint_name)s_%(column_0_name)s"
5228            }
5229        )
5230
5231        t = Table("mytable", m)
5232        uq = UniqueConstraint(name=naming.conv("my_special_key"))
5233
5234        t.append_constraint(uq)
5235        eq_(uq.name, "my_special_key")
5236
5237    def test_fk_name_schema(self):
5238        u1 = self._fixture(
5239            naming_convention={
5240                "fk": "fk_%(table_name)s_%(column_0_name)s_"
5241                "%(referred_table_name)s_%(referred_column_0_name)s"
5242            },
5243            table_schema="foo",
5244        )
5245        m1 = u1.metadata
5246        a1 = Table(
5247            "address",
5248            m1,
5249            Column("id", Integer, primary_key=True),
5250            Column("user_id", Integer),
5251            Column("user_version_id", Integer),
5252        )
5253        fk = ForeignKeyConstraint(
5254            ["user_id", "user_version_id"], ["foo.user.id", "foo.user.version"]
5255        )
5256        a1.append_constraint(fk)
5257        eq_(fk.name, "fk_address_user_id_user_id")
5258
5259    def test_fk_attrs(self):
5260        u1 = self._fixture(
5261            naming_convention={
5262                "fk": "fk_%(table_name)s_%(column_0_name)s_"
5263                "%(referred_table_name)s_%(referred_column_0_name)s"
5264            }
5265        )
5266        m1 = u1.metadata
5267        a1 = Table(
5268            "address",
5269            m1,
5270            Column("id", Integer, primary_key=True),
5271            Column("user_id", Integer),
5272            Column("user_version_id", Integer),
5273        )
5274        fk = ForeignKeyConstraint(
5275            ["user_id", "user_version_id"], ["user.id", "user.version"]
5276        )
5277        a1.append_constraint(fk)
5278        eq_(fk.name, "fk_address_user_id_user_id")
5279
5280    def test_custom(self):
5281        def key_hash(const, table):
5282            return "HASH_%s" % table.name
5283
5284        u1 = self._fixture(
5285            naming_convention={
5286                "fk": "fk_%(table_name)s_%(key_hash)s",
5287                "key_hash": key_hash,
5288            }
5289        )
5290        m1 = u1.metadata
5291        a1 = Table(
5292            "address",
5293            m1,
5294            Column("id", Integer, primary_key=True),
5295            Column("user_id", Integer),
5296            Column("user_version_id", Integer),
5297        )
5298        fk = ForeignKeyConstraint(
5299            ["user_id", "user_version_id"], ["user.id", "user.version"]
5300        )
5301        a1.append_constraint(fk)
5302        eq_(fk.name, "fk_address_HASH_address")
5303
5304    def test_schematype_ck_name_boolean(self):
5305        m1 = MetaData(
5306            naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
5307        )
5308
5309        u1 = Table(
5310            "user",
5311            m1,
5312            Column("x", Boolean(name="foo", create_constraint=True)),
5313        )
5314
5315        self.assert_compile(
5316            schema.CreateTable(u1),
5317            'CREATE TABLE "user" ('
5318            "x BOOLEAN, "
5319            "CONSTRAINT ck_user_foo CHECK (x IN (0, 1))"
5320            ")",
5321        )
5322
5323        # test no side effects from first compile
5324        self.assert_compile(
5325            schema.CreateTable(u1),
5326            'CREATE TABLE "user" ('
5327            "x BOOLEAN, "
5328            "CONSTRAINT ck_user_foo CHECK (x IN (0, 1))"
5329            ")",
5330        )
5331
5332    def test_schematype_ck_name_boolean_not_on_name(self):
5333        m1 = MetaData(
5334            naming_convention={"ck": "ck_%(table_name)s_%(column_0_name)s"}
5335        )
5336
5337        u1 = Table("user", m1, Column("x", Boolean(create_constraint=True)))
5338        # constraint is not hit
5339        is_(
5340            [c for c in u1.constraints if isinstance(c, CheckConstraint)][
5341                0
5342            ].name,
5343            _NONE_NAME,
5344        )
5345        # but is hit at compile time
5346        self.assert_compile(
5347            schema.CreateTable(u1),
5348            'CREATE TABLE "user" ('
5349            "x BOOLEAN, "
5350            "CONSTRAINT ck_user_x CHECK (x IN (0, 1))"
5351            ")",
5352        )
5353
5354    def test_schematype_ck_name_enum(self):
5355        m1 = MetaData(
5356            naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
5357        )
5358
5359        u1 = Table(
5360            "user",
5361            m1,
5362            Column("x", Enum("a", "b", name="foo", create_constraint=True)),
5363        )
5364
5365        self.assert_compile(
5366            schema.CreateTable(u1),
5367            'CREATE TABLE "user" ('
5368            "x VARCHAR(1), "
5369            "CONSTRAINT ck_user_foo CHECK (x IN ('a', 'b'))"
5370            ")",
5371        )
5372
5373        # test no side effects from first compile
5374        self.assert_compile(
5375            schema.CreateTable(u1),
5376            'CREATE TABLE "user" ('
5377            "x VARCHAR(1), "
5378            "CONSTRAINT ck_user_foo CHECK (x IN ('a', 'b'))"
5379            ")",
5380        )
5381
5382    def test_schematype_ck_name_propagate_conv(self):
5383        m1 = MetaData(
5384            naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
5385        )
5386
5387        u1 = Table(
5388            "user",
5389            m1,
5390            Column(
5391                "x",
5392                Enum(
5393                    "a", "b", name=naming.conv("foo"), create_constraint=True
5394                ),
5395            ),
5396        )
5397        eq_(
5398            [c for c in u1.constraints if isinstance(c, CheckConstraint)][
5399                0
5400            ].name,
5401            "foo",
5402        )
5403        # but is hit at compile time
5404        self.assert_compile(
5405            schema.CreateTable(u1),
5406            'CREATE TABLE "user" ('
5407            "x VARCHAR(1), "
5408            "CONSTRAINT foo CHECK (x IN ('a', 'b'))"
5409            ")",
5410        )
5411
5412    def test_schematype_ck_name_boolean_no_name(self):
5413        m1 = MetaData(
5414            naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
5415        )
5416
5417        u1 = Table("user", m1, Column("x", Boolean(create_constraint=True)))
5418        # constraint gets special _defer_none_name
5419        is_(
5420            [c for c in u1.constraints if isinstance(c, CheckConstraint)][
5421                0
5422            ].name,
5423            _NONE_NAME,
5424        )
5425        # no issue with native boolean
5426        self.assert_compile(
5427            schema.CreateTable(u1),
5428            'CREATE TABLE "user" (' "x BOOLEAN" ")",
5429            dialect="postgresql",
5430        )
5431
5432        assert_raises_message(
5433            exc.InvalidRequestError,
5434            r"Naming convention including \%\(constraint_name\)s token "
5435            r"requires that constraint is explicitly named.",
5436            schema.CreateTable(u1).compile,
5437            dialect=default.DefaultDialect(),
5438        )
5439
5440    def test_schematype_no_ck_name_boolean_no_name(self):
5441        m1 = MetaData()  # no naming convention
5442
5443        u1 = Table("user", m1, Column("x", Boolean(create_constraint=True)))
5444        # constraint gets special _defer_none_name
5445        is_(
5446            [c for c in u1.constraints if isinstance(c, CheckConstraint)][
5447                0
5448            ].name,
5449            _NONE_NAME,
5450        )
5451
5452        self.assert_compile(
5453            schema.CreateTable(u1),
5454            'CREATE TABLE "user" (x BOOLEAN, CHECK (x IN (0, 1)))',
5455        )
5456
5457    def test_ck_constraint_redundant_event(self):
5458        u1 = self._fixture(
5459            naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
5460        )
5461
5462        ck1 = CheckConstraint(u1.c.version > 3, name="foo")
5463        u1.append_constraint(ck1)
5464        u1.append_constraint(ck1)
5465        u1.append_constraint(ck1)
5466
5467        eq_(ck1.name, "ck_user_foo")
5468
5469    def test_pickle_metadata(self):
5470        m = MetaData(naming_convention={"pk": "%(table_name)s_pk"})
5471
5472        m2 = pickle.loads(pickle.dumps(m))
5473
5474        eq_(m2.naming_convention, {"pk": "%(table_name)s_pk"})
5475
5476        t2a = Table("t2", m, Column("id", Integer, primary_key=True))
5477        t2b = Table("t2", m2, Column("id", Integer, primary_key=True))
5478
5479        eq_(t2a.primary_key.name, t2b.primary_key.name)
5480        eq_(t2b.primary_key.name, "t2_pk")
5481
5482    def test_expression_index(self):
5483        m = MetaData(naming_convention={"ix": "ix_%(column_0_label)s"})
5484        t = Table("t", m, Column("q", Integer), Column("p", Integer))
5485        ix = Index(None, t.c.q + 5)
5486        t.append_constraint(ix)
5487
5488        # huh.  pretty cool
5489        self.assert_compile(
5490            CreateIndex(ix), "CREATE INDEX ix_t_q ON t (q + 5)"
5491        )
5492
5493
5494class CopyDialectOptionsTest(fixtures.TestBase):
5495    @contextmanager
5496    def _fixture(self):
5497        from sqlalchemy.engine.default import DefaultDialect
5498
5499        class CopyDialectOptionsTestDialect(DefaultDialect):
5500            construct_arguments = [
5501                (Table, {"some_table_arg": None}),
5502                (Column, {"some_column_arg": None}),
5503                (Index, {"some_index_arg": None}),
5504                (PrimaryKeyConstraint, {"some_pk_arg": None}),
5505                (UniqueConstraint, {"some_uq_arg": None}),
5506            ]
5507
5508        def load(dialect_name):
5509            if dialect_name == "copydialectoptionstest":
5510                return CopyDialectOptionsTestDialect
5511            else:
5512                raise exc.NoSuchModuleError("no dialect %r" % dialect_name)
5513
5514        with mock.patch("sqlalchemy.dialects.registry.load", load):
5515            yield
5516
5517    @classmethod
5518    def check_dialect_options_(cls, t):
5519        eq_(
5520            t.dialect_kwargs["copydialectoptionstest_some_table_arg"],
5521            "a1",
5522        )
5523        eq_(
5524            t.c.foo.dialect_kwargs["copydialectoptionstest_some_column_arg"],
5525            "a2",
5526        )
5527        eq_(
5528            t.primary_key.dialect_kwargs["copydialectoptionstest_some_pk_arg"],
5529            "a3",
5530        )
5531        eq_(
5532            list(t.indexes)[0].dialect_kwargs[
5533                "copydialectoptionstest_some_index_arg"
5534            ],
5535            "a4",
5536        )
5537        eq_(
5538            list(c for c in t.constraints if isinstance(c, UniqueConstraint))[
5539                0
5540            ].dialect_kwargs["copydialectoptionstest_some_uq_arg"],
5541            "a5",
5542        )
5543
5544    def test_dialect_options_are_copied(self):
5545        with self._fixture():
5546            t1 = Table(
5547                "t",
5548                MetaData(),
5549                Column(
5550                    "foo",
5551                    Integer,
5552                    copydialectoptionstest_some_column_arg="a2",
5553                ),
5554                Column("bar", Integer),
5555                PrimaryKeyConstraint(
5556                    "foo", copydialectoptionstest_some_pk_arg="a3"
5557                ),
5558                UniqueConstraint(
5559                    "bar", copydialectoptionstest_some_uq_arg="a5"
5560                ),
5561                copydialectoptionstest_some_table_arg="a1",
5562            )
5563            Index(
5564                "idx",
5565                t1.c.foo,
5566                copydialectoptionstest_some_index_arg="a4",
5567            )
5568
5569            self.check_dialect_options_(t1)
5570
5571            m2 = MetaData()
5572            t2 = t1.to_metadata(m2)  # make a copy
5573            self.check_dialect_options_(t2)
5574