1from sqlalchemy.testing import assert_raises
2from sqlalchemy.testing import assert_raises_message
3from sqlalchemy.testing import emits_warning
4
5import pickle
6from sqlalchemy import Integer, String, UniqueConstraint, \
7    CheckConstraint, ForeignKey, MetaData, Sequence, \
8    ForeignKeyConstraint, PrimaryKeyConstraint, ColumnDefault, Index, event,\
9    events, Unicode, types as sqltypes, bindparam, \
10    Table, Column, Boolean, Enum, func, text, BLANK_SCHEMA
11from sqlalchemy import schema, exc
12from sqlalchemy.sql import elements, naming
13import sqlalchemy as tsa
14from sqlalchemy.testing import fixtures
15from sqlalchemy import testing
16from sqlalchemy.testing import ComparesTables, AssertsCompiledSQL
17from sqlalchemy.testing import eq_, is_, mock
18from contextlib import contextmanager
19from sqlalchemy import util
20
21
22class MetaDataTest(fixtures.TestBase, ComparesTables):
23
24    def test_metadata_contains(self):
25        metadata = MetaData()
26        t1 = Table('t1', metadata, Column('x', Integer))
27        t2 = Table('t2', metadata, Column('x', Integer), schema='foo')
28        t3 = Table('t2', MetaData(), Column('x', Integer))
29        t4 = Table('t1', MetaData(), Column('x', Integer), schema='foo')
30
31        assert "t1" in metadata
32        assert "foo.t2" in metadata
33        assert "t2" not in metadata
34        assert "foo.t1" not in metadata
35        assert t1 in metadata
36        assert t2 in metadata
37        assert t3 not in metadata
38        assert t4 not in metadata
39
40    def test_uninitialized_column_copy(self):
41        for col in [
42            Column('foo', String(), nullable=False),
43            Column('baz', String(), unique=True),
44            Column(Integer(), primary_key=True),
45            Column('bar', Integer(), Sequence('foo_seq'), primary_key=True,
46                   key='bar'),
47            Column(Integer(), ForeignKey('bat.blah'), doc="this is a col"),
48            Column('bar', Integer(), ForeignKey('bat.blah'), primary_key=True,
49                   key='bar'),
50            Column('bar', Integer(), info={'foo': 'bar'}),
51        ]:
52            c2 = col.copy()
53            for attr in ('name', 'type', 'nullable',
54                         'primary_key', 'key', 'unique', 'info',
55                         'doc'):
56                eq_(getattr(col, attr), getattr(c2, attr))
57            eq_(len(col.foreign_keys), len(c2.foreign_keys))
58            if col.default:
59                eq_(c2.default.name, 'foo_seq')
60            for a1, a2 in zip(col.foreign_keys, c2.foreign_keys):
61                assert a1 is not a2
62                eq_(a2._colspec, 'bat.blah')
63
64    def test_col_subclass_copy(self):
65        class MyColumn(schema.Column):
66
67            def __init__(self, *args, **kw):
68                self.widget = kw.pop('widget', None)
69                super(MyColumn, self).__init__(*args, **kw)
70
71            def copy(self, *arg, **kw):
72                c = super(MyColumn, self).copy(*arg, **kw)
73                c.widget = self.widget
74                return c
75        c1 = MyColumn('foo', Integer, widget='x')
76        c2 = c1.copy()
77        assert isinstance(c2, MyColumn)
78        eq_(c2.widget, 'x')
79
80    def test_uninitialized_column_copy_events(self):
81        msgs = []
82
83        def write(c, t):
84            msgs.append("attach %s.%s" % (t.name, c.name))
85        c1 = Column('foo', String())
86        m = MetaData()
87        for i in range(3):
88            cx = c1.copy()
89            # as of 0.7, these events no longer copy.  its expected
90            # that listeners will be re-established from the
91            # natural construction of things.
92            cx._on_table_attach(write)
93            Table('foo%d' % i, m, cx)
94        eq_(msgs, ['attach foo0.foo', 'attach foo1.foo', 'attach foo2.foo'])
95
96    def test_schema_collection_add(self):
97        metadata = MetaData()
98
99        Table('t1', metadata, Column('x', Integer), schema='foo')
100        Table('t2', metadata, Column('x', Integer), schema='bar')
101        Table('t3', metadata, Column('x', Integer))
102
103        eq_(metadata._schemas, set(['foo', 'bar']))
104        eq_(len(metadata.tables), 3)
105
106    def test_schema_collection_remove(self):
107        metadata = MetaData()
108
109        t1 = Table('t1', metadata, Column('x', Integer), schema='foo')
110        Table('t2', metadata, Column('x', Integer), schema='bar')
111        t3 = Table('t3', metadata, Column('x', Integer), schema='bar')
112
113        metadata.remove(t3)
114        eq_(metadata._schemas, set(['foo', 'bar']))
115        eq_(len(metadata.tables), 2)
116
117        metadata.remove(t1)
118        eq_(metadata._schemas, set(['bar']))
119        eq_(len(metadata.tables), 1)
120
121    def test_schema_collection_remove_all(self):
122        metadata = MetaData()
123
124        Table('t1', metadata, Column('x', Integer), schema='foo')
125        Table('t2', metadata, Column('x', Integer), schema='bar')
126
127        metadata.clear()
128        eq_(metadata._schemas, set())
129        eq_(len(metadata.tables), 0)
130
131    def test_metadata_tables_immutable(self):
132        metadata = MetaData()
133
134        Table('t1', metadata, Column('x', Integer))
135        assert 't1' in metadata.tables
136
137        assert_raises(
138            TypeError,
139            lambda: metadata.tables.pop('t1')
140        )
141
142    @testing.provide_metadata
143    def test_dupe_tables(self):
144        metadata = self.metadata
145        Table('table1', metadata,
146              Column('col1', Integer, primary_key=True),
147              Column('col2', String(20)))
148
149        metadata.create_all()
150        Table('table1', metadata, autoload=True)
151
152        def go():
153            Table('table1', metadata,
154                  Column('col1', Integer, primary_key=True),
155                  Column('col2', String(20)))
156        assert_raises_message(
157            tsa.exc.InvalidRequestError,
158            "Table 'table1' is already defined for this "
159            "MetaData instance.  Specify 'extend_existing=True' "
160            "to redefine options and columns on an existing "
161            "Table object.",
162            go
163        )
164
165    def test_fk_copy(self):
166        c1 = Column('foo', Integer)
167        c2 = Column('bar', Integer)
168        m = MetaData()
169        t1 = Table('t', m, c1, c2)
170
171        kw = dict(onupdate="X",
172                  ondelete="Y", use_alter=True, name='f1',
173                  deferrable="Z", initially="Q", link_to_name=True)
174
175        fk1 = ForeignKey(c1, **kw)
176        fk2 = ForeignKeyConstraint((c1,), (c2,), **kw)
177
178        t1.append_constraint(fk2)
179        fk1c = fk1.copy()
180        fk2c = fk2.copy()
181
182        for k in kw:
183            eq_(getattr(fk1c, k), kw[k])
184            eq_(getattr(fk2c, k), kw[k])
185
186    def test_check_constraint_copy(self):
187        r = lambda x: x
188        c = CheckConstraint("foo bar",
189                            name='name',
190                            initially=True,
191                            deferrable=True,
192                            _create_rule=r)
193        c2 = c.copy()
194        eq_(c2.name, 'name')
195        eq_(str(c2.sqltext), "foo bar")
196        eq_(c2.initially, True)
197        eq_(c2.deferrable, True)
198        assert c2._create_rule is r
199
200    def test_col_replace_w_constraint(self):
201        m = MetaData()
202        a = Table('a', m, Column('id', Integer, primary_key=True))
203
204        aid = Column('a_id', ForeignKey('a.id'))
205        b = Table('b', m, aid)
206        b.append_column(aid)
207
208        assert b.c.a_id.references(a.c.id)
209        eq_(len(b.constraints), 2)
210
211    def test_fk_construct(self):
212        c1 = Column('foo', Integer)
213        c2 = Column('bar', Integer)
214        m = MetaData()
215        t1 = Table('t', m, c1, c2)
216        fk1 = ForeignKeyConstraint(('foo', ), ('bar', ), table=t1)
217        assert fk1 in t1.constraints
218
219    def test_fk_constraint_col_collection_w_table(self):
220        c1 = Column('foo', Integer)
221        c2 = Column('bar', Integer)
222        m = MetaData()
223        t1 = Table('t', m, c1, c2)
224        fk1 = ForeignKeyConstraint(('foo', ), ('bar', ), table=t1)
225        eq_(dict(fk1.columns), {"foo": c1})
226
227    def test_fk_constraint_col_collection_no_table(self):
228        fk1 = ForeignKeyConstraint(('foo', 'bat'), ('bar', 'hoho'))
229        eq_(dict(fk1.columns), {})
230        eq_(fk1.column_keys, ['foo', 'bat'])
231        eq_(fk1._col_description, 'foo, bat')
232        eq_(fk1._elements, {"foo": fk1.elements[0], "bat": fk1.elements[1]})
233
234    def test_fk_constraint_col_collection_no_table_real_cols(self):
235        c1 = Column('foo', Integer)
236        c2 = Column('bar', Integer)
237        fk1 = ForeignKeyConstraint((c1, ), (c2, ))
238        eq_(dict(fk1.columns), {})
239        eq_(fk1.column_keys, ['foo'])
240        eq_(fk1._col_description, 'foo')
241        eq_(fk1._elements, {"foo": fk1.elements[0]})
242
243    def test_fk_constraint_col_collection_added_to_table(self):
244        c1 = Column('foo', Integer)
245        m = MetaData()
246        fk1 = ForeignKeyConstraint(('foo', ), ('bar', ))
247        Table('t', m, c1, fk1)
248        eq_(dict(fk1.columns), {"foo": c1})
249        eq_(fk1._elements, {"foo": fk1.elements[0]})
250
251    def test_fk_constraint_col_collection_via_fk(self):
252        fk = ForeignKey('bar')
253        c1 = Column('foo', Integer, fk)
254        m = MetaData()
255        t1 = Table('t', m, c1)
256        fk1 = fk.constraint
257        eq_(fk1.column_keys, ['foo'])
258        assert fk1 in t1.constraints
259        eq_(fk1.column_keys, ['foo'])
260        eq_(dict(fk1.columns), {"foo": c1})
261        eq_(fk1._elements, {"foo": fk})
262
263    def test_fk_no_such_parent_col_error(self):
264        meta = MetaData()
265        a = Table('a', meta, Column('a', Integer))
266        Table('b', meta, Column('b', Integer))
267
268        def go():
269            a.append_constraint(
270                ForeignKeyConstraint(['x'], ['b.b'])
271            )
272        assert_raises_message(
273            exc.ArgumentError,
274            "Can't create ForeignKeyConstraint on "
275            "table 'a': no column named 'x' is present.",
276            go
277        )
278
279    def test_fk_given_non_col(self):
280        not_a_col = bindparam('x')
281        assert_raises_message(
282            exc.ArgumentError,
283            "String, Column, or Column-bound argument expected, got Bind",
284            ForeignKey, not_a_col
285        )
286
287    def test_fk_given_non_col_clauseelem(self):
288        class Foo(object):
289
290            def __clause_element__(self):
291                return bindparam('x')
292        assert_raises_message(
293            exc.ArgumentError,
294            "String, Column, or Column-bound argument expected, got Bind",
295            ForeignKey, Foo()
296        )
297
298    def test_fk_given_col_non_table(self):
299        t = Table('t', MetaData(), Column('x', Integer))
300        xa = t.alias().c.x
301        assert_raises_message(
302            exc.ArgumentError,
303            "ForeignKey received Column not bound to a Table, got: .*Alias",
304            ForeignKey, xa
305        )
306
307    def test_fk_given_col_non_table_clauseelem(self):
308        t = Table('t', MetaData(), Column('x', Integer))
309
310        class Foo(object):
311
312            def __clause_element__(self):
313                return t.alias().c.x
314
315        assert_raises_message(
316            exc.ArgumentError,
317            "ForeignKey received Column not bound to a Table, got: .*Alias",
318            ForeignKey, Foo()
319        )
320
321    def test_fk_no_such_target_col_error_upfront(self):
322        meta = MetaData()
323        a = Table('a', meta, Column('a', Integer))
324        Table('b', meta, Column('b', Integer))
325
326        a.append_constraint(ForeignKeyConstraint(['a'], ['b.x']))
327
328        assert_raises_message(
329            exc.NoReferencedColumnError,
330            "Could not initialize target column for ForeignKey 'b.x' on "
331            "table 'a': table 'b' has no column named 'x'",
332            getattr, list(a.foreign_keys)[0], "column"
333        )
334
335    def test_fk_no_such_target_col_error_delayed(self):
336        meta = MetaData()
337        a = Table('a', meta, Column('a', Integer))
338        a.append_constraint(
339            ForeignKeyConstraint(['a'], ['b.x']))
340
341        b = Table('b', meta, Column('b', Integer))
342
343        assert_raises_message(
344            exc.NoReferencedColumnError,
345            "Could not initialize target column for ForeignKey 'b.x' on "
346            "table 'a': table 'b' has no column named 'x'",
347            getattr, list(a.foreign_keys)[0], "column"
348        )
349
350    def test_pickle_metadata_sequence_restated(self):
351        m1 = MetaData()
352        Table('a', m1,
353              Column('id', Integer, primary_key=True),
354              Column('x', Integer, Sequence("x_seq")))
355
356        m2 = pickle.loads(pickle.dumps(m1))
357
358        s2 = Sequence("x_seq")
359        t2 = Table('a', m2,
360                   Column('id', Integer, primary_key=True),
361                   Column('x', Integer, s2),
362                   extend_existing=True)
363
364        assert m2._sequences['x_seq'] is t2.c.x.default
365        assert m2._sequences['x_seq'] is s2
366
367    def test_sequence_restated_replaced(self):
368        """Test restatement of Sequence replaces."""
369
370        m1 = MetaData()
371        s1 = Sequence("x_seq")
372        t = Table('a', m1,
373                  Column('x', Integer, s1)
374                  )
375        assert m1._sequences['x_seq'] is s1
376
377        s2 = Sequence('x_seq')
378        Table('a', m1,
379              Column('x', Integer, s2),
380              extend_existing=True
381              )
382        assert t.c.x.default is s2
383        assert m1._sequences['x_seq'] is s2
384
385    def test_sequence_attach_to_table(self):
386        m1 = MetaData()
387        s1 = Sequence("s")
388        t = Table('a', m1, Column('x', Integer, s1))
389        assert s1.metadata is m1
390
391    def test_sequence_attach_to_existing_table(self):
392        m1 = MetaData()
393        s1 = Sequence("s")
394        t = Table('a', m1, Column('x', Integer))
395        t.c.x._init_items(s1)
396        assert s1.metadata is m1
397
398    def test_pickle_metadata_sequence_implicit(self):
399        m1 = MetaData()
400        Table('a', m1,
401              Column('id', Integer, primary_key=True),
402              Column('x', Integer, Sequence("x_seq")))
403
404        m2 = pickle.loads(pickle.dumps(m1))
405
406        t2 = Table('a', m2, extend_existing=True)
407
408        eq_(m2._sequences, {'x_seq': t2.c.x.default})
409
410    def test_pickle_metadata_schema(self):
411        m1 = MetaData()
412        Table('a', m1,
413              Column('id', Integer, primary_key=True),
414              Column('x', Integer, Sequence("x_seq")),
415              schema='y')
416
417        m2 = pickle.loads(pickle.dumps(m1))
418
419        Table('a', m2, schema='y',
420              extend_existing=True)
421
422        eq_(m2._schemas, m1._schemas)
423
424    def test_metadata_schema_arg(self):
425        m1 = MetaData(schema='sch1')
426        m2 = MetaData(schema='sch1', quote_schema=True)
427        m3 = MetaData(schema='sch1', quote_schema=False)
428        m4 = MetaData()
429
430        for i, (name, metadata, schema, quote_schema,
431                exp_schema, exp_quote_schema) in enumerate([
432                    ('t1', m1, None, None, 'sch1', None),
433                    ('t2', m1, 'sch2', None, 'sch2', None),
434                    ('t3', m1, 'sch2', True, 'sch2', True),
435                    ('t4', m1, 'sch1', None, 'sch1', None),
436                    ('t5', m1, BLANK_SCHEMA, None, None, None),
437                    ('t1', m2, None, None, 'sch1', True),
438                    ('t2', m2, 'sch2', None, 'sch2', None),
439                    ('t3', m2, 'sch2', True, 'sch2', True),
440                    ('t4', m2, 'sch1', None, 'sch1', None),
441                    ('t1', m3, None, None, 'sch1', False),
442                    ('t2', m3, 'sch2', None, 'sch2', None),
443                    ('t3', m3, 'sch2', True, 'sch2', True),
444                    ('t4', m3, 'sch1', None, 'sch1', None),
445                    ('t1', m4, None, None, None, None),
446                    ('t2', m4, 'sch2', None, 'sch2', None),
447                    ('t3', m4, 'sch2', True, 'sch2', True),
448                    ('t4', m4, 'sch1', None, 'sch1', None),
449                    ('t5', m4, BLANK_SCHEMA, None, None, None),
450                ]):
451            kw = {}
452            if schema is not None:
453                kw['schema'] = schema
454            if quote_schema is not None:
455                kw['quote_schema'] = quote_schema
456            t = Table(name, metadata, **kw)
457            eq_(t.schema, exp_schema, "test %d, table schema" % i)
458            eq_(t.schema.quote if t.schema is not None else None,
459                exp_quote_schema,
460                "test %d, table quote_schema" % i)
461            seq = Sequence(name, metadata=metadata, **kw)
462            eq_(seq.schema, exp_schema, "test %d, seq schema" % i)
463            eq_(seq.schema.quote if seq.schema is not None else None,
464                exp_quote_schema,
465                "test %d, seq quote_schema" % i)
466
467    def test_manual_dependencies(self):
468        meta = MetaData()
469        a = Table('a', meta, Column('foo', Integer))
470        b = Table('b', meta, Column('foo', Integer))
471        c = Table('c', meta, Column('foo', Integer))
472        d = Table('d', meta, Column('foo', Integer))
473        e = Table('e', meta, Column('foo', Integer))
474
475        e.add_is_dependent_on(c)
476        a.add_is_dependent_on(b)
477        b.add_is_dependent_on(d)
478        e.add_is_dependent_on(b)
479        c.add_is_dependent_on(a)
480        eq_(
481            meta.sorted_tables,
482            [d, b, a, c, e]
483        )
484
485    def test_deterministic_order(self):
486        meta = MetaData()
487        a = Table('a', meta, Column('foo', Integer))
488        b = Table('b', meta, Column('foo', Integer))
489        c = Table('c', meta, Column('foo', Integer))
490        d = Table('d', meta, Column('foo', Integer))
491        e = Table('e', meta, Column('foo', Integer))
492
493        e.add_is_dependent_on(c)
494        a.add_is_dependent_on(b)
495        eq_(
496            meta.sorted_tables,
497            [b, c, d, a, e]
498        )
499
500    def test_nonexistent(self):
501        assert_raises(tsa.exc.NoSuchTableError, Table,
502                      'fake_table',
503                      MetaData(testing.db), autoload=True)
504
505    def test_assorted_repr(self):
506        t1 = Table("foo", MetaData(), Column("x", Integer))
507        i1 = Index("bar", t1.c.x)
508        ck = schema.CheckConstraint("x > y", name="someconstraint")
509
510        for const, exp in (
511            (Sequence("my_seq"),
512                "Sequence('my_seq')"),
513            (Sequence("my_seq", start=5),
514                "Sequence('my_seq', start=5)"),
515            (Column("foo", Integer),
516                "Column('foo', Integer(), table=None)"),
517            (Table("bar", MetaData(), Column("x", String)),
518                "Table('bar', MetaData(bind=None), "
519                "Column('x', String(), table=<bar>), schema=None)"),
520            (schema.DefaultGenerator(for_update=True),
521                "DefaultGenerator(for_update=True)"),
522            (schema.Index("bar", "c"), "Index('bar', 'c')"),
523            (i1, "Index('bar', Column('x', Integer(), table=<foo>))"),
524            (schema.FetchedValue(), "FetchedValue()"),
525            (ck,
526             "CheckConstraint("
527             "%s"
528             ", name='someconstraint')" % repr(ck.sqltext)),
529        ):
530            eq_(
531                repr(const),
532                exp
533            )
534
535
536class ToMetaDataTest(fixtures.TestBase, ComparesTables):
537
538    @testing.requires.check_constraints
539    def test_copy(self):
540        from sqlalchemy.testing.schema import Table
541        meta = MetaData()
542
543        table = Table(
544            'mytable',
545            meta,
546            Column(
547                'myid',
548                Integer,
549                Sequence('foo_id_seq'),
550                primary_key=True),
551            Column(
552                'name',
553                String(40),
554                nullable=True),
555            Column(
556                'foo',
557                String(40),
558                nullable=False,
559                server_default='x',
560                server_onupdate='q'),
561            Column(
562                'bar',
563                String(40),
564                nullable=False,
565                default='y',
566                onupdate='z'),
567            Column(
568                'description',
569                String(30),
570                CheckConstraint("description='hi'")),
571            UniqueConstraint('name'),
572            test_needs_fk=True)
573
574        table2 = Table(
575            'othertable',
576            meta,
577            Column(
578                'id',
579                Integer,
580                Sequence('foo_seq'),
581                primary_key=True),
582            Column(
583                'myid',
584                Integer,
585                ForeignKey('mytable.myid'),
586            ),
587            test_needs_fk=True)
588
589        def test_to_metadata():
590            meta2 = MetaData()
591            table_c = table.tometadata(meta2)
592            table2_c = table2.tometadata(meta2)
593            return (table_c, table2_c)
594
595        def test_pickle():
596            meta.bind = testing.db
597            meta2 = pickle.loads(pickle.dumps(meta))
598            assert meta2.bind is None
599            pickle.loads(pickle.dumps(meta2))
600            return (meta2.tables['mytable'], meta2.tables['othertable'])
601
602        def test_pickle_via_reflect():
603            # this is the most common use case, pickling the results of a
604            # database reflection
605            meta2 = MetaData(bind=testing.db)
606            t1 = Table('mytable', meta2, autoload=True)
607            Table('othertable', meta2, autoload=True)
608            meta3 = pickle.loads(pickle.dumps(meta2))
609            assert meta3.bind is None
610            assert meta3.tables['mytable'] is not t1
611
612            return (meta3.tables['mytable'], meta3.tables['othertable'])
613
614        meta.create_all(testing.db)
615        try:
616            for test, has_constraints, reflect in \
617                    (test_to_metadata, True, False), \
618                    (test_pickle, True, False), \
619                    (test_pickle_via_reflect, False, True):
620                table_c, table2_c = test()
621                self.assert_tables_equal(table, table_c)
622                self.assert_tables_equal(table2, table2_c)
623                assert table is not table_c
624                assert table.primary_key is not table_c.primary_key
625                assert list(table2_c.c.myid.foreign_keys)[0].column \
626                    is table_c.c.myid
627                assert list(table2_c.c.myid.foreign_keys)[0].column \
628                    is not table.c.myid
629                assert 'x' in str(table_c.c.foo.server_default.arg)
630                if not reflect:
631                    assert isinstance(table_c.c.myid.default, Sequence)
632                    assert str(table_c.c.foo.server_onupdate.arg) == 'q'
633                    assert str(table_c.c.bar.default.arg) == 'y'
634                    assert getattr(table_c.c.bar.onupdate.arg, 'arg',
635                                   table_c.c.bar.onupdate.arg) == 'z'
636                    assert isinstance(table2_c.c.id.default, Sequence)
637
638                # constraints don't get reflected for any dialect right
639                # now
640
641                if has_constraints:
642                    for c in table_c.c.description.constraints:
643                        if isinstance(c, CheckConstraint):
644                            break
645                    else:
646                        assert False
647                    assert str(c.sqltext) == "description='hi'"
648                    for c in table_c.constraints:
649                        if isinstance(c, UniqueConstraint):
650                            break
651                    else:
652                        assert False
653                    assert c.columns.contains_column(table_c.c.name)
654                    assert not c.columns.contains_column(table.c.name)
655
656        finally:
657            meta.drop_all(testing.db)
658
659    def test_col_key_fk_parent(self):
660        # test #2643
661        m1 = MetaData()
662        a = Table('a', m1, Column('x', Integer))
663        b = Table('b', m1, Column('x', Integer, ForeignKey('a.x'), key='y'))
664        assert b.c.y.references(a.c.x)
665
666        m2 = MetaData()
667        b2 = b.tometadata(m2)
668        a2 = a.tometadata(m2)
669        assert b2.c.y.references(a2.c.x)
670
671    def test_change_schema(self):
672        meta = MetaData()
673
674        table = Table('mytable', meta,
675                      Column('myid', Integer, primary_key=True),
676                      Column('name', String(40), nullable=True),
677                      Column('description', String(30),
678                             CheckConstraint("description='hi'")),
679                      UniqueConstraint('name'),
680                      )
681
682        table2 = Table('othertable', meta,
683                       Column('id', Integer, primary_key=True),
684                       Column('myid', Integer, ForeignKey('mytable.myid')),
685                       )
686
687        meta2 = MetaData()
688        table_c = table.tometadata(meta2, schema='someschema')
689        table2_c = table2.tometadata(meta2, schema='someschema')
690
691        eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid
692                                                      == table2_c.c.myid))
693        eq_(str(table_c.join(table2_c).onclause),
694            'someschema.mytable.myid = someschema.othertable.myid')
695
696    def test_retain_table_schema(self):
697        meta = MetaData()
698
699        table = Table('mytable', meta,
700                      Column('myid', Integer, primary_key=True),
701                      Column('name', String(40), nullable=True),
702                      Column('description', String(30),
703                             CheckConstraint("description='hi'")),
704                      UniqueConstraint('name'),
705                      schema='myschema',
706                      )
707
708        table2 = Table(
709            'othertable',
710            meta,
711            Column(
712                'id',
713                Integer,
714                primary_key=True),
715            Column(
716                'myid',
717                Integer,
718                ForeignKey('myschema.mytable.myid')),
719            schema='myschema',
720        )
721
722        meta2 = MetaData()
723        table_c = table.tometadata(meta2)
724        table2_c = table2.tometadata(meta2)
725
726        eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid
727                                                      == table2_c.c.myid))
728        eq_(str(table_c.join(table2_c).onclause),
729            'myschema.mytable.myid = myschema.othertable.myid')
730
731    def test_change_name_retain_metadata(self):
732        meta = MetaData()
733
734        table = Table('mytable', meta,
735                      Column('myid', Integer, primary_key=True),
736                      Column('name', String(40), nullable=True),
737                      Column('description', String(30),
738                             CheckConstraint("description='hi'")),
739                      UniqueConstraint('name'),
740                      schema='myschema',
741                      )
742
743        table2 = table.tometadata(table.metadata, name='newtable')
744        table3 = table.tometadata(table.metadata, schema='newschema',
745                                  name='newtable')
746
747        assert table.metadata is table2.metadata
748        assert table.metadata is table3.metadata
749        eq_((table.name, table2.name, table3.name),
750            ('mytable', 'newtable', 'newtable'))
751        eq_((table.key, table2.key, table3.key),
752            ('myschema.mytable', 'myschema.newtable', 'newschema.newtable'))
753
754    def test_change_name_change_metadata(self):
755        meta = MetaData()
756        meta2 = MetaData()
757
758        table = Table('mytable', meta,
759                      Column('myid', Integer, primary_key=True),
760                      Column('name', String(40), nullable=True),
761                      Column('description', String(30),
762                             CheckConstraint("description='hi'")),
763                      UniqueConstraint('name'),
764                      schema='myschema',
765                      )
766
767        table2 = table.tometadata(meta2, name='newtable')
768
769        assert table.metadata is not table2.metadata
770        eq_((table.name, table2.name),
771            ('mytable', 'newtable'))
772        eq_((table.key, table2.key),
773            ('myschema.mytable', 'myschema.newtable'))
774
775    def test_change_name_selfref_fk_moves(self):
776        meta = MetaData()
777
778        referenced = Table('ref', meta,
779                           Column('id', Integer, primary_key=True),
780                           )
781        table = Table('mytable', meta,
782                      Column('id', Integer, primary_key=True),
783                      Column('parent_id', ForeignKey('mytable.id')),
784                      Column('ref_id', ForeignKey('ref.id'))
785                      )
786
787        table2 = table.tometadata(table.metadata, name='newtable')
788        assert table.metadata is table2.metadata
789        assert table2.c.ref_id.references(referenced.c.id)
790        assert table2.c.parent_id.references(table2.c.id)
791
792    def test_change_name_selfref_fk_moves_w_schema(self):
793        meta = MetaData()
794
795        referenced = Table('ref', meta,
796                           Column('id', Integer, primary_key=True),
797                           )
798        table = Table('mytable', meta,
799                      Column('id', Integer, primary_key=True),
800                      Column('parent_id', ForeignKey('mytable.id')),
801                      Column('ref_id', ForeignKey('ref.id'))
802                      )
803
804        table2 = table.tometadata(
805            table.metadata, name='newtable', schema='newschema')
806        ref2 = referenced.tometadata(table.metadata, schema='newschema')
807        assert table.metadata is table2.metadata
808        assert table2.c.ref_id.references(ref2.c.id)
809        assert table2.c.parent_id.references(table2.c.id)
810
811    def _assert_fk(self, t2, schema, expected, referred_schema_fn=None):
812        m2 = MetaData()
813        existing_schema = t2.schema
814        if schema:
815            t2c = t2.tometadata(m2, schema=schema,
816                                referred_schema_fn=referred_schema_fn)
817            eq_(t2c.schema, schema)
818        else:
819            t2c = t2.tometadata(m2, referred_schema_fn=referred_schema_fn)
820            eq_(t2c.schema, existing_schema)
821        eq_(list(t2c.c.y.foreign_keys)[0]._get_colspec(), expected)
822
823    def test_fk_has_schema_string_retain_schema(self):
824        m = MetaData()
825
826        t2 = Table('t2', m, Column('y', Integer, ForeignKey('q.t1.x')))
827        self._assert_fk(t2, None, "q.t1.x")
828
829        Table('t1', m, Column('x', Integer), schema='q')
830        self._assert_fk(t2, None, "q.t1.x")
831
832    def test_fk_has_schema_string_new_schema(self):
833        m = MetaData()
834
835        t2 = Table('t2', m, Column('y', Integer, ForeignKey('q.t1.x')))
836        self._assert_fk(t2, "z", "q.t1.x")
837
838        Table('t1', m, Column('x', Integer), schema='q')
839        self._assert_fk(t2, "z", "q.t1.x")
840
841    def test_fk_has_schema_col_retain_schema(self):
842        m = MetaData()
843
844        t1 = Table('t1', m, Column('x', Integer), schema='q')
845        t2 = Table('t2', m, Column('y', Integer, ForeignKey(t1.c.x)))
846
847        self._assert_fk(t2, "z", "q.t1.x")
848
849    def test_fk_has_schema_col_new_schema(self):
850        m = MetaData()
851
852        t1 = Table('t1', m, Column('x', Integer), schema='q')
853        t2 = Table('t2', m, Column('y', Integer, ForeignKey(t1.c.x)))
854
855        self._assert_fk(t2, "z", "q.t1.x")
856
857    def test_fk_and_referent_has_same_schema_string_retain_schema(self):
858        m = MetaData()
859
860        t2 = Table('t2', m, Column('y', Integer,
861                                   ForeignKey('q.t1.x')), schema="q")
862
863        self._assert_fk(t2, None, "q.t1.x")
864
865        Table('t1', m, Column('x', Integer), schema='q')
866        self._assert_fk(t2, None, "q.t1.x")
867
868    def test_fk_and_referent_has_same_schema_string_new_schema(self):
869        m = MetaData()
870
871        t2 = Table('t2', m, Column('y', Integer,
872                                   ForeignKey('q.t1.x')), schema="q")
873
874        self._assert_fk(t2, "z", "z.t1.x")
875
876        Table('t1', m, Column('x', Integer), schema='q')
877        self._assert_fk(t2, "z", "z.t1.x")
878
879    def test_fk_and_referent_has_same_schema_col_retain_schema(self):
880        m = MetaData()
881
882        t1 = Table('t1', m, Column('x', Integer), schema='q')
883        t2 = Table('t2', m, Column('y', Integer,
884                                   ForeignKey(t1.c.x)), schema='q')
885        self._assert_fk(t2, None, "q.t1.x")
886
887    def test_fk_and_referent_has_same_schema_col_new_schema(self):
888        m = MetaData()
889
890        t1 = Table('t1', m, Column('x', Integer), schema='q')
891        t2 = Table('t2', m, Column('y', Integer,
892                                   ForeignKey(t1.c.x)), schema='q')
893        self._assert_fk(t2, 'z', "z.t1.x")
894
895    def test_fk_and_referent_has_diff_schema_string_retain_schema(self):
896        m = MetaData()
897
898        t2 = Table('t2', m, Column('y', Integer,
899                                   ForeignKey('p.t1.x')), schema="q")
900
901        self._assert_fk(t2, None, "p.t1.x")
902
903        Table('t1', m, Column('x', Integer), schema='p')
904        self._assert_fk(t2, None, "p.t1.x")
905
906    def test_fk_and_referent_has_diff_schema_string_new_schema(self):
907        m = MetaData()
908
909        t2 = Table('t2', m, Column('y', Integer,
910                                   ForeignKey('p.t1.x')), schema="q")
911
912        self._assert_fk(t2, "z", "p.t1.x")
913
914        Table('t1', m, Column('x', Integer), schema='p')
915        self._assert_fk(t2, "z", "p.t1.x")
916
917    def test_fk_and_referent_has_diff_schema_col_retain_schema(self):
918        m = MetaData()
919
920        t1 = Table('t1', m, Column('x', Integer), schema='p')
921        t2 = Table('t2', m, Column('y', Integer,
922                                   ForeignKey(t1.c.x)), schema='q')
923        self._assert_fk(t2, None, "p.t1.x")
924
925    def test_fk_and_referent_has_diff_schema_col_new_schema(self):
926        m = MetaData()
927
928        t1 = Table('t1', m, Column('x', Integer), schema='p')
929        t2 = Table('t2', m, Column('y', Integer,
930                                   ForeignKey(t1.c.x)), schema='q')
931        self._assert_fk(t2, 'z', "p.t1.x")
932
933    def test_fk_custom_system(self):
934        m = MetaData()
935        t2 = Table('t2', m, Column('y', Integer,
936                                   ForeignKey('p.t1.x')), schema='q')
937
938        def ref_fn(table, to_schema, constraint, referred_schema):
939            assert table is t2
940            eq_(to_schema, "z")
941            eq_(referred_schema, "p")
942            return "h"
943        self._assert_fk(t2, 'z', "h.t1.x", referred_schema_fn=ref_fn)
944
945    def test_copy_info(self):
946        m = MetaData()
947        fk = ForeignKey('t2.id')
948        c = Column('c', Integer, fk)
949        ck = CheckConstraint('c > 5')
950        t = Table('t', m, c, ck)
951
952        m.info['minfo'] = True
953        fk.info['fkinfo'] = True
954        c.info['cinfo'] = True
955        ck.info['ckinfo'] = True
956        t.info['tinfo'] = True
957        t.primary_key.info['pkinfo'] = True
958        fkc = [const for const in t.constraints if
959               isinstance(const, ForeignKeyConstraint)][0]
960        fkc.info['fkcinfo'] = True
961
962        m2 = MetaData()
963        t2 = t.tometadata(m2)
964
965        m.info['minfo'] = False
966        fk.info['fkinfo'] = False
967        c.info['cinfo'] = False
968        ck.info['ckinfo'] = False
969        t.primary_key.info['pkinfo'] = False
970        fkc.info['fkcinfo'] = False
971
972        eq_(m2.info, {})
973        eq_(t2.info, {"tinfo": True})
974        eq_(t2.c.c.info, {"cinfo": True})
975        eq_(list(t2.c.c.foreign_keys)[0].info, {"fkinfo": True})
976        eq_(t2.primary_key.info, {"pkinfo": True})
977
978        fkc2 = [const for const in t2.constraints
979                if isinstance(const, ForeignKeyConstraint)][0]
980        eq_(fkc2.info, {"fkcinfo": True})
981
982        ck2 = [const for const in
983               t2.constraints if isinstance(const, CheckConstraint)][0]
984        eq_(ck2.info, {"ckinfo": True})
985
986    def test_dialect_kwargs(self):
987        meta = MetaData()
988
989        table = Table('mytable', meta,
990                      Column('myid', Integer, primary_key=True),
991                      mysql_engine='InnoDB',
992                      )
993
994        meta2 = MetaData()
995        table_c = table.tometadata(meta2)
996
997        eq_(table.kwargs, {"mysql_engine": "InnoDB"})
998
999        eq_(table.kwargs, table_c.kwargs)
1000
1001    def test_indexes(self):
1002        meta = MetaData()
1003
1004        table = Table('mytable', meta,
1005                      Column('id', Integer, primary_key=True),
1006                      Column('data1', Integer, index=True),
1007                      Column('data2', Integer),
1008                      )
1009        Index('multi', table.c.data1, table.c.data2),
1010
1011        meta2 = MetaData()
1012        table_c = table.tometadata(meta2)
1013
1014        def _get_key(i):
1015            return [i.name, i.unique] + \
1016                sorted(i.kwargs.items()) + \
1017                list(i.columns.keys())
1018
1019        eq_(
1020            sorted([_get_key(i) for i in table.indexes]),
1021            sorted([_get_key(i) for i in table_c.indexes])
1022        )
1023
1024    @emits_warning("Table '.+' already exists within the given MetaData")
1025    def test_already_exists(self):
1026
1027        meta1 = MetaData()
1028        table1 = Table('mytable', meta1,
1029                       Column('myid', Integer, primary_key=True),
1030                       )
1031        meta2 = MetaData()
1032        table2 = Table('mytable', meta2,
1033                       Column('yourid', Integer, primary_key=True),
1034                       )
1035
1036        table_c = table1.tometadata(meta2)
1037        table_d = table2.tometadata(meta2)
1038
1039        # d'oh!
1040        assert table_c is table_d
1041
1042    def test_default_schema_metadata(self):
1043        meta = MetaData(schema='myschema')
1044
1045        table = Table(
1046            'mytable',
1047            meta,
1048            Column(
1049                'myid',
1050                Integer,
1051                primary_key=True),
1052            Column(
1053                'name',
1054                String(40),
1055                nullable=True),
1056            Column(
1057                'description',
1058                String(30),
1059                CheckConstraint("description='hi'")),
1060            UniqueConstraint('name'),
1061        )
1062
1063        table2 = Table(
1064            'othertable', meta, Column(
1065                'id', Integer, primary_key=True), Column(
1066                'myid', Integer, ForeignKey('myschema.mytable.myid')), )
1067
1068        meta2 = MetaData(schema='someschema')
1069        table_c = table.tometadata(meta2, schema=None)
1070        table2_c = table2.tometadata(meta2, schema=None)
1071
1072        eq_(str(table_c.join(table2_c).onclause),
1073            str(table_c.c.myid == table2_c.c.myid))
1074        eq_(str(table_c.join(table2_c).onclause),
1075            "someschema.mytable.myid = someschema.othertable.myid")
1076
1077    def test_strip_schema(self):
1078        meta = MetaData()
1079
1080        table = Table('mytable', meta,
1081                      Column('myid', Integer, primary_key=True),
1082                      Column('name', String(40), nullable=True),
1083                      Column('description', String(30),
1084                             CheckConstraint("description='hi'")),
1085                      UniqueConstraint('name'),
1086                      )
1087
1088        table2 = Table('othertable', meta,
1089                       Column('id', Integer, primary_key=True),
1090                       Column('myid', Integer, ForeignKey('mytable.myid')),
1091                       )
1092
1093        meta2 = MetaData()
1094        table_c = table.tometadata(meta2, schema=None)
1095        table2_c = table2.tometadata(meta2, schema=None)
1096
1097        eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid
1098                                                      == table2_c.c.myid))
1099        eq_(str(table_c.join(table2_c).onclause),
1100            'mytable.myid = othertable.myid')
1101
1102    def test_unique_true_flag(self):
1103        meta = MetaData()
1104
1105        table = Table('mytable', meta, Column('x', Integer, unique=True))
1106
1107        m2 = MetaData()
1108
1109        t2 = table.tometadata(m2)
1110
1111        eq_(
1112            len([
1113                const for const
1114                in t2.constraints
1115                if isinstance(const, UniqueConstraint)]),
1116            1
1117        )
1118
1119    def test_index_true_flag(self):
1120        meta = MetaData()
1121
1122        table = Table('mytable', meta, Column('x', Integer, index=True))
1123
1124        m2 = MetaData()
1125
1126        t2 = table.tometadata(m2)
1127
1128        eq_(len(t2.indexes), 1)
1129
1130
1131class InfoTest(fixtures.TestBase):
1132    def test_metadata_info(self):
1133        m1 = MetaData()
1134        eq_(m1.info, {})
1135
1136        m1 = MetaData(info={"foo": "bar"})
1137        eq_(m1.info, {"foo": "bar"})
1138
1139    def test_foreignkey_constraint_info(self):
1140        fkc = ForeignKeyConstraint(['a'], ['b'], name='bar')
1141        eq_(fkc.info, {})
1142
1143        fkc = ForeignKeyConstraint(
1144            ['a'], ['b'], name='bar', info={"foo": "bar"})
1145        eq_(fkc.info, {"foo": "bar"})
1146
1147    def test_foreignkey_info(self):
1148        fkc = ForeignKey('a')
1149        eq_(fkc.info, {})
1150
1151        fkc = ForeignKey('a', info={"foo": "bar"})
1152        eq_(fkc.info, {"foo": "bar"})
1153
1154    def test_primarykey_constraint_info(self):
1155        pkc = PrimaryKeyConstraint('a', name='x')
1156        eq_(pkc.info, {})
1157
1158        pkc = PrimaryKeyConstraint('a', name='x', info={'foo': 'bar'})
1159        eq_(pkc.info, {'foo': 'bar'})
1160
1161    def test_unique_constraint_info(self):
1162        uc = UniqueConstraint('a', name='x')
1163        eq_(uc.info, {})
1164
1165        uc = UniqueConstraint('a', name='x', info={'foo': 'bar'})
1166        eq_(uc.info, {'foo': 'bar'})
1167
1168    def test_check_constraint_info(self):
1169        cc = CheckConstraint('foo=bar', name='x')
1170        eq_(cc.info, {})
1171
1172        cc = CheckConstraint('foo=bar', name='x', info={'foo': 'bar'})
1173        eq_(cc.info, {'foo': 'bar'})
1174
1175    def test_index_info(self):
1176        ix = Index('x', 'a')
1177        eq_(ix.info, {})
1178
1179        ix = Index('x', 'a', info={'foo': 'bar'})
1180        eq_(ix.info, {'foo': 'bar'})
1181
1182    def test_column_info(self):
1183        c = Column('x', Integer)
1184        eq_(c.info, {})
1185
1186        c = Column('x', Integer, info={'foo': 'bar'})
1187        eq_(c.info, {'foo': 'bar'})
1188
1189    def test_table_info(self):
1190        t = Table('x', MetaData())
1191        eq_(t.info, {})
1192
1193        t = Table('x', MetaData(), info={'foo': 'bar'})
1194        eq_(t.info, {'foo': 'bar'})
1195
1196
1197class TableTest(fixtures.TestBase, AssertsCompiledSQL):
1198
1199    @testing.requires.temporary_tables
1200    @testing.skip_if('mssql', 'different col format')
1201    def test_prefixes(self):
1202        from sqlalchemy import Table
1203        table1 = Table("temporary_table_1", MetaData(),
1204                       Column("col1", Integer),
1205                       prefixes=["TEMPORARY"])
1206
1207        self.assert_compile(
1208            schema.CreateTable(table1),
1209            "CREATE TEMPORARY TABLE temporary_table_1 (col1 INTEGER)"
1210        )
1211
1212        table2 = Table("temporary_table_2", MetaData(),
1213                       Column("col1", Integer),
1214                       prefixes=["VIRTUAL"])
1215        self.assert_compile(
1216            schema.CreateTable(table2),
1217            "CREATE VIRTUAL TABLE temporary_table_2 (col1 INTEGER)"
1218        )
1219
1220    def test_table_info(self):
1221        metadata = MetaData()
1222        t1 = Table('foo', metadata, info={'x': 'y'})
1223        t2 = Table('bar', metadata, info={})
1224        t3 = Table('bat', metadata)
1225        assert t1.info == {'x': 'y'}
1226        assert t2.info == {}
1227        assert t3.info == {}
1228        for t in (t1, t2, t3):
1229            t.info['bar'] = 'zip'
1230            assert t.info['bar'] == 'zip'
1231
1232    def test_foreign_key_constraints_collection(self):
1233        metadata = MetaData()
1234        t1 = Table('foo', metadata, Column('a', Integer))
1235        eq_(t1.foreign_key_constraints, set())
1236
1237        fk1 = ForeignKey('q.id')
1238        fk2 = ForeignKey('j.id')
1239        fk3 = ForeignKeyConstraint(['b', 'c'], ['r.x', 'r.y'])
1240
1241        t1.append_column(Column('b', Integer, fk1))
1242        eq_(
1243            t1.foreign_key_constraints,
1244            set([fk1.constraint]))
1245
1246        t1.append_column(Column('c', Integer, fk2))
1247        eq_(
1248            t1.foreign_key_constraints,
1249            set([fk1.constraint, fk2.constraint]))
1250
1251        t1.append_constraint(fk3)
1252        eq_(
1253            t1.foreign_key_constraints,
1254            set([fk1.constraint, fk2.constraint, fk3]))
1255
1256    def test_c_immutable(self):
1257        m = MetaData()
1258        t1 = Table('t', m, Column('x', Integer), Column('y', Integer))
1259        assert_raises(
1260            TypeError,
1261            t1.c.extend, [Column('z', Integer)]
1262        )
1263
1264        def assign():
1265            t1.c['z'] = Column('z', Integer)
1266        assert_raises(
1267            TypeError,
1268            assign
1269        )
1270
1271        def assign2():
1272            t1.c.z = Column('z', Integer)
1273        assert_raises(
1274            TypeError,
1275            assign2
1276        )
1277
1278    def test_c_mutate_after_unpickle(self):
1279        m = MetaData()
1280
1281        y = Column('y', Integer)
1282        t1 = Table('t', m, Column('x', Integer), y)
1283
1284        t2 = pickle.loads(pickle.dumps(t1))
1285        z = Column('z', Integer)
1286        g = Column('g', Integer)
1287        t2.append_column(z)
1288
1289        is_(t1.c.contains_column(y), True)
1290        is_(t2.c.contains_column(y), False)
1291        y2 = t2.c.y
1292        is_(t2.c.contains_column(y2), True)
1293
1294        is_(t2.c.contains_column(z), True)
1295        is_(t2.c.contains_column(g), False)
1296
1297    def test_autoincrement_replace(self):
1298        m = MetaData()
1299
1300        t = Table('t', m,
1301                  Column('id', Integer, primary_key=True)
1302                  )
1303
1304        is_(t._autoincrement_column, t.c.id)
1305
1306        t = Table('t', m,
1307                  Column('id', Integer, primary_key=True),
1308                  extend_existing=True
1309                  )
1310        is_(t._autoincrement_column, t.c.id)
1311
1312    def test_pk_args_standalone(self):
1313        m = MetaData()
1314        t = Table('t', m,
1315                  Column('x', Integer, primary_key=True),
1316                  PrimaryKeyConstraint(mssql_clustered=True)
1317                  )
1318        eq_(
1319            list(t.primary_key), [t.c.x]
1320        )
1321        eq_(
1322            t.primary_key.dialect_kwargs, {"mssql_clustered": True}
1323        )
1324
1325    def test_pk_cols_sets_flags(self):
1326        m = MetaData()
1327        t = Table('t', m,
1328                  Column('x', Integer),
1329                  Column('y', Integer),
1330                  Column('z', Integer),
1331                  PrimaryKeyConstraint('x', 'y')
1332                  )
1333        eq_(t.c.x.primary_key, True)
1334        eq_(t.c.y.primary_key, True)
1335        eq_(t.c.z.primary_key, False)
1336
1337    def test_pk_col_mismatch_one(self):
1338        m = MetaData()
1339        assert_raises_message(
1340            exc.SAWarning,
1341            "Table 't' specifies columns 'x' as primary_key=True, "
1342            "not matching locally specified columns 'q'",
1343            Table, 't', m,
1344            Column('x', Integer, primary_key=True),
1345            Column('q', Integer),
1346            PrimaryKeyConstraint('q')
1347        )
1348
1349    def test_pk_col_mismatch_two(self):
1350        m = MetaData()
1351        assert_raises_message(
1352            exc.SAWarning,
1353            "Table 't' specifies columns 'a', 'b', 'c' as primary_key=True, "
1354            "not matching locally specified columns 'b', 'c'",
1355            Table, 't', m,
1356            Column('a', Integer, primary_key=True),
1357            Column('b', Integer, primary_key=True),
1358            Column('c', Integer, primary_key=True),
1359            PrimaryKeyConstraint('b', 'c')
1360        )
1361
1362    @testing.emits_warning("Table 't'")
1363    def test_pk_col_mismatch_three(self):
1364        m = MetaData()
1365        t = Table('t', m,
1366                  Column('x', Integer, primary_key=True),
1367                  Column('q', Integer),
1368                  PrimaryKeyConstraint('q')
1369                  )
1370        eq_(list(t.primary_key), [t.c.q])
1371
1372    @testing.emits_warning("Table 't'")
1373    def test_pk_col_mismatch_four(self):
1374        m = MetaData()
1375        t = Table('t', m,
1376                  Column('a', Integer, primary_key=True),
1377                  Column('b', Integer, primary_key=True),
1378                  Column('c', Integer, primary_key=True),
1379                  PrimaryKeyConstraint('b', 'c')
1380                  )
1381        eq_(list(t.primary_key), [t.c.b, t.c.c])
1382
1383    def test_pk_always_flips_nullable(self):
1384        m = MetaData()
1385
1386        t1 = Table('t1', m, Column('x', Integer), PrimaryKeyConstraint('x'))
1387
1388        t2 = Table('t2', m, Column('x', Integer, primary_key=True))
1389
1390        eq_(list(t1.primary_key), [t1.c.x])
1391
1392        eq_(list(t2.primary_key), [t2.c.x])
1393
1394        assert t1.c.x.primary_key
1395        assert t2.c.x.primary_key
1396
1397        assert not t2.c.x.nullable
1398        assert not t1.c.x.nullable
1399
1400
1401class SchemaTypeTest(fixtures.TestBase):
1402
1403    class MyType(sqltypes.SchemaType, sqltypes.TypeEngine):
1404        column = None
1405        table = None
1406        evt_targets = ()
1407
1408        def _set_table(self, column, table):
1409            super(SchemaTypeTest.MyType, self)._set_table(column, table)
1410            self.column = column
1411            self.table = table
1412
1413        def _on_table_create(self, target, bind, **kw):
1414            super(SchemaTypeTest.MyType, self)._on_table_create(
1415                target, bind, **kw)
1416            self.evt_targets += (target,)
1417
1418        def _on_metadata_create(self, target, bind, **kw):
1419            super(SchemaTypeTest.MyType, self)._on_metadata_create(
1420                target, bind, **kw)
1421            self.evt_targets += (target,)
1422
1423    class MyTypeWImpl(MyType):
1424
1425        def _gen_dialect_impl(self, dialect):
1426            return self.adapt(SchemaTypeTest.MyTypeImpl)
1427
1428    class MyTypeImpl(MyTypeWImpl):
1429        pass
1430
1431    def test_independent_schema(self):
1432        m = MetaData()
1433        type_ = self.MyType(schema="q")
1434        t1 = Table('x', m, Column("y", type_), schema="z")
1435        eq_(t1.c.y.type.schema, "q")
1436
1437    def test_inherit_schema(self):
1438        m = MetaData()
1439        type_ = self.MyType(schema="q", inherit_schema=True)
1440        t1 = Table('x', m, Column("y", type_), schema="z")
1441        eq_(t1.c.y.type.schema, "z")
1442
1443    def test_independent_schema_enum(self):
1444        m = MetaData()
1445        type_ = sqltypes.Enum("a", schema="q")
1446        t1 = Table('x', m, Column("y", type_), schema="z")
1447        eq_(t1.c.y.type.schema, "q")
1448
1449    def test_inherit_schema_enum(self):
1450        m = MetaData()
1451        type_ = sqltypes.Enum("a", "b", "c", schema="q", inherit_schema=True)
1452        t1 = Table('x', m, Column("y", type_), schema="z")
1453        eq_(t1.c.y.type.schema, "z")
1454
1455    def test_tometadata_copy_type(self):
1456        m1 = MetaData()
1457
1458        type_ = self.MyType()
1459        t1 = Table('x', m1, Column("y", type_))
1460
1461        m2 = MetaData()
1462        t2 = t1.tometadata(m2)
1463
1464        # metadata isn't set
1465        is_(t2.c.y.type.metadata, None)
1466
1467        # our test type sets table, though
1468        is_(t2.c.y.type.table, t2)
1469
1470    def test_tometadata_independent_schema(self):
1471        m1 = MetaData()
1472
1473        type_ = self.MyType()
1474        t1 = Table('x', m1, Column("y", type_))
1475
1476        m2 = MetaData()
1477        t2 = t1.tometadata(m2, schema="bar")
1478
1479        eq_(t2.c.y.type.schema, None)
1480
1481    def test_tometadata_inherit_schema(self):
1482        m1 = MetaData()
1483
1484        type_ = self.MyType(inherit_schema=True)
1485        t1 = Table('x', m1, Column("y", type_))
1486
1487        m2 = MetaData()
1488        t2 = t1.tometadata(m2, schema="bar")
1489
1490        eq_(t1.c.y.type.schema, None)
1491        eq_(t2.c.y.type.schema, "bar")
1492
1493    def test_tometadata_independent_events(self):
1494        m1 = MetaData()
1495
1496        type_ = self.MyType()
1497        t1 = Table('x', m1, Column("y", type_))
1498
1499        m2 = MetaData()
1500        t2 = t1.tometadata(m2)
1501
1502        t1.dispatch.before_create(t1, testing.db)
1503        eq_(t1.c.y.type.evt_targets, (t1,))
1504        eq_(t2.c.y.type.evt_targets, ())
1505
1506        t2.dispatch.before_create(t2, testing.db)
1507        t2.dispatch.before_create(t2, testing.db)
1508        eq_(t1.c.y.type.evt_targets, (t1,))
1509        eq_(t2.c.y.type.evt_targets, (t2, t2))
1510
1511    def test_metadata_dispatch_no_new_impl(self):
1512        m1 = MetaData()
1513        typ = self.MyType(metadata=m1)
1514        m1.dispatch.before_create(m1, testing.db)
1515        eq_(typ.evt_targets, (m1, ))
1516
1517        dialect_impl = typ.dialect_impl(testing.db.dialect)
1518        eq_(dialect_impl.evt_targets, ())
1519
1520    def test_metadata_dispatch_new_impl(self):
1521        m1 = MetaData()
1522        typ = self.MyTypeWImpl(metadata=m1)
1523        m1.dispatch.before_create(m1, testing.db)
1524        eq_(typ.evt_targets, (m1, ))
1525
1526        dialect_impl = typ.dialect_impl(testing.db.dialect)
1527        eq_(dialect_impl.evt_targets, (m1, ))
1528
1529    def test_table_dispatch_no_new_impl(self):
1530        m1 = MetaData()
1531        typ = self.MyType()
1532        t1 = Table('t1', m1, Column('x', typ))
1533        m1.dispatch.before_create(t1, testing.db)
1534        eq_(typ.evt_targets, (t1, ))
1535
1536        dialect_impl = typ.dialect_impl(testing.db.dialect)
1537        eq_(dialect_impl.evt_targets, ())
1538
1539    def test_table_dispatch_new_impl(self):
1540        m1 = MetaData()
1541        typ = self.MyTypeWImpl()
1542        t1 = Table('t1', m1, Column('x', typ))
1543        m1.dispatch.before_create(t1, testing.db)
1544        eq_(typ.evt_targets, (t1, ))
1545
1546        dialect_impl = typ.dialect_impl(testing.db.dialect)
1547        eq_(dialect_impl.evt_targets, (t1, ))
1548
1549    def test_create_metadata_bound_no_crash(self):
1550        m1 = MetaData()
1551        self.MyType(metadata=m1)
1552
1553        m1.create_all(testing.db)
1554
1555    def test_boolean_constraint_type_doesnt_double(self):
1556        m1 = MetaData()
1557
1558        t1 = Table('x', m1, Column("flag", Boolean()))
1559        eq_(
1560            len([
1561                c for c in t1.constraints
1562                if isinstance(c, CheckConstraint)]),
1563            1
1564        )
1565        m2 = MetaData()
1566        t2 = t1.tometadata(m2)
1567
1568        eq_(
1569            len([
1570                c for c in t2.constraints
1571                if isinstance(c, CheckConstraint)]),
1572            1
1573        )
1574
1575    def test_enum_constraint_type_doesnt_double(self):
1576        m1 = MetaData()
1577
1578        t1 = Table('x', m1, Column("flag", Enum('a', 'b', 'c')))
1579        eq_(
1580            len([
1581                c for c in t1.constraints
1582                if isinstance(c, CheckConstraint)]),
1583            1
1584        )
1585        m2 = MetaData()
1586        t2 = t1.tometadata(m2)
1587
1588        eq_(
1589            len([
1590                c for c in t2.constraints
1591                if isinstance(c, CheckConstraint)]),
1592            1
1593        )
1594
1595
1596class SchemaTest(fixtures.TestBase, AssertsCompiledSQL):
1597
1598    def test_default_schema_metadata_fk(self):
1599        m = MetaData(schema="foo")
1600        t1 = Table('t1', m, Column('x', Integer))
1601        t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')))
1602        assert t2.c.x.references(t1.c.x)
1603
1604    def test_ad_hoc_schema_equiv_fk(self):
1605        m = MetaData()
1606        t1 = Table('t1', m, Column('x', Integer), schema="foo")
1607        t2 = Table(
1608            't2',
1609            m,
1610            Column(
1611                'x',
1612                Integer,
1613                ForeignKey('t1.x')),
1614            schema="foo")
1615        assert_raises(
1616            exc.NoReferencedTableError,
1617            lambda: t2.c.x.references(t1.c.x)
1618        )
1619
1620    def test_default_schema_metadata_fk_alt_remote(self):
1621        m = MetaData(schema="foo")
1622        t1 = Table('t1', m, Column('x', Integer))
1623        t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')),
1624                   schema="bar")
1625        assert t2.c.x.references(t1.c.x)
1626
1627    def test_default_schema_metadata_fk_alt_local_raises(self):
1628        m = MetaData(schema="foo")
1629        t1 = Table('t1', m, Column('x', Integer), schema="bar")
1630        t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')))
1631        assert_raises(
1632            exc.NoReferencedTableError,
1633            lambda: t2.c.x.references(t1.c.x)
1634        )
1635
1636    def test_default_schema_metadata_fk_alt_local(self):
1637        m = MetaData(schema="foo")
1638        t1 = Table('t1', m, Column('x', Integer), schema="bar")
1639        t2 = Table('t2', m, Column('x', Integer, ForeignKey('bar.t1.x')))
1640        assert t2.c.x.references(t1.c.x)
1641
1642    def test_create_drop_schema(self):
1643
1644        self.assert_compile(
1645            schema.CreateSchema("sa_schema"),
1646            "CREATE SCHEMA sa_schema"
1647        )
1648        self.assert_compile(
1649            schema.DropSchema("sa_schema"),
1650            "DROP SCHEMA sa_schema"
1651        )
1652        self.assert_compile(
1653            schema.DropSchema("sa_schema", cascade=True),
1654            "DROP SCHEMA sa_schema CASCADE"
1655        )
1656
1657    def test_iteration(self):
1658        metadata = MetaData()
1659        table1 = Table(
1660            'table1',
1661            metadata,
1662            Column(
1663                'col1',
1664                Integer,
1665                primary_key=True),
1666            schema='someschema')
1667        table2 = Table(
1668            'table2',
1669            metadata,
1670            Column(
1671                'col1',
1672                Integer,
1673                primary_key=True),
1674            Column(
1675                'col2',
1676                Integer,
1677                ForeignKey('someschema.table1.col1')),
1678            schema='someschema')
1679
1680        t1 = str(schema.CreateTable(table1).compile(bind=testing.db))
1681        t2 = str(schema.CreateTable(table2).compile(bind=testing.db))
1682        if testing.db.dialect.preparer(testing.db.dialect).omit_schema:
1683            assert t1.index("CREATE TABLE table1") > -1
1684            assert t2.index("CREATE TABLE table2") > -1
1685        else:
1686            assert t1.index("CREATE TABLE someschema.table1") > -1
1687            assert t2.index("CREATE TABLE someschema.table2") > -1
1688
1689
1690class UseExistingTest(fixtures.TablesTest):
1691
1692    @classmethod
1693    def define_tables(cls, metadata):
1694        Table('users', metadata,
1695              Column('id', Integer, primary_key=True),
1696              Column('name', String(30)))
1697
1698    def _useexisting_fixture(self):
1699        meta2 = MetaData(testing.db)
1700        Table('users', meta2, autoload=True)
1701        return meta2
1702
1703    def _notexisting_fixture(self):
1704        return MetaData(testing.db)
1705
1706    def test_exception_no_flags(self):
1707        meta2 = self._useexisting_fixture()
1708
1709        def go():
1710            Table('users', meta2, Column('name',
1711                                         Unicode), autoload=True)
1712        assert_raises_message(
1713            exc.InvalidRequestError,
1714            "Table 'users' is already defined for this "
1715            "MetaData instance.",
1716            go
1717        )
1718
1719    def test_keep_plus_existing_raises(self):
1720        meta2 = self._useexisting_fixture()
1721        assert_raises(
1722            exc.ArgumentError,
1723            Table, 'users', meta2, keep_existing=True,
1724            extend_existing=True
1725        )
1726
1727    @testing.uses_deprecated()
1728    def test_existing_plus_useexisting_raises(self):
1729        meta2 = self._useexisting_fixture()
1730        assert_raises(
1731            exc.ArgumentError,
1732            Table, 'users', meta2, useexisting=True,
1733            extend_existing=True
1734        )
1735
1736    def test_keep_existing_no_dupe_constraints(self):
1737        meta2 = self._notexisting_fixture()
1738        users = Table('users', meta2,
1739                      Column('id', Integer),
1740                      Column('name', Unicode),
1741                      UniqueConstraint('name'),
1742                      keep_existing=True
1743                      )
1744        assert 'name' in users.c
1745        assert 'id' in users.c
1746        eq_(len(users.constraints), 2)
1747
1748        u2 = Table('users', meta2,
1749                   Column('id', Integer),
1750                   Column('name', Unicode),
1751                   UniqueConstraint('name'),
1752                   keep_existing=True
1753                   )
1754        eq_(len(u2.constraints), 2)
1755
1756    def test_extend_existing_dupes_constraints(self):
1757        meta2 = self._notexisting_fixture()
1758        users = Table('users', meta2,
1759                      Column('id', Integer),
1760                      Column('name', Unicode),
1761                      UniqueConstraint('name'),
1762                      extend_existing=True
1763                      )
1764        assert 'name' in users.c
1765        assert 'id' in users.c
1766        eq_(len(users.constraints), 2)
1767
1768        u2 = Table('users', meta2,
1769                   Column('id', Integer),
1770                   Column('name', Unicode),
1771                   UniqueConstraint('name'),
1772                   extend_existing=True
1773                   )
1774        # constraint got duped
1775        eq_(len(u2.constraints), 3)
1776
1777    def test_keep_existing_coltype(self):
1778        meta2 = self._useexisting_fixture()
1779        users = Table('users', meta2, Column('name', Unicode),
1780                      autoload=True, keep_existing=True)
1781        assert not isinstance(users.c.name.type, Unicode)
1782
1783    def test_keep_existing_quote(self):
1784        meta2 = self._useexisting_fixture()
1785        users = Table('users', meta2, quote=True, autoload=True,
1786                      keep_existing=True)
1787        assert not users.name.quote
1788
1789    def test_keep_existing_add_column(self):
1790        meta2 = self._useexisting_fixture()
1791        users = Table('users', meta2,
1792                      Column('foo', Integer),
1793                      autoload=True,
1794                      keep_existing=True)
1795        assert "foo" not in users.c
1796
1797    def test_keep_existing_coltype_no_orig(self):
1798        meta2 = self._notexisting_fixture()
1799        users = Table('users', meta2, Column('name', Unicode),
1800                      autoload=True, keep_existing=True)
1801        assert isinstance(users.c.name.type, Unicode)
1802
1803    @testing.skip_if(
1804        lambda: testing.db.dialect.requires_name_normalize,
1805        "test depends on lowercase as case insensitive")
1806    def test_keep_existing_quote_no_orig(self):
1807        meta2 = self._notexisting_fixture()
1808        users = Table('users', meta2, quote=True,
1809                      autoload=True,
1810                      keep_existing=True)
1811        assert users.name.quote
1812
1813    def test_keep_existing_add_column_no_orig(self):
1814        meta2 = self._notexisting_fixture()
1815        users = Table('users', meta2,
1816                      Column('foo', Integer),
1817                      autoload=True,
1818                      keep_existing=True)
1819        assert "foo" in users.c
1820
1821    def test_keep_existing_coltype_no_reflection(self):
1822        meta2 = self._useexisting_fixture()
1823        users = Table('users', meta2, Column('name', Unicode),
1824                      keep_existing=True)
1825        assert not isinstance(users.c.name.type, Unicode)
1826
1827    def test_keep_existing_quote_no_reflection(self):
1828        meta2 = self._useexisting_fixture()
1829        users = Table('users', meta2, quote=True,
1830                      keep_existing=True)
1831        assert not users.name.quote
1832
1833    def test_keep_existing_add_column_no_reflection(self):
1834        meta2 = self._useexisting_fixture()
1835        users = Table('users', meta2,
1836                      Column('foo', Integer),
1837                      keep_existing=True)
1838        assert "foo" not in users.c
1839
1840    def test_extend_existing_coltype(self):
1841        meta2 = self._useexisting_fixture()
1842        users = Table('users', meta2, Column('name', Unicode),
1843                      autoload=True, extend_existing=True)
1844        assert isinstance(users.c.name.type, Unicode)
1845
1846    def test_extend_existing_quote(self):
1847        meta2 = self._useexisting_fixture()
1848        assert_raises_message(
1849            tsa.exc.ArgumentError,
1850            "Can't redefine 'quote' or 'quote_schema' arguments",
1851            Table, 'users', meta2, quote=True, autoload=True,
1852            extend_existing=True
1853        )
1854
1855    def test_extend_existing_add_column(self):
1856        meta2 = self._useexisting_fixture()
1857        users = Table('users', meta2,
1858                      Column('foo', Integer),
1859                      autoload=True,
1860                      extend_existing=True)
1861        assert "foo" in users.c
1862
1863    def test_extend_existing_coltype_no_orig(self):
1864        meta2 = self._notexisting_fixture()
1865        users = Table('users', meta2, Column('name', Unicode),
1866                      autoload=True, extend_existing=True)
1867        assert isinstance(users.c.name.type, Unicode)
1868
1869    @testing.skip_if(
1870        lambda: testing.db.dialect.requires_name_normalize,
1871        "test depends on lowercase as case insensitive")
1872    def test_extend_existing_quote_no_orig(self):
1873        meta2 = self._notexisting_fixture()
1874        users = Table('users', meta2, quote=True,
1875                      autoload=True,
1876                      extend_existing=True)
1877        assert users.name.quote
1878
1879    def test_extend_existing_add_column_no_orig(self):
1880        meta2 = self._notexisting_fixture()
1881        users = Table('users', meta2,
1882                      Column('foo', Integer),
1883                      autoload=True,
1884                      extend_existing=True)
1885        assert "foo" in users.c
1886
1887    def test_extend_existing_coltype_no_reflection(self):
1888        meta2 = self._useexisting_fixture()
1889        users = Table('users', meta2, Column('name', Unicode),
1890                      extend_existing=True)
1891        assert isinstance(users.c.name.type, Unicode)
1892
1893    def test_extend_existing_quote_no_reflection(self):
1894        meta2 = self._useexisting_fixture()
1895        assert_raises_message(
1896            tsa.exc.ArgumentError,
1897            "Can't redefine 'quote' or 'quote_schema' arguments",
1898            Table, 'users', meta2, quote=True,
1899            extend_existing=True
1900        )
1901
1902    def test_extend_existing_add_column_no_reflection(self):
1903        meta2 = self._useexisting_fixture()
1904        users = Table('users', meta2,
1905                      Column('foo', Integer),
1906                      extend_existing=True)
1907        assert "foo" in users.c
1908
1909
1910class IndexTest(fixtures.TestBase):
1911
1912    def _assert(self, t, i, columns=True):
1913        eq_(t.indexes, set([i]))
1914        if columns:
1915            eq_(list(i.columns), [t.c.x])
1916        else:
1917            eq_(list(i.columns), [])
1918        assert i.table is t
1919
1920    def test_separate_decl_columns(self):
1921        m = MetaData()
1922        t = Table('t', m, Column('x', Integer))
1923        i = Index('i', t.c.x)
1924        self._assert(t, i)
1925
1926    def test_separate_decl_columns_functional(self):
1927        m = MetaData()
1928        t = Table('t', m, Column('x', Integer))
1929        i = Index('i', func.foo(t.c.x))
1930        self._assert(t, i)
1931
1932    def test_inline_decl_columns(self):
1933        m = MetaData()
1934        c = Column('x', Integer)
1935        i = Index('i', c)
1936        t = Table('t', m, c, i)
1937        self._assert(t, i)
1938
1939    def test_inline_decl_columns_functional(self):
1940        m = MetaData()
1941        c = Column('x', Integer)
1942        i = Index('i', func.foo(c))
1943        t = Table('t', m, c, i)
1944        self._assert(t, i)
1945
1946    def test_inline_decl_string(self):
1947        m = MetaData()
1948        i = Index('i', "x")
1949        t = Table('t', m, Column('x', Integer), i)
1950        self._assert(t, i)
1951
1952    def test_inline_decl_textonly(self):
1953        m = MetaData()
1954        i = Index('i', text("foobar(x)"))
1955        t = Table('t', m, Column('x', Integer), i)
1956        self._assert(t, i, columns=False)
1957
1958    def test_separate_decl_textonly(self):
1959        m = MetaData()
1960        i = Index('i', text("foobar(x)"))
1961        t = Table('t', m, Column('x', Integer))
1962        t.append_constraint(i)
1963        self._assert(t, i, columns=False)
1964
1965    def test_unnamed_column_exception(self):
1966        # this can occur in some declarative situations
1967        c = Column(Integer)
1968        idx = Index('q', c)
1969        m = MetaData()
1970        t = Table('t', m, Column('q'))
1971        assert_raises_message(
1972            exc.ArgumentError,
1973            "Can't add unnamed column to column collection",
1974            t.append_constraint, idx
1975        )
1976
1977
1978class ConstraintTest(fixtures.TestBase):
1979
1980    def _single_fixture(self):
1981        m = MetaData()
1982
1983        t1 = Table('t1', m,
1984                   Column('a', Integer),
1985                   Column('b', Integer)
1986                   )
1987
1988        t2 = Table('t2', m,
1989                   Column('a', Integer, ForeignKey('t1.a'))
1990                   )
1991
1992        t3 = Table('t3', m,
1993                   Column('a', Integer)
1994                   )
1995        return t1, t2, t3
1996
1997    def test_table_references(self):
1998        t1, t2, t3 = self._single_fixture()
1999        assert list(t2.c.a.foreign_keys)[0].references(t1)
2000        assert not list(t2.c.a.foreign_keys)[0].references(t3)
2001
2002    def test_column_references(self):
2003        t1, t2, t3 = self._single_fixture()
2004        assert t2.c.a.references(t1.c.a)
2005        assert not t2.c.a.references(t3.c.a)
2006        assert not t2.c.a.references(t1.c.b)
2007
2008    def test_column_references_derived(self):
2009        t1, t2, t3 = self._single_fixture()
2010        s1 = tsa.select([tsa.select([t1]).alias()])
2011        assert t2.c.a.references(s1.c.a)
2012        assert not t2.c.a.references(s1.c.b)
2013
2014    def test_copy_doesnt_reference(self):
2015        t1, t2, t3 = self._single_fixture()
2016        a2 = t2.c.a.copy()
2017        assert not a2.references(t1.c.a)
2018        assert not a2.references(t1.c.b)
2019
2020    def test_derived_column_references(self):
2021        t1, t2, t3 = self._single_fixture()
2022        s1 = tsa.select([tsa.select([t2]).alias()])
2023        assert s1.c.a.references(t1.c.a)
2024        assert not s1.c.a.references(t1.c.b)
2025
2026    def test_referred_table_accessor(self):
2027        t1, t2, t3 = self._single_fixture()
2028        fkc = list(t2.foreign_key_constraints)[0]
2029        is_(fkc.referred_table, t1)
2030
2031    def test_referred_table_accessor_not_available(self):
2032        t1 = Table('t', MetaData(), Column('x', ForeignKey('q.id')))
2033        fkc = list(t1.foreign_key_constraints)[0]
2034        assert_raises_message(
2035            exc.InvalidRequestError,
2036            "Foreign key associated with column 't.x' could not find "
2037            "table 'q' with which to generate a foreign key to target "
2038            "column 'id'",
2039            getattr, fkc, "referred_table"
2040        )
2041
2042    def test_related_column_not_present_atfirst_ok(self):
2043        m = MetaData()
2044        base_table = Table("base", m,
2045                           Column("id", Integer, primary_key=True)
2046                           )
2047        fk = ForeignKey('base.q')
2048        derived_table = Table("derived", m,
2049                              Column("id", None, fk,
2050                                     primary_key=True),
2051                              )
2052
2053        base_table.append_column(Column('q', Integer))
2054        assert fk.column is base_table.c.q
2055        assert isinstance(derived_table.c.id.type, Integer)
2056
2057    def test_related_column_not_present_atfirst_ok_onname(self):
2058        m = MetaData()
2059        base_table = Table("base", m,
2060                           Column("id", Integer, primary_key=True)
2061                           )
2062        fk = ForeignKey('base.q', link_to_name=True)
2063        derived_table = Table("derived", m,
2064                              Column("id", None, fk,
2065                                     primary_key=True),
2066                              )
2067
2068        base_table.append_column(Column('q', Integer, key='zz'))
2069        assert fk.column is base_table.c.zz
2070        assert isinstance(derived_table.c.id.type, Integer)
2071
2072    def test_related_column_not_present_atfirst_ok_linktoname_conflict(self):
2073        m = MetaData()
2074        base_table = Table("base", m,
2075                           Column("id", Integer, primary_key=True)
2076                           )
2077        fk = ForeignKey('base.q', link_to_name=True)
2078        derived_table = Table("derived", m,
2079                              Column("id", None, fk,
2080                                     primary_key=True),
2081                              )
2082
2083        base_table.append_column(Column('zz', Integer, key='q'))
2084        base_table.append_column(Column('q', Integer, key='zz'))
2085        assert fk.column is base_table.c.zz
2086        assert isinstance(derived_table.c.id.type, Integer)
2087
2088    def test_invalid_composite_fk_check_strings(self):
2089        m = MetaData()
2090
2091        assert_raises_message(
2092            exc.ArgumentError,
2093            r"ForeignKeyConstraint on t1\(x, y\) refers to "
2094            "multiple remote tables: t2 and t3",
2095            Table,
2096            't1', m, Column('x', Integer), Column('y', Integer),
2097            ForeignKeyConstraint(['x', 'y'], ['t2.x', 't3.y'])
2098        )
2099
2100    def test_invalid_composite_fk_check_columns(self):
2101        m = MetaData()
2102
2103        t2 = Table('t2', m, Column('x', Integer))
2104        t3 = Table('t3', m, Column('y', Integer))
2105
2106        assert_raises_message(
2107            exc.ArgumentError,
2108            r"ForeignKeyConstraint on t1\(x, y\) refers to "
2109            "multiple remote tables: t2 and t3",
2110            Table,
2111            't1', m, Column('x', Integer), Column('y', Integer),
2112            ForeignKeyConstraint(['x', 'y'], [t2.c.x, t3.c.y])
2113        )
2114
2115    def test_invalid_composite_fk_check_columns_notattached(self):
2116        m = MetaData()
2117        x = Column('x', Integer)
2118        y = Column('y', Integer)
2119
2120        # no error is raised for this one right now.
2121        # which is a minor bug.
2122        Table('t1', m, Column('x', Integer), Column('y', Integer),
2123              ForeignKeyConstraint(['x', 'y'], [x, y])
2124              )
2125
2126        Table('t2', m, x)
2127        Table('t3', m, y)
2128
2129    def test_constraint_copied_to_proxy_ok(self):
2130        m = MetaData()
2131        Table('t1', m, Column('id', Integer, primary_key=True))
2132        t2 = Table('t2', m, Column('id', Integer, ForeignKey('t1.id'),
2133                                   primary_key=True))
2134
2135        s = tsa.select([t2])
2136        t2fk = list(t2.c.id.foreign_keys)[0]
2137        sfk = list(s.c.id.foreign_keys)[0]
2138
2139        # the two FKs share the ForeignKeyConstraint
2140        is_(
2141            t2fk.constraint,
2142            sfk.constraint
2143        )
2144
2145        # but the ForeignKeyConstraint isn't
2146        # aware of the select's FK
2147        eq_(
2148            t2fk.constraint.elements,
2149            [t2fk]
2150        )
2151
2152    def test_type_propagate_composite_fk_string(self):
2153        metadata = MetaData()
2154        Table(
2155            'a', metadata,
2156            Column('key1', Integer, primary_key=True),
2157            Column('key2', String(40), primary_key=True))
2158
2159        b = Table('b', metadata,
2160                  Column('a_key1', None),
2161                  Column('a_key2', None),
2162                  Column('id', Integer, primary_key=True),
2163                  ForeignKeyConstraint(['a_key1', 'a_key2'],
2164                                       ['a.key1', 'a.key2'])
2165                  )
2166
2167        assert isinstance(b.c.a_key1.type, Integer)
2168        assert isinstance(b.c.a_key2.type, String)
2169
2170    def test_type_propagate_composite_fk_col(self):
2171        metadata = MetaData()
2172        a = Table('a', metadata,
2173                  Column('key1', Integer, primary_key=True),
2174                  Column('key2', String(40), primary_key=True))
2175
2176        b = Table('b', metadata,
2177                  Column('a_key1', None),
2178                  Column('a_key2', None),
2179                  Column('id', Integer, primary_key=True),
2180                  ForeignKeyConstraint(['a_key1', 'a_key2'],
2181                                       [a.c.key1, a.c.key2])
2182                  )
2183
2184        assert isinstance(b.c.a_key1.type, Integer)
2185        assert isinstance(b.c.a_key2.type, String)
2186
2187    def test_type_propagate_standalone_fk_string(self):
2188        metadata = MetaData()
2189        Table(
2190            'a', metadata,
2191            Column('key1', Integer, primary_key=True))
2192
2193        b = Table('b', metadata,
2194                  Column('a_key1', None, ForeignKey("a.key1")),
2195                  )
2196
2197        assert isinstance(b.c.a_key1.type, Integer)
2198
2199    def test_type_propagate_standalone_fk_col(self):
2200        metadata = MetaData()
2201        a = Table('a', metadata,
2202                  Column('key1', Integer, primary_key=True))
2203
2204        b = Table('b', metadata,
2205                  Column('a_key1', None, ForeignKey(a.c.key1)),
2206                  )
2207
2208        assert isinstance(b.c.a_key1.type, Integer)
2209
2210    def test_type_propagate_chained_string_source_first(self):
2211        metadata = MetaData()
2212        Table(
2213            'a', metadata,
2214            Column('key1', Integer, primary_key=True)
2215        )
2216
2217        b = Table('b', metadata,
2218                  Column('a_key1', None, ForeignKey("a.key1")),
2219                  )
2220
2221        c = Table('c', metadata,
2222                  Column('b_key1', None, ForeignKey("b.a_key1")),
2223                  )
2224
2225        assert isinstance(b.c.a_key1.type, Integer)
2226        assert isinstance(c.c.b_key1.type, Integer)
2227
2228    def test_type_propagate_chained_string_source_last(self):
2229        metadata = MetaData()
2230
2231        b = Table('b', metadata,
2232                  Column('a_key1', None, ForeignKey("a.key1")),
2233                  )
2234
2235        c = Table('c', metadata,
2236                  Column('b_key1', None, ForeignKey("b.a_key1")),
2237                  )
2238
2239        Table(
2240            'a', metadata,
2241            Column('key1', Integer, primary_key=True))
2242
2243        assert isinstance(b.c.a_key1.type, Integer)
2244        assert isinstance(c.c.b_key1.type, Integer)
2245
2246    def test_type_propagate_chained_string_source_last_onname(self):
2247        metadata = MetaData()
2248
2249        b = Table('b', metadata,
2250                  Column(
2251                      'a_key1', None,
2252                      ForeignKey("a.key1", link_to_name=True), key="ak1"),
2253                  )
2254
2255        c = Table('c', metadata,
2256                  Column(
2257                      'b_key1', None,
2258                      ForeignKey("b.a_key1", link_to_name=True), key="bk1"),
2259                  )
2260
2261        Table(
2262            'a', metadata,
2263            Column('key1', Integer, primary_key=True, key='ak1'))
2264
2265        assert isinstance(b.c.ak1.type, Integer)
2266        assert isinstance(c.c.bk1.type, Integer)
2267
2268    def test_type_propagate_chained_string_source_last_onname_conflict(self):
2269        metadata = MetaData()
2270
2271        b = Table('b', metadata,
2272                  # b.c.key1 -> a.c.key1 -> String
2273                  Column(
2274                      'ak1', None,
2275                      ForeignKey("a.key1", link_to_name=False), key="key1"),
2276                  # b.c.ak1 -> a.c.ak1 -> Integer
2277                  Column(
2278                      'a_key1', None,
2279                      ForeignKey("a.key1", link_to_name=True), key="ak1"),
2280                  )
2281
2282        c = Table('c', metadata,
2283                  # c.c.b_key1 -> b.c.ak1 -> Integer
2284                  Column(
2285                      'b_key1', None,
2286                      ForeignKey("b.ak1", link_to_name=False)),
2287                  # c.c.b_ak1 -> b.c.ak1
2288                  Column(
2289                      'b_ak1', None,
2290                      ForeignKey("b.ak1", link_to_name=True)),
2291                  )
2292
2293        Table(
2294            'a', metadata,
2295            # a.c.key1
2296            Column('ak1', String, key="key1"),
2297            # a.c.ak1
2298            Column('key1', Integer, primary_key=True, key='ak1'),
2299        )
2300
2301        assert isinstance(b.c.key1.type, String)
2302        assert isinstance(b.c.ak1.type, Integer)
2303
2304        assert isinstance(c.c.b_ak1.type, String)
2305        assert isinstance(c.c.b_key1.type, Integer)
2306
2307    def test_type_propagate_chained_col_orig_first(self):
2308        metadata = MetaData()
2309        a = Table('a', metadata,
2310                  Column('key1', Integer, primary_key=True))
2311
2312        b = Table('b', metadata,
2313                  Column('a_key1', None, ForeignKey(a.c.key1)),
2314                  )
2315
2316        c = Table('c', metadata,
2317                  Column('b_key1', None, ForeignKey(b.c.a_key1)),
2318                  )
2319
2320        assert isinstance(b.c.a_key1.type, Integer)
2321        assert isinstance(c.c.b_key1.type, Integer)
2322
2323    def test_column_accessor_col(self):
2324        c1 = Column('x', Integer)
2325        fk = ForeignKey(c1)
2326        is_(fk.column, c1)
2327
2328    def test_column_accessor_clause_element(self):
2329        c1 = Column('x', Integer)
2330
2331        class CThing(object):
2332
2333            def __init__(self, c):
2334                self.c = c
2335
2336            def __clause_element__(self):
2337                return self.c
2338
2339        fk = ForeignKey(CThing(c1))
2340        is_(fk.column, c1)
2341
2342    def test_column_accessor_string_no_parent(self):
2343        fk = ForeignKey("sometable.somecol")
2344        assert_raises_message(
2345            exc.InvalidRequestError,
2346            "this ForeignKey object does not yet have a parent "
2347            "Column associated with it.",
2348            getattr, fk, "column"
2349        )
2350
2351    def test_column_accessor_string_no_parent_table(self):
2352        fk = ForeignKey("sometable.somecol")
2353        Column('x', fk)
2354        assert_raises_message(
2355            exc.InvalidRequestError,
2356            "this ForeignKey's parent column is not yet "
2357            "associated with a Table.",
2358            getattr, fk, "column"
2359        )
2360
2361    def test_column_accessor_string_no_target_table(self):
2362        fk = ForeignKey("sometable.somecol")
2363        c1 = Column('x', fk)
2364        Table('t', MetaData(), c1)
2365        assert_raises_message(
2366            exc.NoReferencedTableError,
2367            "Foreign key associated with column 't.x' could not find "
2368            "table 'sometable' with which to generate a "
2369            "foreign key to target column 'somecol'",
2370            getattr, fk, "column"
2371        )
2372
2373    def test_column_accessor_string_no_target_column(self):
2374        fk = ForeignKey("sometable.somecol")
2375        c1 = Column('x', fk)
2376        m = MetaData()
2377        Table('t', m, c1)
2378        Table("sometable", m, Column('notsomecol', Integer))
2379        assert_raises_message(
2380            exc.NoReferencedColumnError,
2381            "Could not initialize target column for ForeignKey "
2382            "'sometable.somecol' on table 't': "
2383            "table 'sometable' has no column named 'somecol'",
2384            getattr, fk, "column"
2385        )
2386
2387    def test_remove_table_fk_bookkeeping(self):
2388        metadata = MetaData()
2389        fk = ForeignKey('t1.x')
2390        t2 = Table('t2', metadata, Column('y', Integer, fk))
2391        t3 = Table('t3', metadata, Column('y', Integer, ForeignKey('t1.x')))
2392
2393        assert t2.key in metadata.tables
2394        assert ("t1", "x") in metadata._fk_memos
2395
2396        metadata.remove(t2)
2397
2398        # key is removed
2399        assert t2.key not in metadata.tables
2400
2401        # the memo for the FK is still there
2402        assert ("t1", "x") in metadata._fk_memos
2403
2404        # fk is not in the collection
2405        assert fk not in metadata._fk_memos[("t1", "x")]
2406
2407        # make the referenced table
2408        t1 = Table('t1', metadata, Column('x', Integer))
2409
2410        # t2 tells us exactly what's wrong
2411        assert_raises_message(
2412            exc.InvalidRequestError,
2413            "Table t2 is no longer associated with its parent MetaData",
2414            getattr, fk, "column"
2415        )
2416
2417        # t3 is unaffected
2418        assert t3.c.y.references(t1.c.x)
2419
2420        # remove twice OK
2421        metadata.remove(t2)
2422
2423
2424class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
2425
2426    """Test Column() construction."""
2427
2428    __dialect__ = 'default'
2429
2430    def columns(self):
2431        return [Column(Integer),
2432                Column('b', Integer),
2433                Column(Integer),
2434                Column('d', Integer),
2435                Column(Integer, name='e'),
2436                Column(type_=Integer),
2437                Column(Integer()),
2438                Column('h', Integer()),
2439                Column(type_=Integer())]
2440
2441    def test_basic(self):
2442        c = self.columns()
2443
2444        for i, v in ((0, 'a'), (2, 'c'), (5, 'f'), (6, 'g'), (8, 'i')):
2445            c[i].name = v
2446            c[i].key = v
2447        del i, v
2448
2449        tbl = Table('table', MetaData(), *c)
2450
2451        for i, col in enumerate(tbl.c):
2452            assert col.name == c[i].name
2453
2454    def test_name_none(self):
2455
2456        c = Column(Integer)
2457        assert_raises_message(
2458            exc.ArgumentError,
2459            "Column must be constructed with a non-blank name or assign a "
2460            "non-blank .name ",
2461            Table, 't', MetaData(), c)
2462
2463    def test_name_blank(self):
2464
2465        c = Column('', Integer)
2466        assert_raises_message(
2467            exc.ArgumentError,
2468            "Column must be constructed with a non-blank name or assign a "
2469            "non-blank .name ",
2470            Table, 't', MetaData(), c)
2471
2472    def test_dupe_column(self):
2473        c = Column('x', Integer)
2474        Table('t', MetaData(), c)
2475
2476        assert_raises_message(
2477            exc.ArgumentError,
2478            "Column object 'x' already assigned to Table 't'",
2479            Table, 'q', MetaData(), c)
2480
2481    def test_incomplete_key(self):
2482        c = Column(Integer)
2483        assert c.name is None
2484        assert c.key is None
2485
2486        c.name = 'named'
2487        Table('t', MetaData(), c)
2488
2489        assert c.name == 'named'
2490        assert c.name == c.key
2491
2492    def test_unique_index_flags_default_to_none(self):
2493        c = Column(Integer)
2494        eq_(c.unique, None)
2495        eq_(c.index, None)
2496
2497        c = Column('c', Integer, index=True)
2498        eq_(c.unique, None)
2499        eq_(c.index, True)
2500
2501        t = Table('t', MetaData(), c)
2502        eq_(list(t.indexes)[0].unique, False)
2503
2504        c = Column(Integer, unique=True)
2505        eq_(c.unique, True)
2506        eq_(c.index, None)
2507
2508        c = Column('c', Integer, index=True, unique=True)
2509        eq_(c.unique, True)
2510        eq_(c.index, True)
2511
2512        t = Table('t', MetaData(), c)
2513        eq_(list(t.indexes)[0].unique, True)
2514
2515    def test_bogus(self):
2516        assert_raises(exc.ArgumentError, Column, 'foo', name='bar')
2517        assert_raises(exc.ArgumentError, Column, 'foo', Integer,
2518                      type_=Integer())
2519
2520    def test_custom_subclass_proxy(self):
2521        """test proxy generation of a Column subclass, can be compiled."""
2522
2523        from sqlalchemy.schema import Column
2524        from sqlalchemy.ext.compiler import compiles
2525        from sqlalchemy.sql import select
2526
2527        class MyColumn(Column):
2528
2529            def _constructor(self, name, type, **kw):
2530                kw['name'] = name
2531                return MyColumn(type, **kw)
2532
2533            def __init__(self, type, **kw):
2534                Column.__init__(self, type, **kw)
2535
2536            def my_goofy_thing(self):
2537                return "hi"
2538
2539        @compiles(MyColumn)
2540        def goofy(element, compiler, **kw):
2541            s = compiler.visit_column(element, **kw)
2542            return s + "-"
2543
2544        id = MyColumn(Integer, primary_key=True)
2545        id.name = 'id'
2546        name = MyColumn(String)
2547        name.name = 'name'
2548        t1 = Table('foo', MetaData(),
2549                   id,
2550                   name
2551                   )
2552
2553        # goofy thing
2554        eq_(t1.c.name.my_goofy_thing(), "hi")
2555
2556        # create proxy
2557        s = select([t1.select().alias()])
2558
2559        # proxy has goofy thing
2560        eq_(s.c.name.my_goofy_thing(), "hi")
2561
2562        # compile works
2563        self.assert_compile(
2564            select([t1.select().alias()]),
2565            "SELECT anon_1.id-, anon_1.name- FROM "
2566            "(SELECT foo.id- AS id, foo.name- AS name "
2567            "FROM foo) AS anon_1",
2568        )
2569
2570    def test_custom_subclass_proxy_typeerror(self):
2571        from sqlalchemy.schema import Column
2572        from sqlalchemy.sql import select
2573
2574        class MyColumn(Column):
2575
2576            def __init__(self, type, **kw):
2577                Column.__init__(self, type, **kw)
2578
2579        id = MyColumn(Integer, primary_key=True)
2580        id.name = 'id'
2581        name = MyColumn(String)
2582        name.name = 'name'
2583        t1 = Table('foo', MetaData(),
2584                   id,
2585                   name
2586                   )
2587        assert_raises_message(
2588            TypeError,
2589            "Could not create a copy of this <class "
2590            "'test.sql.test_metadata..*MyColumn'> "
2591            "object.  Ensure the class includes a _constructor()",
2592            getattr, select([t1.select().alias()]), 'c'
2593        )
2594
2595    def test_custom_create(self):
2596        from sqlalchemy.ext.compiler import compiles, deregister
2597
2598        @compiles(schema.CreateColumn)
2599        def compile(element, compiler, **kw):
2600            column = element.element
2601
2602            if "special" not in column.info:
2603                return compiler.visit_create_column(element, **kw)
2604
2605            text = "%s SPECIAL DIRECTIVE %s" % (
2606                column.name,
2607                compiler.type_compiler.process(column.type)
2608            )
2609            default = compiler.get_column_default_string(column)
2610            if default is not None:
2611                text += " DEFAULT " + default
2612
2613            if not column.nullable:
2614                text += " NOT NULL"
2615
2616            if column.constraints:
2617                text += " ".join(
2618                    compiler.process(const)
2619                    for const in column.constraints)
2620            return text
2621
2622        t = Table(
2623            'mytable', MetaData(),
2624            Column('x', Integer, info={
2625                   "special": True}, primary_key=True),
2626            Column('y', String(50)),
2627            Column('z', String(20), info={
2628                   "special": True}))
2629
2630        self.assert_compile(
2631            schema.CreateTable(t),
2632            "CREATE TABLE mytable (x SPECIAL DIRECTIVE INTEGER "
2633            "NOT NULL, y VARCHAR(50), "
2634            "z SPECIAL DIRECTIVE VARCHAR(20), PRIMARY KEY (x))"
2635        )
2636
2637        deregister(schema.CreateColumn)
2638
2639
2640class ColumnDefaultsTest(fixtures.TestBase):
2641
2642    """test assignment of default fixures to columns"""
2643
2644    def _fixture(self, *arg, **kw):
2645        return Column('x', Integer, *arg, **kw)
2646
2647    def test_server_default_positional(self):
2648        target = schema.DefaultClause('y')
2649        c = self._fixture(target)
2650        assert c.server_default is target
2651        assert target.column is c
2652
2653    def test_onupdate_default_not_server_default_one(self):
2654        target1 = schema.DefaultClause('y')
2655        target2 = schema.DefaultClause('z')
2656
2657        c = self._fixture(server_default=target1, server_onupdate=target2)
2658        eq_(c.server_default.arg, 'y')
2659        eq_(c.server_onupdate.arg, 'z')
2660
2661    def test_onupdate_default_not_server_default_two(self):
2662        target1 = schema.DefaultClause('y', for_update=True)
2663        target2 = schema.DefaultClause('z', for_update=True)
2664
2665        c = self._fixture(server_default=target1, server_onupdate=target2)
2666        eq_(c.server_default.arg, 'y')
2667        eq_(c.server_onupdate.arg, 'z')
2668
2669    def test_onupdate_default_not_server_default_three(self):
2670        target1 = schema.DefaultClause('y', for_update=False)
2671        target2 = schema.DefaultClause('z', for_update=True)
2672
2673        c = self._fixture(target1, target2)
2674        eq_(c.server_default.arg, 'y')
2675        eq_(c.server_onupdate.arg, 'z')
2676
2677    def test_onupdate_default_not_server_default_four(self):
2678        target1 = schema.DefaultClause('y', for_update=False)
2679
2680        c = self._fixture(server_onupdate=target1)
2681        is_(c.server_default, None)
2682        eq_(c.server_onupdate.arg, 'y')
2683
2684    def test_server_default_keyword_as_schemaitem(self):
2685        target = schema.DefaultClause('y')
2686        c = self._fixture(server_default=target)
2687        assert c.server_default is target
2688        assert target.column is c
2689
2690    def test_server_default_keyword_as_clause(self):
2691        target = 'y'
2692        c = self._fixture(server_default=target)
2693        assert c.server_default.arg == target
2694        assert c.server_default.column is c
2695
2696    def test_server_default_onupdate_positional(self):
2697        target = schema.DefaultClause('y', for_update=True)
2698        c = self._fixture(target)
2699        assert c.server_onupdate is target
2700        assert target.column is c
2701
2702    def test_server_default_onupdate_keyword_as_schemaitem(self):
2703        target = schema.DefaultClause('y', for_update=True)
2704        c = self._fixture(server_onupdate=target)
2705        assert c.server_onupdate is target
2706        assert target.column is c
2707
2708    def test_server_default_onupdate_keyword_as_clause(self):
2709        target = 'y'
2710        c = self._fixture(server_onupdate=target)
2711        assert c.server_onupdate.arg == target
2712        assert c.server_onupdate.column is c
2713
2714    def test_column_default_positional(self):
2715        target = schema.ColumnDefault('y')
2716        c = self._fixture(target)
2717        assert c.default is target
2718        assert target.column is c
2719
2720    def test_column_default_keyword_as_schemaitem(self):
2721        target = schema.ColumnDefault('y')
2722        c = self._fixture(default=target)
2723        assert c.default is target
2724        assert target.column is c
2725
2726    def test_column_default_keyword_as_clause(self):
2727        target = 'y'
2728        c = self._fixture(default=target)
2729        assert c.default.arg == target
2730        assert c.default.column is c
2731
2732    def test_column_default_onupdate_positional(self):
2733        target = schema.ColumnDefault('y', for_update=True)
2734        c = self._fixture(target)
2735        assert c.onupdate is target
2736        assert target.column is c
2737
2738    def test_column_default_onupdate_keyword_as_schemaitem(self):
2739        target = schema.ColumnDefault('y', for_update=True)
2740        c = self._fixture(onupdate=target)
2741        assert c.onupdate is target
2742        assert target.column is c
2743
2744    def test_column_default_onupdate_keyword_as_clause(self):
2745        target = 'y'
2746        c = self._fixture(onupdate=target)
2747        assert c.onupdate.arg == target
2748        assert c.onupdate.column is c
2749
2750
2751class ColumnOptionsTest(fixtures.TestBase):
2752
2753    def test_default_generators(self):
2754        g1, g2 = Sequence('foo_id_seq'), ColumnDefault('f5')
2755        assert Column(String, default=g1).default is g1
2756        assert Column(String, onupdate=g1).onupdate is g1
2757        assert Column(String, default=g2).default is g2
2758        assert Column(String, onupdate=g2).onupdate is g2
2759
2760    def _null_type_error(self, col):
2761        t = Table('t', MetaData(), col)
2762        assert_raises_message(
2763            exc.CompileError,
2764            r"\(in table 't', column 'foo'\): Can't generate DDL for NullType",
2765            schema.CreateTable(t).compile
2766        )
2767
2768    def _no_name_error(self, col):
2769        assert_raises_message(
2770            exc.ArgumentError,
2771            "Column must be constructed with a non-blank name or "
2772            "assign a non-blank .name",
2773            Table, 't', MetaData(), col
2774        )
2775
2776    def _no_error(self, col):
2777        m = MetaData()
2778        b = Table('bar', m, Column('id', Integer))
2779        t = Table('t', m, col)
2780        schema.CreateTable(t).compile()
2781
2782    def test_argument_signatures(self):
2783        self._no_name_error(Column())
2784        self._null_type_error(Column("foo"))
2785        self._no_name_error(Column(default="foo"))
2786
2787        self._no_name_error(Column(Sequence("a")))
2788        self._null_type_error(Column("foo", default="foo"))
2789
2790        self._null_type_error(Column("foo", Sequence("a")))
2791
2792        self._no_name_error(Column(ForeignKey('bar.id')))
2793
2794        self._no_error(Column("foo", ForeignKey('bar.id')))
2795
2796        self._no_name_error(Column(ForeignKey('bar.id'), default="foo"))
2797
2798        self._no_name_error(Column(ForeignKey('bar.id'), Sequence("a")))
2799        self._no_error(Column("foo", ForeignKey('bar.id'), default="foo"))
2800        self._no_error(Column("foo", ForeignKey('bar.id'), Sequence("a")))
2801
2802    def test_column_info(self):
2803
2804        c1 = Column('foo', String, info={'x': 'y'})
2805        c2 = Column('bar', String, info={})
2806        c3 = Column('bat', String)
2807        assert c1.info == {'x': 'y'}
2808        assert c2.info == {}
2809        assert c3.info == {}
2810
2811        for c in (c1, c2, c3):
2812            c.info['bar'] = 'zip'
2813            assert c.info['bar'] == 'zip'
2814
2815
2816class CatchAllEventsTest(fixtures.RemovesEvents, fixtures.TestBase):
2817
2818    def test_all_events(self):
2819        canary = []
2820
2821        def before_attach(obj, parent):
2822            canary.append("%s->%s" % (obj.__class__.__name__,
2823                                      parent.__class__.__name__))
2824
2825        def after_attach(obj, parent):
2826            canary.append("%s->%s" % (obj.__class__.__name__, parent))
2827
2828        self.event_listen(
2829            schema.SchemaItem,
2830            "before_parent_attach",
2831            before_attach)
2832        self.event_listen(
2833            schema.SchemaItem,
2834            "after_parent_attach",
2835            after_attach)
2836
2837        m = MetaData()
2838        Table('t1', m,
2839              Column('id', Integer, Sequence('foo_id'), primary_key=True),
2840              Column('bar', String, ForeignKey('t2.id'))
2841              )
2842        Table('t2', m,
2843              Column('id', Integer, primary_key=True),
2844              )
2845
2846        eq_(
2847            canary,
2848            ['Sequence->Column', 'Sequence->id', 'ForeignKey->Column',
2849             'ForeignKey->bar', 'Table->MetaData',
2850             'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t1',
2851             'Column->Table', 'Column->t1', 'Column->Table',
2852             'Column->t1', 'ForeignKeyConstraint->Table',
2853             'ForeignKeyConstraint->t1', 'Table->MetaData(bind=None)',
2854             'Table->MetaData', 'PrimaryKeyConstraint->Table',
2855             'PrimaryKeyConstraint->t2', 'Column->Table', 'Column->t2',
2856             'Table->MetaData(bind=None)']
2857        )
2858
2859    def test_events_per_constraint(self):
2860        canary = []
2861
2862        def evt(target):
2863            def before_attach(obj, parent):
2864                canary.append("%s->%s" % (target.__name__,
2865                                          parent.__class__.__name__))
2866
2867            def after_attach(obj, parent):
2868                assert hasattr(obj, 'name')  # so we can change it
2869                canary.append("%s->%s" % (target.__name__, parent))
2870            self.event_listen(target, "before_parent_attach", before_attach)
2871            self.event_listen(target, "after_parent_attach", after_attach)
2872
2873        for target in [
2874            schema.ForeignKeyConstraint, schema.PrimaryKeyConstraint,
2875            schema.UniqueConstraint,
2876            schema.CheckConstraint,
2877            schema.Index
2878        ]:
2879            evt(target)
2880
2881        m = MetaData()
2882        Table('t1', m,
2883              Column('id', Integer, Sequence('foo_id'), primary_key=True),
2884              Column('bar', String, ForeignKey('t2.id'), index=True),
2885              Column('bat', Integer, unique=True),
2886              )
2887        Table('t2', m,
2888              Column('id', Integer, primary_key=True),
2889              Column('bar', Integer),
2890              Column('bat', Integer),
2891              CheckConstraint("bar>5"),
2892              UniqueConstraint('bar', 'bat'),
2893              Index(None, 'bar', 'bat')
2894              )
2895        eq_(
2896            canary,
2897            [
2898                'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t1',
2899                'Index->Table', 'Index->t1',
2900                'ForeignKeyConstraint->Table', 'ForeignKeyConstraint->t1',
2901                'UniqueConstraint->Table', 'UniqueConstraint->t1',
2902                'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t2',
2903                'CheckConstraint->Table', 'CheckConstraint->t2',
2904                'UniqueConstraint->Table', 'UniqueConstraint->t2',
2905                'Index->Table', 'Index->t2'
2906            ]
2907        )
2908
2909
2910class DialectKWArgTest(fixtures.TestBase):
2911
2912    @contextmanager
2913    def _fixture(self):
2914        from sqlalchemy.engine.default import DefaultDialect
2915
2916        class ParticipatingDialect(DefaultDialect):
2917            construct_arguments = [
2918                (schema.Index, {
2919                    "x": 5,
2920                    "y": False,
2921                    "z_one": None
2922                }),
2923                (schema.ForeignKeyConstraint, {
2924                    "foobar": False
2925                })
2926            ]
2927
2928        class ParticipatingDialect2(DefaultDialect):
2929            construct_arguments = [
2930                (schema.Index, {
2931                    "x": 9,
2932                    "y": True,
2933                    "pp": "default"
2934                }),
2935                (schema.Table, {
2936                    "*": None
2937                })
2938            ]
2939
2940        class NonParticipatingDialect(DefaultDialect):
2941            construct_arguments = None
2942
2943        def load(dialect_name):
2944            if dialect_name == "participating":
2945                return ParticipatingDialect
2946            elif dialect_name == "participating2":
2947                return ParticipatingDialect2
2948            elif dialect_name == "nonparticipating":
2949                return NonParticipatingDialect
2950            else:
2951                raise exc.NoSuchModuleError("no dialect %r" % dialect_name)
2952        with mock.patch("sqlalchemy.dialects.registry.load", load):
2953            yield
2954
2955    def teardown(self):
2956        Index._kw_registry.clear()
2957
2958    def test_participating(self):
2959        with self._fixture():
2960            idx = Index('a', 'b', 'c', participating_y=True)
2961            eq_(
2962                idx.dialect_options,
2963                {"participating": {"x": 5, "y": True, "z_one": None}}
2964            )
2965            eq_(
2966                idx.dialect_kwargs,
2967                {
2968                    'participating_y': True,
2969                }
2970            )
2971
2972    def test_nonparticipating(self):
2973        with self._fixture():
2974            idx = Index(
2975                'a',
2976                'b',
2977                'c',
2978                nonparticipating_y=True,
2979                nonparticipating_q=5)
2980            eq_(
2981                idx.dialect_kwargs,
2982                {
2983                    'nonparticipating_y': True,
2984                    'nonparticipating_q': 5
2985                }
2986            )
2987
2988    def test_bad_kwarg_raise(self):
2989        with self._fixture():
2990            assert_raises_message(
2991                TypeError,
2992                "Additional arguments should be named "
2993                "<dialectname>_<argument>, got 'foobar'",
2994                Index, 'a', 'b', 'c', foobar=True
2995            )
2996
2997    def test_unknown_dialect_warning(self):
2998        with self._fixture():
2999            assert_raises_message(
3000                exc.SAWarning,
3001                "Can't validate argument 'unknown_y'; can't locate "
3002                "any SQLAlchemy dialect named 'unknown'",
3003                Index, 'a', 'b', 'c', unknown_y=True
3004            )
3005
3006    def test_participating_bad_kw(self):
3007        with self._fixture():
3008            assert_raises_message(
3009                exc.ArgumentError,
3010                "Argument 'participating_q_p_x' is not accepted by dialect "
3011                "'participating' on behalf of "
3012                "<class 'sqlalchemy.sql.schema.Index'>",
3013                Index, 'a', 'b', 'c', participating_q_p_x=8
3014            )
3015
3016    def test_participating_unknown_schema_item(self):
3017        with self._fixture():
3018            # the dialect doesn't include UniqueConstraint in
3019            # its registry at all.
3020            assert_raises_message(
3021                exc.ArgumentError,
3022                "Argument 'participating_q_p_x' is not accepted by dialect "
3023                "'participating' on behalf of "
3024                "<class 'sqlalchemy.sql.schema.UniqueConstraint'>",
3025                UniqueConstraint, 'a', 'b', participating_q_p_x=8
3026            )
3027
3028    @testing.emits_warning("Can't validate")
3029    def test_unknown_dialect_warning_still_populates(self):
3030        with self._fixture():
3031            idx = Index('a', 'b', 'c', unknown_y=True)
3032            eq_(idx.dialect_kwargs, {"unknown_y": True})  # still populates
3033
3034    @testing.emits_warning("Can't validate")
3035    def test_unknown_dialect_warning_still_populates_multiple(self):
3036        with self._fixture():
3037            idx = Index('a', 'b', 'c', unknown_y=True, unknown_z=5,
3038                        otherunknown_foo='bar', participating_y=8)
3039            eq_(
3040                idx.dialect_options,
3041                {
3042                    "unknown": {'y': True, 'z': 5, '*': None},
3043                    "otherunknown": {'foo': 'bar', '*': None},
3044                    "participating": {'x': 5, 'y': 8, 'z_one': None}
3045                }
3046            )
3047            eq_(idx.dialect_kwargs,
3048                {'unknown_z': 5, 'participating_y': 8,
3049                 'unknown_y': True,
3050                 'otherunknown_foo': 'bar'}
3051                )  # still populates
3052
3053    def test_runs_safekwarg(self):
3054
3055        with mock.patch("sqlalchemy.util.safe_kwarg",
3056                        lambda arg: "goofy_%s" % arg):
3057            with self._fixture():
3058                idx = Index('a', 'b')
3059                idx.kwargs[util.u('participating_x')] = 7
3060
3061                eq_(
3062                    list(idx.dialect_kwargs),
3063                    ['goofy_participating_x']
3064                )
3065
3066    def test_combined(self):
3067        with self._fixture():
3068            idx = Index('a', 'b', 'c', participating_x=7,
3069                        nonparticipating_y=True)
3070
3071            eq_(
3072                idx.dialect_options,
3073                {
3074                    'participating': {'y': False, 'x': 7, 'z_one': None},
3075                    'nonparticipating': {'y': True, '*': None}
3076                }
3077            )
3078            eq_(
3079                idx.dialect_kwargs,
3080                {
3081                    'participating_x': 7,
3082                    'nonparticipating_y': True,
3083                }
3084            )
3085
3086    def test_multiple_participating(self):
3087        with self._fixture():
3088            idx = Index('a', 'b', 'c',
3089                        participating_x=7,
3090                        participating2_x=15,
3091                        participating2_y="lazy"
3092                        )
3093            eq_(
3094                idx.dialect_options,
3095                {
3096                    "participating": {'x': 7, 'y': False, 'z_one': None},
3097                    "participating2": {'x': 15, 'y': 'lazy', 'pp': 'default'},
3098                }
3099            )
3100            eq_(
3101                idx.dialect_kwargs,
3102                {
3103                    'participating_x': 7,
3104                    'participating2_x': 15,
3105                    'participating2_y': 'lazy'
3106                }
3107            )
3108
3109    def test_foreign_key_propagate(self):
3110        with self._fixture():
3111            m = MetaData()
3112            fk = ForeignKey('t2.id', participating_foobar=True)
3113            t = Table('t', m, Column('id', Integer, fk))
3114            fkc = [
3115                c for c in t.constraints if isinstance(
3116                    c,
3117                    ForeignKeyConstraint)][0]
3118            eq_(
3119                fkc.dialect_kwargs,
3120                {'participating_foobar': True}
3121            )
3122
3123    def test_foreign_key_propagate_exceptions_delayed(self):
3124        with self._fixture():
3125            m = MetaData()
3126            fk = ForeignKey('t2.id', participating_fake=True)
3127            c1 = Column('id', Integer, fk)
3128            assert_raises_message(
3129                exc.ArgumentError,
3130                "Argument 'participating_fake' is not accepted by "
3131                "dialect 'participating' on behalf of "
3132                "<class 'sqlalchemy.sql.schema.ForeignKeyConstraint'>",
3133                Table, 't', m, c1
3134            )
3135
3136    def test_wildcard(self):
3137        with self._fixture():
3138            m = MetaData()
3139            t = Table('x', m, Column('x', Integer),
3140                      participating2_xyz='foo',
3141                      participating2_engine='InnoDB',
3142                      )
3143            eq_(
3144                t.dialect_kwargs,
3145                {
3146                    'participating2_xyz': 'foo',
3147                    'participating2_engine': 'InnoDB'
3148                }
3149            )
3150
3151    def test_uninit_wildcard(self):
3152        with self._fixture():
3153            m = MetaData()
3154            t = Table('x', m, Column('x', Integer))
3155            eq_(
3156                t.dialect_options['participating2'], {'*': None}
3157            )
3158            eq_(
3159                t.dialect_kwargs, {}
3160            )
3161
3162    def test_not_contains_wildcard(self):
3163        with self._fixture():
3164            m = MetaData()
3165            t = Table('x', m, Column('x', Integer))
3166            assert 'foobar' not in t.dialect_options['participating2']
3167
3168    def test_contains_wildcard(self):
3169        with self._fixture():
3170            m = MetaData()
3171            t = Table('x', m, Column('x', Integer), participating2_foobar=5)
3172            assert 'foobar' in t.dialect_options['participating2']
3173
3174    def test_update(self):
3175        with self._fixture():
3176            idx = Index('a', 'b', 'c', participating_x=20)
3177            eq_(idx.dialect_kwargs, {
3178                "participating_x": 20,
3179            })
3180            idx._validate_dialect_kwargs({
3181                "participating_x": 25,
3182                "participating_z_one": "default"})
3183            eq_(idx.dialect_options, {
3184                "participating": {"x": 25, "y": False, "z_one": "default"}
3185            })
3186            eq_(idx.dialect_kwargs, {
3187                "participating_x": 25,
3188                'participating_z_one': "default"
3189            })
3190
3191            idx._validate_dialect_kwargs({
3192                "participating_x": 25,
3193                "participating_z_one": "default"})
3194
3195            eq_(idx.dialect_options, {
3196                "participating": {"x": 25, "y": False, "z_one": "default"}
3197            })
3198            eq_(idx.dialect_kwargs, {
3199                "participating_x": 25,
3200                'participating_z_one': "default"
3201            })
3202
3203            idx._validate_dialect_kwargs({
3204                "participating_y": True,
3205                'participating2_y': "p2y"})
3206            eq_(idx.dialect_options, {
3207                "participating": {"x": 25, "y": True, "z_one": "default"},
3208                "participating2": {"y": "p2y", "pp": "default", "x": 9}
3209            })
3210            eq_(idx.dialect_kwargs, {
3211                "participating_x": 25,
3212                "participating_y": True,
3213                'participating2_y': "p2y",
3214                "participating_z_one": "default"})
3215
3216    def test_key_error_kwargs_no_dialect(self):
3217        with self._fixture():
3218            idx = Index('a', 'b', 'c')
3219            assert_raises(
3220                KeyError,
3221                idx.kwargs.__getitem__, 'foo_bar'
3222            )
3223
3224    def test_key_error_kwargs_no_underscore(self):
3225        with self._fixture():
3226            idx = Index('a', 'b', 'c')
3227            assert_raises(
3228                KeyError,
3229                idx.kwargs.__getitem__, 'foobar'
3230            )
3231
3232    def test_key_error_kwargs_no_argument(self):
3233        with self._fixture():
3234            idx = Index('a', 'b', 'c')
3235            assert_raises(
3236                KeyError,
3237                idx.kwargs.__getitem__, 'participating_asdmfq34098'
3238            )
3239
3240            assert_raises(
3241                KeyError,
3242                idx.kwargs.__getitem__, 'nonparticipating_asdmfq34098'
3243            )
3244
3245    def test_key_error_dialect_options(self):
3246        with self._fixture():
3247            idx = Index('a', 'b', 'c')
3248            assert_raises(
3249                KeyError,
3250                idx.dialect_options['participating'].__getitem__, 'asdfaso890'
3251            )
3252
3253            assert_raises(
3254                KeyError,
3255                idx.dialect_options['nonparticipating'].__getitem__,
3256                'asdfaso890')
3257
3258    def test_ad_hoc_participating_via_opt(self):
3259        with self._fixture():
3260            idx = Index('a', 'b', 'c')
3261            idx.dialect_options['participating']['foobar'] = 5
3262
3263            eq_(idx.dialect_options['participating']['foobar'], 5)
3264            eq_(idx.kwargs['participating_foobar'], 5)
3265
3266    def test_ad_hoc_nonparticipating_via_opt(self):
3267        with self._fixture():
3268            idx = Index('a', 'b', 'c')
3269            idx.dialect_options['nonparticipating']['foobar'] = 5
3270
3271            eq_(idx.dialect_options['nonparticipating']['foobar'], 5)
3272            eq_(idx.kwargs['nonparticipating_foobar'], 5)
3273
3274    def test_ad_hoc_participating_via_kwargs(self):
3275        with self._fixture():
3276            idx = Index('a', 'b', 'c')
3277            idx.kwargs['participating_foobar'] = 5
3278
3279            eq_(idx.dialect_options['participating']['foobar'], 5)
3280            eq_(idx.kwargs['participating_foobar'], 5)
3281
3282    def test_ad_hoc_nonparticipating_via_kwargs(self):
3283        with self._fixture():
3284            idx = Index('a', 'b', 'c')
3285            idx.kwargs['nonparticipating_foobar'] = 5
3286
3287            eq_(idx.dialect_options['nonparticipating']['foobar'], 5)
3288            eq_(idx.kwargs['nonparticipating_foobar'], 5)
3289
3290    def test_ad_hoc_via_kwargs_invalid_key(self):
3291        with self._fixture():
3292            idx = Index('a', 'b', 'c')
3293            assert_raises_message(
3294                exc.ArgumentError,
3295                "Keys must be of the form <dialectname>_<argname>",
3296                idx.kwargs.__setitem__, "foobar", 5
3297            )
3298
3299    def test_ad_hoc_via_kwargs_invalid_dialect(self):
3300        with self._fixture():
3301            idx = Index('a', 'b', 'c')
3302            assert_raises_message(
3303                exc.ArgumentError,
3304                "no dialect 'nonexistent'",
3305                idx.kwargs.__setitem__, "nonexistent_foobar", 5
3306            )
3307
3308    def test_add_new_arguments_participating(self):
3309        with self._fixture():
3310            Index.argument_for("participating", "xyzqpr", False)
3311
3312            idx = Index('a', 'b', 'c', participating_xyzqpr=True)
3313
3314            eq_(idx.kwargs['participating_xyzqpr'], True)
3315
3316            idx = Index('a', 'b', 'c')
3317            eq_(idx.dialect_options['participating']['xyzqpr'], False)
3318
3319    def test_add_new_arguments_participating_no_existing(self):
3320        with self._fixture():
3321            PrimaryKeyConstraint.argument_for("participating", "xyzqpr", False)
3322
3323            pk = PrimaryKeyConstraint('a', 'b', 'c', participating_xyzqpr=True)
3324
3325            eq_(pk.kwargs['participating_xyzqpr'], True)
3326
3327            pk = PrimaryKeyConstraint('a', 'b', 'c')
3328            eq_(pk.dialect_options['participating']['xyzqpr'], False)
3329
3330    def test_add_new_arguments_nonparticipating(self):
3331        with self._fixture():
3332            assert_raises_message(
3333                exc.ArgumentError,
3334                "Dialect 'nonparticipating' does have keyword-argument "
3335                "validation and defaults enabled configured",
3336                Index.argument_for, "nonparticipating", "xyzqpr", False
3337            )
3338
3339    def test_add_new_arguments_invalid_dialect(self):
3340        with self._fixture():
3341            assert_raises_message(
3342                exc.ArgumentError,
3343                "no dialect 'nonexistent'",
3344                Index.argument_for, "nonexistent", "foobar", 5
3345            )
3346
3347
3348class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
3349    __dialect__ = 'default'
3350
3351    def _fixture(self, naming_convention, table_schema=None):
3352        m1 = MetaData(naming_convention=naming_convention)
3353
3354        u1 = Table('user', m1,
3355                   Column('id', Integer, primary_key=True),
3356                   Column('version', Integer, primary_key=True),
3357                   Column('data', String(30)),
3358                   schema=table_schema
3359                   )
3360
3361        return u1
3362
3363    def test_uq_name(self):
3364        u1 = self._fixture(naming_convention={
3365            "uq": "uq_%(table_name)s_%(column_0_name)s"
3366        })
3367        uq = UniqueConstraint(u1.c.data)
3368        eq_(uq.name, "uq_user_data")
3369
3370    def test_ck_name_required(self):
3371        u1 = self._fixture(naming_convention={
3372            "ck": "ck_%(table_name)s_%(constraint_name)s"
3373        })
3374        ck = CheckConstraint(u1.c.data == 'x', name='mycheck')
3375        eq_(ck.name, "ck_user_mycheck")
3376
3377        assert_raises_message(
3378            exc.InvalidRequestError,
3379            r"Naming convention including %\(constraint_name\)s token "
3380            "requires that constraint is explicitly named.",
3381            CheckConstraint, u1.c.data == 'x'
3382        )
3383
3384    def test_ck_name_deferred_required(self):
3385        u1 = self._fixture(naming_convention={
3386            "ck": "ck_%(table_name)s_%(constraint_name)s"
3387        })
3388        ck = CheckConstraint(u1.c.data == 'x', name=elements._defer_name(None))
3389
3390        assert_raises_message(
3391            exc.InvalidRequestError,
3392            r"Naming convention including %\(constraint_name\)s token "
3393            "requires that constraint is explicitly named.",
3394            schema.AddConstraint(ck).compile
3395        )
3396
3397    def test_column_attached_ck_name(self):
3398        m = MetaData(naming_convention={
3399            "ck": "ck_%(table_name)s_%(constraint_name)s"
3400        })
3401        ck = CheckConstraint('x > 5', name='x1')
3402        Table('t', m, Column('x', ck))
3403        eq_(ck.name, "ck_t_x1")
3404
3405    def test_table_attached_ck_name(self):
3406        m = MetaData(naming_convention={
3407            "ck": "ck_%(table_name)s_%(constraint_name)s"
3408        })
3409        ck = CheckConstraint('x > 5', name='x1')
3410        Table('t', m, Column('x', Integer), ck)
3411        eq_(ck.name, "ck_t_x1")
3412
3413    def test_uq_name_already_conv(self):
3414        m = MetaData(naming_convention={
3415            "uq": "uq_%(constraint_name)s_%(column_0_name)s"
3416        })
3417
3418        t = Table('mytable', m)
3419        uq = UniqueConstraint(name=naming.conv('my_special_key'))
3420
3421        t.append_constraint(uq)
3422        eq_(uq.name, "my_special_key")
3423
3424    def test_fk_name_schema(self):
3425        u1 = self._fixture(naming_convention={
3426            "fk": "fk_%(table_name)s_%(column_0_name)s_"
3427            "%(referred_table_name)s_%(referred_column_0_name)s"
3428        }, table_schema="foo")
3429        m1 = u1.metadata
3430        a1 = Table('address', m1,
3431                   Column('id', Integer, primary_key=True),
3432                   Column('user_id', Integer),
3433                   Column('user_version_id', Integer)
3434                   )
3435        fk = ForeignKeyConstraint(['user_id', 'user_version_id'],
3436                                  ['foo.user.id', 'foo.user.version'])
3437        a1.append_constraint(fk)
3438        eq_(fk.name, "fk_address_user_id_user_id")
3439
3440    def test_fk_attrs(self):
3441        u1 = self._fixture(naming_convention={
3442            "fk": "fk_%(table_name)s_%(column_0_name)s_"
3443            "%(referred_table_name)s_%(referred_column_0_name)s"
3444        })
3445        m1 = u1.metadata
3446        a1 = Table('address', m1,
3447                   Column('id', Integer, primary_key=True),
3448                   Column('user_id', Integer),
3449                   Column('user_version_id', Integer)
3450                   )
3451        fk = ForeignKeyConstraint(['user_id', 'user_version_id'],
3452                                  ['user.id', 'user.version'])
3453        a1.append_constraint(fk)
3454        eq_(fk.name, "fk_address_user_id_user_id")
3455
3456    def test_custom(self):
3457        def key_hash(const, table):
3458            return "HASH_%s" % table.name
3459
3460        u1 = self._fixture(naming_convention={
3461            "fk": "fk_%(table_name)s_%(key_hash)s",
3462            "key_hash": key_hash
3463        })
3464        m1 = u1.metadata
3465        a1 = Table('address', m1,
3466                   Column('id', Integer, primary_key=True),
3467                   Column('user_id', Integer),
3468                   Column('user_version_id', Integer)
3469                   )
3470        fk = ForeignKeyConstraint(['user_id', 'user_version_id'],
3471                                  ['user.id', 'user.version'])
3472        a1.append_constraint(fk)
3473        eq_(fk.name, "fk_address_HASH_address")
3474
3475    def test_schematype_ck_name_boolean(self):
3476        m1 = MetaData(naming_convention={
3477            "ck": "ck_%(table_name)s_%(constraint_name)s"})
3478
3479        u1 = Table('user', m1,
3480                   Column('x', Boolean(name='foo'))
3481                   )
3482        # constraint is not hit
3483        eq_(
3484            [c for c in u1.constraints
3485                if isinstance(c, CheckConstraint)][0].name, "foo"
3486        )
3487        # but is hit at compile time
3488        self.assert_compile(
3489            schema.CreateTable(u1),
3490            'CREATE TABLE "user" ('
3491            "x BOOLEAN, "
3492            "CONSTRAINT ck_user_foo CHECK (x IN (0, 1))"
3493            ")"
3494        )
3495
3496    def test_schematype_ck_name_boolean_not_on_name(self):
3497        m1 = MetaData(naming_convention={
3498            "ck": "ck_%(table_name)s_%(column_0_name)s"})
3499
3500        u1 = Table('user', m1,
3501                   Column('x', Boolean())
3502                   )
3503        # constraint is not hit
3504        eq_(
3505            [c for c in u1.constraints
3506                if isinstance(c, CheckConstraint)][0].name, "_unnamed_"
3507        )
3508        # but is hit at compile time
3509        self.assert_compile(
3510            schema.CreateTable(u1),
3511            'CREATE TABLE "user" ('
3512            "x BOOLEAN, "
3513            "CONSTRAINT ck_user_x CHECK (x IN (0, 1))"
3514            ")"
3515        )
3516
3517    def test_schematype_ck_name_enum(self):
3518        m1 = MetaData(naming_convention={
3519            "ck": "ck_%(table_name)s_%(constraint_name)s"})
3520
3521        u1 = Table('user', m1,
3522                   Column('x', Enum('a', 'b', name='foo'))
3523                   )
3524        eq_(
3525            [c for c in u1.constraints
3526                if isinstance(c, CheckConstraint)][0].name, "foo"
3527        )
3528        # but is hit at compile time
3529        self.assert_compile(
3530            schema.CreateTable(u1),
3531            'CREATE TABLE "user" ('
3532            "x VARCHAR(1), "
3533            "CONSTRAINT ck_user_foo CHECK (x IN ('a', 'b'))"
3534            ")"
3535        )
3536
3537    def test_schematype_ck_name_propagate_conv(self):
3538        m1 = MetaData(naming_convention={
3539            "ck": "ck_%(table_name)s_%(constraint_name)s"})
3540
3541        u1 = Table('user', m1,
3542                   Column('x', Enum('a', 'b', name=naming.conv('foo')))
3543                   )
3544        eq_(
3545            [c for c in u1.constraints
3546                if isinstance(c, CheckConstraint)][0].name, "foo"
3547        )
3548        # but is hit at compile time
3549        self.assert_compile(
3550            schema.CreateTable(u1),
3551            'CREATE TABLE "user" ('
3552            "x VARCHAR(1), "
3553            "CONSTRAINT foo CHECK (x IN ('a', 'b'))"
3554            ")"
3555        )
3556
3557    def test_schematype_ck_name_boolean_no_name(self):
3558        m1 = MetaData(naming_convention={
3559            "ck": "ck_%(table_name)s_%(constraint_name)s"
3560        })
3561
3562        u1 = Table(
3563            'user', m1,
3564            Column('x', Boolean())
3565        )
3566        # constraint gets special _defer_none_name
3567        eq_(
3568            [c for c in u1.constraints
3569                if isinstance(c, CheckConstraint)][0].name, "_unnamed_"
3570        )
3571        # no issue with native boolean
3572        self.assert_compile(
3573            schema.CreateTable(u1),
3574            'CREATE TABLE "user" ('
3575            "x BOOLEAN"
3576            ")",
3577            dialect='postgresql'
3578        )
3579
3580        assert_raises_message(
3581            exc.InvalidRequestError,
3582            "Naming convention including \%\(constraint_name\)s token "
3583            "requires that constraint is explicitly named.",
3584            schema.CreateTable(u1).compile
3585        )
3586
3587    def test_schematype_no_ck_name_boolean_no_name(self):
3588        m1 = MetaData()  # no naming convention
3589
3590        u1 = Table(
3591            'user', m1,
3592            Column('x', Boolean())
3593        )
3594        # constraint gets special _defer_none_name
3595        eq_(
3596            [c for c in u1.constraints
3597                if isinstance(c, CheckConstraint)][0].name, "_unnamed_"
3598        )
3599
3600        self.assert_compile(
3601            schema.CreateTable(u1),
3602            'CREATE TABLE "user" (x BOOLEAN, CHECK (x IN (0, 1)))'
3603        )
3604
3605    def test_ck_constraint_redundant_event(self):
3606        u1 = self._fixture(naming_convention={
3607            "ck": "ck_%(table_name)s_%(constraint_name)s"})
3608
3609        ck1 = CheckConstraint(u1.c.version > 3, name='foo')
3610        u1.append_constraint(ck1)
3611        u1.append_constraint(ck1)
3612        u1.append_constraint(ck1)
3613
3614        eq_(ck1.name, "ck_user_foo")
3615
3616    def test_pickle_metadata(self):
3617        m = MetaData(naming_convention={"pk": "%(table_name)s_pk"})
3618
3619        m2 = pickle.loads(pickle.dumps(m))
3620
3621        eq_(m2.naming_convention, {"pk": "%(table_name)s_pk"})
3622
3623        t2a = Table('t2', m, Column('id', Integer, primary_key=True))
3624        t2b = Table('t2', m2, Column('id', Integer, primary_key=True))
3625
3626        eq_(t2a.primary_key.name, t2b.primary_key.name)
3627        eq_(t2b.primary_key.name, "t2_pk")
3628