1from contextlib import contextmanager
2import pickle
3
4import sqlalchemy as tsa
5from sqlalchemy import ARRAY
6from sqlalchemy import bindparam
7from sqlalchemy import BLANK_SCHEMA
8from sqlalchemy import Boolean
9from sqlalchemy import CheckConstraint
10from sqlalchemy import Column
11from sqlalchemy import column
12from sqlalchemy import ColumnDefault
13from sqlalchemy import desc
14from sqlalchemy import Enum
15from sqlalchemy import event
16from sqlalchemy import exc
17from sqlalchemy import ForeignKey
18from sqlalchemy import ForeignKeyConstraint
19from sqlalchemy import func
20from sqlalchemy import Index
21from sqlalchemy import Integer
22from sqlalchemy import MetaData
23from sqlalchemy import PrimaryKeyConstraint
24from sqlalchemy import schema
25from sqlalchemy import Sequence
26from sqlalchemy import String
27from sqlalchemy import Table
28from sqlalchemy import table
29from sqlalchemy import testing
30from sqlalchemy import text
31from sqlalchemy import TypeDecorator
32from sqlalchemy import types as sqltypes
33from sqlalchemy import Unicode
34from sqlalchemy import UniqueConstraint
35from sqlalchemy import util
36from sqlalchemy.engine import default
37from sqlalchemy.schema import AddConstraint
38from sqlalchemy.schema import CreateIndex
39from sqlalchemy.schema import DefaultClause
40from sqlalchemy.schema import DropIndex
41from sqlalchemy.sql import naming
42from sqlalchemy.sql.elements import _NONE_NAME
43from sqlalchemy.sql.elements import literal_column
44from sqlalchemy.testing import assert_raises
45from sqlalchemy.testing import assert_raises_message
46from sqlalchemy.testing import AssertsCompiledSQL
47from sqlalchemy.testing import ComparesTables
48from sqlalchemy.testing import emits_warning
49from sqlalchemy.testing import eq_
50from sqlalchemy.testing import fixtures
51from sqlalchemy.testing import is_
52from sqlalchemy.testing import is_false
53from sqlalchemy.testing import is_true
54from sqlalchemy.testing import mock
55
56
57class MetaDataTest(fixtures.TestBase, ComparesTables):
58    def test_metadata_contains(self):
59        metadata = MetaData()
60        t1 = Table("t1", metadata, Column("x", Integer))
61        t2 = Table("t2", metadata, Column("x", Integer), schema="foo")
62        t3 = Table("t2", MetaData(), Column("x", Integer))
63        t4 = Table("t1", MetaData(), Column("x", Integer), schema="foo")
64
65        assert "t1" in metadata
66        assert "foo.t2" in metadata
67        assert "t2" not in metadata
68        assert "foo.t1" not in metadata
69        assert t1 in metadata
70        assert t2 in metadata
71        assert t3 not in metadata
72        assert t4 not in metadata
73
74    def test_uninitialized_column_copy(self):
75        for col in [
76            Column("foo", String(), nullable=False),
77            Column("baz", String(), unique=True),
78            Column(Integer(), primary_key=True),
79            Column(
80                "bar",
81                Integer(),
82                Sequence("foo_seq"),
83                primary_key=True,
84                key="bar",
85            ),
86            Column(Integer(), ForeignKey("bat.blah"), doc="this is a col"),
87            Column(
88                "bar",
89                Integer(),
90                ForeignKey("bat.blah"),
91                primary_key=True,
92                key="bar",
93            ),
94            Column("bar", Integer(), info={"foo": "bar"}),
95        ]:
96            c2 = col.copy()
97            for attr in (
98                "name",
99                "type",
100                "nullable",
101                "primary_key",
102                "key",
103                "unique",
104                "info",
105                "doc",
106            ):
107                eq_(getattr(col, attr), getattr(c2, attr))
108            eq_(len(col.foreign_keys), len(c2.foreign_keys))
109            if col.default:
110                eq_(c2.default.name, "foo_seq")
111            for a1, a2 in zip(col.foreign_keys, c2.foreign_keys):
112                assert a1 is not a2
113                eq_(a2._colspec, "bat.blah")
114
115    def test_col_subclass_copy(self):
116        class MyColumn(schema.Column):
117            def __init__(self, *args, **kw):
118                self.widget = kw.pop("widget", None)
119                super(MyColumn, self).__init__(*args, **kw)
120
121            def copy(self, *arg, **kw):
122                c = super(MyColumn, self).copy(*arg, **kw)
123                c.widget = self.widget
124                return c
125
126        c1 = MyColumn("foo", Integer, widget="x")
127        c2 = c1.copy()
128        assert isinstance(c2, MyColumn)
129        eq_(c2.widget, "x")
130
131    def test_uninitialized_column_copy_events(self):
132        msgs = []
133
134        def write(c, t):
135            msgs.append("attach %s.%s" % (t.name, c.name))
136
137        c1 = Column("foo", String())
138        m = MetaData()
139        for i in range(3):
140            cx = c1.copy()
141            # as of 0.7, these events no longer copy.  its expected
142            # that listeners will be re-established from the
143            # natural construction of things.
144            cx._on_table_attach(write)
145            Table("foo%d" % i, m, cx)
146        eq_(msgs, ["attach foo0.foo", "attach foo1.foo", "attach foo2.foo"])
147
148    def test_schema_collection_add(self):
149        metadata = MetaData()
150
151        Table("t1", metadata, Column("x", Integer), schema="foo")
152        Table("t2", metadata, Column("x", Integer), schema="bar")
153        Table("t3", metadata, Column("x", Integer))
154
155        eq_(metadata._schemas, set(["foo", "bar"]))
156        eq_(len(metadata.tables), 3)
157
158    def test_schema_collection_remove(self):
159        metadata = MetaData()
160
161        t1 = Table("t1", metadata, Column("x", Integer), schema="foo")
162        Table("t2", metadata, Column("x", Integer), schema="bar")
163        t3 = Table("t3", metadata, Column("x", Integer), schema="bar")
164
165        metadata.remove(t3)
166        eq_(metadata._schemas, set(["foo", "bar"]))
167        eq_(len(metadata.tables), 2)
168
169        metadata.remove(t1)
170        eq_(metadata._schemas, set(["bar"]))
171        eq_(len(metadata.tables), 1)
172
173    def test_schema_collection_remove_all(self):
174        metadata = MetaData()
175
176        Table("t1", metadata, Column("x", Integer), schema="foo")
177        Table("t2", metadata, Column("x", Integer), schema="bar")
178
179        metadata.clear()
180        eq_(metadata._schemas, set())
181        eq_(len(metadata.tables), 0)
182
183    def test_metadata_tables_immutable(self):
184        metadata = MetaData()
185
186        Table("t1", metadata, Column("x", Integer))
187        assert "t1" in metadata.tables
188
189        assert_raises(TypeError, lambda: metadata.tables.pop("t1"))
190
191    @testing.provide_metadata
192    def test_dupe_tables(self):
193        metadata = self.metadata
194        Table(
195            "table1",
196            metadata,
197            Column("col1", Integer, primary_key=True),
198            Column("col2", String(20)),
199        )
200
201        metadata.create_all()
202        Table("table1", metadata, autoload=True)
203
204        def go():
205            Table(
206                "table1",
207                metadata,
208                Column("col1", Integer, primary_key=True),
209                Column("col2", String(20)),
210            )
211
212        assert_raises_message(
213            tsa.exc.InvalidRequestError,
214            "Table 'table1' is already defined for this "
215            "MetaData instance.  Specify 'extend_existing=True' "
216            "to redefine options and columns on an existing "
217            "Table object.",
218            go,
219        )
220
221    def test_fk_copy(self):
222        c1 = Column("foo", Integer)
223        c2 = Column("bar", Integer)
224        m = MetaData()
225        t1 = Table("t", m, c1, c2)
226
227        kw = dict(
228            onupdate="X",
229            ondelete="Y",
230            use_alter=True,
231            name="f1",
232            deferrable="Z",
233            initially="Q",
234            link_to_name=True,
235        )
236
237        fk1 = ForeignKey(c1, **kw)
238        fk2 = ForeignKeyConstraint((c1,), (c2,), **kw)
239
240        t1.append_constraint(fk2)
241        fk1c = fk1.copy()
242        fk2c = fk2.copy()
243
244        for k in kw:
245            eq_(getattr(fk1c, k), kw[k])
246            eq_(getattr(fk2c, k), kw[k])
247
248    def test_check_constraint_copy(self):
249        def r(x):
250            return x
251
252        c = CheckConstraint(
253            "foo bar",
254            name="name",
255            initially=True,
256            deferrable=True,
257            _create_rule=r,
258        )
259        c2 = c.copy()
260        eq_(c2.name, "name")
261        eq_(str(c2.sqltext), "foo bar")
262        eq_(c2.initially, True)
263        eq_(c2.deferrable, True)
264        assert c2._create_rule is r
265
266    def test_col_replace_w_constraint(self):
267        m = MetaData()
268        a = Table("a", m, Column("id", Integer, primary_key=True))
269
270        aid = Column("a_id", ForeignKey("a.id"))
271        b = Table("b", m, aid)
272        b.append_column(aid)
273
274        assert b.c.a_id.references(a.c.id)
275        eq_(len(b.constraints), 2)
276
277    def test_fk_construct(self):
278        c1 = Column("foo", Integer)
279        c2 = Column("bar", Integer)
280        m = MetaData()
281        t1 = Table("t", m, c1, c2)
282        fk1 = ForeignKeyConstraint(("foo",), ("bar",), table=t1)
283        assert fk1 in t1.constraints
284
285    def test_fk_constraint_col_collection_w_table(self):
286        c1 = Column("foo", Integer)
287        c2 = Column("bar", Integer)
288        m = MetaData()
289        t1 = Table("t", m, c1, c2)
290        fk1 = ForeignKeyConstraint(("foo",), ("bar",), table=t1)
291        eq_(dict(fk1.columns), {"foo": c1})
292
293    def test_fk_constraint_col_collection_no_table(self):
294        fk1 = ForeignKeyConstraint(("foo", "bat"), ("bar", "hoho"))
295        eq_(dict(fk1.columns), {})
296        eq_(fk1.column_keys, ["foo", "bat"])
297        eq_(fk1._col_description, "foo, bat")
298        eq_(fk1._elements, {"foo": fk1.elements[0], "bat": fk1.elements[1]})
299
300    def test_fk_constraint_col_collection_no_table_real_cols(self):
301        c1 = Column("foo", Integer)
302        c2 = Column("bar", Integer)
303        fk1 = ForeignKeyConstraint((c1,), (c2,))
304        eq_(dict(fk1.columns), {})
305        eq_(fk1.column_keys, ["foo"])
306        eq_(fk1._col_description, "foo")
307        eq_(fk1._elements, {"foo": fk1.elements[0]})
308
309    def test_fk_constraint_col_collection_added_to_table(self):
310        c1 = Column("foo", Integer)
311        m = MetaData()
312        fk1 = ForeignKeyConstraint(("foo",), ("bar",))
313        Table("t", m, c1, fk1)
314        eq_(dict(fk1.columns), {"foo": c1})
315        eq_(fk1._elements, {"foo": fk1.elements[0]})
316
317    def test_fk_constraint_col_collection_via_fk(self):
318        fk = ForeignKey("bar")
319        c1 = Column("foo", Integer, fk)
320        m = MetaData()
321        t1 = Table("t", m, c1)
322        fk1 = fk.constraint
323        eq_(fk1.column_keys, ["foo"])
324        assert fk1 in t1.constraints
325        eq_(fk1.column_keys, ["foo"])
326        eq_(dict(fk1.columns), {"foo": c1})
327        eq_(fk1._elements, {"foo": fk})
328
329    def test_fk_no_such_parent_col_error(self):
330        meta = MetaData()
331        a = Table("a", meta, Column("a", Integer))
332        Table("b", meta, Column("b", Integer))
333
334        def go():
335            a.append_constraint(ForeignKeyConstraint(["x"], ["b.b"]))
336
337        assert_raises_message(
338            exc.ArgumentError,
339            "Can't create ForeignKeyConstraint on "
340            "table 'a': no column named 'x' is present.",
341            go,
342        )
343
344    def test_fk_given_non_col(self):
345        not_a_col = bindparam("x")
346        assert_raises_message(
347            exc.ArgumentError,
348            "String, Column, or Column-bound argument expected, got Bind",
349            ForeignKey,
350            not_a_col,
351        )
352
353    def test_fk_given_non_col_clauseelem(self):
354        class Foo(object):
355            def __clause_element__(self):
356                return bindparam("x")
357
358        assert_raises_message(
359            exc.ArgumentError,
360            "String, Column, or Column-bound argument expected, got Bind",
361            ForeignKey,
362            Foo(),
363        )
364
365    def test_fk_given_col_non_table(self):
366        t = Table("t", MetaData(), Column("x", Integer))
367        xa = t.alias().c.x
368        assert_raises_message(
369            exc.ArgumentError,
370            "ForeignKey received Column not bound to a Table, got: .*Alias",
371            ForeignKey,
372            xa,
373        )
374
375    def test_fk_given_col_non_table_clauseelem(self):
376        t = Table("t", MetaData(), Column("x", Integer))
377
378        class Foo(object):
379            def __clause_element__(self):
380                return t.alias().c.x
381
382        assert_raises_message(
383            exc.ArgumentError,
384            "ForeignKey received Column not bound to a Table, got: .*Alias",
385            ForeignKey,
386            Foo(),
387        )
388
389    def test_fk_no_such_target_col_error_upfront(self):
390        meta = MetaData()
391        a = Table("a", meta, Column("a", Integer))
392        Table("b", meta, Column("b", Integer))
393
394        a.append_constraint(ForeignKeyConstraint(["a"], ["b.x"]))
395
396        assert_raises_message(
397            exc.NoReferencedColumnError,
398            "Could not initialize target column for ForeignKey 'b.x' on "
399            "table 'a': table 'b' has no column named 'x'",
400            getattr,
401            list(a.foreign_keys)[0],
402            "column",
403        )
404
405    def test_fk_no_such_target_col_error_delayed(self):
406        meta = MetaData()
407        a = Table("a", meta, Column("a", Integer))
408        a.append_constraint(ForeignKeyConstraint(["a"], ["b.x"]))
409
410        Table("b", meta, Column("b", Integer))
411
412        assert_raises_message(
413            exc.NoReferencedColumnError,
414            "Could not initialize target column for ForeignKey 'b.x' on "
415            "table 'a': table 'b' has no column named 'x'",
416            getattr,
417            list(a.foreign_keys)[0],
418            "column",
419        )
420
421    def test_fk_mismatched_local_remote_cols(self):
422
423        assert_raises_message(
424            exc.ArgumentError,
425            "ForeignKeyConstraint number of constrained columns must "
426            "match the number of referenced columns.",
427            ForeignKeyConstraint,
428            ["a"],
429            ["b.a", "b.b"],
430        )
431
432        assert_raises_message(
433            exc.ArgumentError,
434            "ForeignKeyConstraint number of constrained columns "
435            "must match the number of referenced columns.",
436            ForeignKeyConstraint,
437            ["a", "b"],
438            ["b.a"],
439        )
440
441        assert_raises_message(
442            exc.ArgumentError,
443            "ForeignKeyConstraint with duplicate source column "
444            "references are not supported.",
445            ForeignKeyConstraint,
446            ["a", "a"],
447            ["b.a", "b.b"],
448        )
449
450    def test_pickle_metadata_sequence_restated(self):
451        m1 = MetaData()
452        Table(
453            "a",
454            m1,
455            Column("id", Integer, primary_key=True),
456            Column("x", Integer, Sequence("x_seq")),
457        )
458
459        m2 = pickle.loads(pickle.dumps(m1))
460
461        s2 = Sequence("x_seq")
462        t2 = Table(
463            "a",
464            m2,
465            Column("id", Integer, primary_key=True),
466            Column("x", Integer, s2),
467            extend_existing=True,
468        )
469
470        assert m2._sequences["x_seq"] is t2.c.x.default
471        assert m2._sequences["x_seq"] is s2
472
473    def test_sequence_restated_replaced(self):
474        """Test restatement of Sequence replaces."""
475
476        m1 = MetaData()
477        s1 = Sequence("x_seq")
478        t = Table("a", m1, Column("x", Integer, s1))
479        assert m1._sequences["x_seq"] is s1
480
481        s2 = Sequence("x_seq")
482        Table("a", m1, Column("x", Integer, s2), extend_existing=True)
483        assert t.c.x.default is s2
484        assert m1._sequences["x_seq"] is s2
485
486    def test_sequence_attach_to_table(self):
487        m1 = MetaData()
488        s1 = Sequence("s")
489        Table("a", m1, Column("x", Integer, s1))
490        assert s1.metadata is m1
491
492    def test_sequence_attach_to_existing_table(self):
493        m1 = MetaData()
494        s1 = Sequence("s")
495        t = Table("a", m1, Column("x", Integer))
496        t.c.x._init_items(s1)
497        assert s1.metadata is m1
498
499    def test_pickle_metadata_sequence_implicit(self):
500        m1 = MetaData()
501        Table(
502            "a",
503            m1,
504            Column("id", Integer, primary_key=True),
505            Column("x", Integer, Sequence("x_seq")),
506        )
507
508        m2 = pickle.loads(pickle.dumps(m1))
509
510        t2 = Table("a", m2, extend_existing=True)
511
512        eq_(m2._sequences, {"x_seq": t2.c.x.default})
513
514    def test_pickle_metadata_schema(self):
515        m1 = MetaData()
516        Table(
517            "a",
518            m1,
519            Column("id", Integer, primary_key=True),
520            Column("x", Integer, Sequence("x_seq")),
521            schema="y",
522        )
523
524        m2 = pickle.loads(pickle.dumps(m1))
525
526        Table("a", m2, schema="y", extend_existing=True)
527
528        eq_(m2._schemas, m1._schemas)
529
530    def test_metadata_schema_arg(self):
531        m1 = MetaData(schema="sch1")
532        m2 = MetaData(schema="sch1", quote_schema=True)
533        m3 = MetaData(schema="sch1", quote_schema=False)
534        m4 = MetaData()
535
536        for (
537            i,
538            (
539                name,
540                metadata,
541                schema_,
542                quote_schema,
543                exp_schema,
544                exp_quote_schema,
545            ),
546        ) in enumerate(
547            [
548                ("t1", m1, None, None, "sch1", None),
549                ("t2", m1, "sch2", None, "sch2", None),
550                ("t3", m1, "sch2", True, "sch2", True),
551                ("t4", m1, "sch1", None, "sch1", None),
552                ("t5", m1, BLANK_SCHEMA, None, None, None),
553                ("t1", m2, None, None, "sch1", True),
554                ("t2", m2, "sch2", None, "sch2", None),
555                ("t3", m2, "sch2", True, "sch2", True),
556                ("t4", m2, "sch1", None, "sch1", None),
557                ("t1", m3, None, None, "sch1", False),
558                ("t2", m3, "sch2", None, "sch2", None),
559                ("t3", m3, "sch2", True, "sch2", True),
560                ("t4", m3, "sch1", None, "sch1", None),
561                ("t1", m4, None, None, None, None),
562                ("t2", m4, "sch2", None, "sch2", None),
563                ("t3", m4, "sch2", True, "sch2", True),
564                ("t4", m4, "sch1", None, "sch1", None),
565                ("t5", m4, BLANK_SCHEMA, None, None, None),
566            ]
567        ):
568            kw = {}
569            if schema_ is not None:
570                kw["schema"] = schema_
571            if quote_schema is not None:
572                kw["quote_schema"] = quote_schema
573            t = Table(name, metadata, **kw)
574            eq_(t.schema, exp_schema, "test %d, table schema" % i)
575            eq_(
576                t.schema.quote if t.schema is not None else None,
577                exp_quote_schema,
578                "test %d, table quote_schema" % i,
579            )
580            seq = Sequence(name, metadata=metadata, **kw)
581            eq_(seq.schema, exp_schema, "test %d, seq schema" % i)
582            eq_(
583                seq.schema.quote if seq.schema is not None else None,
584                exp_quote_schema,
585                "test %d, seq quote_schema" % i,
586            )
587
588    def test_manual_dependencies(self):
589        meta = MetaData()
590        a = Table("a", meta, Column("foo", Integer))
591        b = Table("b", meta, Column("foo", Integer))
592        c = Table("c", meta, Column("foo", Integer))
593        d = Table("d", meta, Column("foo", Integer))
594        e = Table("e", meta, Column("foo", Integer))
595
596        e.add_is_dependent_on(c)
597        a.add_is_dependent_on(b)
598        b.add_is_dependent_on(d)
599        e.add_is_dependent_on(b)
600        c.add_is_dependent_on(a)
601        eq_(meta.sorted_tables, [d, b, a, c, e])
602
603    def test_deterministic_order(self):
604        meta = MetaData()
605        a = Table("a", meta, Column("foo", Integer))
606        b = Table("b", meta, Column("foo", Integer))
607        c = Table("c", meta, Column("foo", Integer))
608        d = Table("d", meta, Column("foo", Integer))
609        e = Table("e", meta, Column("foo", Integer))
610
611        e.add_is_dependent_on(c)
612        a.add_is_dependent_on(b)
613        eq_(meta.sorted_tables, [b, c, d, a, e])
614
615    def test_fks_deterministic_order(self):
616        meta = MetaData()
617        a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo")))
618        b = Table("b", meta, Column("foo", Integer))
619        c = Table("c", meta, Column("foo", Integer))
620        d = Table("d", meta, Column("foo", Integer))
621        e = Table("e", meta, Column("foo", Integer, ForeignKey("c.foo")))
622
623        eq_(meta.sorted_tables, [b, c, d, a, e])
624
625    def test_cycles_fks_warning_one(self):
626        meta = MetaData()
627        a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo")))
628        b = Table("b", meta, Column("foo", Integer, ForeignKey("d.foo")))
629        c = Table("c", meta, Column("foo", Integer, ForeignKey("b.foo")))
630        d = Table("d", meta, Column("foo", Integer, ForeignKey("c.foo")))
631        e = Table("e", meta, Column("foo", Integer))
632
633        with testing.expect_warnings(
634            "Cannot correctly sort tables; there are unresolvable cycles "
635            'between tables "b, c, d", which is usually caused by mutually '
636            "dependent foreign key constraints.  "
637            "Foreign key constraints involving these tables will not be "
638            "considered"
639        ):
640            eq_(meta.sorted_tables, [b, c, d, e, a])
641
642    def test_cycles_fks_warning_two(self):
643        meta = MetaData()
644        a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo")))
645        b = Table("b", meta, Column("foo", Integer, ForeignKey("a.foo")))
646        c = Table("c", meta, Column("foo", Integer, ForeignKey("e.foo")))
647        d = Table("d", meta, Column("foo", Integer))
648        e = Table("e", meta, Column("foo", Integer, ForeignKey("d.foo")))
649
650        with testing.expect_warnings(
651            "Cannot correctly sort tables; there are unresolvable cycles "
652            'between tables "a, b", which is usually caused by mutually '
653            "dependent foreign key constraints.  "
654            "Foreign key constraints involving these tables will not be "
655            "considered"
656        ):
657            eq_(meta.sorted_tables, [a, b, d, e, c])
658
659    def test_cycles_fks_fks_delivered_separately(self):
660        meta = MetaData()
661        a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo")))
662        b = Table("b", meta, Column("foo", Integer, ForeignKey("a.foo")))
663        c = Table("c", meta, Column("foo", Integer, ForeignKey("e.foo")))
664        d = Table("d", meta, Column("foo", Integer))
665        e = Table("e", meta, Column("foo", Integer, ForeignKey("d.foo")))
666
667        results = schema.sort_tables_and_constraints(
668            sorted(meta.tables.values(), key=lambda t: t.key)
669        )
670        results[-1] = (None, set(results[-1][-1]))
671        eq_(
672            results,
673            [
674                (a, set()),
675                (b, set()),
676                (d, {fk.constraint for fk in d.foreign_keys}),
677                (e, {fk.constraint for fk in e.foreign_keys}),
678                (c, {fk.constraint for fk in c.foreign_keys}),
679                (
680                    None,
681                    {fk.constraint for fk in a.foreign_keys}.union(
682                        fk.constraint for fk in b.foreign_keys
683                    ),
684                ),
685            ],
686        )
687
688    def test_cycles_fks_usealter(self):
689        meta = MetaData()
690        a = Table("a", meta, Column("foo", Integer, ForeignKey("b.foo")))
691        b = Table(
692            "b",
693            meta,
694            Column("foo", Integer, ForeignKey("d.foo", use_alter=True)),
695        )
696        c = Table("c", meta, Column("foo", Integer, ForeignKey("b.foo")))
697        d = Table("d", meta, Column("foo", Integer, ForeignKey("c.foo")))
698        e = Table("e", meta, Column("foo", Integer))
699
700        eq_(meta.sorted_tables, [b, e, a, c, d])
701
702    def test_nonexistent(self):
703        assert_raises(
704            tsa.exc.NoSuchTableError,
705            Table,
706            "fake_table",
707            MetaData(testing.db),
708            autoload=True,
709        )
710
711    def test_assorted_repr(self):
712        t1 = Table("foo", MetaData(), Column("x", Integer))
713        i1 = Index("bar", t1.c.x)
714        ck = schema.CheckConstraint("x > y", name="someconstraint")
715
716        for const, exp in (
717            (Sequence("my_seq"), "Sequence('my_seq')"),
718            (Sequence("my_seq", start=5), "Sequence('my_seq', start=5)"),
719            (Column("foo", Integer), "Column('foo', Integer(), table=None)"),
720            (
721                Column(
722                    "foo",
723                    Integer,
724                    primary_key=True,
725                    nullable=False,
726                    onupdate=1,
727                    default=42,
728                    server_default="42",
729                    comment="foo",
730                ),
731                "Column('foo', Integer(), table=None, primary_key=True, "
732                "nullable=False, onupdate=%s, default=%s, server_default=%s, "
733                "comment='foo')"
734                % (
735                    ColumnDefault(1),
736                    ColumnDefault(42),
737                    DefaultClause("42"),
738                ),
739            ),
740            (
741                Table("bar", MetaData(), Column("x", String)),
742                "Table('bar', MetaData(bind=None), "
743                "Column('x', String(), table=<bar>), schema=None)",
744            ),
745            (
746                schema.DefaultGenerator(for_update=True),
747                "DefaultGenerator(for_update=True)",
748            ),
749            (schema.Index("bar", "c"), "Index('bar', 'c')"),
750            (i1, "Index('bar', Column('x', Integer(), table=<foo>))"),
751            (schema.FetchedValue(), "FetchedValue()"),
752            (
753                ck,
754                "CheckConstraint("
755                "%s"
756                ", name='someconstraint')" % repr(ck.sqltext),
757            ),
758            (ColumnDefault(("foo", "bar")), "ColumnDefault(('foo', 'bar'))"),
759        ):
760            eq_(repr(const), exp)
761
762
763class ToMetaDataTest(fixtures.TestBase, AssertsCompiledSQL, ComparesTables):
764    @testing.requires.check_constraints
765    def test_copy(self):
766        # TODO: modernize this test
767
768        from sqlalchemy.testing.schema import Table
769
770        meta = MetaData()
771
772        table = Table(
773            "mytable",
774            meta,
775            Column("myid", Integer, Sequence("foo_id_seq"), primary_key=True),
776            Column("name", String(40), nullable=True),
777            Column(
778                "foo",
779                String(40),
780                nullable=False,
781                server_default="x",
782                server_onupdate="q",
783            ),
784            Column(
785                "bar", String(40), nullable=False, default="y", onupdate="z"
786            ),
787            Column(
788                "description", String(30), CheckConstraint("description='hi'")
789            ),
790            UniqueConstraint("name"),
791            test_needs_fk=True,
792        )
793
794        table2 = Table(
795            "othertable",
796            meta,
797            Column("id", Integer, Sequence("foo_seq"), primary_key=True),
798            Column("myid", Integer, ForeignKey("mytable.myid")),
799            test_needs_fk=True,
800        )
801
802        table3 = Table(
803            "has_comments",
804            meta,
805            Column("foo", Integer, comment="some column"),
806            comment="table comment",
807        )
808
809        def test_to_metadata():
810            meta2 = MetaData()
811            table_c = table.tometadata(meta2)
812            table2_c = table2.tometadata(meta2)
813            table3_c = table3.tometadata(meta2)
814            return (table_c, table2_c, table3_c)
815
816        def test_pickle():
817            meta.bind = testing.db
818            meta2 = pickle.loads(pickle.dumps(meta))
819            assert meta2.bind is None
820            pickle.loads(pickle.dumps(meta2))
821            return (
822                meta2.tables["mytable"],
823                meta2.tables["othertable"],
824                meta2.tables["has_comments"],
825            )
826
827        def test_pickle_via_reflect():
828            # this is the most common use case, pickling the results of a
829            # database reflection
830            meta2 = MetaData(bind=testing.db)
831            t1 = Table("mytable", meta2, autoload=True)
832            Table("othertable", meta2, autoload=True)
833            Table("has_comments", meta2, autoload=True)
834            meta3 = pickle.loads(pickle.dumps(meta2))
835            assert meta3.bind is None
836            assert meta3.tables["mytable"] is not t1
837
838            return (
839                meta3.tables["mytable"],
840                meta3.tables["othertable"],
841                meta3.tables["has_comments"],
842            )
843
844        meta.create_all(testing.db)
845        try:
846            for test, has_constraints, reflect in (
847                (test_to_metadata, True, False),
848                (test_pickle, True, False),
849                (test_pickle_via_reflect, False, True),
850            ):
851                table_c, table2_c, table3_c = test()
852                self.assert_tables_equal(table, table_c)
853                self.assert_tables_equal(table2, table2_c)
854                assert table is not table_c
855                assert table.primary_key is not table_c.primary_key
856                assert (
857                    list(table2_c.c.myid.foreign_keys)[0].column
858                    is table_c.c.myid
859                )
860                assert (
861                    list(table2_c.c.myid.foreign_keys)[0].column
862                    is not table.c.myid
863                )
864                assert "x" in str(table_c.c.foo.server_default.arg)
865                if not reflect:
866                    assert isinstance(table_c.c.myid.default, Sequence)
867                    assert str(table_c.c.foo.server_onupdate.arg) == "q"
868                    assert str(table_c.c.bar.default.arg) == "y"
869                    assert (
870                        getattr(
871                            table_c.c.bar.onupdate.arg,
872                            "arg",
873                            table_c.c.bar.onupdate.arg,
874                        )
875                        == "z"
876                    )
877                    assert isinstance(table2_c.c.id.default, Sequence)
878
879                # constraints don't get reflected for any dialect right
880                # now
881
882                if has_constraints:
883                    for c in table_c.c.description.constraints:
884                        if isinstance(c, CheckConstraint):
885                            break
886                    else:
887                        assert False
888                    assert str(c.sqltext) == "description='hi'"
889                    for c in table_c.constraints:
890                        if isinstance(c, UniqueConstraint):
891                            break
892                    else:
893                        assert False
894                    assert c.columns.contains_column(table_c.c.name)
895                    assert not c.columns.contains_column(table.c.name)
896
897                if testing.requires.comment_reflection.enabled:
898                    eq_(table3_c.comment, "table comment")
899                    eq_(table3_c.c.foo.comment, "some column")
900
901        finally:
902            meta.drop_all(testing.db)
903
904    def test_col_key_fk_parent(self):
905        # test #2643
906        m1 = MetaData()
907        a = Table("a", m1, Column("x", Integer))
908        b = Table("b", m1, Column("x", Integer, ForeignKey("a.x"), key="y"))
909        assert b.c.y.references(a.c.x)
910
911        m2 = MetaData()
912        b2 = b.tometadata(m2)
913        a2 = a.tometadata(m2)
914        assert b2.c.y.references(a2.c.x)
915
916    def test_column_collection_constraint_w_ad_hoc_columns(self):
917        """Test ColumnCollectionConstraint that has columns that aren't
918        part of the Table.
919
920        """
921        meta = MetaData()
922
923        uq1 = UniqueConstraint(literal_column("some_name"))
924        cc1 = CheckConstraint(literal_column("some_name") > 5)
925        table = Table(
926            "mytable",
927            meta,
928            Column("myid", Integer, primary_key=True),
929            Column("name", String(40), nullable=True),
930            uq1,
931            cc1,
932        )
933
934        self.assert_compile(
935            schema.AddConstraint(uq1),
936            "ALTER TABLE mytable ADD UNIQUE (some_name)",
937            dialect="default",
938        )
939        self.assert_compile(
940            schema.AddConstraint(cc1),
941            "ALTER TABLE mytable ADD CHECK (some_name > 5)",
942            dialect="default",
943        )
944        meta2 = MetaData()
945        table2 = table.tometadata(meta2)
946        uq2 = [
947            c for c in table2.constraints if isinstance(c, UniqueConstraint)
948        ][0]
949        cc2 = [
950            c for c in table2.constraints if isinstance(c, CheckConstraint)
951        ][0]
952        self.assert_compile(
953            schema.AddConstraint(uq2),
954            "ALTER TABLE mytable ADD UNIQUE (some_name)",
955            dialect="default",
956        )
957        self.assert_compile(
958            schema.AddConstraint(cc2),
959            "ALTER TABLE mytable ADD CHECK (some_name > 5)",
960            dialect="default",
961        )
962
963    def test_change_schema(self):
964        meta = MetaData()
965
966        table = Table(
967            "mytable",
968            meta,
969            Column("myid", Integer, primary_key=True),
970            Column("name", String(40), nullable=True),
971            Column(
972                "description", String(30), CheckConstraint("description='hi'")
973            ),
974            UniqueConstraint("name"),
975        )
976
977        table2 = Table(
978            "othertable",
979            meta,
980            Column("id", Integer, primary_key=True),
981            Column("myid", Integer, ForeignKey("mytable.myid")),
982        )
983
984        meta2 = MetaData()
985        table_c = table.tometadata(meta2, schema="someschema")
986        table2_c = table2.tometadata(meta2, schema="someschema")
987
988        eq_(
989            str(table_c.join(table2_c).onclause),
990            str(table_c.c.myid == table2_c.c.myid),
991        )
992        eq_(
993            str(table_c.join(table2_c).onclause),
994            "someschema.mytable.myid = someschema.othertable.myid",
995        )
996
997    def test_retain_table_schema(self):
998        meta = MetaData()
999
1000        table = Table(
1001            "mytable",
1002            meta,
1003            Column("myid", Integer, primary_key=True),
1004            Column("name", String(40), nullable=True),
1005            Column(
1006                "description", String(30), CheckConstraint("description='hi'")
1007            ),
1008            UniqueConstraint("name"),
1009            schema="myschema",
1010        )
1011
1012        table2 = Table(
1013            "othertable",
1014            meta,
1015            Column("id", Integer, primary_key=True),
1016            Column("myid", Integer, ForeignKey("myschema.mytable.myid")),
1017            schema="myschema",
1018        )
1019
1020        meta2 = MetaData()
1021        table_c = table.tometadata(meta2)
1022        table2_c = table2.tometadata(meta2)
1023
1024        eq_(
1025            str(table_c.join(table2_c).onclause),
1026            str(table_c.c.myid == table2_c.c.myid),
1027        )
1028        eq_(
1029            str(table_c.join(table2_c).onclause),
1030            "myschema.mytable.myid = myschema.othertable.myid",
1031        )
1032
1033    def test_change_name_retain_metadata(self):
1034        meta = MetaData()
1035
1036        table = Table(
1037            "mytable",
1038            meta,
1039            Column("myid", Integer, primary_key=True),
1040            Column("name", String(40), nullable=True),
1041            Column(
1042                "description", String(30), CheckConstraint("description='hi'")
1043            ),
1044            UniqueConstraint("name"),
1045            schema="myschema",
1046        )
1047
1048        table2 = table.tometadata(table.metadata, name="newtable")
1049        table3 = table.tometadata(
1050            table.metadata, schema="newschema", name="newtable"
1051        )
1052
1053        assert table.metadata is table2.metadata
1054        assert table.metadata is table3.metadata
1055        eq_(
1056            (table.name, table2.name, table3.name),
1057            ("mytable", "newtable", "newtable"),
1058        )
1059        eq_(
1060            (table.key, table2.key, table3.key),
1061            ("myschema.mytable", "myschema.newtable", "newschema.newtable"),
1062        )
1063
1064    def test_change_name_change_metadata(self):
1065        meta = MetaData()
1066        meta2 = MetaData()
1067
1068        table = Table(
1069            "mytable",
1070            meta,
1071            Column("myid", Integer, primary_key=True),
1072            Column("name", String(40), nullable=True),
1073            Column(
1074                "description", String(30), CheckConstraint("description='hi'")
1075            ),
1076            UniqueConstraint("name"),
1077            schema="myschema",
1078        )
1079
1080        table2 = table.tometadata(meta2, name="newtable")
1081
1082        assert table.metadata is not table2.metadata
1083        eq_((table.name, table2.name), ("mytable", "newtable"))
1084        eq_((table.key, table2.key), ("myschema.mytable", "myschema.newtable"))
1085
1086    def test_change_name_selfref_fk_moves(self):
1087        meta = MetaData()
1088
1089        referenced = Table(
1090            "ref", meta, Column("id", Integer, primary_key=True)
1091        )
1092        table = Table(
1093            "mytable",
1094            meta,
1095            Column("id", Integer, primary_key=True),
1096            Column("parent_id", ForeignKey("mytable.id")),
1097            Column("ref_id", ForeignKey("ref.id")),
1098        )
1099
1100        table2 = table.tometadata(table.metadata, name="newtable")
1101        assert table.metadata is table2.metadata
1102        assert table2.c.ref_id.references(referenced.c.id)
1103        assert table2.c.parent_id.references(table2.c.id)
1104
1105    def test_change_name_selfref_fk_moves_w_schema(self):
1106        meta = MetaData()
1107
1108        referenced = Table(
1109            "ref", meta, Column("id", Integer, primary_key=True)
1110        )
1111        table = Table(
1112            "mytable",
1113            meta,
1114            Column("id", Integer, primary_key=True),
1115            Column("parent_id", ForeignKey("mytable.id")),
1116            Column("ref_id", ForeignKey("ref.id")),
1117        )
1118
1119        table2 = table.tometadata(
1120            table.metadata, name="newtable", schema="newschema"
1121        )
1122        ref2 = referenced.tometadata(table.metadata, schema="newschema")
1123        assert table.metadata is table2.metadata
1124        assert table2.c.ref_id.references(ref2.c.id)
1125        assert table2.c.parent_id.references(table2.c.id)
1126
1127    def _assert_fk(self, t2, schema, expected, referred_schema_fn=None):
1128        m2 = MetaData()
1129        existing_schema = t2.schema
1130        if schema:
1131            t2c = t2.tometadata(
1132                m2, schema=schema, referred_schema_fn=referred_schema_fn
1133            )
1134            eq_(t2c.schema, schema)
1135        else:
1136            t2c = t2.tometadata(m2, referred_schema_fn=referred_schema_fn)
1137            eq_(t2c.schema, existing_schema)
1138        eq_(list(t2c.c.y.foreign_keys)[0]._get_colspec(), expected)
1139
1140    def test_fk_has_schema_string_retain_schema(self):
1141        m = MetaData()
1142
1143        t2 = Table("t2", m, Column("y", Integer, ForeignKey("q.t1.x")))
1144        self._assert_fk(t2, None, "q.t1.x")
1145
1146        Table("t1", m, Column("x", Integer), schema="q")
1147        self._assert_fk(t2, None, "q.t1.x")
1148
1149    def test_fk_has_schema_string_new_schema(self):
1150        m = MetaData()
1151
1152        t2 = Table("t2", m, Column("y", Integer, ForeignKey("q.t1.x")))
1153        self._assert_fk(t2, "z", "q.t1.x")
1154
1155        Table("t1", m, Column("x", Integer), schema="q")
1156        self._assert_fk(t2, "z", "q.t1.x")
1157
1158    def test_fk_has_schema_col_retain_schema(self):
1159        m = MetaData()
1160
1161        t1 = Table("t1", m, Column("x", Integer), schema="q")
1162        t2 = Table("t2", m, Column("y", Integer, ForeignKey(t1.c.x)))
1163
1164        self._assert_fk(t2, "z", "q.t1.x")
1165
1166    def test_fk_has_schema_col_new_schema(self):
1167        m = MetaData()
1168
1169        t1 = Table("t1", m, Column("x", Integer), schema="q")
1170        t2 = Table("t2", m, Column("y", Integer, ForeignKey(t1.c.x)))
1171
1172        self._assert_fk(t2, "z", "q.t1.x")
1173
1174    def test_fk_and_referent_has_same_schema_string_retain_schema(self):
1175        m = MetaData()
1176
1177        t2 = Table(
1178            "t2", m, Column("y", Integer, ForeignKey("q.t1.x")), schema="q"
1179        )
1180
1181        self._assert_fk(t2, None, "q.t1.x")
1182
1183        Table("t1", m, Column("x", Integer), schema="q")
1184        self._assert_fk(t2, None, "q.t1.x")
1185
1186    def test_fk_and_referent_has_same_schema_string_new_schema(self):
1187        m = MetaData()
1188
1189        t2 = Table(
1190            "t2", m, Column("y", Integer, ForeignKey("q.t1.x")), schema="q"
1191        )
1192
1193        self._assert_fk(t2, "z", "z.t1.x")
1194
1195        Table("t1", m, Column("x", Integer), schema="q")
1196        self._assert_fk(t2, "z", "z.t1.x")
1197
1198    def test_fk_and_referent_has_same_schema_col_retain_schema(self):
1199        m = MetaData()
1200
1201        t1 = Table("t1", m, Column("x", Integer), schema="q")
1202        t2 = Table(
1203            "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q"
1204        )
1205        self._assert_fk(t2, None, "q.t1.x")
1206
1207    def test_fk_and_referent_has_same_schema_col_new_schema(self):
1208        m = MetaData()
1209
1210        t1 = Table("t1", m, Column("x", Integer), schema="q")
1211        t2 = Table(
1212            "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q"
1213        )
1214        self._assert_fk(t2, "z", "z.t1.x")
1215
1216    def test_fk_and_referent_has_diff_schema_string_retain_schema(self):
1217        m = MetaData()
1218
1219        t2 = Table(
1220            "t2", m, Column("y", Integer, ForeignKey("p.t1.x")), schema="q"
1221        )
1222
1223        self._assert_fk(t2, None, "p.t1.x")
1224
1225        Table("t1", m, Column("x", Integer), schema="p")
1226        self._assert_fk(t2, None, "p.t1.x")
1227
1228    def test_fk_and_referent_has_diff_schema_string_new_schema(self):
1229        m = MetaData()
1230
1231        t2 = Table(
1232            "t2", m, Column("y", Integer, ForeignKey("p.t1.x")), schema="q"
1233        )
1234
1235        self._assert_fk(t2, "z", "p.t1.x")
1236
1237        Table("t1", m, Column("x", Integer), schema="p")
1238        self._assert_fk(t2, "z", "p.t1.x")
1239
1240    def test_fk_and_referent_has_diff_schema_col_retain_schema(self):
1241        m = MetaData()
1242
1243        t1 = Table("t1", m, Column("x", Integer), schema="p")
1244        t2 = Table(
1245            "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q"
1246        )
1247        self._assert_fk(t2, None, "p.t1.x")
1248
1249    def test_fk_and_referent_has_diff_schema_col_new_schema(self):
1250        m = MetaData()
1251
1252        t1 = Table("t1", m, Column("x", Integer), schema="p")
1253        t2 = Table(
1254            "t2", m, Column("y", Integer, ForeignKey(t1.c.x)), schema="q"
1255        )
1256        self._assert_fk(t2, "z", "p.t1.x")
1257
1258    def test_fk_custom_system(self):
1259        m = MetaData()
1260        t2 = Table(
1261            "t2", m, Column("y", Integer, ForeignKey("p.t1.x")), schema="q"
1262        )
1263
1264        def ref_fn(table, to_schema, constraint, referred_schema):
1265            assert table is t2
1266            eq_(to_schema, "z")
1267            eq_(referred_schema, "p")
1268            return "h"
1269
1270        self._assert_fk(t2, "z", "h.t1.x", referred_schema_fn=ref_fn)
1271
1272    def test_copy_info(self):
1273        m = MetaData()
1274        fk = ForeignKey("t2.id")
1275        c = Column("c", Integer, fk)
1276        ck = CheckConstraint("c > 5")
1277        t = Table("t", m, c, ck)
1278
1279        m.info["minfo"] = True
1280        fk.info["fkinfo"] = True
1281        c.info["cinfo"] = True
1282        ck.info["ckinfo"] = True
1283        t.info["tinfo"] = True
1284        t.primary_key.info["pkinfo"] = True
1285        fkc = [
1286            const
1287            for const in t.constraints
1288            if isinstance(const, ForeignKeyConstraint)
1289        ][0]
1290        fkc.info["fkcinfo"] = True
1291
1292        m2 = MetaData()
1293        t2 = t.tometadata(m2)
1294
1295        m.info["minfo"] = False
1296        fk.info["fkinfo"] = False
1297        c.info["cinfo"] = False
1298        ck.info["ckinfo"] = False
1299        t.primary_key.info["pkinfo"] = False
1300        fkc.info["fkcinfo"] = False
1301
1302        eq_(m2.info, {})
1303        eq_(t2.info, {"tinfo": True})
1304        eq_(t2.c.c.info, {"cinfo": True})
1305        eq_(list(t2.c.c.foreign_keys)[0].info, {"fkinfo": True})
1306        eq_(t2.primary_key.info, {"pkinfo": True})
1307
1308        fkc2 = [
1309            const
1310            for const in t2.constraints
1311            if isinstance(const, ForeignKeyConstraint)
1312        ][0]
1313        eq_(fkc2.info, {"fkcinfo": True})
1314
1315        ck2 = [
1316            const
1317            for const in t2.constraints
1318            if isinstance(const, CheckConstraint)
1319        ][0]
1320        eq_(ck2.info, {"ckinfo": True})
1321
1322    def test_dialect_kwargs(self):
1323        meta = MetaData()
1324
1325        table = Table(
1326            "mytable",
1327            meta,
1328            Column("myid", Integer, primary_key=True),
1329            mysql_engine="InnoDB",
1330        )
1331
1332        meta2 = MetaData()
1333        table_c = table.tometadata(meta2)
1334
1335        eq_(table.kwargs, {"mysql_engine": "InnoDB"})
1336
1337        eq_(table.kwargs, table_c.kwargs)
1338
1339    def test_indexes(self):
1340        meta = MetaData()
1341
1342        table = Table(
1343            "mytable",
1344            meta,
1345            Column("id", Integer, primary_key=True),
1346            Column("data1", Integer, index=True),
1347            Column("data2", Integer),
1348            Index("text", text("data1 + 1")),
1349        )
1350        Index("multi", table.c.data1, table.c.data2)
1351        Index("func", func.abs(table.c.data1))
1352        Index("multi-func", table.c.data1, func.abs(table.c.data2))
1353
1354        meta2 = MetaData()
1355        table_c = table.tometadata(meta2)
1356
1357        def _get_key(i):
1358            return (
1359                [i.name, i.unique]
1360                + sorted(i.kwargs.items())
1361                + [str(col) for col in i.expressions]
1362            )
1363
1364        eq_(
1365            sorted([_get_key(i) for i in table.indexes]),
1366            sorted([_get_key(i) for i in table_c.indexes]),
1367        )
1368
1369    def test_indexes_with_col_redefine(self):
1370        meta = MetaData()
1371
1372        table = Table(
1373            "mytable",
1374            meta,
1375            Column("id", Integer, primary_key=True),
1376            Column("data1", Integer),
1377            Column("data2", Integer),
1378            Index("text", text("data1 + 1")),
1379        )
1380        Index("multi", table.c.data1, table.c.data2)
1381        Index("func", func.abs(table.c.data1))
1382        Index("multi-func", table.c.data1, func.abs(table.c.data2))
1383
1384        table = Table(
1385            "mytable",
1386            meta,
1387            Column("data1", Integer),
1388            Column("data2", Integer),
1389            extend_existing=True,
1390        )
1391
1392        meta2 = MetaData()
1393        table_c = table.tometadata(meta2)
1394
1395        def _get_key(i):
1396            return (
1397                [i.name, i.unique]
1398                + sorted(i.kwargs.items())
1399                + [str(col) for col in i.expressions]
1400            )
1401
1402        eq_(
1403            sorted([_get_key(i) for i in table.indexes]),
1404            sorted([_get_key(i) for i in table_c.indexes]),
1405        )
1406
1407    @emits_warning("Table '.+' already exists within the given MetaData")
1408    def test_already_exists(self):
1409
1410        meta1 = MetaData()
1411        table1 = Table(
1412            "mytable", meta1, Column("myid", Integer, primary_key=True)
1413        )
1414        meta2 = MetaData()
1415        table2 = Table(
1416            "mytable", meta2, Column("yourid", Integer, primary_key=True)
1417        )
1418
1419        table_c = table1.tometadata(meta2)
1420        table_d = table2.tometadata(meta2)
1421
1422        # d'oh!
1423        assert table_c is table_d
1424
1425    def test_default_schema_metadata(self):
1426        meta = MetaData(schema="myschema")
1427
1428        table = Table(
1429            "mytable",
1430            meta,
1431            Column("myid", Integer, primary_key=True),
1432            Column("name", String(40), nullable=True),
1433            Column(
1434                "description", String(30), CheckConstraint("description='hi'")
1435            ),
1436            UniqueConstraint("name"),
1437        )
1438
1439        table2 = Table(
1440            "othertable",
1441            meta,
1442            Column("id", Integer, primary_key=True),
1443            Column("myid", Integer, ForeignKey("myschema.mytable.myid")),
1444        )
1445
1446        meta2 = MetaData(schema="someschema")
1447        table_c = table.tometadata(meta2, schema=None)
1448        table2_c = table2.tometadata(meta2, schema=None)
1449
1450        eq_(
1451            str(table_c.join(table2_c).onclause),
1452            str(table_c.c.myid == table2_c.c.myid),
1453        )
1454        eq_(
1455            str(table_c.join(table2_c).onclause),
1456            "someschema.mytable.myid = someschema.othertable.myid",
1457        )
1458
1459    def test_strip_schema(self):
1460        meta = MetaData()
1461
1462        table = Table(
1463            "mytable",
1464            meta,
1465            Column("myid", Integer, primary_key=True),
1466            Column("name", String(40), nullable=True),
1467            Column(
1468                "description", String(30), CheckConstraint("description='hi'")
1469            ),
1470            UniqueConstraint("name"),
1471        )
1472
1473        table2 = Table(
1474            "othertable",
1475            meta,
1476            Column("id", Integer, primary_key=True),
1477            Column("myid", Integer, ForeignKey("mytable.myid")),
1478        )
1479
1480        meta2 = MetaData()
1481        table_c = table.tometadata(meta2, schema=None)
1482        table2_c = table2.tometadata(meta2, schema=None)
1483
1484        eq_(
1485            str(table_c.join(table2_c).onclause),
1486            str(table_c.c.myid == table2_c.c.myid),
1487        )
1488        eq_(
1489            str(table_c.join(table2_c).onclause),
1490            "mytable.myid = othertable.myid",
1491        )
1492
1493    def test_unique_true_flag(self):
1494        meta = MetaData()
1495
1496        table = Table("mytable", meta, Column("x", Integer, unique=True))
1497
1498        m2 = MetaData()
1499
1500        t2 = table.tometadata(m2)
1501
1502        eq_(
1503            len(
1504                [
1505                    const
1506                    for const in t2.constraints
1507                    if isinstance(const, UniqueConstraint)
1508                ]
1509            ),
1510            1,
1511        )
1512
1513    def test_index_true_flag(self):
1514        meta = MetaData()
1515
1516        table = Table("mytable", meta, Column("x", Integer, index=True))
1517
1518        m2 = MetaData()
1519
1520        t2 = table.tometadata(m2)
1521
1522        eq_(len(t2.indexes), 1)
1523
1524
1525class InfoTest(fixtures.TestBase):
1526    def test_metadata_info(self):
1527        m1 = MetaData()
1528        eq_(m1.info, {})
1529
1530        m1 = MetaData(info={"foo": "bar"})
1531        eq_(m1.info, {"foo": "bar"})
1532
1533    def test_foreignkey_constraint_info(self):
1534        fkc = ForeignKeyConstraint(["a"], ["b"], name="bar")
1535        eq_(fkc.info, {})
1536
1537        fkc = ForeignKeyConstraint(
1538            ["a"], ["b"], name="bar", info={"foo": "bar"}
1539        )
1540        eq_(fkc.info, {"foo": "bar"})
1541
1542    def test_foreignkey_info(self):
1543        fkc = ForeignKey("a")
1544        eq_(fkc.info, {})
1545
1546        fkc = ForeignKey("a", info={"foo": "bar"})
1547        eq_(fkc.info, {"foo": "bar"})
1548
1549    def test_primarykey_constraint_info(self):
1550        pkc = PrimaryKeyConstraint("a", name="x")
1551        eq_(pkc.info, {})
1552
1553        pkc = PrimaryKeyConstraint("a", name="x", info={"foo": "bar"})
1554        eq_(pkc.info, {"foo": "bar"})
1555
1556    def test_unique_constraint_info(self):
1557        uc = UniqueConstraint("a", name="x")
1558        eq_(uc.info, {})
1559
1560        uc = UniqueConstraint("a", name="x", info={"foo": "bar"})
1561        eq_(uc.info, {"foo": "bar"})
1562
1563    def test_check_constraint_info(self):
1564        cc = CheckConstraint("foo=bar", name="x")
1565        eq_(cc.info, {})
1566
1567        cc = CheckConstraint("foo=bar", name="x", info={"foo": "bar"})
1568        eq_(cc.info, {"foo": "bar"})
1569
1570    def test_index_info(self):
1571        ix = Index("x", "a")
1572        eq_(ix.info, {})
1573
1574        ix = Index("x", "a", info={"foo": "bar"})
1575        eq_(ix.info, {"foo": "bar"})
1576
1577    def test_column_info(self):
1578        c = Column("x", Integer)
1579        eq_(c.info, {})
1580
1581        c = Column("x", Integer, info={"foo": "bar"})
1582        eq_(c.info, {"foo": "bar"})
1583
1584    def test_table_info(self):
1585        t = Table("x", MetaData())
1586        eq_(t.info, {})
1587
1588        t = Table("x", MetaData(), info={"foo": "bar"})
1589        eq_(t.info, {"foo": "bar"})
1590
1591
1592class TableTest(fixtures.TestBase, AssertsCompiledSQL):
1593    @testing.requires.temporary_tables
1594    @testing.skip_if("mssql", "different col format")
1595    def test_prefixes(self):
1596        from sqlalchemy import Table
1597
1598        table1 = Table(
1599            "temporary_table_1",
1600            MetaData(),
1601            Column("col1", Integer),
1602            prefixes=["TEMPORARY"],
1603        )
1604
1605        self.assert_compile(
1606            schema.CreateTable(table1),
1607            "CREATE TEMPORARY TABLE temporary_table_1 (col1 INTEGER)",
1608        )
1609
1610        table2 = Table(
1611            "temporary_table_2",
1612            MetaData(),
1613            Column("col1", Integer),
1614            prefixes=["VIRTUAL"],
1615        )
1616        self.assert_compile(
1617            schema.CreateTable(table2),
1618            "CREATE VIRTUAL TABLE temporary_table_2 (col1 INTEGER)",
1619        )
1620
1621    def test_table_info(self):
1622        metadata = MetaData()
1623        t1 = Table("foo", metadata, info={"x": "y"})
1624        t2 = Table("bar", metadata, info={})
1625        t3 = Table("bat", metadata)
1626        assert t1.info == {"x": "y"}
1627        assert t2.info == {}
1628        assert t3.info == {}
1629        for t in (t1, t2, t3):
1630            t.info["bar"] = "zip"
1631            assert t.info["bar"] == "zip"
1632
1633    def test_invalid_objects(self):
1634        assert_raises_message(
1635            tsa.exc.ArgumentError,
1636            "'SchemaItem' object, such as a 'Column' or a "
1637            "'Constraint' expected, got <.*ColumnClause at .*; q>",
1638            Table,
1639            "asdf",
1640            MetaData(),
1641            tsa.column("q", Integer),
1642        )
1643
1644        assert_raises_message(
1645            tsa.exc.ArgumentError,
1646            r"'SchemaItem' object, such as a 'Column' or a "
1647            r"'Constraint' expected, got String\(\)",
1648            Table,
1649            "asdf",
1650            MetaData(),
1651            String(),
1652        )
1653
1654        assert_raises_message(
1655            tsa.exc.ArgumentError,
1656            "'SchemaItem' object, such as a 'Column' or a "
1657            "'Constraint' expected, got 12",
1658            Table,
1659            "asdf",
1660            MetaData(),
1661            12,
1662        )
1663
1664    def test_reset_exported_passes(self):
1665
1666        m = MetaData()
1667
1668        t = Table("t", m, Column("foo", Integer))
1669        eq_(list(t.c), [t.c.foo])
1670
1671        t._reset_exported()
1672
1673        eq_(list(t.c), [t.c.foo])
1674
1675    def test_foreign_key_constraints_collection(self):
1676        metadata = MetaData()
1677        t1 = Table("foo", metadata, Column("a", Integer))
1678        eq_(t1.foreign_key_constraints, set())
1679
1680        fk1 = ForeignKey("q.id")
1681        fk2 = ForeignKey("j.id")
1682        fk3 = ForeignKeyConstraint(["b", "c"], ["r.x", "r.y"])
1683
1684        t1.append_column(Column("b", Integer, fk1))
1685        eq_(t1.foreign_key_constraints, set([fk1.constraint]))
1686
1687        t1.append_column(Column("c", Integer, fk2))
1688        eq_(t1.foreign_key_constraints, set([fk1.constraint, fk2.constraint]))
1689
1690        t1.append_constraint(fk3)
1691        eq_(
1692            t1.foreign_key_constraints,
1693            set([fk1.constraint, fk2.constraint, fk3]),
1694        )
1695
1696    def test_c_immutable(self):
1697        m = MetaData()
1698        t1 = Table("t", m, Column("x", Integer), Column("y", Integer))
1699        assert_raises(TypeError, t1.c.extend, [Column("z", Integer)])
1700
1701        def assign():
1702            t1.c["z"] = Column("z", Integer)
1703
1704        assert_raises(TypeError, assign)
1705
1706        def assign2():
1707            t1.c.z = Column("z", Integer)
1708
1709        assert_raises(TypeError, assign2)
1710
1711    def test_c_mutate_after_unpickle(self):
1712        m = MetaData()
1713
1714        y = Column("y", Integer)
1715        t1 = Table("t", m, Column("x", Integer), y)
1716
1717        t2 = pickle.loads(pickle.dumps(t1))
1718        z = Column("z", Integer)
1719        g = Column("g", Integer)
1720        t2.append_column(z)
1721
1722        is_(t1.c.contains_column(y), True)
1723        is_(t2.c.contains_column(y), False)
1724        y2 = t2.c.y
1725        is_(t2.c.contains_column(y2), True)
1726
1727        is_(t2.c.contains_column(z), True)
1728        is_(t2.c.contains_column(g), False)
1729
1730    def test_autoincrement_replace(self):
1731        m = MetaData()
1732
1733        t = Table("t", m, Column("id", Integer, primary_key=True))
1734
1735        is_(t._autoincrement_column, t.c.id)
1736
1737        t = Table(
1738            "t",
1739            m,
1740            Column("id", Integer, primary_key=True),
1741            extend_existing=True,
1742        )
1743        is_(t._autoincrement_column, t.c.id)
1744
1745    def test_pk_args_standalone(self):
1746        m = MetaData()
1747        t = Table(
1748            "t",
1749            m,
1750            Column("x", Integer, primary_key=True),
1751            PrimaryKeyConstraint(mssql_clustered=True),
1752        )
1753        eq_(list(t.primary_key), [t.c.x])
1754        eq_(t.primary_key.dialect_kwargs, {"mssql_clustered": True})
1755
1756    def test_pk_cols_sets_flags(self):
1757        m = MetaData()
1758        t = Table(
1759            "t",
1760            m,
1761            Column("x", Integer),
1762            Column("y", Integer),
1763            Column("z", Integer),
1764            PrimaryKeyConstraint("x", "y"),
1765        )
1766        eq_(t.c.x.primary_key, True)
1767        eq_(t.c.y.primary_key, True)
1768        eq_(t.c.z.primary_key, False)
1769
1770    def test_pk_col_mismatch_one(self):
1771        m = MetaData()
1772        assert_raises_message(
1773            exc.SAWarning,
1774            "Table 't' specifies columns 'x' as primary_key=True, "
1775            "not matching locally specified columns 'q'",
1776            Table,
1777            "t",
1778            m,
1779            Column("x", Integer, primary_key=True),
1780            Column("q", Integer),
1781            PrimaryKeyConstraint("q"),
1782        )
1783
1784    def test_pk_col_mismatch_two(self):
1785        m = MetaData()
1786        assert_raises_message(
1787            exc.SAWarning,
1788            "Table 't' specifies columns 'a', 'b', 'c' as primary_key=True, "
1789            "not matching locally specified columns 'b', 'c'",
1790            Table,
1791            "t",
1792            m,
1793            Column("a", Integer, primary_key=True),
1794            Column("b", Integer, primary_key=True),
1795            Column("c", Integer, primary_key=True),
1796            PrimaryKeyConstraint("b", "c"),
1797        )
1798
1799    @testing.emits_warning("Table 't'")
1800    def test_pk_col_mismatch_three(self):
1801        m = MetaData()
1802        t = Table(
1803            "t",
1804            m,
1805            Column("x", Integer, primary_key=True),
1806            Column("q", Integer),
1807            PrimaryKeyConstraint("q"),
1808        )
1809        eq_(list(t.primary_key), [t.c.q])
1810
1811    @testing.emits_warning("Table 't'")
1812    def test_pk_col_mismatch_four(self):
1813        m = MetaData()
1814        t = Table(
1815            "t",
1816            m,
1817            Column("a", Integer, primary_key=True),
1818            Column("b", Integer, primary_key=True),
1819            Column("c", Integer, primary_key=True),
1820            PrimaryKeyConstraint("b", "c"),
1821        )
1822        eq_(list(t.primary_key), [t.c.b, t.c.c])
1823
1824    def test_pk_always_flips_nullable(self):
1825        m = MetaData()
1826
1827        t1 = Table("t1", m, Column("x", Integer), PrimaryKeyConstraint("x"))
1828
1829        t2 = Table("t2", m, Column("x", Integer, primary_key=True))
1830
1831        eq_(list(t1.primary_key), [t1.c.x])
1832
1833        eq_(list(t2.primary_key), [t2.c.x])
1834
1835        assert t1.c.x.primary_key
1836        assert t2.c.x.primary_key
1837
1838        assert not t2.c.x.nullable
1839        assert not t1.c.x.nullable
1840
1841
1842class PKAutoIncrementTest(fixtures.TestBase):
1843    def test_multi_integer_no_autoinc(self):
1844        pk = PrimaryKeyConstraint(Column("a", Integer), Column("b", Integer))
1845        t = Table("t", MetaData())
1846        t.append_constraint(pk)
1847
1848        is_(pk._autoincrement_column, None)
1849
1850    def test_multi_integer_multi_autoinc(self):
1851        pk = PrimaryKeyConstraint(
1852            Column("a", Integer, autoincrement=True),
1853            Column("b", Integer, autoincrement=True),
1854        )
1855        t = Table("t", MetaData())
1856        t.append_constraint(pk)
1857
1858        assert_raises_message(
1859            exc.ArgumentError,
1860            "Only one Column may be marked",
1861            lambda: pk._autoincrement_column,
1862        )
1863
1864    def test_single_integer_no_autoinc(self):
1865        pk = PrimaryKeyConstraint(Column("a", Integer))
1866        t = Table("t", MetaData())
1867        t.append_constraint(pk)
1868
1869        is_(pk._autoincrement_column, pk.columns["a"])
1870
1871    def test_single_string_no_autoinc(self):
1872        pk = PrimaryKeyConstraint(Column("a", String))
1873        t = Table("t", MetaData())
1874        t.append_constraint(pk)
1875
1876        is_(pk._autoincrement_column, None)
1877
1878    def test_single_string_illegal_autoinc(self):
1879        t = Table("t", MetaData(), Column("a", String, autoincrement=True))
1880        pk = PrimaryKeyConstraint(t.c.a)
1881        t.append_constraint(pk)
1882
1883        assert_raises_message(
1884            exc.ArgumentError,
1885            "Column type VARCHAR on column 't.a'",
1886            lambda: pk._autoincrement_column,
1887        )
1888
1889    def test_single_integer_default(self):
1890        t = Table(
1891            "t",
1892            MetaData(),
1893            Column("a", Integer, autoincrement=True, default=lambda: 1),
1894        )
1895        pk = PrimaryKeyConstraint(t.c.a)
1896        t.append_constraint(pk)
1897
1898        is_(pk._autoincrement_column, t.c.a)
1899
1900    def test_single_integer_server_default(self):
1901        # new as of 1.1; now that we have three states for autoincrement,
1902        # if the user puts autoincrement=True with a server_default, trust
1903        # them on it
1904        t = Table(
1905            "t",
1906            MetaData(),
1907            Column(
1908                "a", Integer, autoincrement=True, server_default=func.magic()
1909            ),
1910        )
1911        pk = PrimaryKeyConstraint(t.c.a)
1912        t.append_constraint(pk)
1913
1914        is_(pk._autoincrement_column, t.c.a)
1915
1916    def test_implicit_autoinc_but_fks(self):
1917        m = MetaData()
1918        Table("t1", m, Column("id", Integer, primary_key=True))
1919        t2 = Table("t2", MetaData(), Column("a", Integer, ForeignKey("t1.id")))
1920        pk = PrimaryKeyConstraint(t2.c.a)
1921        t2.append_constraint(pk)
1922        is_(pk._autoincrement_column, None)
1923
1924    def test_explicit_autoinc_but_fks(self):
1925        m = MetaData()
1926        Table("t1", m, Column("id", Integer, primary_key=True))
1927        t2 = Table(
1928            "t2",
1929            MetaData(),
1930            Column("a", Integer, ForeignKey("t1.id"), autoincrement=True),
1931        )
1932        pk = PrimaryKeyConstraint(t2.c.a)
1933        t2.append_constraint(pk)
1934        is_(pk._autoincrement_column, t2.c.a)
1935
1936        t3 = Table(
1937            "t3",
1938            MetaData(),
1939            Column(
1940                "a", Integer, ForeignKey("t1.id"), autoincrement="ignore_fk"
1941            ),
1942        )
1943        pk = PrimaryKeyConstraint(t3.c.a)
1944        t3.append_constraint(pk)
1945        is_(pk._autoincrement_column, t3.c.a)
1946
1947
1948class SchemaTypeTest(fixtures.TestBase):
1949    __backend__ = True
1950
1951    class TrackEvents(object):
1952        column = None
1953        table = None
1954        evt_targets = ()
1955
1956        def _set_table(self, column, table):
1957            super(SchemaTypeTest.TrackEvents, self)._set_table(column, table)
1958            self.column = column
1959            self.table = table
1960
1961        def _on_table_create(self, target, bind, **kw):
1962            super(SchemaTypeTest.TrackEvents, self)._on_table_create(
1963                target, bind, **kw
1964            )
1965            self.evt_targets += (target,)
1966
1967        def _on_metadata_create(self, target, bind, **kw):
1968            super(SchemaTypeTest.TrackEvents, self)._on_metadata_create(
1969                target, bind, **kw
1970            )
1971            self.evt_targets += (target,)
1972
1973    # TODO: Enum and Boolean put TypeEngine first.  Changing that here
1974    # causes collection-mutate-while-iterated errors in the event system
1975    # since the hooks here call upon the adapted type.  Need to figure out
1976    # why Enum and Boolean don't have this problem.
1977    class MyType(TrackEvents, sqltypes.SchemaType, sqltypes.TypeEngine):
1978        pass
1979
1980    class WrapEnum(TrackEvents, Enum):
1981        pass
1982
1983    class WrapBoolean(TrackEvents, Boolean):
1984        pass
1985
1986    class MyTypeWImpl(MyType):
1987        def _gen_dialect_impl(self, dialect):
1988            return self.adapt(SchemaTypeTest.MyTypeImpl)
1989
1990    class MyTypeImpl(MyTypeWImpl):
1991        pass
1992
1993    class MyTypeDecAndSchema(TypeDecorator, sqltypes.SchemaType):
1994        impl = String()
1995
1996        evt_targets = ()
1997
1998        def __init__(self):
1999            TypeDecorator.__init__(self)
2000            sqltypes.SchemaType.__init__(self)
2001
2002        def _on_table_create(self, target, bind, **kw):
2003            self.evt_targets += (target,)
2004
2005        def _on_metadata_create(self, target, bind, **kw):
2006            self.evt_targets += (target,)
2007
2008    def test_before_parent_attach_plain(self):
2009        typ = self.MyType()
2010        self._test_before_parent_attach(typ)
2011
2012    def test_before_parent_attach_typedec_enclosing_schematype(self):
2013        # additional test for [ticket:2919] as part of test for
2014        # [ticket:3832]
2015        # this also serves as the test for [ticket:6152]
2016
2017        class MySchemaType(sqltypes.TypeEngine, sqltypes.SchemaType):
2018            pass
2019
2020        target_typ = MySchemaType()
2021
2022        class MyType(TypeDecorator):
2023            impl = target_typ
2024
2025        typ = MyType()
2026        self._test_before_parent_attach(typ, target_typ)
2027
2028    def test_before_parent_attach_array_enclosing_schematype(self):
2029        # test for [ticket:4141] which is the same idea as [ticket:3832]
2030        # for ARRAY
2031
2032        typ = ARRAY(String)
2033
2034        self._test_before_parent_attach(typ)
2035
2036    def test_before_parent_attach_typedec_of_schematype(self):
2037        class MyType(TypeDecorator, sqltypes.SchemaType):
2038            impl = String
2039
2040        typ = MyType()
2041        self._test_before_parent_attach(typ)
2042
2043    def test_before_parent_attach_schematype_of_typedec(self):
2044        class MyType(sqltypes.SchemaType, TypeDecorator):
2045            impl = String
2046
2047        typ = MyType()
2048        self._test_before_parent_attach(typ)
2049
2050    def _test_before_parent_attach(self, typ, evt_target=None):
2051        canary = mock.Mock()
2052
2053        if evt_target is None:
2054            evt_target = typ
2055
2056        orig_set_parent = evt_target._set_parent
2057        orig_set_parent_w_dispatch = evt_target._set_parent_with_dispatch
2058
2059        def _set_parent(parent, **kw):
2060            orig_set_parent(parent, **kw)
2061            canary._set_parent(parent)
2062
2063        def _set_parent_w_dispatch(parent):
2064            orig_set_parent_w_dispatch(parent)
2065            canary._set_parent_with_dispatch(parent)
2066
2067        with mock.patch.object(evt_target, "_set_parent", _set_parent):
2068            with mock.patch.object(
2069                evt_target, "_set_parent_with_dispatch", _set_parent_w_dispatch
2070            ):
2071                event.listen(evt_target, "before_parent_attach", canary.go)
2072
2073                c = Column("q", typ)
2074
2075        eq_(
2076            canary.mock_calls,
2077            [
2078                mock.call.go(evt_target, c),
2079                mock.call._set_parent(c),
2080                mock.call._set_parent_with_dispatch(c),
2081            ],
2082        )
2083
2084    def test_independent_schema(self):
2085        m = MetaData()
2086        type_ = self.MyType(schema="q")
2087        t1 = Table("x", m, Column("y", type_), schema="z")
2088        eq_(t1.c.y.type.schema, "q")
2089
2090    def test_inherit_schema(self):
2091        m = MetaData()
2092        type_ = self.MyType(schema="q", inherit_schema=True)
2093        t1 = Table("x", m, Column("y", type_), schema="z")
2094        eq_(t1.c.y.type.schema, "z")
2095
2096    def test_independent_schema_enum(self):
2097        m = MetaData()
2098        type_ = sqltypes.Enum("a", schema="q")
2099        t1 = Table("x", m, Column("y", type_), schema="z")
2100        eq_(t1.c.y.type.schema, "q")
2101
2102    def test_inherit_schema_enum(self):
2103        m = MetaData()
2104        type_ = sqltypes.Enum("a", "b", "c", schema="q", inherit_schema=True)
2105        t1 = Table("x", m, Column("y", type_), schema="z")
2106        eq_(t1.c.y.type.schema, "z")
2107
2108    def test_tometadata_copy_type(self):
2109        m1 = MetaData()
2110
2111        type_ = self.MyType()
2112        t1 = Table("x", m1, Column("y", type_))
2113
2114        m2 = MetaData()
2115        t2 = t1.tometadata(m2)
2116
2117        # metadata isn't set
2118        is_(t2.c.y.type.metadata, None)
2119
2120        # our test type sets table, though
2121        is_(t2.c.y.type.table, t2)
2122
2123    def test_tometadata_copy_decorated(self):
2124        class MyDecorated(TypeDecorator):
2125            impl = self.MyType
2126
2127        m1 = MetaData()
2128
2129        type_ = MyDecorated(schema="z")
2130        t1 = Table("x", m1, Column("y", type_))
2131
2132        m2 = MetaData()
2133        t2 = t1.tometadata(m2)
2134        eq_(t2.c.y.type.schema, "z")
2135
2136    def test_tometadata_independent_schema(self):
2137        m1 = MetaData()
2138
2139        type_ = self.MyType()
2140        t1 = Table("x", m1, Column("y", type_))
2141
2142        m2 = MetaData()
2143        t2 = t1.tometadata(m2, schema="bar")
2144
2145        eq_(t2.c.y.type.schema, None)
2146
2147    def test_tometadata_inherit_schema(self):
2148        m1 = MetaData()
2149
2150        type_ = self.MyType(inherit_schema=True)
2151        t1 = Table("x", m1, Column("y", type_))
2152
2153        m2 = MetaData()
2154        t2 = t1.tometadata(m2, schema="bar")
2155
2156        eq_(t1.c.y.type.schema, None)
2157        eq_(t2.c.y.type.schema, "bar")
2158
2159    def test_tometadata_independent_events(self):
2160        m1 = MetaData()
2161
2162        type_ = self.MyType()
2163        t1 = Table("x", m1, Column("y", type_))
2164
2165        m2 = MetaData()
2166        t2 = t1.tometadata(m2)
2167
2168        t1.dispatch.before_create(t1, testing.db)
2169        eq_(t1.c.y.type.evt_targets, (t1,))
2170        eq_(t2.c.y.type.evt_targets, ())
2171
2172        t2.dispatch.before_create(t2, testing.db)
2173        t2.dispatch.before_create(t2, testing.db)
2174        eq_(t1.c.y.type.evt_targets, (t1,))
2175        eq_(t2.c.y.type.evt_targets, (t2, t2))
2176
2177    def test_enum_column_copy_transfers_events(self):
2178        m = MetaData()
2179
2180        type_ = self.WrapEnum("a", "b", "c", name="foo")
2181        y = Column("y", type_)
2182        y_copy = y.copy()
2183        t1 = Table("x", m, y_copy)
2184
2185        is_true(y_copy.type._create_events)
2186
2187        # for PostgreSQL, this will emit CREATE TYPE
2188        m.dispatch.before_create(t1, testing.db)
2189        try:
2190            eq_(t1.c.y.type.evt_targets, (t1,))
2191        finally:
2192            # do the drop so that PostgreSQL emits DROP TYPE
2193            m.dispatch.after_drop(t1, testing.db)
2194
2195    def test_enum_nonnative_column_copy_transfers_events(self):
2196        m = MetaData()
2197
2198        type_ = self.WrapEnum("a", "b", "c", name="foo", native_enum=False)
2199        y = Column("y", type_)
2200        y_copy = y.copy()
2201        t1 = Table("x", m, y_copy)
2202
2203        is_true(y_copy.type._create_events)
2204
2205        m.dispatch.before_create(t1, testing.db)
2206        eq_(t1.c.y.type.evt_targets, (t1,))
2207
2208    def test_enum_nonnative_column_copy_transfers_constraintpref(self):
2209        m = MetaData()
2210
2211        type_ = self.WrapEnum(
2212            "a",
2213            "b",
2214            "c",
2215            name="foo",
2216            native_enum=False,
2217            create_constraint=False,
2218        )
2219        y = Column("y", type_)
2220        y_copy = y.copy()
2221        Table("x", m, y_copy)
2222
2223        is_false(y_copy.type.create_constraint)
2224
2225    def test_boolean_column_copy_transfers_events(self):
2226        m = MetaData()
2227
2228        type_ = self.WrapBoolean()
2229        y = Column("y", type_)
2230        y_copy = y.copy()
2231        Table("x", m, y_copy)
2232
2233        is_true(y_copy.type._create_events)
2234
2235    def test_boolean_nonnative_column_copy_transfers_constraintpref(self):
2236        m = MetaData()
2237
2238        type_ = self.WrapBoolean(create_constraint=False)
2239        y = Column("y", type_)
2240        y_copy = y.copy()
2241        Table("x", m, y_copy)
2242
2243        is_false(y_copy.type.create_constraint)
2244
2245    def test_metadata_dispatch_no_new_impl(self):
2246        m1 = MetaData()
2247        typ = self.MyType(metadata=m1)
2248        m1.dispatch.before_create(m1, testing.db)
2249        eq_(typ.evt_targets, (m1,))
2250
2251        dialect_impl = typ.dialect_impl(testing.db.dialect)
2252        eq_(dialect_impl.evt_targets, ())
2253
2254    def test_metadata_dispatch_new_impl(self):
2255        m1 = MetaData()
2256        typ = self.MyTypeWImpl(metadata=m1)
2257        m1.dispatch.before_create(m1, testing.db)
2258        eq_(typ.evt_targets, (m1,))
2259
2260        dialect_impl = typ.dialect_impl(testing.db.dialect)
2261        eq_(dialect_impl.evt_targets, (m1,))
2262
2263    def test_table_dispatch_decorator_schematype(self):
2264        m1 = MetaData()
2265        typ = self.MyTypeDecAndSchema()
2266        t1 = Table("t1", m1, Column("x", typ))
2267        m1.dispatch.before_create(t1, testing.db)
2268        eq_(typ.evt_targets, (t1,))
2269
2270    def test_table_dispatch_no_new_impl(self):
2271        m1 = MetaData()
2272        typ = self.MyType()
2273        t1 = Table("t1", m1, Column("x", typ))
2274        m1.dispatch.before_create(t1, testing.db)
2275        eq_(typ.evt_targets, (t1,))
2276
2277        dialect_impl = typ.dialect_impl(testing.db.dialect)
2278        eq_(dialect_impl.evt_targets, ())
2279
2280    def test_table_dispatch_new_impl(self):
2281        m1 = MetaData()
2282        typ = self.MyTypeWImpl()
2283        t1 = Table("t1", m1, Column("x", typ))
2284        m1.dispatch.before_create(t1, testing.db)
2285        eq_(typ.evt_targets, (t1,))
2286
2287        dialect_impl = typ.dialect_impl(testing.db.dialect)
2288        eq_(dialect_impl.evt_targets, (t1,))
2289
2290    def test_create_metadata_bound_no_crash(self):
2291        m1 = MetaData()
2292        self.MyType(metadata=m1)
2293
2294        m1.create_all(testing.db)
2295
2296    def test_boolean_constraint_type_doesnt_double(self):
2297        m1 = MetaData()
2298
2299        t1 = Table("x", m1, Column("flag", Boolean()))
2300        eq_(
2301            len([c for c in t1.constraints if isinstance(c, CheckConstraint)]),
2302            1,
2303        )
2304        m2 = MetaData()
2305        t2 = t1.tometadata(m2)
2306
2307        eq_(
2308            len([c for c in t2.constraints if isinstance(c, CheckConstraint)]),
2309            1,
2310        )
2311
2312    def test_enum_constraint_type_doesnt_double(self):
2313        m1 = MetaData()
2314
2315        t1 = Table("x", m1, Column("flag", Enum("a", "b", "c")))
2316        eq_(
2317            len([c for c in t1.constraints if isinstance(c, CheckConstraint)]),
2318            1,
2319        )
2320        m2 = MetaData()
2321        t2 = t1.tometadata(m2)
2322
2323        eq_(
2324            len([c for c in t2.constraints if isinstance(c, CheckConstraint)]),
2325            1,
2326        )
2327
2328
2329class SchemaTest(fixtures.TestBase, AssertsCompiledSQL):
2330    def test_default_schema_metadata_fk(self):
2331        m = MetaData(schema="foo")
2332        t1 = Table("t1", m, Column("x", Integer))
2333        t2 = Table("t2", m, Column("x", Integer, ForeignKey("t1.x")))
2334        assert t2.c.x.references(t1.c.x)
2335
2336    def test_ad_hoc_schema_equiv_fk(self):
2337        m = MetaData()
2338        t1 = Table("t1", m, Column("x", Integer), schema="foo")
2339        t2 = Table(
2340            "t2", m, Column("x", Integer, ForeignKey("t1.x")), schema="foo"
2341        )
2342        assert_raises(
2343            exc.NoReferencedTableError, lambda: t2.c.x.references(t1.c.x)
2344        )
2345
2346    def test_default_schema_metadata_fk_alt_remote(self):
2347        m = MetaData(schema="foo")
2348        t1 = Table("t1", m, Column("x", Integer))
2349        t2 = Table(
2350            "t2", m, Column("x", Integer, ForeignKey("t1.x")), schema="bar"
2351        )
2352        assert t2.c.x.references(t1.c.x)
2353
2354    def test_default_schema_metadata_fk_alt_local_raises(self):
2355        m = MetaData(schema="foo")
2356        t1 = Table("t1", m, Column("x", Integer), schema="bar")
2357        t2 = Table("t2", m, Column("x", Integer, ForeignKey("t1.x")))
2358        assert_raises(
2359            exc.NoReferencedTableError, lambda: t2.c.x.references(t1.c.x)
2360        )
2361
2362    def test_default_schema_metadata_fk_alt_local(self):
2363        m = MetaData(schema="foo")
2364        t1 = Table("t1", m, Column("x", Integer), schema="bar")
2365        t2 = Table("t2", m, Column("x", Integer, ForeignKey("bar.t1.x")))
2366        assert t2.c.x.references(t1.c.x)
2367
2368    def test_create_drop_schema(self):
2369
2370        self.assert_compile(
2371            schema.CreateSchema("sa_schema"), "CREATE SCHEMA sa_schema"
2372        )
2373        self.assert_compile(
2374            schema.DropSchema("sa_schema"), "DROP SCHEMA sa_schema"
2375        )
2376        self.assert_compile(
2377            schema.DropSchema("sa_schema", cascade=True),
2378            "DROP SCHEMA sa_schema CASCADE",
2379        )
2380
2381    def test_iteration(self):
2382        metadata = MetaData()
2383        table1 = Table(
2384            "table1",
2385            metadata,
2386            Column("col1", Integer, primary_key=True),
2387            schema="someschema",
2388        )
2389        table2 = Table(
2390            "table2",
2391            metadata,
2392            Column("col1", Integer, primary_key=True),
2393            Column("col2", Integer, ForeignKey("someschema.table1.col1")),
2394            schema="someschema",
2395        )
2396
2397        t1 = str(schema.CreateTable(table1).compile(bind=testing.db))
2398        t2 = str(schema.CreateTable(table2).compile(bind=testing.db))
2399        if testing.db.dialect.preparer(testing.db.dialect).omit_schema:
2400            assert t1.index("CREATE TABLE table1") > -1
2401            assert t2.index("CREATE TABLE table2") > -1
2402        else:
2403            assert t1.index("CREATE TABLE someschema.table1") > -1
2404            assert t2.index("CREATE TABLE someschema.table2") > -1
2405
2406
2407class UseExistingTest(fixtures.TablesTest):
2408    @classmethod
2409    def define_tables(cls, metadata):
2410        Table(
2411            "users",
2412            metadata,
2413            Column("id", Integer, primary_key=True),
2414            Column("name", String(30)),
2415        )
2416
2417    def _useexisting_fixture(self):
2418        meta2 = MetaData(testing.db)
2419        Table("users", meta2, autoload=True)
2420        return meta2
2421
2422    def _notexisting_fixture(self):
2423        return MetaData(testing.db)
2424
2425    def test_exception_no_flags(self):
2426        meta2 = self._useexisting_fixture()
2427
2428        def go():
2429            Table("users", meta2, Column("name", Unicode), autoload=True)
2430
2431        assert_raises_message(
2432            exc.InvalidRequestError,
2433            "Table 'users' is already defined for this " "MetaData instance.",
2434            go,
2435        )
2436
2437    def test_keep_plus_existing_raises(self):
2438        meta2 = self._useexisting_fixture()
2439        assert_raises(
2440            exc.ArgumentError,
2441            Table,
2442            "users",
2443            meta2,
2444            keep_existing=True,
2445            extend_existing=True,
2446        )
2447
2448    def test_keep_existing_no_dupe_constraints(self):
2449        meta2 = self._notexisting_fixture()
2450        users = Table(
2451            "users",
2452            meta2,
2453            Column("id", Integer),
2454            Column("name", Unicode),
2455            UniqueConstraint("name"),
2456            keep_existing=True,
2457        )
2458        assert "name" in users.c
2459        assert "id" in users.c
2460        eq_(len(users.constraints), 2)
2461
2462        u2 = Table(
2463            "users",
2464            meta2,
2465            Column("id", Integer),
2466            Column("name", Unicode),
2467            UniqueConstraint("name"),
2468            keep_existing=True,
2469        )
2470        eq_(len(u2.constraints), 2)
2471
2472    def test_extend_existing_dupes_constraints(self):
2473        meta2 = self._notexisting_fixture()
2474        users = Table(
2475            "users",
2476            meta2,
2477            Column("id", Integer),
2478            Column("name", Unicode),
2479            UniqueConstraint("name"),
2480            extend_existing=True,
2481        )
2482        assert "name" in users.c
2483        assert "id" in users.c
2484        eq_(len(users.constraints), 2)
2485
2486        u2 = Table(
2487            "users",
2488            meta2,
2489            Column("id", Integer),
2490            Column("name", Unicode),
2491            UniqueConstraint("name"),
2492            extend_existing=True,
2493        )
2494        # constraint got duped
2495        eq_(len(u2.constraints), 3)
2496
2497    def test_keep_existing_coltype(self):
2498        meta2 = self._useexisting_fixture()
2499        users = Table(
2500            "users",
2501            meta2,
2502            Column("name", Unicode),
2503            autoload=True,
2504            keep_existing=True,
2505        )
2506        assert not isinstance(users.c.name.type, Unicode)
2507
2508    def test_keep_existing_quote(self):
2509        meta2 = self._useexisting_fixture()
2510        users = Table(
2511            "users", meta2, quote=True, autoload=True, keep_existing=True
2512        )
2513        assert not users.name.quote
2514
2515    def test_keep_existing_add_column(self):
2516        meta2 = self._useexisting_fixture()
2517        users = Table(
2518            "users",
2519            meta2,
2520            Column("foo", Integer),
2521            autoload=True,
2522            keep_existing=True,
2523        )
2524        assert "foo" not in users.c
2525
2526    def test_keep_existing_coltype_no_orig(self):
2527        meta2 = self._notexisting_fixture()
2528        users = Table(
2529            "users",
2530            meta2,
2531            Column("name", Unicode),
2532            autoload=True,
2533            keep_existing=True,
2534        )
2535        assert isinstance(users.c.name.type, Unicode)
2536
2537    @testing.skip_if(
2538        lambda: testing.db.dialect.requires_name_normalize,
2539        "test depends on lowercase as case insensitive",
2540    )
2541    def test_keep_existing_quote_no_orig(self):
2542        meta2 = self._notexisting_fixture()
2543        users = Table(
2544            "users", meta2, quote=True, autoload=True, keep_existing=True
2545        )
2546        assert users.name.quote
2547
2548    def test_keep_existing_add_column_no_orig(self):
2549        meta2 = self._notexisting_fixture()
2550        users = Table(
2551            "users",
2552            meta2,
2553            Column("foo", Integer),
2554            autoload=True,
2555            keep_existing=True,
2556        )
2557        assert "foo" in users.c
2558
2559    def test_keep_existing_coltype_no_reflection(self):
2560        meta2 = self._useexisting_fixture()
2561        users = Table(
2562            "users", meta2, Column("name", Unicode), keep_existing=True
2563        )
2564        assert not isinstance(users.c.name.type, Unicode)
2565
2566    def test_keep_existing_quote_no_reflection(self):
2567        meta2 = self._useexisting_fixture()
2568        users = Table("users", meta2, quote=True, keep_existing=True)
2569        assert not users.name.quote
2570
2571    def test_keep_existing_add_column_no_reflection(self):
2572        meta2 = self._useexisting_fixture()
2573        users = Table(
2574            "users", meta2, Column("foo", Integer), keep_existing=True
2575        )
2576        assert "foo" not in users.c
2577
2578    def test_extend_existing_coltype(self):
2579        meta2 = self._useexisting_fixture()
2580        users = Table(
2581            "users",
2582            meta2,
2583            Column("name", Unicode),
2584            autoload=True,
2585            extend_existing=True,
2586        )
2587        assert isinstance(users.c.name.type, Unicode)
2588
2589    def test_extend_existing_quote(self):
2590        meta2 = self._useexisting_fixture()
2591        assert_raises_message(
2592            tsa.exc.ArgumentError,
2593            "Can't redefine 'quote' or 'quote_schema' arguments",
2594            Table,
2595            "users",
2596            meta2,
2597            quote=True,
2598            autoload=True,
2599            extend_existing=True,
2600        )
2601
2602    def test_extend_existing_add_column(self):
2603        meta2 = self._useexisting_fixture()
2604        users = Table(
2605            "users",
2606            meta2,
2607            Column("foo", Integer),
2608            autoload=True,
2609            extend_existing=True,
2610        )
2611        assert "foo" in users.c
2612
2613    def test_extend_existing_coltype_no_orig(self):
2614        meta2 = self._notexisting_fixture()
2615        users = Table(
2616            "users",
2617            meta2,
2618            Column("name", Unicode),
2619            autoload=True,
2620            extend_existing=True,
2621        )
2622        assert isinstance(users.c.name.type, Unicode)
2623
2624    @testing.skip_if(
2625        lambda: testing.db.dialect.requires_name_normalize,
2626        "test depends on lowercase as case insensitive",
2627    )
2628    def test_extend_existing_quote_no_orig(self):
2629        meta2 = self._notexisting_fixture()
2630        users = Table(
2631            "users", meta2, quote=True, autoload=True, extend_existing=True
2632        )
2633        assert users.name.quote
2634
2635    def test_extend_existing_add_column_no_orig(self):
2636        meta2 = self._notexisting_fixture()
2637        users = Table(
2638            "users",
2639            meta2,
2640            Column("foo", Integer),
2641            autoload=True,
2642            extend_existing=True,
2643        )
2644        assert "foo" in users.c
2645
2646    def test_extend_existing_coltype_no_reflection(self):
2647        meta2 = self._useexisting_fixture()
2648        users = Table(
2649            "users", meta2, Column("name", Unicode), extend_existing=True
2650        )
2651        assert isinstance(users.c.name.type, Unicode)
2652
2653    def test_extend_existing_quote_no_reflection(self):
2654        meta2 = self._useexisting_fixture()
2655        assert_raises_message(
2656            tsa.exc.ArgumentError,
2657            "Can't redefine 'quote' or 'quote_schema' arguments",
2658            Table,
2659            "users",
2660            meta2,
2661            quote=True,
2662            extend_existing=True,
2663        )
2664
2665    def test_extend_existing_add_column_no_reflection(self):
2666        meta2 = self._useexisting_fixture()
2667        users = Table(
2668            "users", meta2, Column("foo", Integer), extend_existing=True
2669        )
2670        assert "foo" in users.c
2671
2672
2673class ConstraintTest(fixtures.TestBase):
2674    def _single_fixture(self):
2675        m = MetaData()
2676
2677        t1 = Table("t1", m, Column("a", Integer), Column("b", Integer))
2678
2679        t2 = Table("t2", m, Column("a", Integer, ForeignKey("t1.a")))
2680
2681        t3 = Table("t3", m, Column("a", Integer))
2682        return t1, t2, t3
2683
2684    def _assert_index_col_x(self, t, i, columns=True):
2685        eq_(t.indexes, set([i]))
2686        if columns:
2687            eq_(list(i.columns), [t.c.x])
2688        else:
2689            eq_(list(i.columns), [])
2690        assert i.table is t
2691
2692    def test_separate_decl_columns(self):
2693        m = MetaData()
2694        t = Table("t", m, Column("x", Integer))
2695        i = Index("i", t.c.x)
2696        self._assert_index_col_x(t, i)
2697
2698    def test_separate_decl_columns_functional(self):
2699        m = MetaData()
2700        t = Table("t", m, Column("x", Integer))
2701        i = Index("i", func.foo(t.c.x))
2702        self._assert_index_col_x(t, i)
2703
2704    def test_index_no_cols_private_table_arg(self):
2705        m = MetaData()
2706        t = Table("t", m, Column("x", Integer))
2707        i = Index("i", _table=t)
2708        is_(i.table, t)
2709        eq_(list(i.columns), [])
2710
2711    def test_index_w_cols_private_table_arg(self):
2712        m = MetaData()
2713        t = Table("t", m, Column("x", Integer))
2714        i = Index("i", t.c.x, _table=t)
2715        is_(i.table, t)
2716
2717        eq_(i.columns, [t.c.x])
2718
2719    def test_inline_decl_columns(self):
2720        m = MetaData()
2721        c = Column("x", Integer)
2722        i = Index("i", c)
2723        t = Table("t", m, c, i)
2724        self._assert_index_col_x(t, i)
2725
2726    def test_inline_decl_columns_functional(self):
2727        m = MetaData()
2728        c = Column("x", Integer)
2729        i = Index("i", func.foo(c))
2730        t = Table("t", m, c, i)
2731        self._assert_index_col_x(t, i)
2732
2733    def test_inline_decl_string(self):
2734        m = MetaData()
2735        i = Index("i", "x")
2736        t = Table("t", m, Column("x", Integer), i)
2737        self._assert_index_col_x(t, i)
2738
2739    def test_inline_decl_textonly(self):
2740        m = MetaData()
2741        i = Index("i", text("foobar(x)"))
2742        t = Table("t", m, Column("x", Integer), i)
2743        self._assert_index_col_x(t, i, columns=False)
2744
2745    def test_separate_decl_textonly(self):
2746        m = MetaData()
2747        i = Index("i", text("foobar(x)"))
2748        t = Table("t", m, Column("x", Integer))
2749        t.append_constraint(i)
2750        self._assert_index_col_x(t, i, columns=False)
2751
2752    def test_unnamed_column_exception(self):
2753        # this can occur in some declarative situations
2754        c = Column(Integer)
2755        idx = Index("q", c)
2756        m = MetaData()
2757        t = Table("t", m, Column("q"))
2758        assert_raises_message(
2759            exc.ArgumentError,
2760            "Can't add unnamed column to column collection",
2761            t.append_constraint,
2762            idx,
2763        )
2764
2765    def test_non_attached_col_plus_string_expr(self):
2766        # another one that declarative can lead towards
2767        metadata = MetaData()
2768
2769        t1 = Table("a", metadata, Column("id", Integer))
2770
2771        c2 = Column("x", Integer)
2772
2773        # if we do it here, no problem
2774        # t1.append_column(c2)
2775
2776        idx = Index("foo", c2, desc("foo"))
2777
2778        t1.append_column(c2)
2779
2780        self._assert_index_col_x(t1, idx, columns=True)
2781
2782    def test_column_associated_w_lowercase_table(self):
2783        from sqlalchemy import table
2784
2785        c = Column("x", Integer)
2786        table("foo", c)
2787        idx = Index("q", c)
2788        is_(idx.table, None)  # lower-case-T table doesn't have indexes
2789
2790    def test_clauseelement_extraction_one(self):
2791        t = Table("t", MetaData(), Column("x", Integer), Column("y", Integer))
2792
2793        class MyThing(object):
2794            def __clause_element__(self):
2795                return t.c.x + 5
2796
2797        idx = Index("foo", MyThing())
2798        self._assert_index_col_x(t, idx)
2799
2800    def test_clauseelement_extraction_two(self):
2801        t = Table("t", MetaData(), Column("x", Integer), Column("y", Integer))
2802
2803        class MyThing(object):
2804            def __clause_element__(self):
2805                return t.c.x + 5
2806
2807        idx = Index("bar", MyThing(), t.c.y)
2808
2809        eq_(set(t.indexes), set([idx]))
2810
2811    def test_clauseelement_extraction_three(self):
2812        t = Table("t", MetaData(), Column("x", Integer), Column("y", Integer))
2813
2814        expr1 = t.c.x + 5
2815
2816        class MyThing(object):
2817            def __clause_element__(self):
2818                return expr1
2819
2820        idx = Index("bar", MyThing(), t.c.y)
2821
2822        is_(idx.expressions[0], expr1)
2823        is_(idx.expressions[1], t.c.y)
2824
2825    def test_table_references(self):
2826        t1, t2, t3 = self._single_fixture()
2827        assert list(t2.c.a.foreign_keys)[0].references(t1)
2828        assert not list(t2.c.a.foreign_keys)[0].references(t3)
2829
2830    def test_column_references(self):
2831        t1, t2, t3 = self._single_fixture()
2832        assert t2.c.a.references(t1.c.a)
2833        assert not t2.c.a.references(t3.c.a)
2834        assert not t2.c.a.references(t1.c.b)
2835
2836    def test_column_references_derived(self):
2837        t1, t2, t3 = self._single_fixture()
2838        s1 = tsa.select([tsa.select([t1]).alias()])
2839        assert t2.c.a.references(s1.c.a)
2840        assert not t2.c.a.references(s1.c.b)
2841
2842    def test_copy_doesnt_reference(self):
2843        t1, t2, t3 = self._single_fixture()
2844        a2 = t2.c.a.copy()
2845        assert not a2.references(t1.c.a)
2846        assert not a2.references(t1.c.b)
2847
2848    def test_derived_column_references(self):
2849        t1, t2, t3 = self._single_fixture()
2850        s1 = tsa.select([tsa.select([t2]).alias()])
2851        assert s1.c.a.references(t1.c.a)
2852        assert not s1.c.a.references(t1.c.b)
2853
2854    def test_referred_table_accessor(self):
2855        t1, t2, t3 = self._single_fixture()
2856        fkc = list(t2.foreign_key_constraints)[0]
2857        is_(fkc.referred_table, t1)
2858
2859    def test_referred_table_accessor_not_available(self):
2860        t1 = Table("t", MetaData(), Column("x", ForeignKey("q.id")))
2861        fkc = list(t1.foreign_key_constraints)[0]
2862        assert_raises_message(
2863            exc.InvalidRequestError,
2864            "Foreign key associated with column 't.x' could not find "
2865            "table 'q' with which to generate a foreign key to target "
2866            "column 'id'",
2867            getattr,
2868            fkc,
2869            "referred_table",
2870        )
2871
2872    def test_related_column_not_present_atfirst_ok(self):
2873        m = MetaData()
2874        base_table = Table("base", m, Column("id", Integer, primary_key=True))
2875        fk = ForeignKey("base.q")
2876        derived_table = Table(
2877            "derived", m, Column("id", None, fk, primary_key=True)
2878        )
2879
2880        base_table.append_column(Column("q", Integer))
2881        assert fk.column is base_table.c.q
2882        assert isinstance(derived_table.c.id.type, Integer)
2883
2884    def test_related_column_not_present_atfirst_ok_onname(self):
2885        m = MetaData()
2886        base_table = Table("base", m, Column("id", Integer, primary_key=True))
2887        fk = ForeignKey("base.q", link_to_name=True)
2888        derived_table = Table(
2889            "derived", m, Column("id", None, fk, primary_key=True)
2890        )
2891
2892        base_table.append_column(Column("q", Integer, key="zz"))
2893        assert fk.column is base_table.c.zz
2894        assert isinstance(derived_table.c.id.type, Integer)
2895
2896    def test_related_column_not_present_atfirst_ok_linktoname_conflict(self):
2897        m = MetaData()
2898        base_table = Table("base", m, Column("id", Integer, primary_key=True))
2899        fk = ForeignKey("base.q", link_to_name=True)
2900        derived_table = Table(
2901            "derived", m, Column("id", None, fk, primary_key=True)
2902        )
2903
2904        base_table.append_column(Column("zz", Integer, key="q"))
2905        base_table.append_column(Column("q", Integer, key="zz"))
2906        assert fk.column is base_table.c.zz
2907        assert isinstance(derived_table.c.id.type, Integer)
2908
2909    def test_invalid_composite_fk_check_strings(self):
2910        m = MetaData()
2911
2912        assert_raises_message(
2913            exc.ArgumentError,
2914            r"ForeignKeyConstraint on t1\(x, y\) refers to "
2915            "multiple remote tables: t2 and t3",
2916            Table,
2917            "t1",
2918            m,
2919            Column("x", Integer),
2920            Column("y", Integer),
2921            ForeignKeyConstraint(["x", "y"], ["t2.x", "t3.y"]),
2922        )
2923
2924    def test_invalid_composite_fk_check_columns(self):
2925        m = MetaData()
2926
2927        t2 = Table("t2", m, Column("x", Integer))
2928        t3 = Table("t3", m, Column("y", Integer))
2929
2930        assert_raises_message(
2931            exc.ArgumentError,
2932            r"ForeignKeyConstraint on t1\(x, y\) refers to "
2933            "multiple remote tables: t2 and t3",
2934            Table,
2935            "t1",
2936            m,
2937            Column("x", Integer),
2938            Column("y", Integer),
2939            ForeignKeyConstraint(["x", "y"], [t2.c.x, t3.c.y]),
2940        )
2941
2942    def test_invalid_composite_fk_check_columns_notattached(self):
2943        m = MetaData()
2944        x = Column("x", Integer)
2945        y = Column("y", Integer)
2946
2947        # no error is raised for this one right now.
2948        # which is a minor bug.
2949        Table(
2950            "t1",
2951            m,
2952            Column("x", Integer),
2953            Column("y", Integer),
2954            ForeignKeyConstraint(["x", "y"], [x, y]),
2955        )
2956
2957        Table("t2", m, x)
2958        Table("t3", m, y)
2959
2960    def test_constraint_copied_to_proxy_ok(self):
2961        m = MetaData()
2962        Table("t1", m, Column("id", Integer, primary_key=True))
2963        t2 = Table(
2964            "t2",
2965            m,
2966            Column("id", Integer, ForeignKey("t1.id"), primary_key=True),
2967        )
2968
2969        s = tsa.select([t2])
2970        t2fk = list(t2.c.id.foreign_keys)[0]
2971        sfk = list(s.c.id.foreign_keys)[0]
2972
2973        # the two FKs share the ForeignKeyConstraint
2974        is_(t2fk.constraint, sfk.constraint)
2975
2976        # but the ForeignKeyConstraint isn't
2977        # aware of the select's FK
2978        eq_(t2fk.constraint.elements, [t2fk])
2979
2980    def test_type_propagate_composite_fk_string(self):
2981        metadata = MetaData()
2982        Table(
2983            "a",
2984            metadata,
2985            Column("key1", Integer, primary_key=True),
2986            Column("key2", String(40), primary_key=True),
2987        )
2988
2989        b = Table(
2990            "b",
2991            metadata,
2992            Column("a_key1", None),
2993            Column("a_key2", None),
2994            Column("id", Integer, primary_key=True),
2995            ForeignKeyConstraint(["a_key1", "a_key2"], ["a.key1", "a.key2"]),
2996        )
2997
2998        assert isinstance(b.c.a_key1.type, Integer)
2999        assert isinstance(b.c.a_key2.type, String)
3000
3001    def test_type_propagate_composite_fk_col(self):
3002        metadata = MetaData()
3003        a = Table(
3004            "a",
3005            metadata,
3006            Column("key1", Integer, primary_key=True),
3007            Column("key2", String(40), primary_key=True),
3008        )
3009
3010        b = Table(
3011            "b",
3012            metadata,
3013            Column("a_key1", None),
3014            Column("a_key2", None),
3015            Column("id", Integer, primary_key=True),
3016            ForeignKeyConstraint(["a_key1", "a_key2"], [a.c.key1, a.c.key2]),
3017        )
3018
3019        assert isinstance(b.c.a_key1.type, Integer)
3020        assert isinstance(b.c.a_key2.type, String)
3021
3022    def test_type_propagate_standalone_fk_string(self):
3023        metadata = MetaData()
3024        Table("a", metadata, Column("key1", Integer, primary_key=True))
3025
3026        b = Table("b", metadata, Column("a_key1", None, ForeignKey("a.key1")))
3027
3028        assert isinstance(b.c.a_key1.type, Integer)
3029
3030    def test_type_propagate_standalone_fk_col(self):
3031        metadata = MetaData()
3032        a = Table("a", metadata, Column("key1", Integer, primary_key=True))
3033
3034        b = Table("b", metadata, Column("a_key1", None, ForeignKey(a.c.key1)))
3035
3036        assert isinstance(b.c.a_key1.type, Integer)
3037
3038    def test_type_propagate_chained_string_source_first(self):
3039        metadata = MetaData()
3040        Table("a", metadata, Column("key1", Integer, primary_key=True))
3041
3042        b = Table("b", metadata, Column("a_key1", None, ForeignKey("a.key1")))
3043
3044        c = Table(
3045            "c", metadata, Column("b_key1", None, ForeignKey("b.a_key1"))
3046        )
3047
3048        assert isinstance(b.c.a_key1.type, Integer)
3049        assert isinstance(c.c.b_key1.type, Integer)
3050
3051    def test_type_propagate_chained_string_source_last(self):
3052        metadata = MetaData()
3053
3054        b = Table("b", metadata, Column("a_key1", None, ForeignKey("a.key1")))
3055
3056        c = Table(
3057            "c", metadata, Column("b_key1", None, ForeignKey("b.a_key1"))
3058        )
3059
3060        Table("a", metadata, Column("key1", Integer, primary_key=True))
3061
3062        assert isinstance(b.c.a_key1.type, Integer)
3063        assert isinstance(c.c.b_key1.type, Integer)
3064
3065    def test_type_propagate_chained_string_source_last_onname(self):
3066        metadata = MetaData()
3067
3068        b = Table(
3069            "b",
3070            metadata,
3071            Column(
3072                "a_key1",
3073                None,
3074                ForeignKey("a.key1", link_to_name=True),
3075                key="ak1",
3076            ),
3077        )
3078
3079        c = Table(
3080            "c",
3081            metadata,
3082            Column(
3083                "b_key1",
3084                None,
3085                ForeignKey("b.a_key1", link_to_name=True),
3086                key="bk1",
3087            ),
3088        )
3089
3090        Table(
3091            "a", metadata, Column("key1", Integer, primary_key=True, key="ak1")
3092        )
3093
3094        assert isinstance(b.c.ak1.type, Integer)
3095        assert isinstance(c.c.bk1.type, Integer)
3096
3097    def test_type_propagate_chained_string_source_last_onname_conflict(self):
3098        metadata = MetaData()
3099
3100        b = Table(
3101            "b",
3102            metadata,
3103            # b.c.key1 -> a.c.key1 -> String
3104            Column(
3105                "ak1",
3106                None,
3107                ForeignKey("a.key1", link_to_name=False),
3108                key="key1",
3109            ),
3110            # b.c.ak1 -> a.c.ak1 -> Integer
3111            Column(
3112                "a_key1",
3113                None,
3114                ForeignKey("a.key1", link_to_name=True),
3115                key="ak1",
3116            ),
3117        )
3118
3119        c = Table(
3120            "c",
3121            metadata,
3122            # c.c.b_key1 -> b.c.ak1 -> Integer
3123            Column("b_key1", None, ForeignKey("b.ak1", link_to_name=False)),
3124            # c.c.b_ak1 -> b.c.ak1
3125            Column("b_ak1", None, ForeignKey("b.ak1", link_to_name=True)),
3126        )
3127
3128        Table(
3129            "a",
3130            metadata,
3131            # a.c.key1
3132            Column("ak1", String, key="key1"),
3133            # a.c.ak1
3134            Column("key1", Integer, primary_key=True, key="ak1"),
3135        )
3136
3137        assert isinstance(b.c.key1.type, String)
3138        assert isinstance(b.c.ak1.type, Integer)
3139
3140        assert isinstance(c.c.b_ak1.type, String)
3141        assert isinstance(c.c.b_key1.type, Integer)
3142
3143    def test_type_propagate_chained_col_orig_first(self):
3144        metadata = MetaData()
3145        a = Table("a", metadata, Column("key1", Integer, primary_key=True))
3146
3147        b = Table("b", metadata, Column("a_key1", None, ForeignKey(a.c.key1)))
3148
3149        c = Table(
3150            "c", metadata, Column("b_key1", None, ForeignKey(b.c.a_key1))
3151        )
3152
3153        assert isinstance(b.c.a_key1.type, Integer)
3154        assert isinstance(c.c.b_key1.type, Integer)
3155
3156    def test_column_accessor_col(self):
3157        c1 = Column("x", Integer)
3158        fk = ForeignKey(c1)
3159        is_(fk.column, c1)
3160
3161    def test_column_accessor_clause_element(self):
3162        c1 = Column("x", Integer)
3163
3164        class CThing(object):
3165            def __init__(self, c):
3166                self.c = c
3167
3168            def __clause_element__(self):
3169                return self.c
3170
3171        fk = ForeignKey(CThing(c1))
3172        is_(fk.column, c1)
3173
3174    def test_column_accessor_string_no_parent(self):
3175        fk = ForeignKey("sometable.somecol")
3176        assert_raises_message(
3177            exc.InvalidRequestError,
3178            "this ForeignKey object does not yet have a parent "
3179            "Column associated with it.",
3180            getattr,
3181            fk,
3182            "column",
3183        )
3184
3185    def test_column_accessor_string_no_parent_table(self):
3186        fk = ForeignKey("sometable.somecol")
3187        Column("x", fk)
3188        assert_raises_message(
3189            exc.InvalidRequestError,
3190            "this ForeignKey's parent column is not yet "
3191            "associated with a Table.",
3192            getattr,
3193            fk,
3194            "column",
3195        )
3196
3197    def test_column_accessor_string_no_target_table(self):
3198        fk = ForeignKey("sometable.somecol")
3199        c1 = Column("x", fk)
3200        Table("t", MetaData(), c1)
3201        assert_raises_message(
3202            exc.NoReferencedTableError,
3203            "Foreign key associated with column 't.x' could not find "
3204            "table 'sometable' with which to generate a "
3205            "foreign key to target column 'somecol'",
3206            getattr,
3207            fk,
3208            "column",
3209        )
3210
3211    def test_column_accessor_string_no_target_column(self):
3212        fk = ForeignKey("sometable.somecol")
3213        c1 = Column("x", fk)
3214        m = MetaData()
3215        Table("t", m, c1)
3216        Table("sometable", m, Column("notsomecol", Integer))
3217        assert_raises_message(
3218            exc.NoReferencedColumnError,
3219            "Could not initialize target column for ForeignKey "
3220            "'sometable.somecol' on table 't': "
3221            "table 'sometable' has no column named 'somecol'",
3222            getattr,
3223            fk,
3224            "column",
3225        )
3226
3227    def test_remove_table_fk_bookkeeping(self):
3228        metadata = MetaData()
3229        fk = ForeignKey("t1.x")
3230        t2 = Table("t2", metadata, Column("y", Integer, fk))
3231        t3 = Table("t3", metadata, Column("y", Integer, ForeignKey("t1.x")))
3232
3233        assert t2.key in metadata.tables
3234        assert ("t1", "x") in metadata._fk_memos
3235
3236        metadata.remove(t2)
3237
3238        # key is removed
3239        assert t2.key not in metadata.tables
3240
3241        # the memo for the FK is still there
3242        assert ("t1", "x") in metadata._fk_memos
3243
3244        # fk is not in the collection
3245        assert fk not in metadata._fk_memos[("t1", "x")]
3246
3247        # make the referenced table
3248        t1 = Table("t1", metadata, Column("x", Integer))
3249
3250        # t2 tells us exactly what's wrong
3251        assert_raises_message(
3252            exc.InvalidRequestError,
3253            "Table t2 is no longer associated with its parent MetaData",
3254            getattr,
3255            fk,
3256            "column",
3257        )
3258
3259        # t3 is unaffected
3260        assert t3.c.y.references(t1.c.x)
3261
3262        # remove twice OK
3263        metadata.remove(t2)
3264
3265    def test_double_fk_usage_raises(self):
3266        f = ForeignKey("b.id")
3267
3268        Column("x", Integer, f)
3269        assert_raises(exc.InvalidRequestError, Column, "y", Integer, f)
3270
3271    def test_auto_append_constraint(self):
3272        m = MetaData()
3273
3274        t = Table("tbl", m, Column("a", Integer), Column("b", Integer))
3275
3276        t2 = Table("t2", m, Column("a", Integer), Column("b", Integer))
3277
3278        for c in (
3279            UniqueConstraint(t.c.a),
3280            CheckConstraint(t.c.a > 5),
3281            ForeignKeyConstraint([t.c.a], [t2.c.a]),
3282            PrimaryKeyConstraint(t.c.a),
3283        ):
3284            assert c in t.constraints
3285            t.append_constraint(c)
3286            assert c in t.constraints
3287
3288        c = Index("foo", t.c.a)
3289        assert c in t.indexes
3290
3291    def test_auto_append_lowercase_table(self):
3292        from sqlalchemy import table, column
3293
3294        t = table("t", column("a"))
3295        t2 = table("t2", column("a"))
3296        for c in (
3297            UniqueConstraint(t.c.a),
3298            CheckConstraint(t.c.a > 5),
3299            ForeignKeyConstraint([t.c.a], [t2.c.a]),
3300            PrimaryKeyConstraint(t.c.a),
3301            Index("foo", t.c.a),
3302        ):
3303            assert True
3304
3305    def test_tometadata_ok(self):
3306        m = MetaData()
3307
3308        t = Table("tbl", m, Column("a", Integer), Column("b", Integer))
3309
3310        t2 = Table("t2", m, Column("a", Integer), Column("b", Integer))
3311
3312        UniqueConstraint(t.c.a)
3313        CheckConstraint(t.c.a > 5)
3314        ForeignKeyConstraint([t.c.a], [t2.c.a])
3315        PrimaryKeyConstraint(t.c.a)
3316
3317        m2 = MetaData()
3318
3319        t3 = t.tometadata(m2)
3320
3321        eq_(len(t3.constraints), 4)
3322
3323        for c in t3.constraints:
3324            assert c.table is t3
3325
3326    def test_check_constraint_copy(self):
3327        m = MetaData()
3328        t = Table("tbl", m, Column("a", Integer), Column("b", Integer))
3329        ck = CheckConstraint(t.c.a > 5)
3330        ck2 = ck.copy()
3331        assert ck in t.constraints
3332        assert ck2 not in t.constraints
3333
3334    def test_ambig_check_constraint_auto_append(self):
3335        m = MetaData()
3336
3337        t = Table("tbl", m, Column("a", Integer), Column("b", Integer))
3338
3339        t2 = Table("t2", m, Column("a", Integer), Column("b", Integer))
3340        c = CheckConstraint(t.c.a > t2.c.b)
3341        assert c not in t.constraints
3342        assert c not in t2.constraints
3343
3344    def test_auto_append_ck_on_col_attach_one(self):
3345        m = MetaData()
3346
3347        a = Column("a", Integer)
3348        b = Column("b", Integer)
3349        ck = CheckConstraint(a > b)
3350
3351        t = Table("tbl", m, a, b)
3352        assert ck in t.constraints
3353
3354    def test_auto_append_ck_on_col_attach_two(self):
3355        m = MetaData()
3356
3357        a = Column("a", Integer)
3358        b = Column("b", Integer)
3359        c = Column("c", Integer)
3360        ck = CheckConstraint(a > b + c)
3361
3362        t = Table("tbl", m, a)
3363        assert ck not in t.constraints
3364
3365        t.append_column(b)
3366        assert ck not in t.constraints
3367
3368        t.append_column(c)
3369        assert ck in t.constraints
3370
3371    def test_auto_append_ck_on_col_attach_three(self):
3372        m = MetaData()
3373
3374        a = Column("a", Integer)
3375        b = Column("b", Integer)
3376        c = Column("c", Integer)
3377        ck = CheckConstraint(a > b + c)
3378
3379        t = Table("tbl", m, a)
3380        assert ck not in t.constraints
3381
3382        t.append_column(b)
3383        assert ck not in t.constraints
3384
3385        t2 = Table("t2", m)
3386        t2.append_column(c)
3387
3388        # two different tables, so CheckConstraint does nothing.
3389        assert ck not in t.constraints
3390
3391    def test_auto_append_uq_on_col_attach_one(self):
3392        m = MetaData()
3393
3394        a = Column("a", Integer)
3395        b = Column("b", Integer)
3396        uq = UniqueConstraint(a, b)
3397
3398        t = Table("tbl", m, a, b)
3399        assert uq in t.constraints
3400
3401    def test_auto_append_uq_on_col_attach_two(self):
3402        m = MetaData()
3403
3404        a = Column("a", Integer)
3405        b = Column("b", Integer)
3406        c = Column("c", Integer)
3407        uq = UniqueConstraint(a, b, c)
3408
3409        t = Table("tbl", m, a)
3410        assert uq not in t.constraints
3411
3412        t.append_column(b)
3413        assert uq not in t.constraints
3414
3415        t.append_column(c)
3416        assert uq in t.constraints
3417
3418    def test_auto_append_uq_on_col_attach_three(self):
3419        m = MetaData()
3420
3421        a = Column("a", Integer)
3422        b = Column("b", Integer)
3423        c = Column("c", Integer)
3424        uq = UniqueConstraint(a, b, c)
3425
3426        t = Table("tbl", m, a)
3427        assert uq not in t.constraints
3428
3429        t.append_column(b)
3430        assert uq not in t.constraints
3431
3432        t2 = Table("t2", m)
3433
3434        # two different tables, so UniqueConstraint raises
3435        assert_raises_message(
3436            exc.ArgumentError,
3437            r"Column\(s\) 't2\.c' are not part of table 'tbl'\.",
3438            t2.append_column,
3439            c,
3440        )
3441
3442    def test_auto_append_uq_on_col_attach_four(self):
3443        """Test that a uniqueconstraint that names Column and string names
3444        won't autoattach using deferred column attachment.
3445
3446        """
3447        m = MetaData()
3448
3449        a = Column("a", Integer)
3450        b = Column("b", Integer)
3451        c = Column("c", Integer)
3452        uq = UniqueConstraint(a, "b", "c")
3453
3454        t = Table("tbl", m, a)
3455        assert uq not in t.constraints
3456
3457        t.append_column(b)
3458        assert uq not in t.constraints
3459
3460        t.append_column(c)
3461
3462        # we don't track events for previously unknown columns
3463        # named 'c' to be attached
3464        assert uq not in t.constraints
3465
3466        t.append_constraint(uq)
3467
3468        assert uq in t.constraints
3469
3470        eq_(
3471            [cn for cn in t.constraints if isinstance(cn, UniqueConstraint)],
3472            [uq],
3473        )
3474
3475    def test_auto_append_uq_on_col_attach_five(self):
3476        """Test that a uniqueconstraint that names Column and string names
3477        *will* autoattach if the table has all those names up front.
3478
3479        """
3480        m = MetaData()
3481
3482        a = Column("a", Integer)
3483        b = Column("b", Integer)
3484        c = Column("c", Integer)
3485
3486        t = Table("tbl", m, a, c, b)
3487
3488        uq = UniqueConstraint(a, "b", "c")
3489
3490        assert uq in t.constraints
3491
3492        t.append_constraint(uq)
3493
3494        assert uq in t.constraints
3495
3496        eq_(
3497            [cn for cn in t.constraints if isinstance(cn, UniqueConstraint)],
3498            [uq],
3499        )
3500
3501    def test_index_asserts_cols_standalone(self):
3502        metadata = MetaData()
3503
3504        t1 = Table("t1", metadata, Column("x", Integer))
3505        t2 = Table("t2", metadata, Column("y", Integer))
3506        assert_raises_message(
3507            exc.ArgumentError,
3508            r"Column\(s\) 't2.y' are not part of table 't1'.",
3509            Index,
3510            "bar",
3511            t1.c.x,
3512            t2.c.y,
3513        )
3514
3515    def test_index_asserts_cols_inline(self):
3516        metadata = MetaData()
3517
3518        t1 = Table("t1", metadata, Column("x", Integer))
3519        assert_raises_message(
3520            exc.ArgumentError,
3521            "Index 'bar' is against table 't1', and "
3522            "cannot be associated with table 't2'.",
3523            Table,
3524            "t2",
3525            metadata,
3526            Column("y", Integer),
3527            Index("bar", t1.c.x),
3528        )
3529
3530    def test_raise_index_nonexistent_name(self):
3531        m = MetaData()
3532        # the KeyError isn't ideal here, a nicer message
3533        # perhaps
3534        assert_raises(
3535            KeyError, Table, "t", m, Column("x", Integer), Index("foo", "q")
3536        )
3537
3538    def test_raise_not_a_column(self):
3539        assert_raises(exc.ArgumentError, Index, "foo", 5)
3540
3541    def test_raise_expr_no_column(self):
3542        idx = Index("foo", func.lower(5))
3543
3544        assert_raises_message(
3545            exc.CompileError,
3546            "Index 'foo' is not associated with any table.",
3547            schema.CreateIndex(idx).compile,
3548            dialect=testing.db.dialect,
3549        )
3550        assert_raises_message(
3551            exc.CompileError,
3552            "Index 'foo' is not associated with any table.",
3553            schema.CreateIndex(idx).compile,
3554        )
3555
3556    def test_no_warning_w_no_columns(self):
3557        idx = Index(name="foo")
3558
3559        assert_raises_message(
3560            exc.CompileError,
3561            "Index 'foo' is not associated with any table.",
3562            schema.CreateIndex(idx).compile,
3563            dialect=testing.db.dialect,
3564        )
3565        assert_raises_message(
3566            exc.CompileError,
3567            "Index 'foo' is not associated with any table.",
3568            schema.CreateIndex(idx).compile,
3569        )
3570
3571    def test_raise_clauseelement_not_a_column(self):
3572        m = MetaData()
3573        t2 = Table("t2", m, Column("x", Integer))
3574
3575        class SomeClass(object):
3576            def __clause_element__(self):
3577                return t2
3578
3579        assert_raises_message(
3580            exc.ArgumentError,
3581            r"Element Table\('t2', .* is not a string name or column element",
3582            Index,
3583            "foo",
3584            SomeClass(),
3585        )
3586
3587
3588class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
3589
3590    """Test Column() construction."""
3591
3592    __dialect__ = "default"
3593
3594    def columns(self):
3595        return [
3596            Column(Integer),
3597            Column("b", Integer),
3598            Column(Integer),
3599            Column("d", Integer),
3600            Column(Integer, name="e"),
3601            Column(type_=Integer),
3602            Column(Integer()),
3603            Column("h", Integer()),
3604            Column(type_=Integer()),
3605        ]
3606
3607    def test_basic(self):
3608        c = self.columns()
3609
3610        for i, v in ((0, "a"), (2, "c"), (5, "f"), (6, "g"), (8, "i")):
3611            c[i].name = v
3612            c[i].key = v
3613        del i, v
3614
3615        tbl = Table("table", MetaData(), *c)
3616
3617        for i, col in enumerate(tbl.c):
3618            assert col.name == c[i].name
3619
3620    def test_name_none(self):
3621
3622        c = Column(Integer)
3623        assert_raises_message(
3624            exc.ArgumentError,
3625            "Column must be constructed with a non-blank name or assign a "
3626            "non-blank .name ",
3627            Table,
3628            "t",
3629            MetaData(),
3630            c,
3631        )
3632
3633    def test_name_blank(self):
3634
3635        c = Column("", Integer)
3636        assert_raises_message(
3637            exc.ArgumentError,
3638            "Column must be constructed with a non-blank name or assign a "
3639            "non-blank .name ",
3640            Table,
3641            "t",
3642            MetaData(),
3643            c,
3644        )
3645
3646    def test_no_shared_column_schema(self):
3647        c = Column("x", Integer)
3648        Table("t", MetaData(), c)
3649
3650        assert_raises_message(
3651            exc.ArgumentError,
3652            "Column object 'x' already assigned to Table 't'",
3653            Table,
3654            "q",
3655            MetaData(),
3656            c,
3657        )
3658
3659    def test_no_shared_column_sql(self):
3660        c = column("x", Integer)
3661        table("t", c)
3662
3663        assert_raises_message(
3664            exc.ArgumentError,
3665            "column object 'x' already assigned to table 't'",
3666            table,
3667            "q",
3668            c,
3669        )
3670
3671    def test_incomplete_key(self):
3672        c = Column(Integer)
3673        assert c.name is None
3674        assert c.key is None
3675
3676        c.name = "named"
3677        Table("t", MetaData(), c)
3678
3679        assert c.name == "named"
3680        assert c.name == c.key
3681
3682    def test_unique_index_flags_default_to_none(self):
3683        c = Column(Integer)
3684        eq_(c.unique, None)
3685        eq_(c.index, None)
3686
3687        c = Column("c", Integer, index=True)
3688        eq_(c.unique, None)
3689        eq_(c.index, True)
3690
3691        t = Table("t", MetaData(), c)
3692        eq_(list(t.indexes)[0].unique, False)
3693
3694        c = Column(Integer, unique=True)
3695        eq_(c.unique, True)
3696        eq_(c.index, None)
3697
3698        c = Column("c", Integer, index=True, unique=True)
3699        eq_(c.unique, True)
3700        eq_(c.index, True)
3701
3702        t = Table("t", MetaData(), c)
3703        eq_(list(t.indexes)[0].unique, True)
3704
3705    def test_bogus(self):
3706        assert_raises(exc.ArgumentError, Column, "foo", name="bar")
3707        assert_raises(
3708            exc.ArgumentError, Column, "foo", Integer, type_=Integer()
3709        )
3710
3711    def test_custom_subclass_proxy(self):
3712        """test proxy generation of a Column subclass, can be compiled."""
3713
3714        from sqlalchemy.schema import Column
3715        from sqlalchemy.ext.compiler import compiles
3716        from sqlalchemy.sql import select
3717
3718        class MyColumn(Column):
3719            def _constructor(self, name, type_, **kw):
3720                kw["name"] = name
3721                return MyColumn(type_, **kw)
3722
3723            def __init__(self, type_, **kw):
3724                Column.__init__(self, type_, **kw)
3725
3726            def my_goofy_thing(self):
3727                return "hi"
3728
3729        @compiles(MyColumn)
3730        def goofy(element, compiler, **kw):
3731            s = compiler.visit_column(element, **kw)
3732            return s + "-"
3733
3734        id_ = MyColumn(Integer, primary_key=True)
3735        id_.name = "id"
3736        name = MyColumn(String)
3737        name.name = "name"
3738        t1 = Table("foo", MetaData(), id_, name)
3739
3740        # goofy thing
3741        eq_(t1.c.name.my_goofy_thing(), "hi")
3742
3743        # create proxy
3744        s = select([t1.select().alias()])
3745
3746        # proxy has goofy thing
3747        eq_(s.c.name.my_goofy_thing(), "hi")
3748
3749        # compile works
3750        self.assert_compile(
3751            select([t1.select().alias()]),
3752            "SELECT anon_1.id-, anon_1.name- FROM "
3753            "(SELECT foo.id- AS id, foo.name- AS name "
3754            "FROM foo) AS anon_1",
3755        )
3756
3757    def test_custom_subclass_proxy_typeerror(self):
3758        from sqlalchemy.schema import Column
3759        from sqlalchemy.sql import select
3760
3761        class MyColumn(Column):
3762            def __init__(self, type_, **kw):
3763                Column.__init__(self, type_, **kw)
3764
3765        id_ = MyColumn(Integer, primary_key=True)
3766        id_.name = "id"
3767        name = MyColumn(String)
3768        name.name = "name"
3769        t1 = Table("foo", MetaData(), id_, name)
3770        assert_raises_message(
3771            TypeError,
3772            "Could not create a copy of this <class "
3773            "'test.sql.test_metadata..*MyColumn'> "
3774            "object.  Ensure the class includes a _constructor()",
3775            getattr,
3776            select([t1.select().alias()]),
3777            "c",
3778        )
3779
3780    def test_custom_create(self):
3781        from sqlalchemy.ext.compiler import compiles, deregister
3782
3783        @compiles(schema.CreateColumn)
3784        def compile_(element, compiler, **kw):
3785            column = element.element
3786
3787            if "special" not in column.info:
3788                return compiler.visit_create_column(element, **kw)
3789
3790            text = "%s SPECIAL DIRECTIVE %s" % (
3791                column.name,
3792                compiler.type_compiler.process(column.type),
3793            )
3794            default = compiler.get_column_default_string(column)
3795            if default is not None:
3796                text += " DEFAULT " + default
3797
3798            if not column.nullable:
3799                text += " NOT NULL"
3800
3801            if column.constraints:
3802                text += " ".join(
3803                    compiler.process(const) for const in column.constraints
3804                )
3805            return text
3806
3807        t = Table(
3808            "mytable",
3809            MetaData(),
3810            Column("x", Integer, info={"special": True}, primary_key=True),
3811            Column("y", String(50)),
3812            Column("z", String(20), info={"special": True}),
3813        )
3814
3815        self.assert_compile(
3816            schema.CreateTable(t),
3817            "CREATE TABLE mytable (x SPECIAL DIRECTIVE INTEGER "
3818            "NOT NULL, y VARCHAR(50), "
3819            "z SPECIAL DIRECTIVE VARCHAR(20), PRIMARY KEY (x))",
3820        )
3821
3822        deregister(schema.CreateColumn)
3823
3824
3825class ColumnDefaultsTest(fixtures.TestBase):
3826
3827    """test assignment of default fixures to columns"""
3828
3829    def _fixture(self, *arg, **kw):
3830        return Column("x", Integer, *arg, **kw)
3831
3832    def test_server_default_positional(self):
3833        target = schema.DefaultClause("y")
3834        c = self._fixture(target)
3835        assert c.server_default is target
3836        assert target.column is c
3837
3838    def test_onupdate_default_not_server_default_one(self):
3839        target1 = schema.DefaultClause("y")
3840        target2 = schema.DefaultClause("z")
3841
3842        c = self._fixture(server_default=target1, server_onupdate=target2)
3843        eq_(c.server_default.arg, "y")
3844        eq_(c.server_onupdate.arg, "z")
3845
3846    def test_onupdate_default_not_server_default_two(self):
3847        target1 = schema.DefaultClause("y", for_update=True)
3848        target2 = schema.DefaultClause("z", for_update=True)
3849
3850        c = self._fixture(server_default=target1, server_onupdate=target2)
3851        eq_(c.server_default.arg, "y")
3852        eq_(c.server_onupdate.arg, "z")
3853
3854    def test_onupdate_default_not_server_default_three(self):
3855        target1 = schema.DefaultClause("y", for_update=False)
3856        target2 = schema.DefaultClause("z", for_update=True)
3857
3858        c = self._fixture(target1, target2)
3859        eq_(c.server_default.arg, "y")
3860        eq_(c.server_onupdate.arg, "z")
3861
3862    def test_onupdate_default_not_server_default_four(self):
3863        target1 = schema.DefaultClause("y", for_update=False)
3864
3865        c = self._fixture(server_onupdate=target1)
3866        is_(c.server_default, None)
3867        eq_(c.server_onupdate.arg, "y")
3868
3869    def test_server_default_keyword_as_schemaitem(self):
3870        target = schema.DefaultClause("y")
3871        c = self._fixture(server_default=target)
3872        assert c.server_default is target
3873        assert target.column is c
3874
3875    def test_server_default_keyword_as_clause(self):
3876        target = "y"
3877        c = self._fixture(server_default=target)
3878        assert c.server_default.arg == target
3879        assert c.server_default.column is c
3880
3881    def test_server_default_onupdate_positional(self):
3882        target = schema.DefaultClause("y", for_update=True)
3883        c = self._fixture(target)
3884        assert c.server_onupdate is target
3885        assert target.column is c
3886
3887    def test_server_default_onupdate_keyword_as_schemaitem(self):
3888        target = schema.DefaultClause("y", for_update=True)
3889        c = self._fixture(server_onupdate=target)
3890        assert c.server_onupdate is target
3891        assert target.column is c
3892
3893    def test_server_default_onupdate_keyword_as_clause(self):
3894        target = "y"
3895        c = self._fixture(server_onupdate=target)
3896        assert c.server_onupdate.arg == target
3897        assert c.server_onupdate.column is c
3898
3899    def test_column_default_positional(self):
3900        target = schema.ColumnDefault("y")
3901        c = self._fixture(target)
3902        assert c.default is target
3903        assert target.column is c
3904
3905    def test_column_default_keyword_as_schemaitem(self):
3906        target = schema.ColumnDefault("y")
3907        c = self._fixture(default=target)
3908        assert c.default is target
3909        assert target.column is c
3910
3911    def test_column_default_keyword_as_clause(self):
3912        target = "y"
3913        c = self._fixture(default=target)
3914        assert c.default.arg == target
3915        assert c.default.column is c
3916
3917    def test_column_default_onupdate_positional(self):
3918        target = schema.ColumnDefault("y", for_update=True)
3919        c = self._fixture(target)
3920        assert c.onupdate is target
3921        assert target.column is c
3922
3923    def test_column_default_onupdate_keyword_as_schemaitem(self):
3924        target = schema.ColumnDefault("y", for_update=True)
3925        c = self._fixture(onupdate=target)
3926        assert c.onupdate is target
3927        assert target.column is c
3928
3929    def test_column_default_onupdate_keyword_as_clause(self):
3930        target = "y"
3931        c = self._fixture(onupdate=target)
3932        assert c.onupdate.arg == target
3933        assert c.onupdate.column is c
3934
3935
3936class ColumnOptionsTest(fixtures.TestBase):
3937    def test_default_generators(self):
3938        g1, g2 = Sequence("foo_id_seq"), ColumnDefault("f5")
3939        assert Column(String, default=g1).default is g1
3940        assert Column(String, onupdate=g1).onupdate is g1
3941        assert Column(String, default=g2).default is g2
3942        assert Column(String, onupdate=g2).onupdate is g2
3943
3944    def _null_type_error(self, col):
3945        t = Table("t", MetaData(), col)
3946        assert_raises_message(
3947            exc.CompileError,
3948            r"\(in table 't', column 'foo'\): Can't generate DDL for NullType",
3949            schema.CreateTable(t).compile,
3950        )
3951
3952    def _no_name_error(self, col):
3953        assert_raises_message(
3954            exc.ArgumentError,
3955            "Column must be constructed with a non-blank name or "
3956            "assign a non-blank .name",
3957            Table,
3958            "t",
3959            MetaData(),
3960            col,
3961        )
3962
3963    def _no_error(self, col):
3964        m = MetaData()
3965        Table("bar", m, Column("id", Integer))
3966        t = Table("t", m, col)
3967        schema.CreateTable(t).compile()
3968
3969    def test_argument_signatures(self):
3970        self._no_name_error(Column())
3971        self._null_type_error(Column("foo"))
3972        self._no_name_error(Column(default="foo"))
3973
3974        self._no_name_error(Column(Sequence("a")))
3975        self._null_type_error(Column("foo", default="foo"))
3976
3977        self._null_type_error(Column("foo", Sequence("a")))
3978
3979        self._no_name_error(Column(ForeignKey("bar.id")))
3980
3981        self._no_error(Column("foo", ForeignKey("bar.id")))
3982
3983        self._no_name_error(Column(ForeignKey("bar.id"), default="foo"))
3984
3985        self._no_name_error(Column(ForeignKey("bar.id"), Sequence("a")))
3986        self._no_error(Column("foo", ForeignKey("bar.id"), default="foo"))
3987        self._no_error(Column("foo", ForeignKey("bar.id"), Sequence("a")))
3988
3989    def test_column_info(self):
3990
3991        c1 = Column("foo", String, info={"x": "y"})
3992        c2 = Column("bar", String, info={})
3993        c3 = Column("bat", String)
3994        assert c1.info == {"x": "y"}
3995        assert c2.info == {}
3996        assert c3.info == {}
3997
3998        for c in (c1, c2, c3):
3999            c.info["bar"] = "zip"
4000            assert c.info["bar"] == "zip"
4001
4002
4003class CatchAllEventsTest(fixtures.RemovesEvents, fixtures.TestBase):
4004    def test_all_events(self):
4005        canary = []
4006
4007        def before_attach(obj, parent):
4008            canary.append(
4009                "%s->%s" % (obj.__class__.__name__, parent.__class__.__name__)
4010            )
4011
4012        def after_attach(obj, parent):
4013            canary.append("%s->%s" % (obj.__class__.__name__, parent))
4014
4015        self.event_listen(
4016            schema.SchemaItem, "before_parent_attach", before_attach
4017        )
4018        self.event_listen(
4019            schema.SchemaItem, "after_parent_attach", after_attach
4020        )
4021
4022        m = MetaData()
4023        Table(
4024            "t1",
4025            m,
4026            Column("id", Integer, Sequence("foo_id"), primary_key=True),
4027            Column("bar", String, ForeignKey("t2.id")),
4028        )
4029        Table("t2", m, Column("id", Integer, primary_key=True))
4030
4031        eq_(
4032            canary,
4033            [
4034                "Sequence->Column",
4035                "Sequence->id",
4036                "ForeignKey->Column",
4037                "ForeignKey->bar",
4038                "Table->MetaData",
4039                "PrimaryKeyConstraint->Table",
4040                "PrimaryKeyConstraint->t1",
4041                "Column->Table",
4042                "Column->t1",
4043                "Column->Table",
4044                "Column->t1",
4045                "ForeignKeyConstraint->Table",
4046                "ForeignKeyConstraint->t1",
4047                "Table->MetaData(bind=None)",
4048                "Table->MetaData",
4049                "PrimaryKeyConstraint->Table",
4050                "PrimaryKeyConstraint->t2",
4051                "Column->Table",
4052                "Column->t2",
4053                "Table->MetaData(bind=None)",
4054            ],
4055        )
4056
4057    def test_events_per_constraint(self):
4058        canary = []
4059
4060        def evt(target):
4061            def before_attach(obj, parent):
4062                canary.append(
4063                    "%s->%s" % (target.__name__, parent.__class__.__name__)
4064                )
4065
4066            def after_attach(obj, parent):
4067                assert hasattr(obj, "name")  # so we can change it
4068                canary.append("%s->%s" % (target.__name__, parent))
4069
4070            self.event_listen(target, "before_parent_attach", before_attach)
4071            self.event_listen(target, "after_parent_attach", after_attach)
4072
4073        for target in [
4074            schema.ForeignKeyConstraint,
4075            schema.PrimaryKeyConstraint,
4076            schema.UniqueConstraint,
4077            schema.CheckConstraint,
4078            schema.Index,
4079        ]:
4080            evt(target)
4081
4082        m = MetaData()
4083        Table(
4084            "t1",
4085            m,
4086            Column("id", Integer, Sequence("foo_id"), primary_key=True),
4087            Column("bar", String, ForeignKey("t2.id"), index=True),
4088            Column("bat", Integer, unique=True),
4089        )
4090        Table(
4091            "t2",
4092            m,
4093            Column("id", Integer, primary_key=True),
4094            Column("bar", Integer),
4095            Column("bat", Integer),
4096            CheckConstraint("bar>5"),
4097            UniqueConstraint("bar", "bat"),
4098            Index(None, "bar", "bat"),
4099        )
4100        eq_(
4101            canary,
4102            [
4103                "PrimaryKeyConstraint->Table",
4104                "PrimaryKeyConstraint->t1",
4105                "Index->Table",
4106                "Index->t1",
4107                "ForeignKeyConstraint->Table",
4108                "ForeignKeyConstraint->t1",
4109                "UniqueConstraint->Table",
4110                "UniqueConstraint->t1",
4111                "PrimaryKeyConstraint->Table",
4112                "PrimaryKeyConstraint->t2",
4113                "CheckConstraint->Table",
4114                "CheckConstraint->t2",
4115                "UniqueConstraint->Table",
4116                "UniqueConstraint->t2",
4117                "Index->Table",
4118                "Index->t2",
4119            ],
4120        )
4121
4122
4123class DialectKWArgTest(fixtures.TestBase):
4124    @contextmanager
4125    def _fixture(self):
4126        from sqlalchemy.engine.default import DefaultDialect
4127
4128        class ParticipatingDialect(DefaultDialect):
4129            construct_arguments = [
4130                (schema.Index, {"x": 5, "y": False, "z_one": None}),
4131                (schema.ForeignKeyConstraint, {"foobar": False}),
4132            ]
4133
4134        class ParticipatingDialect2(DefaultDialect):
4135            construct_arguments = [
4136                (schema.Index, {"x": 9, "y": True, "pp": "default"}),
4137                (schema.Table, {"*": None}),
4138            ]
4139
4140        class NonParticipatingDialect(DefaultDialect):
4141            construct_arguments = None
4142
4143        def load(dialect_name):
4144            if dialect_name == "participating":
4145                return ParticipatingDialect
4146            elif dialect_name == "participating2":
4147                return ParticipatingDialect2
4148            elif dialect_name == "nonparticipating":
4149                return NonParticipatingDialect
4150            else:
4151                raise exc.NoSuchModuleError("no dialect %r" % dialect_name)
4152
4153        with mock.patch("sqlalchemy.dialects.registry.load", load):
4154            yield
4155
4156    def teardown(self):
4157        Index._kw_registry.clear()
4158
4159    def test_participating(self):
4160        with self._fixture():
4161            idx = Index("a", "b", "c", participating_y=True)
4162            eq_(
4163                idx.dialect_options,
4164                {"participating": {"x": 5, "y": True, "z_one": None}},
4165            )
4166            eq_(idx.dialect_kwargs, {"participating_y": True})
4167
4168    def test_nonparticipating(self):
4169        with self._fixture():
4170            idx = Index(
4171                "a", "b", "c", nonparticipating_y=True, nonparticipating_q=5
4172            )
4173            eq_(
4174                idx.dialect_kwargs,
4175                {"nonparticipating_y": True, "nonparticipating_q": 5},
4176            )
4177
4178    def test_bad_kwarg_raise(self):
4179        with self._fixture():
4180            assert_raises_message(
4181                TypeError,
4182                "Additional arguments should be named "
4183                "<dialectname>_<argument>, got 'foobar'",
4184                Index,
4185                "a",
4186                "b",
4187                "c",
4188                foobar=True,
4189            )
4190
4191    def test_unknown_dialect_warning(self):
4192        with self._fixture():
4193            with testing.expect_warnings(
4194                "Can't validate argument 'unknown_y'; can't locate "
4195                "any SQLAlchemy dialect named 'unknown'",
4196            ):
4197                Index("a", "b", "c", unknown_y=True)
4198
4199    def test_participating_bad_kw(self):
4200        with self._fixture():
4201            assert_raises_message(
4202                exc.ArgumentError,
4203                "Argument 'participating_q_p_x' is not accepted by dialect "
4204                "'participating' on behalf of "
4205                "<class 'sqlalchemy.sql.schema.Index'>",
4206                Index,
4207                "a",
4208                "b",
4209                "c",
4210                participating_q_p_x=8,
4211            )
4212
4213    def test_participating_unknown_schema_item(self):
4214        with self._fixture():
4215            # the dialect doesn't include UniqueConstraint in
4216            # its registry at all.
4217            assert_raises_message(
4218                exc.ArgumentError,
4219                "Argument 'participating_q_p_x' is not accepted by dialect "
4220                "'participating' on behalf of "
4221                "<class 'sqlalchemy.sql.schema.UniqueConstraint'>",
4222                UniqueConstraint,
4223                "a",
4224                "b",
4225                participating_q_p_x=8,
4226            )
4227
4228    @testing.emits_warning("Can't validate")
4229    def test_unknown_dialect_warning_still_populates(self):
4230        with self._fixture():
4231            idx = Index("a", "b", "c", unknown_y=True)
4232            eq_(idx.dialect_kwargs, {"unknown_y": True})  # still populates
4233
4234    @testing.emits_warning("Can't validate")
4235    def test_unknown_dialect_warning_still_populates_multiple(self):
4236        with self._fixture():
4237            idx = Index(
4238                "a",
4239                "b",
4240                "c",
4241                unknown_y=True,
4242                unknown_z=5,
4243                otherunknown_foo="bar",
4244                participating_y=8,
4245            )
4246            eq_(
4247                idx.dialect_options,
4248                {
4249                    "unknown": {"y": True, "z": 5, "*": None},
4250                    "otherunknown": {"foo": "bar", "*": None},
4251                    "participating": {"x": 5, "y": 8, "z_one": None},
4252                },
4253            )
4254            eq_(
4255                idx.dialect_kwargs,
4256                {
4257                    "unknown_z": 5,
4258                    "participating_y": 8,
4259                    "unknown_y": True,
4260                    "otherunknown_foo": "bar",
4261                },
4262            )  # still populates
4263
4264    def test_runs_safekwarg(self):
4265
4266        with mock.patch(
4267            "sqlalchemy.util.safe_kwarg", lambda arg: "goofy_%s" % arg
4268        ):
4269            with self._fixture():
4270                idx = Index("a", "b")
4271                idx.kwargs[util.u("participating_x")] = 7
4272
4273                eq_(list(idx.dialect_kwargs), ["goofy_participating_x"])
4274
4275    def test_combined(self):
4276        with self._fixture():
4277            idx = Index(
4278                "a", "b", "c", participating_x=7, nonparticipating_y=True
4279            )
4280
4281            eq_(
4282                idx.dialect_options,
4283                {
4284                    "participating": {"y": False, "x": 7, "z_one": None},
4285                    "nonparticipating": {"y": True, "*": None},
4286                },
4287            )
4288            eq_(
4289                idx.dialect_kwargs,
4290                {"participating_x": 7, "nonparticipating_y": True},
4291            )
4292
4293    def test_multiple_participating(self):
4294        with self._fixture():
4295            idx = Index(
4296                "a",
4297                "b",
4298                "c",
4299                participating_x=7,
4300                participating2_x=15,
4301                participating2_y="lazy",
4302            )
4303            eq_(
4304                idx.dialect_options,
4305                {
4306                    "participating": {"x": 7, "y": False, "z_one": None},
4307                    "participating2": {"x": 15, "y": "lazy", "pp": "default"},
4308                },
4309            )
4310            eq_(
4311                idx.dialect_kwargs,
4312                {
4313                    "participating_x": 7,
4314                    "participating2_x": 15,
4315                    "participating2_y": "lazy",
4316                },
4317            )
4318
4319    def test_foreign_key_propagate(self):
4320        with self._fixture():
4321            m = MetaData()
4322            fk = ForeignKey("t2.id", participating_foobar=True)
4323            t = Table("t", m, Column("id", Integer, fk))
4324            fkc = [
4325                c for c in t.constraints if isinstance(c, ForeignKeyConstraint)
4326            ][0]
4327            eq_(fkc.dialect_kwargs, {"participating_foobar": True})
4328
4329    def test_foreign_key_propagate_exceptions_delayed(self):
4330        with self._fixture():
4331            m = MetaData()
4332            fk = ForeignKey("t2.id", participating_fake=True)
4333            c1 = Column("id", Integer, fk)
4334            assert_raises_message(
4335                exc.ArgumentError,
4336                "Argument 'participating_fake' is not accepted by "
4337                "dialect 'participating' on behalf of "
4338                "<class 'sqlalchemy.sql.schema.ForeignKeyConstraint'>",
4339                Table,
4340                "t",
4341                m,
4342                c1,
4343            )
4344
4345    def test_wildcard(self):
4346        with self._fixture():
4347            m = MetaData()
4348            t = Table(
4349                "x",
4350                m,
4351                Column("x", Integer),
4352                participating2_xyz="foo",
4353                participating2_engine="InnoDB",
4354            )
4355            eq_(
4356                t.dialect_kwargs,
4357                {
4358                    "participating2_xyz": "foo",
4359                    "participating2_engine": "InnoDB",
4360                },
4361            )
4362
4363    def test_uninit_wildcard(self):
4364        with self._fixture():
4365            m = MetaData()
4366            t = Table("x", m, Column("x", Integer))
4367            eq_(t.dialect_options["participating2"], {"*": None})
4368            eq_(t.dialect_kwargs, {})
4369
4370    def test_not_contains_wildcard(self):
4371        with self._fixture():
4372            m = MetaData()
4373            t = Table("x", m, Column("x", Integer))
4374            assert "foobar" not in t.dialect_options["participating2"]
4375
4376    def test_contains_wildcard(self):
4377        with self._fixture():
4378            m = MetaData()
4379            t = Table("x", m, Column("x", Integer), participating2_foobar=5)
4380            assert "foobar" in t.dialect_options["participating2"]
4381
4382    def test_update(self):
4383        with self._fixture():
4384            idx = Index("a", "b", "c", participating_x=20)
4385            eq_(idx.dialect_kwargs, {"participating_x": 20})
4386            idx._validate_dialect_kwargs(
4387                {"participating_x": 25, "participating_z_one": "default"}
4388            )
4389            eq_(
4390                idx.dialect_options,
4391                {"participating": {"x": 25, "y": False, "z_one": "default"}},
4392            )
4393            eq_(
4394                idx.dialect_kwargs,
4395                {"participating_x": 25, "participating_z_one": "default"},
4396            )
4397
4398            idx._validate_dialect_kwargs(
4399                {"participating_x": 25, "participating_z_one": "default"}
4400            )
4401
4402            eq_(
4403                idx.dialect_options,
4404                {"participating": {"x": 25, "y": False, "z_one": "default"}},
4405            )
4406            eq_(
4407                idx.dialect_kwargs,
4408                {"participating_x": 25, "participating_z_one": "default"},
4409            )
4410
4411            idx._validate_dialect_kwargs(
4412                {"participating_y": True, "participating2_y": "p2y"}
4413            )
4414            eq_(
4415                idx.dialect_options,
4416                {
4417                    "participating": {"x": 25, "y": True, "z_one": "default"},
4418                    "participating2": {"y": "p2y", "pp": "default", "x": 9},
4419                },
4420            )
4421            eq_(
4422                idx.dialect_kwargs,
4423                {
4424                    "participating_x": 25,
4425                    "participating_y": True,
4426                    "participating2_y": "p2y",
4427                    "participating_z_one": "default",
4428                },
4429            )
4430
4431    def test_key_error_kwargs_no_dialect(self):
4432        with self._fixture():
4433            idx = Index("a", "b", "c")
4434            assert_raises(KeyError, idx.kwargs.__getitem__, "foo_bar")
4435
4436    def test_key_error_kwargs_no_underscore(self):
4437        with self._fixture():
4438            idx = Index("a", "b", "c")
4439            assert_raises(KeyError, idx.kwargs.__getitem__, "foobar")
4440
4441    def test_key_error_kwargs_no_argument(self):
4442        with self._fixture():
4443            idx = Index("a", "b", "c")
4444            assert_raises(
4445                KeyError, idx.kwargs.__getitem__, "participating_asdmfq34098"
4446            )
4447
4448            assert_raises(
4449                KeyError,
4450                idx.kwargs.__getitem__,
4451                "nonparticipating_asdmfq34098",
4452            )
4453
4454    def test_key_error_dialect_options(self):
4455        with self._fixture():
4456            idx = Index("a", "b", "c")
4457            assert_raises(
4458                KeyError,
4459                idx.dialect_options["participating"].__getitem__,
4460                "asdfaso890",
4461            )
4462
4463            assert_raises(
4464                KeyError,
4465                idx.dialect_options["nonparticipating"].__getitem__,
4466                "asdfaso890",
4467            )
4468
4469    def test_ad_hoc_participating_via_opt(self):
4470        with self._fixture():
4471            idx = Index("a", "b", "c")
4472            idx.dialect_options["participating"]["foobar"] = 5
4473
4474            eq_(idx.dialect_options["participating"]["foobar"], 5)
4475            eq_(idx.kwargs["participating_foobar"], 5)
4476
4477    def test_ad_hoc_nonparticipating_via_opt(self):
4478        with self._fixture():
4479            idx = Index("a", "b", "c")
4480            idx.dialect_options["nonparticipating"]["foobar"] = 5
4481
4482            eq_(idx.dialect_options["nonparticipating"]["foobar"], 5)
4483            eq_(idx.kwargs["nonparticipating_foobar"], 5)
4484
4485    def test_ad_hoc_participating_via_kwargs(self):
4486        with self._fixture():
4487            idx = Index("a", "b", "c")
4488            idx.kwargs["participating_foobar"] = 5
4489
4490            eq_(idx.dialect_options["participating"]["foobar"], 5)
4491            eq_(idx.kwargs["participating_foobar"], 5)
4492
4493    def test_ad_hoc_nonparticipating_via_kwargs(self):
4494        with self._fixture():
4495            idx = Index("a", "b", "c")
4496            idx.kwargs["nonparticipating_foobar"] = 5
4497
4498            eq_(idx.dialect_options["nonparticipating"]["foobar"], 5)
4499            eq_(idx.kwargs["nonparticipating_foobar"], 5)
4500
4501    def test_ad_hoc_via_kwargs_invalid_key(self):
4502        with self._fixture():
4503            idx = Index("a", "b", "c")
4504            assert_raises_message(
4505                exc.ArgumentError,
4506                "Keys must be of the form <dialectname>_<argname>",
4507                idx.kwargs.__setitem__,
4508                "foobar",
4509                5,
4510            )
4511
4512    def test_ad_hoc_via_kwargs_invalid_dialect(self):
4513        with self._fixture():
4514            idx = Index("a", "b", "c")
4515            assert_raises_message(
4516                exc.ArgumentError,
4517                "no dialect 'nonexistent'",
4518                idx.kwargs.__setitem__,
4519                "nonexistent_foobar",
4520                5,
4521            )
4522
4523    def test_add_new_arguments_participating(self):
4524        with self._fixture():
4525            Index.argument_for("participating", "xyzqpr", False)
4526
4527            idx = Index("a", "b", "c", participating_xyzqpr=True)
4528
4529            eq_(idx.kwargs["participating_xyzqpr"], True)
4530
4531            idx = Index("a", "b", "c")
4532            eq_(idx.dialect_options["participating"]["xyzqpr"], False)
4533
4534    def test_add_new_arguments_participating_no_existing(self):
4535        with self._fixture():
4536            PrimaryKeyConstraint.argument_for("participating", "xyzqpr", False)
4537
4538            pk = PrimaryKeyConstraint("a", "b", "c", participating_xyzqpr=True)
4539
4540            eq_(pk.kwargs["participating_xyzqpr"], True)
4541
4542            pk = PrimaryKeyConstraint("a", "b", "c")
4543            eq_(pk.dialect_options["participating"]["xyzqpr"], False)
4544
4545    def test_add_new_arguments_nonparticipating(self):
4546        with self._fixture():
4547            assert_raises_message(
4548                exc.ArgumentError,
4549                "Dialect 'nonparticipating' does have keyword-argument "
4550                "validation and defaults enabled configured",
4551                Index.argument_for,
4552                "nonparticipating",
4553                "xyzqpr",
4554                False,
4555            )
4556
4557    def test_add_new_arguments_invalid_dialect(self):
4558        with self._fixture():
4559            assert_raises_message(
4560                exc.ArgumentError,
4561                "no dialect 'nonexistent'",
4562                Index.argument_for,
4563                "nonexistent",
4564                "foobar",
4565                5,
4566            )
4567
4568
4569class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
4570    __dialect__ = "default"
4571
4572    def _fixture(self, naming_convention, table_schema=None):
4573        m1 = MetaData(naming_convention=naming_convention)
4574
4575        u1 = Table(
4576            "user",
4577            m1,
4578            Column("id", Integer, primary_key=True),
4579            Column("version", Integer, primary_key=True),
4580            Column("data", String(30)),
4581            Column("Data2", String(30), key="data2"),
4582            Column("Data3", String(30), key="data3"),
4583            schema=table_schema,
4584        )
4585
4586        return u1
4587
4588    def _colliding_name_fixture(self, naming_convention, id_flags):
4589        m1 = MetaData(naming_convention=naming_convention)
4590
4591        t1 = Table(
4592            "foo",
4593            m1,
4594            Column("id", Integer, **id_flags),
4595            Column("foo_id", Integer),
4596        )
4597        return t1
4598
4599    def test_colliding_col_label_from_index_flag(self):
4600        t1 = self._colliding_name_fixture(
4601            {"ix": "ix_%(column_0_label)s"}, {"index": True}
4602        )
4603
4604        idx = list(t1.indexes)[0]
4605
4606        # name is generated up front.  alembic really prefers this
4607        eq_(idx.name, "ix_foo_id")
4608        self.assert_compile(
4609            CreateIndex(idx), "CREATE INDEX ix_foo_id ON foo (id)"
4610        )
4611
4612    def test_colliding_col_label_from_unique_flag(self):
4613        t1 = self._colliding_name_fixture(
4614            {"uq": "uq_%(column_0_label)s"}, {"unique": True}
4615        )
4616
4617        const = [c for c in t1.constraints if isinstance(c, UniqueConstraint)]
4618        uq = const[0]
4619
4620        # name is generated up front.  alembic really prefers this
4621        eq_(uq.name, "uq_foo_id")
4622
4623        self.assert_compile(
4624            AddConstraint(uq),
4625            "ALTER TABLE foo ADD CONSTRAINT uq_foo_id UNIQUE (id)",
4626        )
4627
4628    def test_colliding_col_label_from_index_obj(self):
4629        t1 = self._colliding_name_fixture({"ix": "ix_%(column_0_label)s"}, {})
4630
4631        idx = Index(None, t1.c.id)
4632        is_(idx, list(t1.indexes)[0])
4633        eq_(idx.name, "ix_foo_id")
4634        self.assert_compile(
4635            CreateIndex(idx), "CREATE INDEX ix_foo_id ON foo (id)"
4636        )
4637
4638    def test_colliding_col_label_from_unique_obj(self):
4639        t1 = self._colliding_name_fixture({"uq": "uq_%(column_0_label)s"}, {})
4640        uq = UniqueConstraint(t1.c.id)
4641        const = [c for c in t1.constraints if isinstance(c, UniqueConstraint)]
4642        is_(const[0], uq)
4643        eq_(const[0].name, "uq_foo_id")
4644        self.assert_compile(
4645            AddConstraint(const[0]),
4646            "ALTER TABLE foo ADD CONSTRAINT uq_foo_id UNIQUE (id)",
4647        )
4648
4649    def test_colliding_col_label_from_index_flag_no_conv(self):
4650        t1 = self._colliding_name_fixture({"ck": "foo"}, {"index": True})
4651
4652        idx = list(t1.indexes)[0]
4653
4654        # this behavior needs to fail, as of #4911 since we are testing it,
4655        # ensure it raises a CompileError.  In #4289 we may want to revisit
4656        # this in some way, most likely specifically to Postgresql only.
4657        assert_raises_message(
4658            exc.CompileError,
4659            "CREATE INDEX requires that the index have a name",
4660            CreateIndex(idx).compile,
4661        )
4662
4663        assert_raises_message(
4664            exc.CompileError,
4665            "DROP INDEX requires that the index have a name",
4666            DropIndex(idx).compile,
4667        )
4668
4669    def test_colliding_col_label_from_unique_flag_no_conv(self):
4670        t1 = self._colliding_name_fixture({"ck": "foo"}, {"unique": True})
4671
4672        const = [c for c in t1.constraints if isinstance(c, UniqueConstraint)]
4673        is_(const[0].name, None)
4674
4675        self.assert_compile(
4676            AddConstraint(const[0]), "ALTER TABLE foo ADD UNIQUE (id)"
4677        )
4678
4679    @testing.combinations(
4680        ("nopk",),
4681        ("column",),
4682        ("constraint",),
4683        ("explicit_name",),
4684        argnames="pktype",
4685    )
4686    @testing.combinations(
4687        ("pk_%(table_name)s", "pk_t1"),
4688        ("pk_%(column_0_name)s", "pk_x"),
4689        ("pk_%(column_0_N_name)s", "pk_x_y"),
4690        ("pk_%(column_0_N_label)s", "pk_t1_x_t1_y"),
4691        ("%(column_0_name)s", "x"),
4692        ("%(column_0N_name)s", "xy"),
4693        argnames="conv, expected_name",
4694    )
4695    def test_pk_conventions(self, conv, expected_name, pktype):
4696        m1 = MetaData(naming_convention={"pk": conv})
4697
4698        if pktype == "column":
4699            t1 = Table(
4700                "t1",
4701                m1,
4702                Column("x", Integer, primary_key=True),
4703                Column("y", Integer, primary_key=True),
4704            )
4705        elif pktype == "constraint":
4706            t1 = Table(
4707                "t1",
4708                m1,
4709                Column("x", Integer),
4710                Column("y", Integer),
4711                PrimaryKeyConstraint("x", "y"),
4712            )
4713        elif pktype == "nopk":
4714            t1 = Table(
4715                "t1",
4716                m1,
4717                Column("x", Integer, nullable=False),
4718                Column("y", Integer, nullable=False),
4719            )
4720            expected_name = None
4721        elif pktype == "explicit_name":
4722            t1 = Table(
4723                "t1",
4724                m1,
4725                Column("x", Integer, primary_key=True),
4726                Column("y", Integer, primary_key=True),
4727                PrimaryKeyConstraint("x", "y", name="myname"),
4728            )
4729            expected_name = "myname"
4730
4731        if expected_name:
4732            eq_(t1.primary_key.name, expected_name)
4733
4734        if pktype == "nopk":
4735            self.assert_compile(
4736                schema.CreateTable(t1),
4737                "CREATE TABLE t1 (x INTEGER NOT NULL, y INTEGER NOT NULL)",
4738            )
4739        else:
4740            self.assert_compile(
4741                schema.CreateTable(t1),
4742                "CREATE TABLE t1 (x INTEGER NOT NULL, y INTEGER NOT NULL, "
4743                "CONSTRAINT %s PRIMARY KEY (x, y))" % expected_name,
4744            )
4745
4746    def test_uq_name(self):
4747        u1 = self._fixture(
4748            naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"}
4749        )
4750        uq = UniqueConstraint(u1.c.data)
4751        eq_(uq.name, "uq_user_data")
4752
4753    def test_uq_conv_name(self):
4754        u1 = self._fixture(
4755            naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"}
4756        )
4757        uq = UniqueConstraint(u1.c.data, name=naming.conv("myname"))
4758        self.assert_compile(
4759            schema.AddConstraint(uq),
4760            'ALTER TABLE "user" ADD CONSTRAINT myname UNIQUE (data)',
4761            dialect="default",
4762        )
4763
4764    def test_uq_defer_name_convention(self):
4765        u1 = self._fixture(
4766            naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"}
4767        )
4768        uq = UniqueConstraint(u1.c.data, name=naming._NONE_NAME)
4769        self.assert_compile(
4770            schema.AddConstraint(uq),
4771            'ALTER TABLE "user" ADD CONSTRAINT uq_user_data UNIQUE (data)',
4772            dialect="default",
4773        )
4774
4775    def test_uq_key(self):
4776        u1 = self._fixture(
4777            naming_convention={"uq": "uq_%(table_name)s_%(column_0_key)s"}
4778        )
4779        uq = UniqueConstraint(u1.c.data, u1.c.data2)
4780        eq_(uq.name, "uq_user_data")
4781
4782    def test_uq_label(self):
4783        u1 = self._fixture(
4784            naming_convention={"uq": "uq_%(table_name)s_%(column_0_label)s"}
4785        )
4786        uq = UniqueConstraint(u1.c.data, u1.c.data2)
4787        eq_(uq.name, "uq_user_user_data")
4788
4789    def test_uq_allcols_underscore_name(self):
4790        u1 = self._fixture(
4791            naming_convention={"uq": "uq_%(table_name)s_%(column_0_N_name)s"}
4792        )
4793        uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3)
4794        eq_(uq.name, "uq_user_data_Data2_Data3")
4795
4796    def test_uq_allcols_merged_name(self):
4797        u1 = self._fixture(
4798            naming_convention={"uq": "uq_%(table_name)s_%(column_0N_name)s"}
4799        )
4800        uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3)
4801        eq_(uq.name, "uq_user_dataData2Data3")
4802
4803    def test_uq_allcols_merged_key(self):
4804        u1 = self._fixture(
4805            naming_convention={"uq": "uq_%(table_name)s_%(column_0N_key)s"}
4806        )
4807        uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3)
4808        eq_(uq.name, "uq_user_datadata2data3")
4809
4810    def test_uq_allcols_truncated_name(self):
4811        u1 = self._fixture(
4812            naming_convention={"uq": "uq_%(table_name)s_%(column_0N_name)s"}
4813        )
4814        uq = UniqueConstraint(u1.c.data, u1.c.data2, u1.c.data3)
4815
4816        dialect = default.DefaultDialect()
4817        self.assert_compile(
4818            schema.AddConstraint(uq),
4819            'ALTER TABLE "user" ADD '
4820            'CONSTRAINT "uq_user_dataData2Data3" '
4821            'UNIQUE (data, "Data2", "Data3")',
4822            dialect=dialect,
4823        )
4824
4825        dialect.max_identifier_length = 15
4826        self.assert_compile(
4827            schema.AddConstraint(uq),
4828            'ALTER TABLE "user" ADD '
4829            'CONSTRAINT uq_user_2769 UNIQUE (data, "Data2", "Data3")',
4830            dialect=dialect,
4831        )
4832
4833    def test_fk_allcols_underscore_name(self):
4834        u1 = self._fixture(
4835            naming_convention={
4836                "fk": "fk_%(table_name)s_%(column_0_N_name)s_"
4837                "%(referred_table_name)s_%(referred_column_0_N_name)s"
4838            }
4839        )
4840
4841        m1 = u1.metadata
4842        a1 = Table(
4843            "address",
4844            m1,
4845            Column("id", Integer, primary_key=True),
4846            Column("UserData", String(30), key="user_data"),
4847            Column("UserData2", String(30), key="user_data2"),
4848            Column("UserData3", String(30), key="user_data3"),
4849        )
4850        fk = ForeignKeyConstraint(
4851            ["user_data", "user_data2", "user_data3"],
4852            ["user.data", "user.data2", "user.data3"],
4853        )
4854        a1.append_constraint(fk)
4855        self.assert_compile(
4856            schema.AddConstraint(fk),
4857            "ALTER TABLE address ADD CONSTRAINT "
4858            '"fk_address_UserData_UserData2_UserData3_user_data_Data2_Data3" '
4859            'FOREIGN KEY("UserData", "UserData2", "UserData3") '
4860            'REFERENCES "user" (data, "Data2", "Data3")',
4861            dialect=default.DefaultDialect(),
4862        )
4863
4864    def test_fk_allcols_merged_name(self):
4865        u1 = self._fixture(
4866            naming_convention={
4867                "fk": "fk_%(table_name)s_%(column_0N_name)s_"
4868                "%(referred_table_name)s_%(referred_column_0N_name)s"
4869            }
4870        )
4871
4872        m1 = u1.metadata
4873        a1 = Table(
4874            "address",
4875            m1,
4876            Column("id", Integer, primary_key=True),
4877            Column("UserData", String(30), key="user_data"),
4878            Column("UserData2", String(30), key="user_data2"),
4879            Column("UserData3", String(30), key="user_data3"),
4880        )
4881        fk = ForeignKeyConstraint(
4882            ["user_data", "user_data2", "user_data3"],
4883            ["user.data", "user.data2", "user.data3"],
4884        )
4885        a1.append_constraint(fk)
4886        self.assert_compile(
4887            schema.AddConstraint(fk),
4888            "ALTER TABLE address ADD CONSTRAINT "
4889            '"fk_address_UserDataUserData2UserData3_user_dataData2Data3" '
4890            'FOREIGN KEY("UserData", "UserData2", "UserData3") '
4891            'REFERENCES "user" (data, "Data2", "Data3")',
4892            dialect=default.DefaultDialect(),
4893        )
4894
4895    def test_fk_allcols_truncated_name(self):
4896        u1 = self._fixture(
4897            naming_convention={
4898                "fk": "fk_%(table_name)s_%(column_0N_name)s_"
4899                "%(referred_table_name)s_%(referred_column_0N_name)s"
4900            }
4901        )
4902
4903        m1 = u1.metadata
4904        a1 = Table(
4905            "address",
4906            m1,
4907            Column("id", Integer, primary_key=True),
4908            Column("UserData", String(30), key="user_data"),
4909            Column("UserData2", String(30), key="user_data2"),
4910            Column("UserData3", String(30), key="user_data3"),
4911        )
4912        fk = ForeignKeyConstraint(
4913            ["user_data", "user_data2", "user_data3"],
4914            ["user.data", "user.data2", "user.data3"],
4915        )
4916        a1.append_constraint(fk)
4917
4918        dialect = default.DefaultDialect()
4919        dialect.max_identifier_length = 15
4920        self.assert_compile(
4921            schema.AddConstraint(fk),
4922            "ALTER TABLE address ADD CONSTRAINT "
4923            "fk_addr_f9ff "
4924            'FOREIGN KEY("UserData", "UserData2", "UserData3") '
4925            'REFERENCES "user" (data, "Data2", "Data3")',
4926            dialect=dialect,
4927        )
4928
4929    def test_ix_allcols_truncation(self):
4930        u1 = self._fixture(
4931            naming_convention={"ix": "ix_%(table_name)s_%(column_0N_name)s"}
4932        )
4933        ix = Index(None, u1.c.data, u1.c.data2, u1.c.data3)
4934        dialect = default.DefaultDialect()
4935        dialect.max_identifier_length = 15
4936        self.assert_compile(
4937            schema.CreateIndex(ix),
4938            "CREATE INDEX ix_user_2de9 ON " '"user" (data, "Data2", "Data3")',
4939            dialect=dialect,
4940        )
4941
4942    def test_ix_name(self):
4943        u1 = self._fixture(
4944            naming_convention={"ix": "ix_%(table_name)s_%(column_0_name)s"}
4945        )
4946        ix = Index(None, u1.c.data)
4947        eq_(ix.name, "ix_user_data")
4948
4949    def test_ck_name_required(self):
4950        u1 = self._fixture(
4951            naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
4952        )
4953        ck = CheckConstraint(u1.c.data == "x", name="mycheck")
4954        eq_(ck.name, "ck_user_mycheck")
4955
4956        assert_raises_message(
4957            exc.InvalidRequestError,
4958            r"Naming convention including %\(constraint_name\)s token "
4959            "requires that constraint is explicitly named.",
4960            CheckConstraint,
4961            u1.c.data == "x",
4962        )
4963
4964    def test_ck_name_deferred_required(self):
4965        u1 = self._fixture(
4966            naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
4967        )
4968        ck = CheckConstraint(u1.c.data == "x", name=naming._NONE_NAME)
4969
4970        assert_raises_message(
4971            exc.InvalidRequestError,
4972            r"Naming convention including %\(constraint_name\)s token "
4973            "requires that constraint is explicitly named.",
4974            schema.AddConstraint(ck).compile,
4975        )
4976
4977    def test_column_attached_ck_name(self):
4978        m = MetaData(
4979            naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
4980        )
4981        ck = CheckConstraint("x > 5", name="x1")
4982        Table("t", m, Column("x", ck))
4983        eq_(ck.name, "ck_t_x1")
4984
4985    def test_table_attached_ck_name(self):
4986        m = MetaData(
4987            naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
4988        )
4989        ck = CheckConstraint("x > 5", name="x1")
4990        Table("t", m, Column("x", Integer), ck)
4991        eq_(ck.name, "ck_t_x1")
4992
4993    def test_uq_name_already_conv(self):
4994        m = MetaData(
4995            naming_convention={
4996                "uq": "uq_%(constraint_name)s_%(column_0_name)s"
4997            }
4998        )
4999
5000        t = Table("mytable", m)
5001        uq = UniqueConstraint(name=naming.conv("my_special_key"))
5002
5003        t.append_constraint(uq)
5004        eq_(uq.name, "my_special_key")
5005
5006    def test_fk_name_schema(self):
5007        u1 = self._fixture(
5008            naming_convention={
5009                "fk": "fk_%(table_name)s_%(column_0_name)s_"
5010                "%(referred_table_name)s_%(referred_column_0_name)s"
5011            },
5012            table_schema="foo",
5013        )
5014        m1 = u1.metadata
5015        a1 = Table(
5016            "address",
5017            m1,
5018            Column("id", Integer, primary_key=True),
5019            Column("user_id", Integer),
5020            Column("user_version_id", Integer),
5021        )
5022        fk = ForeignKeyConstraint(
5023            ["user_id", "user_version_id"], ["foo.user.id", "foo.user.version"]
5024        )
5025        a1.append_constraint(fk)
5026        eq_(fk.name, "fk_address_user_id_user_id")
5027
5028    def test_fk_attrs(self):
5029        u1 = self._fixture(
5030            naming_convention={
5031                "fk": "fk_%(table_name)s_%(column_0_name)s_"
5032                "%(referred_table_name)s_%(referred_column_0_name)s"
5033            }
5034        )
5035        m1 = u1.metadata
5036        a1 = Table(
5037            "address",
5038            m1,
5039            Column("id", Integer, primary_key=True),
5040            Column("user_id", Integer),
5041            Column("user_version_id", Integer),
5042        )
5043        fk = ForeignKeyConstraint(
5044            ["user_id", "user_version_id"], ["user.id", "user.version"]
5045        )
5046        a1.append_constraint(fk)
5047        eq_(fk.name, "fk_address_user_id_user_id")
5048
5049    def test_custom(self):
5050        def key_hash(const, table):
5051            return "HASH_%s" % table.name
5052
5053        u1 = self._fixture(
5054            naming_convention={
5055                "fk": "fk_%(table_name)s_%(key_hash)s",
5056                "key_hash": key_hash,
5057            }
5058        )
5059        m1 = u1.metadata
5060        a1 = Table(
5061            "address",
5062            m1,
5063            Column("id", Integer, primary_key=True),
5064            Column("user_id", Integer),
5065            Column("user_version_id", Integer),
5066        )
5067        fk = ForeignKeyConstraint(
5068            ["user_id", "user_version_id"], ["user.id", "user.version"]
5069        )
5070        a1.append_constraint(fk)
5071        eq_(fk.name, "fk_address_HASH_address")
5072
5073    def test_schematype_ck_name_boolean(self):
5074        m1 = MetaData(
5075            naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
5076        )
5077
5078        u1 = Table(
5079            "user",
5080            m1,
5081            Column("x", Boolean(name="foo")),
5082        )
5083
5084        self.assert_compile(
5085            schema.CreateTable(u1),
5086            'CREATE TABLE "user" ('
5087            "x BOOLEAN, "
5088            "CONSTRAINT ck_user_foo CHECK (x IN (0, 1))"
5089            ")",
5090        )
5091
5092        # test no side effects from first compile
5093        self.assert_compile(
5094            schema.CreateTable(u1),
5095            'CREATE TABLE "user" ('
5096            "x BOOLEAN, "
5097            "CONSTRAINT ck_user_foo CHECK (x IN (0, 1))"
5098            ")",
5099        )
5100
5101    def test_schematype_ck_name_boolean_not_on_name(self):
5102        m1 = MetaData(
5103            naming_convention={"ck": "ck_%(table_name)s_%(column_0_name)s"}
5104        )
5105
5106        u1 = Table("user", m1, Column("x", Boolean()))
5107        # constraint is not hit
5108        is_(
5109            [c for c in u1.constraints if isinstance(c, CheckConstraint)][
5110                0
5111            ].name,
5112            _NONE_NAME,
5113        )
5114        # but is hit at compile time
5115        self.assert_compile(
5116            schema.CreateTable(u1),
5117            'CREATE TABLE "user" ('
5118            "x BOOLEAN, "
5119            "CONSTRAINT ck_user_x CHECK (x IN (0, 1))"
5120            ")",
5121        )
5122
5123    def test_schematype_ck_name_enum(self):
5124        m1 = MetaData(
5125            naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
5126        )
5127
5128        u1 = Table(
5129            "user",
5130            m1,
5131            Column("x", Enum("a", "b", name="foo")),
5132        )
5133
5134        self.assert_compile(
5135            schema.CreateTable(u1),
5136            'CREATE TABLE "user" ('
5137            "x VARCHAR(1), "
5138            "CONSTRAINT ck_user_foo CHECK (x IN ('a', 'b'))"
5139            ")",
5140        )
5141
5142        # test no side effects from first compile
5143        self.assert_compile(
5144            schema.CreateTable(u1),
5145            'CREATE TABLE "user" ('
5146            "x VARCHAR(1), "
5147            "CONSTRAINT ck_user_foo CHECK (x IN ('a', 'b'))"
5148            ")",
5149        )
5150
5151    def test_schematype_ck_name_propagate_conv(self):
5152        m1 = MetaData(
5153            naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
5154        )
5155
5156        u1 = Table(
5157            "user", m1, Column("x", Enum("a", "b", name=naming.conv("foo")))
5158        )
5159        eq_(
5160            [c for c in u1.constraints if isinstance(c, CheckConstraint)][
5161                0
5162            ].name,
5163            "foo",
5164        )
5165        # but is hit at compile time
5166        self.assert_compile(
5167            schema.CreateTable(u1),
5168            'CREATE TABLE "user" ('
5169            "x VARCHAR(1), "
5170            "CONSTRAINT foo CHECK (x IN ('a', 'b'))"
5171            ")",
5172        )
5173
5174    def test_schematype_ck_name_boolean_no_name(self):
5175        m1 = MetaData(
5176            naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
5177        )
5178
5179        u1 = Table("user", m1, Column("x", Boolean()))
5180        # constraint gets special _defer_none_name
5181        is_(
5182            [c for c in u1.constraints if isinstance(c, CheckConstraint)][
5183                0
5184            ].name,
5185            _NONE_NAME,
5186        )
5187        # no issue with native boolean
5188        self.assert_compile(
5189            schema.CreateTable(u1),
5190            'CREATE TABLE "user" (' "x BOOLEAN" ")",
5191            dialect="postgresql",
5192        )
5193
5194        assert_raises_message(
5195            exc.InvalidRequestError,
5196            r"Naming convention including \%\(constraint_name\)s token "
5197            r"requires that constraint is explicitly named.",
5198            schema.CreateTable(u1).compile,
5199            dialect=default.DefaultDialect(),
5200        )
5201
5202    def test_schematype_no_ck_name_boolean_no_name(self):
5203        m1 = MetaData()  # no naming convention
5204
5205        u1 = Table("user", m1, Column("x", Boolean()))
5206        # constraint gets special _defer_none_name
5207        is_(
5208            [c for c in u1.constraints if isinstance(c, CheckConstraint)][
5209                0
5210            ].name,
5211            _NONE_NAME,
5212        )
5213
5214        self.assert_compile(
5215            schema.CreateTable(u1),
5216            'CREATE TABLE "user" (x BOOLEAN, CHECK (x IN (0, 1)))',
5217        )
5218
5219    def test_ck_constraint_redundant_event(self):
5220        u1 = self._fixture(
5221            naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
5222        )
5223
5224        ck1 = CheckConstraint(u1.c.version > 3, name="foo")
5225        u1.append_constraint(ck1)
5226        u1.append_constraint(ck1)
5227        u1.append_constraint(ck1)
5228
5229        eq_(ck1.name, "ck_user_foo")
5230
5231    def test_pickle_metadata(self):
5232        m = MetaData(naming_convention={"pk": "%(table_name)s_pk"})
5233
5234        m2 = pickle.loads(pickle.dumps(m))
5235
5236        eq_(m2.naming_convention, {"pk": "%(table_name)s_pk"})
5237
5238        t2a = Table("t2", m, Column("id", Integer, primary_key=True))
5239        t2b = Table("t2", m2, Column("id", Integer, primary_key=True))
5240
5241        eq_(t2a.primary_key.name, t2b.primary_key.name)
5242        eq_(t2b.primary_key.name, "t2_pk")
5243
5244    def test_expression_index(self):
5245        m = MetaData(naming_convention={"ix": "ix_%(column_0_label)s"})
5246        t = Table("t", m, Column("q", Integer), Column("p", Integer))
5247        ix = Index(None, t.c.q + 5)
5248        t.append_constraint(ix)
5249
5250        # huh.  pretty cool
5251        self.assert_compile(
5252            CreateIndex(ix), "CREATE INDEX ix_t_q ON t (q + 5)"
5253        )
5254
5255
5256class CopyDialectOptionsTest(fixtures.TestBase):
5257    @contextmanager
5258    def _fixture(self):
5259        from sqlalchemy.engine.default import DefaultDialect
5260
5261        class CopyDialectOptionsTestDialect(DefaultDialect):
5262            construct_arguments = [
5263                (Table, {"some_table_arg": None}),
5264                (Column, {"some_column_arg": None}),
5265                (Index, {"some_index_arg": None}),
5266                (PrimaryKeyConstraint, {"some_pk_arg": None}),
5267                (UniqueConstraint, {"some_uq_arg": None}),
5268            ]
5269
5270        def load(dialect_name):
5271            if dialect_name == "copydialectoptionstest":
5272                return CopyDialectOptionsTestDialect
5273            else:
5274                raise exc.NoSuchModuleError("no dialect %r" % dialect_name)
5275
5276        with mock.patch("sqlalchemy.dialects.registry.load", load):
5277            yield
5278
5279    @classmethod
5280    def check_dialect_options_(cls, t):
5281        eq_(
5282            t.dialect_kwargs["copydialectoptionstest_some_table_arg"],
5283            "a1",
5284        )
5285        eq_(
5286            t.c.foo.dialect_kwargs["copydialectoptionstest_some_column_arg"],
5287            "a2",
5288        )
5289        eq_(
5290            t.primary_key.dialect_kwargs["copydialectoptionstest_some_pk_arg"],
5291            "a3",
5292        )
5293        eq_(
5294            list(t.indexes)[0].dialect_kwargs[
5295                "copydialectoptionstest_some_index_arg"
5296            ],
5297            "a4",
5298        )
5299        eq_(
5300            list(c for c in t.constraints if isinstance(c, UniqueConstraint))[
5301                0
5302            ].dialect_kwargs["copydialectoptionstest_some_uq_arg"],
5303            "a5",
5304        )
5305
5306    def test_dialect_options_are_copied(self):
5307        with self._fixture():
5308            t1 = Table(
5309                "t",
5310                MetaData(),
5311                Column(
5312                    "foo",
5313                    Integer,
5314                    copydialectoptionstest_some_column_arg="a2",
5315                ),
5316                Column("bar", Integer),
5317                PrimaryKeyConstraint(
5318                    "foo", copydialectoptionstest_some_pk_arg="a3"
5319                ),
5320                UniqueConstraint(
5321                    "bar", copydialectoptionstest_some_uq_arg="a5"
5322                ),
5323                copydialectoptionstest_some_table_arg="a1",
5324            )
5325            Index(
5326                "idx",
5327                t1.c.foo,
5328                copydialectoptionstest_some_index_arg="a4",
5329            )
5330
5331            self.check_dialect_options_(t1)
5332
5333            m2 = MetaData()
5334            t2 = t1.tometadata(m2)  # make a copy
5335            self.check_dialect_options_(t2)
5336