1from sqlalchemy.testing import assert_raises
2from sqlalchemy.testing import assert_raises_message
3from sqlalchemy.testing import emits_warning
4import pickle
5from sqlalchemy import Integer, String, UniqueConstraint, \
6    CheckConstraint, ForeignKey, MetaData, Sequence, \
7    ForeignKeyConstraint, PrimaryKeyConstraint, ColumnDefault, Index, event,\
8    events, Unicode, types as sqltypes, bindparam, \
9    Table, Column, Boolean, Enum, func, text, TypeDecorator, \
10    BLANK_SCHEMA
11from sqlalchemy import schema, exc
12from sqlalchemy.engine import default
13from sqlalchemy.sql import elements, naming
14import sqlalchemy as tsa
15from sqlalchemy.testing import fixtures
16from sqlalchemy import testing
17from sqlalchemy.testing import ComparesTables, AssertsCompiledSQL
18from sqlalchemy.testing import eq_, is_, mock, is_true
19from contextlib import contextmanager
20from sqlalchemy import util
21
22
23class MetaDataTest(fixtures.TestBase, ComparesTables):
24
25    def test_metadata_contains(self):
26        metadata = MetaData()
27        t1 = Table('t1', metadata, Column('x', Integer))
28        t2 = Table('t2', metadata, Column('x', Integer), schema='foo')
29        t3 = Table('t2', MetaData(), Column('x', Integer))
30        t4 = Table('t1', MetaData(), Column('x', Integer), schema='foo')
31
32        assert "t1" in metadata
33        assert "foo.t2" in metadata
34        assert "t2" not in metadata
35        assert "foo.t1" not in metadata
36        assert t1 in metadata
37        assert t2 in metadata
38        assert t3 not in metadata
39        assert t4 not in metadata
40
41    def test_uninitialized_column_copy(self):
42        for col in [
43            Column('foo', String(), nullable=False),
44            Column('baz', String(), unique=True),
45            Column(Integer(), primary_key=True),
46            Column('bar', Integer(), Sequence('foo_seq'), primary_key=True,
47                   key='bar'),
48            Column(Integer(), ForeignKey('bat.blah'), doc="this is a col"),
49            Column('bar', Integer(), ForeignKey('bat.blah'), primary_key=True,
50                   key='bar'),
51            Column('bar', Integer(), info={'foo': 'bar'}),
52        ]:
53            c2 = col.copy()
54            for attr in ('name', 'type', 'nullable',
55                         'primary_key', 'key', 'unique', 'info',
56                         'doc'):
57                eq_(getattr(col, attr), getattr(c2, attr))
58            eq_(len(col.foreign_keys), len(c2.foreign_keys))
59            if col.default:
60                eq_(c2.default.name, 'foo_seq')
61            for a1, a2 in zip(col.foreign_keys, c2.foreign_keys):
62                assert a1 is not a2
63                eq_(a2._colspec, 'bat.blah')
64
65    def test_col_subclass_copy(self):
66        class MyColumn(schema.Column):
67
68            def __init__(self, *args, **kw):
69                self.widget = kw.pop('widget', None)
70                super(MyColumn, self).__init__(*args, **kw)
71
72            def copy(self, *arg, **kw):
73                c = super(MyColumn, self).copy(*arg, **kw)
74                c.widget = self.widget
75                return c
76        c1 = MyColumn('foo', Integer, widget='x')
77        c2 = c1.copy()
78        assert isinstance(c2, MyColumn)
79        eq_(c2.widget, 'x')
80
81    def test_uninitialized_column_copy_events(self):
82        msgs = []
83
84        def write(c, t):
85            msgs.append("attach %s.%s" % (t.name, c.name))
86        c1 = Column('foo', String())
87        m = MetaData()
88        for i in range(3):
89            cx = c1.copy()
90            # as of 0.7, these events no longer copy.  its expected
91            # that listeners will be re-established from the
92            # natural construction of things.
93            cx._on_table_attach(write)
94            Table('foo%d' % i, m, cx)
95        eq_(msgs, ['attach foo0.foo', 'attach foo1.foo', 'attach foo2.foo'])
96
97    def test_schema_collection_add(self):
98        metadata = MetaData()
99
100        Table('t1', metadata, Column('x', Integer), schema='foo')
101        Table('t2', metadata, Column('x', Integer), schema='bar')
102        Table('t3', metadata, Column('x', Integer))
103
104        eq_(metadata._schemas, set(['foo', 'bar']))
105        eq_(len(metadata.tables), 3)
106
107    def test_schema_collection_remove(self):
108        metadata = MetaData()
109
110        t1 = Table('t1', metadata, Column('x', Integer), schema='foo')
111        Table('t2', metadata, Column('x', Integer), schema='bar')
112        t3 = Table('t3', metadata, Column('x', Integer), schema='bar')
113
114        metadata.remove(t3)
115        eq_(metadata._schemas, set(['foo', 'bar']))
116        eq_(len(metadata.tables), 2)
117
118        metadata.remove(t1)
119        eq_(metadata._schemas, set(['bar']))
120        eq_(len(metadata.tables), 1)
121
122    def test_schema_collection_remove_all(self):
123        metadata = MetaData()
124
125        Table('t1', metadata, Column('x', Integer), schema='foo')
126        Table('t2', metadata, Column('x', Integer), schema='bar')
127
128        metadata.clear()
129        eq_(metadata._schemas, set())
130        eq_(len(metadata.tables), 0)
131
132    def test_metadata_tables_immutable(self):
133        metadata = MetaData()
134
135        Table('t1', metadata, Column('x', Integer))
136        assert 't1' in metadata.tables
137
138        assert_raises(
139            TypeError,
140            lambda: metadata.tables.pop('t1')
141        )
142
143    @testing.provide_metadata
144    def test_dupe_tables(self):
145        metadata = self.metadata
146        Table('table1', metadata,
147              Column('col1', Integer, primary_key=True),
148              Column('col2', String(20)))
149
150        metadata.create_all()
151        Table('table1', metadata, autoload=True)
152
153        def go():
154            Table('table1', metadata,
155                  Column('col1', Integer, primary_key=True),
156                  Column('col2', String(20)))
157        assert_raises_message(
158            tsa.exc.InvalidRequestError,
159            "Table 'table1' is already defined for this "
160            "MetaData instance.  Specify 'extend_existing=True' "
161            "to redefine options and columns on an existing "
162            "Table object.",
163            go
164        )
165
166    def test_fk_copy(self):
167        c1 = Column('foo', Integer)
168        c2 = Column('bar', Integer)
169        m = MetaData()
170        t1 = Table('t', m, c1, c2)
171
172        kw = dict(onupdate="X",
173                  ondelete="Y", use_alter=True, name='f1',
174                  deferrable="Z", initially="Q", link_to_name=True)
175
176        fk1 = ForeignKey(c1, **kw)
177        fk2 = ForeignKeyConstraint((c1,), (c2,), **kw)
178
179        t1.append_constraint(fk2)
180        fk1c = fk1.copy()
181        fk2c = fk2.copy()
182
183        for k in kw:
184            eq_(getattr(fk1c, k), kw[k])
185            eq_(getattr(fk2c, k), kw[k])
186
187    def test_check_constraint_copy(self):
188        def r(x): return x
189        c = CheckConstraint("foo bar",
190                            name='name',
191                            initially=True,
192                            deferrable=True,
193                            _create_rule=r)
194        c2 = c.copy()
195        eq_(c2.name, 'name')
196        eq_(str(c2.sqltext), "foo bar")
197        eq_(c2.initially, True)
198        eq_(c2.deferrable, True)
199        assert c2._create_rule is r
200
201    def test_col_replace_w_constraint(self):
202        m = MetaData()
203        a = Table('a', m, Column('id', Integer, primary_key=True))
204
205        aid = Column('a_id', ForeignKey('a.id'))
206        b = Table('b', m, aid)
207        b.append_column(aid)
208
209        assert b.c.a_id.references(a.c.id)
210        eq_(len(b.constraints), 2)
211
212    def test_fk_construct(self):
213        c1 = Column('foo', Integer)
214        c2 = Column('bar', Integer)
215        m = MetaData()
216        t1 = Table('t', m, c1, c2)
217        fk1 = ForeignKeyConstraint(('foo', ), ('bar', ), table=t1)
218        assert fk1 in t1.constraints
219
220    def test_fk_constraint_col_collection_w_table(self):
221        c1 = Column('foo', Integer)
222        c2 = Column('bar', Integer)
223        m = MetaData()
224        t1 = Table('t', m, c1, c2)
225        fk1 = ForeignKeyConstraint(('foo', ), ('bar', ), table=t1)
226        eq_(dict(fk1.columns), {"foo": c1})
227
228    def test_fk_constraint_col_collection_no_table(self):
229        fk1 = ForeignKeyConstraint(('foo', 'bat'), ('bar', 'hoho'))
230        eq_(dict(fk1.columns), {})
231        eq_(fk1.column_keys, ['foo', 'bat'])
232        eq_(fk1._col_description, 'foo, bat')
233        eq_(fk1._elements, {"foo": fk1.elements[0], "bat": fk1.elements[1]})
234
235    def test_fk_constraint_col_collection_no_table_real_cols(self):
236        c1 = Column('foo', Integer)
237        c2 = Column('bar', Integer)
238        fk1 = ForeignKeyConstraint((c1, ), (c2, ))
239        eq_(dict(fk1.columns), {})
240        eq_(fk1.column_keys, ['foo'])
241        eq_(fk1._col_description, 'foo')
242        eq_(fk1._elements, {"foo": fk1.elements[0]})
243
244    def test_fk_constraint_col_collection_added_to_table(self):
245        c1 = Column('foo', Integer)
246        m = MetaData()
247        fk1 = ForeignKeyConstraint(('foo', ), ('bar', ))
248        Table('t', m, c1, fk1)
249        eq_(dict(fk1.columns), {"foo": c1})
250        eq_(fk1._elements, {"foo": fk1.elements[0]})
251
252    def test_fk_constraint_col_collection_via_fk(self):
253        fk = ForeignKey('bar')
254        c1 = Column('foo', Integer, fk)
255        m = MetaData()
256        t1 = Table('t', m, c1)
257        fk1 = fk.constraint
258        eq_(fk1.column_keys, ['foo'])
259        assert fk1 in t1.constraints
260        eq_(fk1.column_keys, ['foo'])
261        eq_(dict(fk1.columns), {"foo": c1})
262        eq_(fk1._elements, {"foo": fk})
263
264    def test_fk_no_such_parent_col_error(self):
265        meta = MetaData()
266        a = Table('a', meta, Column('a', Integer))
267        Table('b', meta, Column('b', Integer))
268
269        def go():
270            a.append_constraint(
271                ForeignKeyConstraint(['x'], ['b.b'])
272            )
273        assert_raises_message(
274            exc.ArgumentError,
275            "Can't create ForeignKeyConstraint on "
276            "table 'a': no column named 'x' is present.",
277            go
278        )
279
280    def test_fk_given_non_col(self):
281        not_a_col = bindparam('x')
282        assert_raises_message(
283            exc.ArgumentError,
284            "String, Column, or Column-bound argument expected, got Bind",
285            ForeignKey, not_a_col
286        )
287
288    def test_fk_given_non_col_clauseelem(self):
289        class Foo(object):
290
291            def __clause_element__(self):
292                return bindparam('x')
293        assert_raises_message(
294            exc.ArgumentError,
295            "String, Column, or Column-bound argument expected, got Bind",
296            ForeignKey, Foo()
297        )
298
299    def test_fk_given_col_non_table(self):
300        t = Table('t', MetaData(), Column('x', Integer))
301        xa = t.alias().c.x
302        assert_raises_message(
303            exc.ArgumentError,
304            "ForeignKey received Column not bound to a Table, got: .*Alias",
305            ForeignKey, xa
306        )
307
308    def test_fk_given_col_non_table_clauseelem(self):
309        t = Table('t', MetaData(), Column('x', Integer))
310
311        class Foo(object):
312
313            def __clause_element__(self):
314                return t.alias().c.x
315
316        assert_raises_message(
317            exc.ArgumentError,
318            "ForeignKey received Column not bound to a Table, got: .*Alias",
319            ForeignKey, Foo()
320        )
321
322    def test_fk_no_such_target_col_error_upfront(self):
323        meta = MetaData()
324        a = Table('a', meta, Column('a', Integer))
325        Table('b', meta, Column('b', Integer))
326
327        a.append_constraint(ForeignKeyConstraint(['a'], ['b.x']))
328
329        assert_raises_message(
330            exc.NoReferencedColumnError,
331            "Could not initialize target column for ForeignKey 'b.x' on "
332            "table 'a': table 'b' has no column named 'x'",
333            getattr, list(a.foreign_keys)[0], "column"
334        )
335
336    def test_fk_no_such_target_col_error_delayed(self):
337        meta = MetaData()
338        a = Table('a', meta, Column('a', Integer))
339        a.append_constraint(
340            ForeignKeyConstraint(['a'], ['b.x']))
341
342        b = Table('b', meta, Column('b', Integer))
343
344        assert_raises_message(
345            exc.NoReferencedColumnError,
346            "Could not initialize target column for ForeignKey 'b.x' on "
347            "table 'a': table 'b' has no column named 'x'",
348            getattr, list(a.foreign_keys)[0], "column"
349        )
350
351    def test_fk_mismatched_local_remote_cols(self):
352
353        assert_raises_message(
354            exc.ArgumentError,
355            "ForeignKeyConstraint number of constrained columns must "
356            "match the number of referenced columns.",
357            ForeignKeyConstraint, ['a'], ['b.a', 'b.b']
358        )
359
360        assert_raises_message(
361            exc.ArgumentError,
362            "ForeignKeyConstraint number of constrained columns "
363            "must match the number of referenced columns.",
364            ForeignKeyConstraint, ['a', 'b'], ['b.a']
365        )
366
367        assert_raises_message(
368            exc.ArgumentError,
369            "ForeignKeyConstraint with duplicate source column "
370            "references are not supported.",
371            ForeignKeyConstraint, ['a', 'a'], ['b.a', 'b.b']
372        )
373
374    def test_pickle_metadata_sequence_restated(self):
375        m1 = MetaData()
376        Table('a', m1,
377              Column('id', Integer, primary_key=True),
378              Column('x', Integer, Sequence("x_seq")))
379
380        m2 = pickle.loads(pickle.dumps(m1))
381
382        s2 = Sequence("x_seq")
383        t2 = Table('a', m2,
384                   Column('id', Integer, primary_key=True),
385                   Column('x', Integer, s2),
386                   extend_existing=True)
387
388        assert m2._sequences['x_seq'] is t2.c.x.default
389        assert m2._sequences['x_seq'] is s2
390
391    def test_sequence_restated_replaced(self):
392        """Test restatement of Sequence replaces."""
393
394        m1 = MetaData()
395        s1 = Sequence("x_seq")
396        t = Table('a', m1,
397                  Column('x', Integer, s1)
398                  )
399        assert m1._sequences['x_seq'] is s1
400
401        s2 = Sequence('x_seq')
402        Table('a', m1,
403              Column('x', Integer, s2),
404              extend_existing=True
405              )
406        assert t.c.x.default is s2
407        assert m1._sequences['x_seq'] is s2
408
409    def test_sequence_attach_to_table(self):
410        m1 = MetaData()
411        s1 = Sequence("s")
412        t = Table('a', m1, Column('x', Integer, s1))
413        assert s1.metadata is m1
414
415    def test_sequence_attach_to_existing_table(self):
416        m1 = MetaData()
417        s1 = Sequence("s")
418        t = Table('a', m1, Column('x', Integer))
419        t.c.x._init_items(s1)
420        assert s1.metadata is m1
421
422    def test_pickle_metadata_sequence_implicit(self):
423        m1 = MetaData()
424        Table('a', m1,
425              Column('id', Integer, primary_key=True),
426              Column('x', Integer, Sequence("x_seq")))
427
428        m2 = pickle.loads(pickle.dumps(m1))
429
430        t2 = Table('a', m2, extend_existing=True)
431
432        eq_(m2._sequences, {'x_seq': t2.c.x.default})
433
434    def test_pickle_metadata_schema(self):
435        m1 = MetaData()
436        Table('a', m1,
437              Column('id', Integer, primary_key=True),
438              Column('x', Integer, Sequence("x_seq")),
439              schema='y')
440
441        m2 = pickle.loads(pickle.dumps(m1))
442
443        Table('a', m2, schema='y',
444              extend_existing=True)
445
446        eq_(m2._schemas, m1._schemas)
447
448    def test_metadata_schema_arg(self):
449        m1 = MetaData(schema='sch1')
450        m2 = MetaData(schema='sch1', quote_schema=True)
451        m3 = MetaData(schema='sch1', quote_schema=False)
452        m4 = MetaData()
453
454        for i, (name, metadata, schema, quote_schema,
455                exp_schema, exp_quote_schema) in enumerate([
456                    ('t1', m1, None, None, 'sch1', None),
457                    ('t2', m1, 'sch2', None, 'sch2', None),
458                    ('t3', m1, 'sch2', True, 'sch2', True),
459                    ('t4', m1, 'sch1', None, 'sch1', None),
460                    ('t5', m1, BLANK_SCHEMA, None, None, None),
461                    ('t1', m2, None, None, 'sch1', True),
462                    ('t2', m2, 'sch2', None, 'sch2', None),
463                    ('t3', m2, 'sch2', True, 'sch2', True),
464                    ('t4', m2, 'sch1', None, 'sch1', None),
465                    ('t1', m3, None, None, 'sch1', False),
466                    ('t2', m3, 'sch2', None, 'sch2', None),
467                    ('t3', m3, 'sch2', True, 'sch2', True),
468                    ('t4', m3, 'sch1', None, 'sch1', None),
469                    ('t1', m4, None, None, None, None),
470                    ('t2', m4, 'sch2', None, 'sch2', None),
471                    ('t3', m4, 'sch2', True, 'sch2', True),
472                    ('t4', m4, 'sch1', None, 'sch1', None),
473                    ('t5', m4, BLANK_SCHEMA, None, None, None),
474                ]):
475            kw = {}
476            if schema is not None:
477                kw['schema'] = schema
478            if quote_schema is not None:
479                kw['quote_schema'] = quote_schema
480            t = Table(name, metadata, **kw)
481            eq_(t.schema, exp_schema, "test %d, table schema" % i)
482            eq_(t.schema.quote if t.schema is not None else None,
483                exp_quote_schema,
484                "test %d, table quote_schema" % i)
485            seq = Sequence(name, metadata=metadata, **kw)
486            eq_(seq.schema, exp_schema, "test %d, seq schema" % i)
487            eq_(seq.schema.quote if seq.schema is not None else None,
488                exp_quote_schema,
489                "test %d, seq quote_schema" % i)
490
491    def test_manual_dependencies(self):
492        meta = MetaData()
493        a = Table('a', meta, Column('foo', Integer))
494        b = Table('b', meta, Column('foo', Integer))
495        c = Table('c', meta, Column('foo', Integer))
496        d = Table('d', meta, Column('foo', Integer))
497        e = Table('e', meta, Column('foo', Integer))
498
499        e.add_is_dependent_on(c)
500        a.add_is_dependent_on(b)
501        b.add_is_dependent_on(d)
502        e.add_is_dependent_on(b)
503        c.add_is_dependent_on(a)
504        eq_(
505            meta.sorted_tables,
506            [d, b, a, c, e]
507        )
508
509    def test_deterministic_order(self):
510        meta = MetaData()
511        a = Table('a', meta, Column('foo', Integer))
512        b = Table('b', meta, Column('foo', Integer))
513        c = Table('c', meta, Column('foo', Integer))
514        d = Table('d', meta, Column('foo', Integer))
515        e = Table('e', meta, Column('foo', Integer))
516
517        e.add_is_dependent_on(c)
518        a.add_is_dependent_on(b)
519        eq_(
520            meta.sorted_tables,
521            [b, c, d, a, e]
522        )
523
524    def test_nonexistent(self):
525        assert_raises(tsa.exc.NoSuchTableError, Table,
526                      'fake_table',
527                      MetaData(testing.db), autoload=True)
528
529    def test_assorted_repr(self):
530        t1 = Table("foo", MetaData(), Column("x", Integer))
531        i1 = Index("bar", t1.c.x)
532        ck = schema.CheckConstraint("x > y", name="someconstraint")
533
534        for const, exp in (
535            (Sequence("my_seq"),
536                "Sequence('my_seq')"),
537            (Sequence("my_seq", start=5),
538                "Sequence('my_seq', start=5)"),
539            (Column("foo", Integer),
540                "Column('foo', Integer(), table=None)"),
541            (Table("bar", MetaData(), Column("x", String)),
542                "Table('bar', MetaData(bind=None), "
543                "Column('x', String(), table=<bar>), schema=None)"),
544            (schema.DefaultGenerator(for_update=True),
545                "DefaultGenerator(for_update=True)"),
546            (schema.Index("bar", "c"), "Index('bar', 'c')"),
547            (i1, "Index('bar', Column('x', Integer(), table=<foo>))"),
548            (schema.FetchedValue(), "FetchedValue()"),
549            (ck,
550             "CheckConstraint("
551             "%s"
552             ", name='someconstraint')" % repr(ck.sqltext)),
553            (ColumnDefault(('foo', 'bar')), "ColumnDefault(('foo', 'bar'))")
554        ):
555            eq_(
556                repr(const),
557                exp
558            )
559
560
561class ToMetaDataTest(fixtures.TestBase, ComparesTables):
562
563    @testing.requires.check_constraints
564    def test_copy(self):
565        from sqlalchemy.testing.schema import Table
566        meta = MetaData()
567
568        table = Table(
569            'mytable',
570            meta,
571            Column(
572                'myid',
573                Integer,
574                Sequence('foo_id_seq'),
575                primary_key=True),
576            Column(
577                'name',
578                String(40),
579                nullable=True),
580            Column(
581                'foo',
582                String(40),
583                nullable=False,
584                server_default='x',
585                server_onupdate='q'),
586            Column(
587                'bar',
588                String(40),
589                nullable=False,
590                default='y',
591                onupdate='z'),
592            Column(
593                'description',
594                String(30),
595                CheckConstraint("description='hi'")),
596            UniqueConstraint('name'),
597            test_needs_fk=True)
598
599        table2 = Table(
600            'othertable',
601            meta,
602            Column(
603                'id',
604                Integer,
605                Sequence('foo_seq'),
606                primary_key=True),
607            Column(
608                'myid',
609                Integer,
610                ForeignKey('mytable.myid'),
611            ),
612            test_needs_fk=True)
613
614        def test_to_metadata():
615            meta2 = MetaData()
616            table_c = table.tometadata(meta2)
617            table2_c = table2.tometadata(meta2)
618            return (table_c, table2_c)
619
620        def test_pickle():
621            meta.bind = testing.db
622            meta2 = pickle.loads(pickle.dumps(meta))
623            assert meta2.bind is None
624            pickle.loads(pickle.dumps(meta2))
625            return (meta2.tables['mytable'], meta2.tables['othertable'])
626
627        def test_pickle_via_reflect():
628            # this is the most common use case, pickling the results of a
629            # database reflection
630            meta2 = MetaData(bind=testing.db)
631            t1 = Table('mytable', meta2, autoload=True)
632            Table('othertable', meta2, autoload=True)
633            meta3 = pickle.loads(pickle.dumps(meta2))
634            assert meta3.bind is None
635            assert meta3.tables['mytable'] is not t1
636
637            return (meta3.tables['mytable'], meta3.tables['othertable'])
638
639        meta.create_all(testing.db)
640        try:
641            for test, has_constraints, reflect in \
642                    (test_to_metadata, True, False), \
643                    (test_pickle, True, False), \
644                    (test_pickle_via_reflect, False, True):
645                table_c, table2_c = test()
646                self.assert_tables_equal(table, table_c)
647                self.assert_tables_equal(table2, table2_c)
648                assert table is not table_c
649                assert table.primary_key is not table_c.primary_key
650                assert list(table2_c.c.myid.foreign_keys)[0].column \
651                    is table_c.c.myid
652                assert list(table2_c.c.myid.foreign_keys)[0].column \
653                    is not table.c.myid
654                assert 'x' in str(table_c.c.foo.server_default.arg)
655                if not reflect:
656                    assert isinstance(table_c.c.myid.default, Sequence)
657                    assert str(table_c.c.foo.server_onupdate.arg) == 'q'
658                    assert str(table_c.c.bar.default.arg) == 'y'
659                    assert getattr(table_c.c.bar.onupdate.arg, 'arg',
660                                   table_c.c.bar.onupdate.arg) == 'z'
661                    assert isinstance(table2_c.c.id.default, Sequence)
662
663                # constraints don't get reflected for any dialect right
664                # now
665
666                if has_constraints:
667                    for c in table_c.c.description.constraints:
668                        if isinstance(c, CheckConstraint):
669                            break
670                    else:
671                        assert False
672                    assert str(c.sqltext) == "description='hi'"
673                    for c in table_c.constraints:
674                        if isinstance(c, UniqueConstraint):
675                            break
676                    else:
677                        assert False
678                    assert c.columns.contains_column(table_c.c.name)
679                    assert not c.columns.contains_column(table.c.name)
680
681        finally:
682            meta.drop_all(testing.db)
683
684    def test_col_key_fk_parent(self):
685        # test #2643
686        m1 = MetaData()
687        a = Table('a', m1, Column('x', Integer))
688        b = Table('b', m1, Column('x', Integer, ForeignKey('a.x'), key='y'))
689        assert b.c.y.references(a.c.x)
690
691        m2 = MetaData()
692        b2 = b.tometadata(m2)
693        a2 = a.tometadata(m2)
694        assert b2.c.y.references(a2.c.x)
695
696    def test_change_schema(self):
697        meta = MetaData()
698
699        table = Table('mytable', meta,
700                      Column('myid', Integer, primary_key=True),
701                      Column('name', String(40), nullable=True),
702                      Column('description', String(30),
703                             CheckConstraint("description='hi'")),
704                      UniqueConstraint('name'),
705                      )
706
707        table2 = Table('othertable', meta,
708                       Column('id', Integer, primary_key=True),
709                       Column('myid', Integer, ForeignKey('mytable.myid')),
710                       )
711
712        meta2 = MetaData()
713        table_c = table.tometadata(meta2, schema='someschema')
714        table2_c = table2.tometadata(meta2, schema='someschema')
715
716        eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid
717                                                      == table2_c.c.myid))
718        eq_(str(table_c.join(table2_c).onclause),
719            'someschema.mytable.myid = someschema.othertable.myid')
720
721    def test_retain_table_schema(self):
722        meta = MetaData()
723
724        table = Table('mytable', meta,
725                      Column('myid', Integer, primary_key=True),
726                      Column('name', String(40), nullable=True),
727                      Column('description', String(30),
728                             CheckConstraint("description='hi'")),
729                      UniqueConstraint('name'),
730                      schema='myschema',
731                      )
732
733        table2 = Table(
734            'othertable',
735            meta,
736            Column(
737                'id',
738                Integer,
739                primary_key=True),
740            Column(
741                'myid',
742                Integer,
743                ForeignKey('myschema.mytable.myid')),
744            schema='myschema',
745        )
746
747        meta2 = MetaData()
748        table_c = table.tometadata(meta2)
749        table2_c = table2.tometadata(meta2)
750
751        eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid
752                                                      == table2_c.c.myid))
753        eq_(str(table_c.join(table2_c).onclause),
754            'myschema.mytable.myid = myschema.othertable.myid')
755
756    def test_change_name_retain_metadata(self):
757        meta = MetaData()
758
759        table = Table('mytable', meta,
760                      Column('myid', Integer, primary_key=True),
761                      Column('name', String(40), nullable=True),
762                      Column('description', String(30),
763                             CheckConstraint("description='hi'")),
764                      UniqueConstraint('name'),
765                      schema='myschema',
766                      )
767
768        table2 = table.tometadata(table.metadata, name='newtable')
769        table3 = table.tometadata(table.metadata, schema='newschema',
770                                  name='newtable')
771
772        assert table.metadata is table2.metadata
773        assert table.metadata is table3.metadata
774        eq_((table.name, table2.name, table3.name),
775            ('mytable', 'newtable', 'newtable'))
776        eq_((table.key, table2.key, table3.key),
777            ('myschema.mytable', 'myschema.newtable', 'newschema.newtable'))
778
779    def test_change_name_change_metadata(self):
780        meta = MetaData()
781        meta2 = MetaData()
782
783        table = Table('mytable', meta,
784                      Column('myid', Integer, primary_key=True),
785                      Column('name', String(40), nullable=True),
786                      Column('description', String(30),
787                             CheckConstraint("description='hi'")),
788                      UniqueConstraint('name'),
789                      schema='myschema',
790                      )
791
792        table2 = table.tometadata(meta2, name='newtable')
793
794        assert table.metadata is not table2.metadata
795        eq_((table.name, table2.name),
796            ('mytable', 'newtable'))
797        eq_((table.key, table2.key),
798            ('myschema.mytable', 'myschema.newtable'))
799
800    def test_change_name_selfref_fk_moves(self):
801        meta = MetaData()
802
803        referenced = Table('ref', meta,
804                           Column('id', Integer, primary_key=True),
805                           )
806        table = Table('mytable', meta,
807                      Column('id', Integer, primary_key=True),
808                      Column('parent_id', ForeignKey('mytable.id')),
809                      Column('ref_id', ForeignKey('ref.id'))
810                      )
811
812        table2 = table.tometadata(table.metadata, name='newtable')
813        assert table.metadata is table2.metadata
814        assert table2.c.ref_id.references(referenced.c.id)
815        assert table2.c.parent_id.references(table2.c.id)
816
817    def test_change_name_selfref_fk_moves_w_schema(self):
818        meta = MetaData()
819
820        referenced = Table('ref', meta,
821                           Column('id', Integer, primary_key=True),
822                           )
823        table = Table('mytable', meta,
824                      Column('id', Integer, primary_key=True),
825                      Column('parent_id', ForeignKey('mytable.id')),
826                      Column('ref_id', ForeignKey('ref.id'))
827                      )
828
829        table2 = table.tometadata(
830            table.metadata, name='newtable', schema='newschema')
831        ref2 = referenced.tometadata(table.metadata, schema='newschema')
832        assert table.metadata is table2.metadata
833        assert table2.c.ref_id.references(ref2.c.id)
834        assert table2.c.parent_id.references(table2.c.id)
835
836    def _assert_fk(self, t2, schema, expected, referred_schema_fn=None):
837        m2 = MetaData()
838        existing_schema = t2.schema
839        if schema:
840            t2c = t2.tometadata(m2, schema=schema,
841                                referred_schema_fn=referred_schema_fn)
842            eq_(t2c.schema, schema)
843        else:
844            t2c = t2.tometadata(m2, referred_schema_fn=referred_schema_fn)
845            eq_(t2c.schema, existing_schema)
846        eq_(list(t2c.c.y.foreign_keys)[0]._get_colspec(), expected)
847
848    def test_fk_has_schema_string_retain_schema(self):
849        m = MetaData()
850
851        t2 = Table('t2', m, Column('y', Integer, ForeignKey('q.t1.x')))
852        self._assert_fk(t2, None, "q.t1.x")
853
854        Table('t1', m, Column('x', Integer), schema='q')
855        self._assert_fk(t2, None, "q.t1.x")
856
857    def test_fk_has_schema_string_new_schema(self):
858        m = MetaData()
859
860        t2 = Table('t2', m, Column('y', Integer, ForeignKey('q.t1.x')))
861        self._assert_fk(t2, "z", "q.t1.x")
862
863        Table('t1', m, Column('x', Integer), schema='q')
864        self._assert_fk(t2, "z", "q.t1.x")
865
866    def test_fk_has_schema_col_retain_schema(self):
867        m = MetaData()
868
869        t1 = Table('t1', m, Column('x', Integer), schema='q')
870        t2 = Table('t2', m, Column('y', Integer, ForeignKey(t1.c.x)))
871
872        self._assert_fk(t2, "z", "q.t1.x")
873
874    def test_fk_has_schema_col_new_schema(self):
875        m = MetaData()
876
877        t1 = Table('t1', m, Column('x', Integer), schema='q')
878        t2 = Table('t2', m, Column('y', Integer, ForeignKey(t1.c.x)))
879
880        self._assert_fk(t2, "z", "q.t1.x")
881
882    def test_fk_and_referent_has_same_schema_string_retain_schema(self):
883        m = MetaData()
884
885        t2 = Table('t2', m, Column('y', Integer,
886                                   ForeignKey('q.t1.x')), schema="q")
887
888        self._assert_fk(t2, None, "q.t1.x")
889
890        Table('t1', m, Column('x', Integer), schema='q')
891        self._assert_fk(t2, None, "q.t1.x")
892
893    def test_fk_and_referent_has_same_schema_string_new_schema(self):
894        m = MetaData()
895
896        t2 = Table('t2', m, Column('y', Integer,
897                                   ForeignKey('q.t1.x')), schema="q")
898
899        self._assert_fk(t2, "z", "z.t1.x")
900
901        Table('t1', m, Column('x', Integer), schema='q')
902        self._assert_fk(t2, "z", "z.t1.x")
903
904    def test_fk_and_referent_has_same_schema_col_retain_schema(self):
905        m = MetaData()
906
907        t1 = Table('t1', m, Column('x', Integer), schema='q')
908        t2 = Table('t2', m, Column('y', Integer,
909                                   ForeignKey(t1.c.x)), schema='q')
910        self._assert_fk(t2, None, "q.t1.x")
911
912    def test_fk_and_referent_has_same_schema_col_new_schema(self):
913        m = MetaData()
914
915        t1 = Table('t1', m, Column('x', Integer), schema='q')
916        t2 = Table('t2', m, Column('y', Integer,
917                                   ForeignKey(t1.c.x)), schema='q')
918        self._assert_fk(t2, 'z', "z.t1.x")
919
920    def test_fk_and_referent_has_diff_schema_string_retain_schema(self):
921        m = MetaData()
922
923        t2 = Table('t2', m, Column('y', Integer,
924                                   ForeignKey('p.t1.x')), schema="q")
925
926        self._assert_fk(t2, None, "p.t1.x")
927
928        Table('t1', m, Column('x', Integer), schema='p')
929        self._assert_fk(t2, None, "p.t1.x")
930
931    def test_fk_and_referent_has_diff_schema_string_new_schema(self):
932        m = MetaData()
933
934        t2 = Table('t2', m, Column('y', Integer,
935                                   ForeignKey('p.t1.x')), schema="q")
936
937        self._assert_fk(t2, "z", "p.t1.x")
938
939        Table('t1', m, Column('x', Integer), schema='p')
940        self._assert_fk(t2, "z", "p.t1.x")
941
942    def test_fk_and_referent_has_diff_schema_col_retain_schema(self):
943        m = MetaData()
944
945        t1 = Table('t1', m, Column('x', Integer), schema='p')
946        t2 = Table('t2', m, Column('y', Integer,
947                                   ForeignKey(t1.c.x)), schema='q')
948        self._assert_fk(t2, None, "p.t1.x")
949
950    def test_fk_and_referent_has_diff_schema_col_new_schema(self):
951        m = MetaData()
952
953        t1 = Table('t1', m, Column('x', Integer), schema='p')
954        t2 = Table('t2', m, Column('y', Integer,
955                                   ForeignKey(t1.c.x)), schema='q')
956        self._assert_fk(t2, 'z', "p.t1.x")
957
958    def test_fk_custom_system(self):
959        m = MetaData()
960        t2 = Table('t2', m, Column('y', Integer,
961                                   ForeignKey('p.t1.x')), schema='q')
962
963        def ref_fn(table, to_schema, constraint, referred_schema):
964            assert table is t2
965            eq_(to_schema, "z")
966            eq_(referred_schema, "p")
967            return "h"
968        self._assert_fk(t2, 'z', "h.t1.x", referred_schema_fn=ref_fn)
969
970    def test_copy_info(self):
971        m = MetaData()
972        fk = ForeignKey('t2.id')
973        c = Column('c', Integer, fk)
974        ck = CheckConstraint('c > 5')
975        t = Table('t', m, c, ck)
976
977        m.info['minfo'] = True
978        fk.info['fkinfo'] = True
979        c.info['cinfo'] = True
980        ck.info['ckinfo'] = True
981        t.info['tinfo'] = True
982        t.primary_key.info['pkinfo'] = True
983        fkc = [const for const in t.constraints if
984               isinstance(const, ForeignKeyConstraint)][0]
985        fkc.info['fkcinfo'] = True
986
987        m2 = MetaData()
988        t2 = t.tometadata(m2)
989
990        m.info['minfo'] = False
991        fk.info['fkinfo'] = False
992        c.info['cinfo'] = False
993        ck.info['ckinfo'] = False
994        t.primary_key.info['pkinfo'] = False
995        fkc.info['fkcinfo'] = False
996
997        eq_(m2.info, {})
998        eq_(t2.info, {"tinfo": True})
999        eq_(t2.c.c.info, {"cinfo": True})
1000        eq_(list(t2.c.c.foreign_keys)[0].info, {"fkinfo": True})
1001        eq_(t2.primary_key.info, {"pkinfo": True})
1002
1003        fkc2 = [const for const in t2.constraints
1004                if isinstance(const, ForeignKeyConstraint)][0]
1005        eq_(fkc2.info, {"fkcinfo": True})
1006
1007        ck2 = [const for const in
1008               t2.constraints if isinstance(const, CheckConstraint)][0]
1009        eq_(ck2.info, {"ckinfo": True})
1010
1011    def test_dialect_kwargs(self):
1012        meta = MetaData()
1013
1014        table = Table('mytable', meta,
1015                      Column('myid', Integer, primary_key=True),
1016                      mysql_engine='InnoDB',
1017                      )
1018
1019        meta2 = MetaData()
1020        table_c = table.tometadata(meta2)
1021
1022        eq_(table.kwargs, {"mysql_engine": "InnoDB"})
1023
1024        eq_(table.kwargs, table_c.kwargs)
1025
1026    def test_indexes(self):
1027        meta = MetaData()
1028
1029        table = Table('mytable', meta,
1030                      Column('id', Integer, primary_key=True),
1031                      Column('data1', Integer, index=True),
1032                      Column('data2', Integer),
1033                      )
1034        Index('multi', table.c.data1, table.c.data2),
1035
1036        meta2 = MetaData()
1037        table_c = table.tometadata(meta2)
1038
1039        def _get_key(i):
1040            return [i.name, i.unique] + \
1041                sorted(i.kwargs.items()) + \
1042                list(i.columns.keys())
1043
1044        eq_(
1045            sorted([_get_key(i) for i in table.indexes]),
1046            sorted([_get_key(i) for i in table_c.indexes])
1047        )
1048
1049    @emits_warning("Table '.+' already exists within the given MetaData")
1050    def test_already_exists(self):
1051
1052        meta1 = MetaData()
1053        table1 = Table('mytable', meta1,
1054                       Column('myid', Integer, primary_key=True),
1055                       )
1056        meta2 = MetaData()
1057        table2 = Table('mytable', meta2,
1058                       Column('yourid', Integer, primary_key=True),
1059                       )
1060
1061        table_c = table1.tometadata(meta2)
1062        table_d = table2.tometadata(meta2)
1063
1064        # d'oh!
1065        assert table_c is table_d
1066
1067    def test_default_schema_metadata(self):
1068        meta = MetaData(schema='myschema')
1069
1070        table = Table(
1071            'mytable',
1072            meta,
1073            Column(
1074                'myid',
1075                Integer,
1076                primary_key=True),
1077            Column(
1078                'name',
1079                String(40),
1080                nullable=True),
1081            Column(
1082                'description',
1083                String(30),
1084                CheckConstraint("description='hi'")),
1085            UniqueConstraint('name'),
1086        )
1087
1088        table2 = Table(
1089            'othertable', meta, Column(
1090                'id', Integer, primary_key=True), Column(
1091                'myid', Integer, ForeignKey('myschema.mytable.myid')), )
1092
1093        meta2 = MetaData(schema='someschema')
1094        table_c = table.tometadata(meta2, schema=None)
1095        table2_c = table2.tometadata(meta2, schema=None)
1096
1097        eq_(str(table_c.join(table2_c).onclause),
1098            str(table_c.c.myid == table2_c.c.myid))
1099        eq_(str(table_c.join(table2_c).onclause),
1100            "someschema.mytable.myid = someschema.othertable.myid")
1101
1102    def test_strip_schema(self):
1103        meta = MetaData()
1104
1105        table = Table('mytable', meta,
1106                      Column('myid', Integer, primary_key=True),
1107                      Column('name', String(40), nullable=True),
1108                      Column('description', String(30),
1109                             CheckConstraint("description='hi'")),
1110                      UniqueConstraint('name'),
1111                      )
1112
1113        table2 = Table('othertable', meta,
1114                       Column('id', Integer, primary_key=True),
1115                       Column('myid', Integer, ForeignKey('mytable.myid')),
1116                       )
1117
1118        meta2 = MetaData()
1119        table_c = table.tometadata(meta2, schema=None)
1120        table2_c = table2.tometadata(meta2, schema=None)
1121
1122        eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid
1123                                                      == table2_c.c.myid))
1124        eq_(str(table_c.join(table2_c).onclause),
1125            'mytable.myid = othertable.myid')
1126
1127    def test_unique_true_flag(self):
1128        meta = MetaData()
1129
1130        table = Table('mytable', meta, Column('x', Integer, unique=True))
1131
1132        m2 = MetaData()
1133
1134        t2 = table.tometadata(m2)
1135
1136        eq_(
1137            len([
1138                const for const
1139                in t2.constraints
1140                if isinstance(const, UniqueConstraint)]),
1141            1
1142        )
1143
1144    def test_index_true_flag(self):
1145        meta = MetaData()
1146
1147        table = Table('mytable', meta, Column('x', Integer, index=True))
1148
1149        m2 = MetaData()
1150
1151        t2 = table.tometadata(m2)
1152
1153        eq_(len(t2.indexes), 1)
1154
1155
1156class InfoTest(fixtures.TestBase):
1157    def test_metadata_info(self):
1158        m1 = MetaData()
1159        eq_(m1.info, {})
1160
1161        m1 = MetaData(info={"foo": "bar"})
1162        eq_(m1.info, {"foo": "bar"})
1163
1164    def test_foreignkey_constraint_info(self):
1165        fkc = ForeignKeyConstraint(['a'], ['b'], name='bar')
1166        eq_(fkc.info, {})
1167
1168        fkc = ForeignKeyConstraint(
1169            ['a'], ['b'], name='bar', info={"foo": "bar"})
1170        eq_(fkc.info, {"foo": "bar"})
1171
1172    def test_foreignkey_info(self):
1173        fkc = ForeignKey('a')
1174        eq_(fkc.info, {})
1175
1176        fkc = ForeignKey('a', info={"foo": "bar"})
1177        eq_(fkc.info, {"foo": "bar"})
1178
1179    def test_primarykey_constraint_info(self):
1180        pkc = PrimaryKeyConstraint('a', name='x')
1181        eq_(pkc.info, {})
1182
1183        pkc = PrimaryKeyConstraint('a', name='x', info={'foo': 'bar'})
1184        eq_(pkc.info, {'foo': 'bar'})
1185
1186    def test_unique_constraint_info(self):
1187        uc = UniqueConstraint('a', name='x')
1188        eq_(uc.info, {})
1189
1190        uc = UniqueConstraint('a', name='x', info={'foo': 'bar'})
1191        eq_(uc.info, {'foo': 'bar'})
1192
1193    def test_check_constraint_info(self):
1194        cc = CheckConstraint('foo=bar', name='x')
1195        eq_(cc.info, {})
1196
1197        cc = CheckConstraint('foo=bar', name='x', info={'foo': 'bar'})
1198        eq_(cc.info, {'foo': 'bar'})
1199
1200    def test_index_info(self):
1201        ix = Index('x', 'a')
1202        eq_(ix.info, {})
1203
1204        ix = Index('x', 'a', info={'foo': 'bar'})
1205        eq_(ix.info, {'foo': 'bar'})
1206
1207    def test_column_info(self):
1208        c = Column('x', Integer)
1209        eq_(c.info, {})
1210
1211        c = Column('x', Integer, info={'foo': 'bar'})
1212        eq_(c.info, {'foo': 'bar'})
1213
1214    def test_table_info(self):
1215        t = Table('x', MetaData())
1216        eq_(t.info, {})
1217
1218        t = Table('x', MetaData(), info={'foo': 'bar'})
1219        eq_(t.info, {'foo': 'bar'})
1220
1221
1222class TableTest(fixtures.TestBase, AssertsCompiledSQL):
1223
1224    @testing.requires.temporary_tables
1225    @testing.skip_if('mssql', 'different col format')
1226    def test_prefixes(self):
1227        from sqlalchemy import Table
1228        table1 = Table("temporary_table_1", MetaData(),
1229                       Column("col1", Integer),
1230                       prefixes=["TEMPORARY"])
1231
1232        self.assert_compile(
1233            schema.CreateTable(table1),
1234            "CREATE TEMPORARY TABLE temporary_table_1 (col1 INTEGER)"
1235        )
1236
1237        table2 = Table("temporary_table_2", MetaData(),
1238                       Column("col1", Integer),
1239                       prefixes=["VIRTUAL"])
1240        self.assert_compile(
1241            schema.CreateTable(table2),
1242            "CREATE VIRTUAL TABLE temporary_table_2 (col1 INTEGER)"
1243        )
1244
1245    def test_table_info(self):
1246        metadata = MetaData()
1247        t1 = Table('foo', metadata, info={'x': 'y'})
1248        t2 = Table('bar', metadata, info={})
1249        t3 = Table('bat', metadata)
1250        assert t1.info == {'x': 'y'}
1251        assert t2.info == {}
1252        assert t3.info == {}
1253        for t in (t1, t2, t3):
1254            t.info['bar'] = 'zip'
1255            assert t.info['bar'] == 'zip'
1256
1257    def test_reset_exported_passes(self):
1258
1259        m = MetaData()
1260
1261        t = Table('t', m, Column('foo', Integer))
1262        eq_(
1263            list(t.c), [t.c.foo]
1264        )
1265
1266        t._reset_exported()
1267
1268        eq_(
1269            list(t.c), [t.c.foo]
1270        )
1271
1272    def test_foreign_key_constraints_collection(self):
1273        metadata = MetaData()
1274        t1 = Table('foo', metadata, Column('a', Integer))
1275        eq_(t1.foreign_key_constraints, set())
1276
1277        fk1 = ForeignKey('q.id')
1278        fk2 = ForeignKey('j.id')
1279        fk3 = ForeignKeyConstraint(['b', 'c'], ['r.x', 'r.y'])
1280
1281        t1.append_column(Column('b', Integer, fk1))
1282        eq_(
1283            t1.foreign_key_constraints,
1284            set([fk1.constraint]))
1285
1286        t1.append_column(Column('c', Integer, fk2))
1287        eq_(
1288            t1.foreign_key_constraints,
1289            set([fk1.constraint, fk2.constraint]))
1290
1291        t1.append_constraint(fk3)
1292        eq_(
1293            t1.foreign_key_constraints,
1294            set([fk1.constraint, fk2.constraint, fk3]))
1295
1296    def test_c_immutable(self):
1297        m = MetaData()
1298        t1 = Table('t', m, Column('x', Integer), Column('y', Integer))
1299        assert_raises(
1300            TypeError,
1301            t1.c.extend, [Column('z', Integer)]
1302        )
1303
1304        def assign():
1305            t1.c['z'] = Column('z', Integer)
1306        assert_raises(
1307            TypeError,
1308            assign
1309        )
1310
1311        def assign2():
1312            t1.c.z = Column('z', Integer)
1313        assert_raises(
1314            TypeError,
1315            assign2
1316        )
1317
1318    def test_c_mutate_after_unpickle(self):
1319        m = MetaData()
1320
1321        y = Column('y', Integer)
1322        t1 = Table('t', m, Column('x', Integer), y)
1323
1324        t2 = pickle.loads(pickle.dumps(t1))
1325        z = Column('z', Integer)
1326        g = Column('g', Integer)
1327        t2.append_column(z)
1328
1329        is_(t1.c.contains_column(y), True)
1330        is_(t2.c.contains_column(y), False)
1331        y2 = t2.c.y
1332        is_(t2.c.contains_column(y2), True)
1333
1334        is_(t2.c.contains_column(z), True)
1335        is_(t2.c.contains_column(g), False)
1336
1337    def test_autoincrement_replace(self):
1338        m = MetaData()
1339
1340        t = Table('t', m,
1341                  Column('id', Integer, primary_key=True)
1342                  )
1343
1344        is_(t._autoincrement_column, t.c.id)
1345
1346        t = Table('t', m,
1347                  Column('id', Integer, primary_key=True),
1348                  extend_existing=True
1349                  )
1350        is_(t._autoincrement_column, t.c.id)
1351
1352    def test_pk_args_standalone(self):
1353        m = MetaData()
1354        t = Table('t', m,
1355                  Column('x', Integer, primary_key=True),
1356                  PrimaryKeyConstraint(mssql_clustered=True)
1357                  )
1358        eq_(
1359            list(t.primary_key), [t.c.x]
1360        )
1361        eq_(
1362            t.primary_key.dialect_kwargs, {"mssql_clustered": True}
1363        )
1364
1365    def test_pk_cols_sets_flags(self):
1366        m = MetaData()
1367        t = Table('t', m,
1368                  Column('x', Integer),
1369                  Column('y', Integer),
1370                  Column('z', Integer),
1371                  PrimaryKeyConstraint('x', 'y')
1372                  )
1373        eq_(t.c.x.primary_key, True)
1374        eq_(t.c.y.primary_key, True)
1375        eq_(t.c.z.primary_key, False)
1376
1377    def test_pk_col_mismatch_one(self):
1378        m = MetaData()
1379        assert_raises_message(
1380            exc.SAWarning,
1381            "Table 't' specifies columns 'x' as primary_key=True, "
1382            "not matching locally specified columns 'q'",
1383            Table, 't', m,
1384            Column('x', Integer, primary_key=True),
1385            Column('q', Integer),
1386            PrimaryKeyConstraint('q')
1387        )
1388
1389    def test_pk_col_mismatch_two(self):
1390        m = MetaData()
1391        assert_raises_message(
1392            exc.SAWarning,
1393            "Table 't' specifies columns 'a', 'b', 'c' as primary_key=True, "
1394            "not matching locally specified columns 'b', 'c'",
1395            Table, 't', m,
1396            Column('a', Integer, primary_key=True),
1397            Column('b', Integer, primary_key=True),
1398            Column('c', Integer, primary_key=True),
1399            PrimaryKeyConstraint('b', 'c')
1400        )
1401
1402    @testing.emits_warning("Table 't'")
1403    def test_pk_col_mismatch_three(self):
1404        m = MetaData()
1405        t = Table('t', m,
1406                  Column('x', Integer, primary_key=True),
1407                  Column('q', Integer),
1408                  PrimaryKeyConstraint('q')
1409                  )
1410        eq_(list(t.primary_key), [t.c.q])
1411
1412    @testing.emits_warning("Table 't'")
1413    def test_pk_col_mismatch_four(self):
1414        m = MetaData()
1415        t = Table('t', m,
1416                  Column('a', Integer, primary_key=True),
1417                  Column('b', Integer, primary_key=True),
1418                  Column('c', Integer, primary_key=True),
1419                  PrimaryKeyConstraint('b', 'c')
1420                  )
1421        eq_(list(t.primary_key), [t.c.b, t.c.c])
1422
1423    def test_pk_always_flips_nullable(self):
1424        m = MetaData()
1425
1426        t1 = Table('t1', m, Column('x', Integer), PrimaryKeyConstraint('x'))
1427
1428        t2 = Table('t2', m, Column('x', Integer, primary_key=True))
1429
1430        eq_(list(t1.primary_key), [t1.c.x])
1431
1432        eq_(list(t2.primary_key), [t2.c.x])
1433
1434        assert t1.c.x.primary_key
1435        assert t2.c.x.primary_key
1436
1437        assert not t2.c.x.nullable
1438        assert not t1.c.x.nullable
1439
1440
1441class PKAutoIncrementTest(fixtures.TestBase):
1442    def test_multi_integer_no_autoinc(self):
1443        pk = PrimaryKeyConstraint(
1444            Column('a', Integer),
1445            Column('b', Integer)
1446        )
1447        t = Table('t', MetaData())
1448        t.append_constraint(pk)
1449
1450        is_(pk._autoincrement_column, None)
1451
1452    def test_multi_integer_multi_autoinc(self):
1453        pk = PrimaryKeyConstraint(
1454            Column('a', Integer, autoincrement=True),
1455            Column('b', Integer, autoincrement=True)
1456        )
1457        t = Table('t', MetaData())
1458        t.append_constraint(pk)
1459
1460        assert_raises_message(
1461            exc.ArgumentError,
1462            "Only one Column may be marked",
1463            lambda: pk._autoincrement_column
1464        )
1465
1466    def test_single_integer_no_autoinc(self):
1467        pk = PrimaryKeyConstraint(
1468            Column('a', Integer),
1469        )
1470        t = Table('t', MetaData())
1471        t.append_constraint(pk)
1472
1473        is_(pk._autoincrement_column, pk.columns['a'])
1474
1475    def test_single_string_no_autoinc(self):
1476        pk = PrimaryKeyConstraint(
1477            Column('a', String),
1478        )
1479        t = Table('t', MetaData())
1480        t.append_constraint(pk)
1481
1482        is_(pk._autoincrement_column, None)
1483
1484    def test_single_string_illegal_autoinc(self):
1485        t = Table('t', MetaData(), Column('a', String, autoincrement=True))
1486        pk = PrimaryKeyConstraint(
1487            t.c.a
1488        )
1489        t.append_constraint(pk)
1490
1491        assert_raises_message(
1492            exc.ArgumentError,
1493            "Column type VARCHAR on column 't.a'",
1494            lambda: pk._autoincrement_column
1495        )
1496
1497    def test_single_integer_default(self):
1498        t = Table(
1499            't', MetaData(),
1500            Column('a', Integer, autoincrement=True, default=lambda: 1))
1501        pk = PrimaryKeyConstraint(
1502            t.c.a
1503        )
1504        t.append_constraint(pk)
1505
1506        is_(pk._autoincrement_column, t.c.a)
1507
1508    def test_single_integer_server_default(self):
1509        # new as of 1.1; now that we have three states for autoincrement,
1510        # if the user puts autoincrement=True with a server_default, trust
1511        # them on it
1512        t = Table(
1513            't', MetaData(),
1514            Column('a', Integer,
1515                   autoincrement=True, server_default=func.magic()))
1516        pk = PrimaryKeyConstraint(
1517            t.c.a
1518        )
1519        t.append_constraint(pk)
1520
1521        is_(pk._autoincrement_column, t.c.a)
1522
1523    def test_implicit_autoinc_but_fks(self):
1524        m = MetaData()
1525        Table('t1', m, Column('id', Integer, primary_key=True))
1526        t2 = Table(
1527            't2', MetaData(),
1528            Column('a', Integer, ForeignKey('t1.id')))
1529        pk = PrimaryKeyConstraint(
1530            t2.c.a
1531        )
1532        t2.append_constraint(pk)
1533        is_(pk._autoincrement_column, None)
1534
1535    def test_explicit_autoinc_but_fks(self):
1536        m = MetaData()
1537        Table('t1', m, Column('id', Integer, primary_key=True))
1538        t2 = Table(
1539            't2', MetaData(),
1540            Column('a', Integer, ForeignKey('t1.id'), autoincrement=True))
1541        pk = PrimaryKeyConstraint(
1542            t2.c.a
1543        )
1544        t2.append_constraint(pk)
1545        is_(pk._autoincrement_column, t2.c.a)
1546
1547        t3 = Table(
1548            't3', MetaData(),
1549            Column('a', Integer,
1550                   ForeignKey('t1.id'), autoincrement='ignore_fk'))
1551        pk = PrimaryKeyConstraint(
1552            t3.c.a
1553        )
1554        t3.append_constraint(pk)
1555        is_(pk._autoincrement_column, t3.c.a)
1556
1557
1558class SchemaTypeTest(fixtures.TestBase):
1559    __backend__ = True
1560
1561    class TrackEvents(object):
1562        column = None
1563        table = None
1564        evt_targets = ()
1565
1566        def _set_table(self, column, table):
1567            super(SchemaTypeTest.TrackEvents, self)._set_table(column, table)
1568            self.column = column
1569            self.table = table
1570
1571        def _on_table_create(self, target, bind, **kw):
1572            super(SchemaTypeTest.TrackEvents, self)._on_table_create(
1573                target, bind, **kw)
1574            self.evt_targets += (target,)
1575
1576        def _on_metadata_create(self, target, bind, **kw):
1577            super(SchemaTypeTest.TrackEvents, self)._on_metadata_create(
1578                target, bind, **kw)
1579            self.evt_targets += (target,)
1580
1581    # TODO: Enum and Boolean put TypeEngine first.  Changing that here
1582    # causes collection-mutate-while-iterated errors in the event system
1583    # since the hooks here call upon the adapted type.  Need to figure out
1584    # why Enum and Boolean don't have this problem.
1585    class MyType(TrackEvents, sqltypes.SchemaType, sqltypes.TypeEngine):
1586        pass
1587
1588    class WrapEnum(TrackEvents, Enum):
1589        pass
1590
1591    class WrapBoolean(TrackEvents, Boolean):
1592        pass
1593
1594    class MyTypeWImpl(MyType):
1595
1596        def _gen_dialect_impl(self, dialect):
1597            return self.adapt(SchemaTypeTest.MyTypeImpl)
1598
1599    class MyTypeImpl(MyTypeWImpl):
1600        pass
1601
1602    class MyTypeDecAndSchema(TypeDecorator, sqltypes.SchemaType):
1603        impl = String()
1604
1605        evt_targets = ()
1606
1607        def __init__(self):
1608            TypeDecorator.__init__(self)
1609            sqltypes.SchemaType.__init__(self)
1610
1611        def _on_table_create(self, target, bind, **kw):
1612            self.evt_targets += (target,)
1613
1614        def _on_metadata_create(self, target, bind, **kw):
1615            self.evt_targets += (target,)
1616
1617    def test_before_parent_attach_plain(self):
1618        typ = self.MyType()
1619        self._test_before_parent_attach(typ)
1620
1621    def test_before_parent_attach_typedec_enclosing_schematype(self):
1622        # additional test for [ticket:2919] as part of test for
1623        # [ticket:3832]
1624
1625        class MySchemaType(sqltypes.TypeEngine, sqltypes.SchemaType):
1626            pass
1627
1628        target_typ = MySchemaType()
1629
1630        class MyType(TypeDecorator):
1631            impl = target_typ
1632
1633        typ = MyType()
1634        self._test_before_parent_attach(typ, target_typ, double=True)
1635
1636    def test_before_parent_attach_typedec_of_schematype(self):
1637        class MyType(TypeDecorator, sqltypes.SchemaType):
1638            impl = String
1639
1640        typ = MyType()
1641        self._test_before_parent_attach(typ)
1642
1643    def test_before_parent_attach_schematype_of_typedec(self):
1644        class MyType(sqltypes.SchemaType, TypeDecorator):
1645            impl = String
1646
1647        typ = MyType()
1648        self._test_before_parent_attach(typ)
1649
1650    def _test_before_parent_attach(self, typ, evt_target=None, double=False):
1651        canary = mock.Mock()
1652
1653        if evt_target is None:
1654            evt_target = typ
1655
1656        orig_set_parent = evt_target._set_parent
1657        orig_set_parent_w_dispatch = evt_target._set_parent_with_dispatch
1658
1659        def _set_parent(parent):
1660            orig_set_parent(parent)
1661            canary._set_parent(parent)
1662
1663        def _set_parent_w_dispatch(parent):
1664            orig_set_parent_w_dispatch(parent)
1665            canary._set_parent_with_dispatch(parent)
1666
1667        with mock.patch.object(evt_target, '_set_parent', _set_parent):
1668            with mock.patch.object(
1669                    evt_target, '_set_parent_with_dispatch',
1670                    _set_parent_w_dispatch):
1671                event.listen(evt_target, "before_parent_attach", canary.go)
1672
1673                c = Column('q', typ)
1674
1675        if double:
1676            # no clean way yet to fix this, inner schema type is called
1677            # twice, but this is a very unusual use case.
1678            eq_(
1679                canary.mock_calls,
1680                [
1681                    mock.call._set_parent(c),
1682                    mock.call.go(evt_target, c),
1683                    mock.call._set_parent(c),
1684                    mock.call._set_parent_with_dispatch(c)
1685                ]
1686            )
1687        else:
1688            eq_(
1689                canary.mock_calls,
1690                [
1691                    mock.call.go(evt_target, c),
1692                    mock.call._set_parent(c),
1693                    mock.call._set_parent_with_dispatch(c)
1694                ]
1695            )
1696
1697    def test_independent_schema(self):
1698        m = MetaData()
1699        type_ = self.MyType(schema="q")
1700        t1 = Table('x', m, Column("y", type_), schema="z")
1701        eq_(t1.c.y.type.schema, "q")
1702
1703    def test_inherit_schema(self):
1704        m = MetaData()
1705        type_ = self.MyType(schema="q", inherit_schema=True)
1706        t1 = Table('x', m, Column("y", type_), schema="z")
1707        eq_(t1.c.y.type.schema, "z")
1708
1709    def test_independent_schema_enum(self):
1710        m = MetaData()
1711        type_ = sqltypes.Enum("a", schema="q")
1712        t1 = Table('x', m, Column("y", type_), schema="z")
1713        eq_(t1.c.y.type.schema, "q")
1714
1715    def test_inherit_schema_enum(self):
1716        m = MetaData()
1717        type_ = sqltypes.Enum("a", "b", "c", schema="q", inherit_schema=True)
1718        t1 = Table('x', m, Column("y", type_), schema="z")
1719        eq_(t1.c.y.type.schema, "z")
1720
1721    def test_tometadata_copy_type(self):
1722        m1 = MetaData()
1723
1724        type_ = self.MyType()
1725        t1 = Table('x', m1, Column("y", type_))
1726
1727        m2 = MetaData()
1728        t2 = t1.tometadata(m2)
1729
1730        # metadata isn't set
1731        is_(t2.c.y.type.metadata, None)
1732
1733        # our test type sets table, though
1734        is_(t2.c.y.type.table, t2)
1735
1736    def test_tometadata_copy_decorated(self):
1737
1738        class MyDecorated(TypeDecorator):
1739            impl = self.MyType
1740
1741        m1 = MetaData()
1742
1743        type_ = MyDecorated(schema="z")
1744        t1 = Table('x', m1, Column("y", type_))
1745
1746        m2 = MetaData()
1747        t2 = t1.tometadata(m2)
1748        eq_(t2.c.y.type.schema, "z")
1749
1750    def test_tometadata_independent_schema(self):
1751        m1 = MetaData()
1752
1753        type_ = self.MyType()
1754        t1 = Table('x', m1, Column("y", type_))
1755
1756        m2 = MetaData()
1757        t2 = t1.tometadata(m2, schema="bar")
1758
1759        eq_(t2.c.y.type.schema, None)
1760
1761    def test_tometadata_inherit_schema(self):
1762        m1 = MetaData()
1763
1764        type_ = self.MyType(inherit_schema=True)
1765        t1 = Table('x', m1, Column("y", type_))
1766
1767        m2 = MetaData()
1768        t2 = t1.tometadata(m2, schema="bar")
1769
1770        eq_(t1.c.y.type.schema, None)
1771        eq_(t2.c.y.type.schema, "bar")
1772
1773    def test_tometadata_independent_events(self):
1774        m1 = MetaData()
1775
1776        type_ = self.MyType()
1777        t1 = Table('x', m1, Column("y", type_))
1778
1779        m2 = MetaData()
1780        t2 = t1.tometadata(m2)
1781
1782        t1.dispatch.before_create(t1, testing.db)
1783        eq_(t1.c.y.type.evt_targets, (t1,))
1784        eq_(t2.c.y.type.evt_targets, ())
1785
1786        t2.dispatch.before_create(t2, testing.db)
1787        t2.dispatch.before_create(t2, testing.db)
1788        eq_(t1.c.y.type.evt_targets, (t1,))
1789        eq_(t2.c.y.type.evt_targets, (t2, t2))
1790
1791    def test_enum_column_copy_transfers_events(self):
1792        m = MetaData()
1793
1794        type_ = self.WrapEnum('a', 'b', 'c', name='foo')
1795        y = Column('y', type_)
1796        y_copy = y.copy()
1797        t1 = Table('x', m, y_copy)
1798
1799        is_true(y_copy.type._create_events)
1800
1801        m.dispatch.before_create(t1, testing.db)
1802        eq_(t1.c.y.type.evt_targets, (t1, ))
1803
1804    def test_boolean_column_copy_transfers_events(self):
1805        m = MetaData()
1806
1807        type_ = self.WrapBoolean()
1808        y = Column('y', type_)
1809        y_copy = y.copy()
1810        t1 = Table('x', m, y_copy)
1811
1812        is_true(y_copy.type._create_events)
1813
1814    def test_metadata_dispatch_no_new_impl(self):
1815        m1 = MetaData()
1816        typ = self.MyType(metadata=m1)
1817        m1.dispatch.before_create(m1, testing.db)
1818        eq_(typ.evt_targets, (m1, ))
1819
1820        dialect_impl = typ.dialect_impl(testing.db.dialect)
1821        eq_(dialect_impl.evt_targets, ())
1822
1823    def test_metadata_dispatch_new_impl(self):
1824        m1 = MetaData()
1825        typ = self.MyTypeWImpl(metadata=m1)
1826        m1.dispatch.before_create(m1, testing.db)
1827        eq_(typ.evt_targets, (m1, ))
1828
1829        dialect_impl = typ.dialect_impl(testing.db.dialect)
1830        eq_(dialect_impl.evt_targets, (m1, ))
1831
1832    def test_table_dispatch_decorator_schematype(self):
1833        m1 = MetaData()
1834        typ = self.MyTypeDecAndSchema()
1835        t1 = Table('t1', m1, Column('x', typ))
1836        m1.dispatch.before_create(t1, testing.db)
1837        eq_(typ.evt_targets, (t1, ))
1838
1839    def test_table_dispatch_no_new_impl(self):
1840        m1 = MetaData()
1841        typ = self.MyType()
1842        t1 = Table('t1', m1, Column('x', typ))
1843        m1.dispatch.before_create(t1, testing.db)
1844        eq_(typ.evt_targets, (t1, ))
1845
1846        dialect_impl = typ.dialect_impl(testing.db.dialect)
1847        eq_(dialect_impl.evt_targets, ())
1848
1849    def test_table_dispatch_new_impl(self):
1850        m1 = MetaData()
1851        typ = self.MyTypeWImpl()
1852        t1 = Table('t1', m1, Column('x', typ))
1853        m1.dispatch.before_create(t1, testing.db)
1854        eq_(typ.evt_targets, (t1, ))
1855
1856        dialect_impl = typ.dialect_impl(testing.db.dialect)
1857        eq_(dialect_impl.evt_targets, (t1, ))
1858
1859    def test_create_metadata_bound_no_crash(self):
1860        m1 = MetaData()
1861        self.MyType(metadata=m1)
1862
1863        m1.create_all(testing.db)
1864
1865    def test_boolean_constraint_type_doesnt_double(self):
1866        m1 = MetaData()
1867
1868        t1 = Table('x', m1, Column("flag", Boolean()))
1869        eq_(
1870            len([
1871                c for c in t1.constraints
1872                if isinstance(c, CheckConstraint)]),
1873            1
1874        )
1875        m2 = MetaData()
1876        t2 = t1.tometadata(m2)
1877
1878        eq_(
1879            len([
1880                c for c in t2.constraints
1881                if isinstance(c, CheckConstraint)]),
1882            1
1883        )
1884
1885    def test_enum_constraint_type_doesnt_double(self):
1886        m1 = MetaData()
1887
1888        t1 = Table('x', m1, Column("flag", Enum('a', 'b', 'c')))
1889        eq_(
1890            len([
1891                c for c in t1.constraints
1892                if isinstance(c, CheckConstraint)]),
1893            1
1894        )
1895        m2 = MetaData()
1896        t2 = t1.tometadata(m2)
1897
1898        eq_(
1899            len([
1900                c for c in t2.constraints
1901                if isinstance(c, CheckConstraint)]),
1902            1
1903        )
1904
1905
1906class SchemaTest(fixtures.TestBase, AssertsCompiledSQL):
1907
1908    def test_default_schema_metadata_fk(self):
1909        m = MetaData(schema="foo")
1910        t1 = Table('t1', m, Column('x', Integer))
1911        t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')))
1912        assert t2.c.x.references(t1.c.x)
1913
1914    def test_ad_hoc_schema_equiv_fk(self):
1915        m = MetaData()
1916        t1 = Table('t1', m, Column('x', Integer), schema="foo")
1917        t2 = Table(
1918            't2',
1919            m,
1920            Column(
1921                'x',
1922                Integer,
1923                ForeignKey('t1.x')),
1924            schema="foo")
1925        assert_raises(
1926            exc.NoReferencedTableError,
1927            lambda: t2.c.x.references(t1.c.x)
1928        )
1929
1930    def test_default_schema_metadata_fk_alt_remote(self):
1931        m = MetaData(schema="foo")
1932        t1 = Table('t1', m, Column('x', Integer))
1933        t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')),
1934                   schema="bar")
1935        assert t2.c.x.references(t1.c.x)
1936
1937    def test_default_schema_metadata_fk_alt_local_raises(self):
1938        m = MetaData(schema="foo")
1939        t1 = Table('t1', m, Column('x', Integer), schema="bar")
1940        t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')))
1941        assert_raises(
1942            exc.NoReferencedTableError,
1943            lambda: t2.c.x.references(t1.c.x)
1944        )
1945
1946    def test_default_schema_metadata_fk_alt_local(self):
1947        m = MetaData(schema="foo")
1948        t1 = Table('t1', m, Column('x', Integer), schema="bar")
1949        t2 = Table('t2', m, Column('x', Integer, ForeignKey('bar.t1.x')))
1950        assert t2.c.x.references(t1.c.x)
1951
1952    def test_create_drop_schema(self):
1953
1954        self.assert_compile(
1955            schema.CreateSchema("sa_schema"),
1956            "CREATE SCHEMA sa_schema"
1957        )
1958        self.assert_compile(
1959            schema.DropSchema("sa_schema"),
1960            "DROP SCHEMA sa_schema"
1961        )
1962        self.assert_compile(
1963            schema.DropSchema("sa_schema", cascade=True),
1964            "DROP SCHEMA sa_schema CASCADE"
1965        )
1966
1967    def test_iteration(self):
1968        metadata = MetaData()
1969        table1 = Table(
1970            'table1',
1971            metadata,
1972            Column(
1973                'col1',
1974                Integer,
1975                primary_key=True),
1976            schema='someschema')
1977        table2 = Table(
1978            'table2',
1979            metadata,
1980            Column(
1981                'col1',
1982                Integer,
1983                primary_key=True),
1984            Column(
1985                'col2',
1986                Integer,
1987                ForeignKey('someschema.table1.col1')),
1988            schema='someschema')
1989
1990        t1 = str(schema.CreateTable(table1).compile(bind=testing.db))
1991        t2 = str(schema.CreateTable(table2).compile(bind=testing.db))
1992        if testing.db.dialect.preparer(testing.db.dialect).omit_schema:
1993            assert t1.index("CREATE TABLE table1") > -1
1994            assert t2.index("CREATE TABLE table2") > -1
1995        else:
1996            assert t1.index("CREATE TABLE someschema.table1") > -1
1997            assert t2.index("CREATE TABLE someschema.table2") > -1
1998
1999
2000class UseExistingTest(fixtures.TablesTest):
2001
2002    @classmethod
2003    def define_tables(cls, metadata):
2004        Table('users', metadata,
2005              Column('id', Integer, primary_key=True),
2006              Column('name', String(30)))
2007
2008    def _useexisting_fixture(self):
2009        meta2 = MetaData(testing.db)
2010        Table('users', meta2, autoload=True)
2011        return meta2
2012
2013    def _notexisting_fixture(self):
2014        return MetaData(testing.db)
2015
2016    def test_exception_no_flags(self):
2017        meta2 = self._useexisting_fixture()
2018
2019        def go():
2020            Table('users', meta2, Column('name',
2021                                         Unicode), autoload=True)
2022        assert_raises_message(
2023            exc.InvalidRequestError,
2024            "Table 'users' is already defined for this "
2025            "MetaData instance.",
2026            go
2027        )
2028
2029    def test_keep_plus_existing_raises(self):
2030        meta2 = self._useexisting_fixture()
2031        assert_raises(
2032            exc.ArgumentError,
2033            Table, 'users', meta2, keep_existing=True,
2034            extend_existing=True
2035        )
2036
2037    @testing.uses_deprecated()
2038    def test_existing_plus_useexisting_raises(self):
2039        meta2 = self._useexisting_fixture()
2040        assert_raises(
2041            exc.ArgumentError,
2042            Table, 'users', meta2, useexisting=True,
2043            extend_existing=True
2044        )
2045
2046    def test_keep_existing_no_dupe_constraints(self):
2047        meta2 = self._notexisting_fixture()
2048        users = Table('users', meta2,
2049                      Column('id', Integer),
2050                      Column('name', Unicode),
2051                      UniqueConstraint('name'),
2052                      keep_existing=True
2053                      )
2054        assert 'name' in users.c
2055        assert 'id' in users.c
2056        eq_(len(users.constraints), 2)
2057
2058        u2 = Table('users', meta2,
2059                   Column('id', Integer),
2060                   Column('name', Unicode),
2061                   UniqueConstraint('name'),
2062                   keep_existing=True
2063                   )
2064        eq_(len(u2.constraints), 2)
2065
2066    def test_extend_existing_dupes_constraints(self):
2067        meta2 = self._notexisting_fixture()
2068        users = Table('users', meta2,
2069                      Column('id', Integer),
2070                      Column('name', Unicode),
2071                      UniqueConstraint('name'),
2072                      extend_existing=True
2073                      )
2074        assert 'name' in users.c
2075        assert 'id' in users.c
2076        eq_(len(users.constraints), 2)
2077
2078        u2 = Table('users', meta2,
2079                   Column('id', Integer),
2080                   Column('name', Unicode),
2081                   UniqueConstraint('name'),
2082                   extend_existing=True
2083                   )
2084        # constraint got duped
2085        eq_(len(u2.constraints), 3)
2086
2087    def test_keep_existing_coltype(self):
2088        meta2 = self._useexisting_fixture()
2089        users = Table('users', meta2, Column('name', Unicode),
2090                      autoload=True, keep_existing=True)
2091        assert not isinstance(users.c.name.type, Unicode)
2092
2093    def test_keep_existing_quote(self):
2094        meta2 = self._useexisting_fixture()
2095        users = Table('users', meta2, quote=True, autoload=True,
2096                      keep_existing=True)
2097        assert not users.name.quote
2098
2099    def test_keep_existing_add_column(self):
2100        meta2 = self._useexisting_fixture()
2101        users = Table('users', meta2,
2102                      Column('foo', Integer),
2103                      autoload=True,
2104                      keep_existing=True)
2105        assert "foo" not in users.c
2106
2107    def test_keep_existing_coltype_no_orig(self):
2108        meta2 = self._notexisting_fixture()
2109        users = Table('users', meta2, Column('name', Unicode),
2110                      autoload=True, keep_existing=True)
2111        assert isinstance(users.c.name.type, Unicode)
2112
2113    @testing.skip_if(
2114        lambda: testing.db.dialect.requires_name_normalize,
2115        "test depends on lowercase as case insensitive")
2116    def test_keep_existing_quote_no_orig(self):
2117        meta2 = self._notexisting_fixture()
2118        users = Table('users', meta2, quote=True,
2119                      autoload=True,
2120                      keep_existing=True)
2121        assert users.name.quote
2122
2123    def test_keep_existing_add_column_no_orig(self):
2124        meta2 = self._notexisting_fixture()
2125        users = Table('users', meta2,
2126                      Column('foo', Integer),
2127                      autoload=True,
2128                      keep_existing=True)
2129        assert "foo" in users.c
2130
2131    def test_keep_existing_coltype_no_reflection(self):
2132        meta2 = self._useexisting_fixture()
2133        users = Table('users', meta2, Column('name', Unicode),
2134                      keep_existing=True)
2135        assert not isinstance(users.c.name.type, Unicode)
2136
2137    def test_keep_existing_quote_no_reflection(self):
2138        meta2 = self._useexisting_fixture()
2139        users = Table('users', meta2, quote=True,
2140                      keep_existing=True)
2141        assert not users.name.quote
2142
2143    def test_keep_existing_add_column_no_reflection(self):
2144        meta2 = self._useexisting_fixture()
2145        users = Table('users', meta2,
2146                      Column('foo', Integer),
2147                      keep_existing=True)
2148        assert "foo" not in users.c
2149
2150    def test_extend_existing_coltype(self):
2151        meta2 = self._useexisting_fixture()
2152        users = Table('users', meta2, Column('name', Unicode),
2153                      autoload=True, extend_existing=True)
2154        assert isinstance(users.c.name.type, Unicode)
2155
2156    def test_extend_existing_quote(self):
2157        meta2 = self._useexisting_fixture()
2158        assert_raises_message(
2159            tsa.exc.ArgumentError,
2160            "Can't redefine 'quote' or 'quote_schema' arguments",
2161            Table, 'users', meta2, quote=True, autoload=True,
2162            extend_existing=True
2163        )
2164
2165    def test_extend_existing_add_column(self):
2166        meta2 = self._useexisting_fixture()
2167        users = Table('users', meta2,
2168                      Column('foo', Integer),
2169                      autoload=True,
2170                      extend_existing=True)
2171        assert "foo" in users.c
2172
2173    def test_extend_existing_coltype_no_orig(self):
2174        meta2 = self._notexisting_fixture()
2175        users = Table('users', meta2, Column('name', Unicode),
2176                      autoload=True, extend_existing=True)
2177        assert isinstance(users.c.name.type, Unicode)
2178
2179    @testing.skip_if(
2180        lambda: testing.db.dialect.requires_name_normalize,
2181        "test depends on lowercase as case insensitive")
2182    def test_extend_existing_quote_no_orig(self):
2183        meta2 = self._notexisting_fixture()
2184        users = Table('users', meta2, quote=True,
2185                      autoload=True,
2186                      extend_existing=True)
2187        assert users.name.quote
2188
2189    def test_extend_existing_add_column_no_orig(self):
2190        meta2 = self._notexisting_fixture()
2191        users = Table('users', meta2,
2192                      Column('foo', Integer),
2193                      autoload=True,
2194                      extend_existing=True)
2195        assert "foo" in users.c
2196
2197    def test_extend_existing_coltype_no_reflection(self):
2198        meta2 = self._useexisting_fixture()
2199        users = Table('users', meta2, Column('name', Unicode),
2200                      extend_existing=True)
2201        assert isinstance(users.c.name.type, Unicode)
2202
2203    def test_extend_existing_quote_no_reflection(self):
2204        meta2 = self._useexisting_fixture()
2205        assert_raises_message(
2206            tsa.exc.ArgumentError,
2207            "Can't redefine 'quote' or 'quote_schema' arguments",
2208            Table, 'users', meta2, quote=True,
2209            extend_existing=True
2210        )
2211
2212    def test_extend_existing_add_column_no_reflection(self):
2213        meta2 = self._useexisting_fixture()
2214        users = Table('users', meta2,
2215                      Column('foo', Integer),
2216                      extend_existing=True)
2217        assert "foo" in users.c
2218
2219
2220class ConstraintTest(fixtures.TestBase):
2221
2222    def _single_fixture(self):
2223        m = MetaData()
2224
2225        t1 = Table('t1', m,
2226                   Column('a', Integer),
2227                   Column('b', Integer)
2228                   )
2229
2230        t2 = Table('t2', m,
2231                   Column('a', Integer, ForeignKey('t1.a'))
2232                   )
2233
2234        t3 = Table('t3', m,
2235                   Column('a', Integer)
2236                   )
2237        return t1, t2, t3
2238
2239    def _assert_index_col_x(self, t, i, columns=True):
2240        eq_(t.indexes, set([i]))
2241        if columns:
2242            eq_(list(i.columns), [t.c.x])
2243        else:
2244            eq_(list(i.columns), [])
2245        assert i.table is t
2246
2247    def test_separate_decl_columns(self):
2248        m = MetaData()
2249        t = Table('t', m, Column('x', Integer))
2250        i = Index('i', t.c.x)
2251        self._assert_index_col_x(t, i)
2252
2253    def test_separate_decl_columns_functional(self):
2254        m = MetaData()
2255        t = Table('t', m, Column('x', Integer))
2256        i = Index('i', func.foo(t.c.x))
2257        self._assert_index_col_x(t, i)
2258
2259    def test_inline_decl_columns(self):
2260        m = MetaData()
2261        c = Column('x', Integer)
2262        i = Index('i', c)
2263        t = Table('t', m, c, i)
2264        self._assert_index_col_x(t, i)
2265
2266    def test_inline_decl_columns_functional(self):
2267        m = MetaData()
2268        c = Column('x', Integer)
2269        i = Index('i', func.foo(c))
2270        t = Table('t', m, c, i)
2271        self._assert_index_col_x(t, i)
2272
2273    def test_inline_decl_string(self):
2274        m = MetaData()
2275        i = Index('i', "x")
2276        t = Table('t', m, Column('x', Integer), i)
2277        self._assert_index_col_x(t, i)
2278
2279    def test_inline_decl_textonly(self):
2280        m = MetaData()
2281        i = Index('i', text("foobar(x)"))
2282        t = Table('t', m, Column('x', Integer), i)
2283        self._assert_index_col_x(t, i, columns=False)
2284
2285    def test_separate_decl_textonly(self):
2286        m = MetaData()
2287        i = Index('i', text("foobar(x)"))
2288        t = Table('t', m, Column('x', Integer))
2289        t.append_constraint(i)
2290        self._assert_index_col_x(t, i, columns=False)
2291
2292    def test_unnamed_column_exception(self):
2293        # this can occur in some declarative situations
2294        c = Column(Integer)
2295        idx = Index('q', c)
2296        m = MetaData()
2297        t = Table('t', m, Column('q'))
2298        assert_raises_message(
2299            exc.ArgumentError,
2300            "Can't add unnamed column to column collection",
2301            t.append_constraint, idx
2302        )
2303
2304    def test_column_associated_w_lowercase_table(self):
2305        from sqlalchemy import table
2306        c = Column('x', Integer)
2307        table('foo', c)
2308        idx = Index('q', c)
2309        is_(idx.table, None)  # lower-case-T table doesn't have indexes
2310
2311    def test_clauseelement_extraction_one(self):
2312        t = Table('t', MetaData(), Column('x', Integer), Column('y', Integer))
2313
2314        class MyThing(object):
2315            def __clause_element__(self):
2316                return t.c.x + 5
2317
2318        idx = Index('foo', MyThing())
2319        self._assert_index_col_x(t, idx)
2320
2321    def test_clauseelement_extraction_two(self):
2322        t = Table('t', MetaData(), Column('x', Integer), Column('y', Integer))
2323
2324        class MyThing(object):
2325            def __clause_element__(self):
2326                return t.c.x + 5
2327
2328        idx = Index('bar', MyThing(), t.c.y)
2329
2330        eq_(set(t.indexes), set([idx]))
2331
2332    def test_clauseelement_extraction_three(self):
2333        t = Table('t', MetaData(), Column('x', Integer), Column('y', Integer))
2334
2335        expr1 = t.c.x + 5
2336
2337        class MyThing(object):
2338            def __clause_element__(self):
2339                return expr1
2340
2341        idx = Index('bar', MyThing(), t.c.y)
2342
2343        is_(idx.expressions[0], expr1)
2344        is_(idx.expressions[1], t.c.y)
2345
2346    def test_table_references(self):
2347        t1, t2, t3 = self._single_fixture()
2348        assert list(t2.c.a.foreign_keys)[0].references(t1)
2349        assert not list(t2.c.a.foreign_keys)[0].references(t3)
2350
2351    def test_column_references(self):
2352        t1, t2, t3 = self._single_fixture()
2353        assert t2.c.a.references(t1.c.a)
2354        assert not t2.c.a.references(t3.c.a)
2355        assert not t2.c.a.references(t1.c.b)
2356
2357    def test_column_references_derived(self):
2358        t1, t2, t3 = self._single_fixture()
2359        s1 = tsa.select([tsa.select([t1]).alias()])
2360        assert t2.c.a.references(s1.c.a)
2361        assert not t2.c.a.references(s1.c.b)
2362
2363    def test_copy_doesnt_reference(self):
2364        t1, t2, t3 = self._single_fixture()
2365        a2 = t2.c.a.copy()
2366        assert not a2.references(t1.c.a)
2367        assert not a2.references(t1.c.b)
2368
2369    def test_derived_column_references(self):
2370        t1, t2, t3 = self._single_fixture()
2371        s1 = tsa.select([tsa.select([t2]).alias()])
2372        assert s1.c.a.references(t1.c.a)
2373        assert not s1.c.a.references(t1.c.b)
2374
2375    def test_referred_table_accessor(self):
2376        t1, t2, t3 = self._single_fixture()
2377        fkc = list(t2.foreign_key_constraints)[0]
2378        is_(fkc.referred_table, t1)
2379
2380    def test_referred_table_accessor_not_available(self):
2381        t1 = Table('t', MetaData(), Column('x', ForeignKey('q.id')))
2382        fkc = list(t1.foreign_key_constraints)[0]
2383        assert_raises_message(
2384            exc.InvalidRequestError,
2385            "Foreign key associated with column 't.x' could not find "
2386            "table 'q' with which to generate a foreign key to target "
2387            "column 'id'",
2388            getattr, fkc, "referred_table"
2389        )
2390
2391    def test_related_column_not_present_atfirst_ok(self):
2392        m = MetaData()
2393        base_table = Table("base", m,
2394                           Column("id", Integer, primary_key=True)
2395                           )
2396        fk = ForeignKey('base.q')
2397        derived_table = Table("derived", m,
2398                              Column("id", None, fk,
2399                                     primary_key=True),
2400                              )
2401
2402        base_table.append_column(Column('q', Integer))
2403        assert fk.column is base_table.c.q
2404        assert isinstance(derived_table.c.id.type, Integer)
2405
2406    def test_related_column_not_present_atfirst_ok_onname(self):
2407        m = MetaData()
2408        base_table = Table("base", m,
2409                           Column("id", Integer, primary_key=True)
2410                           )
2411        fk = ForeignKey('base.q', link_to_name=True)
2412        derived_table = Table("derived", m,
2413                              Column("id", None, fk,
2414                                     primary_key=True),
2415                              )
2416
2417        base_table.append_column(Column('q', Integer, key='zz'))
2418        assert fk.column is base_table.c.zz
2419        assert isinstance(derived_table.c.id.type, Integer)
2420
2421    def test_related_column_not_present_atfirst_ok_linktoname_conflict(self):
2422        m = MetaData()
2423        base_table = Table("base", m,
2424                           Column("id", Integer, primary_key=True)
2425                           )
2426        fk = ForeignKey('base.q', link_to_name=True)
2427        derived_table = Table("derived", m,
2428                              Column("id", None, fk,
2429                                     primary_key=True),
2430                              )
2431
2432        base_table.append_column(Column('zz', Integer, key='q'))
2433        base_table.append_column(Column('q', Integer, key='zz'))
2434        assert fk.column is base_table.c.zz
2435        assert isinstance(derived_table.c.id.type, Integer)
2436
2437    def test_invalid_composite_fk_check_strings(self):
2438        m = MetaData()
2439
2440        assert_raises_message(
2441            exc.ArgumentError,
2442            r"ForeignKeyConstraint on t1\(x, y\) refers to "
2443            "multiple remote tables: t2 and t3",
2444            Table,
2445            't1', m, Column('x', Integer), Column('y', Integer),
2446            ForeignKeyConstraint(['x', 'y'], ['t2.x', 't3.y'])
2447        )
2448
2449    def test_invalid_composite_fk_check_columns(self):
2450        m = MetaData()
2451
2452        t2 = Table('t2', m, Column('x', Integer))
2453        t3 = Table('t3', m, Column('y', Integer))
2454
2455        assert_raises_message(
2456            exc.ArgumentError,
2457            r"ForeignKeyConstraint on t1\(x, y\) refers to "
2458            "multiple remote tables: t2 and t3",
2459            Table,
2460            't1', m, Column('x', Integer), Column('y', Integer),
2461            ForeignKeyConstraint(['x', 'y'], [t2.c.x, t3.c.y])
2462        )
2463
2464    def test_invalid_composite_fk_check_columns_notattached(self):
2465        m = MetaData()
2466        x = Column('x', Integer)
2467        y = Column('y', Integer)
2468
2469        # no error is raised for this one right now.
2470        # which is a minor bug.
2471        Table('t1', m, Column('x', Integer), Column('y', Integer),
2472              ForeignKeyConstraint(['x', 'y'], [x, y])
2473              )
2474
2475        Table('t2', m, x)
2476        Table('t3', m, y)
2477
2478    def test_constraint_copied_to_proxy_ok(self):
2479        m = MetaData()
2480        Table('t1', m, Column('id', Integer, primary_key=True))
2481        t2 = Table('t2', m, Column('id', Integer, ForeignKey('t1.id'),
2482                                   primary_key=True))
2483
2484        s = tsa.select([t2])
2485        t2fk = list(t2.c.id.foreign_keys)[0]
2486        sfk = list(s.c.id.foreign_keys)[0]
2487
2488        # the two FKs share the ForeignKeyConstraint
2489        is_(
2490            t2fk.constraint,
2491            sfk.constraint
2492        )
2493
2494        # but the ForeignKeyConstraint isn't
2495        # aware of the select's FK
2496        eq_(
2497            t2fk.constraint.elements,
2498            [t2fk]
2499        )
2500
2501    def test_type_propagate_composite_fk_string(self):
2502        metadata = MetaData()
2503        Table(
2504            'a', metadata,
2505            Column('key1', Integer, primary_key=True),
2506            Column('key2', String(40), primary_key=True))
2507
2508        b = Table('b', metadata,
2509                  Column('a_key1', None),
2510                  Column('a_key2', None),
2511                  Column('id', Integer, primary_key=True),
2512                  ForeignKeyConstraint(['a_key1', 'a_key2'],
2513                                       ['a.key1', 'a.key2'])
2514                  )
2515
2516        assert isinstance(b.c.a_key1.type, Integer)
2517        assert isinstance(b.c.a_key2.type, String)
2518
2519    def test_type_propagate_composite_fk_col(self):
2520        metadata = MetaData()
2521        a = Table('a', metadata,
2522                  Column('key1', Integer, primary_key=True),
2523                  Column('key2', String(40), primary_key=True))
2524
2525        b = Table('b', metadata,
2526                  Column('a_key1', None),
2527                  Column('a_key2', None),
2528                  Column('id', Integer, primary_key=True),
2529                  ForeignKeyConstraint(['a_key1', 'a_key2'],
2530                                       [a.c.key1, a.c.key2])
2531                  )
2532
2533        assert isinstance(b.c.a_key1.type, Integer)
2534        assert isinstance(b.c.a_key2.type, String)
2535
2536    def test_type_propagate_standalone_fk_string(self):
2537        metadata = MetaData()
2538        Table(
2539            'a', metadata,
2540            Column('key1', Integer, primary_key=True))
2541
2542        b = Table('b', metadata,
2543                  Column('a_key1', None, ForeignKey("a.key1")),
2544                  )
2545
2546        assert isinstance(b.c.a_key1.type, Integer)
2547
2548    def test_type_propagate_standalone_fk_col(self):
2549        metadata = MetaData()
2550        a = Table('a', metadata,
2551                  Column('key1', Integer, primary_key=True))
2552
2553        b = Table('b', metadata,
2554                  Column('a_key1', None, ForeignKey(a.c.key1)),
2555                  )
2556
2557        assert isinstance(b.c.a_key1.type, Integer)
2558
2559    def test_type_propagate_chained_string_source_first(self):
2560        metadata = MetaData()
2561        Table(
2562            'a', metadata,
2563            Column('key1', Integer, primary_key=True)
2564        )
2565
2566        b = Table('b', metadata,
2567                  Column('a_key1', None, ForeignKey("a.key1")),
2568                  )
2569
2570        c = Table('c', metadata,
2571                  Column('b_key1', None, ForeignKey("b.a_key1")),
2572                  )
2573
2574        assert isinstance(b.c.a_key1.type, Integer)
2575        assert isinstance(c.c.b_key1.type, Integer)
2576
2577    def test_type_propagate_chained_string_source_last(self):
2578        metadata = MetaData()
2579
2580        b = Table('b', metadata,
2581                  Column('a_key1', None, ForeignKey("a.key1")),
2582                  )
2583
2584        c = Table('c', metadata,
2585                  Column('b_key1', None, ForeignKey("b.a_key1")),
2586                  )
2587
2588        Table(
2589            'a', metadata,
2590            Column('key1', Integer, primary_key=True))
2591
2592        assert isinstance(b.c.a_key1.type, Integer)
2593        assert isinstance(c.c.b_key1.type, Integer)
2594
2595    def test_type_propagate_chained_string_source_last_onname(self):
2596        metadata = MetaData()
2597
2598        b = Table('b', metadata,
2599                  Column(
2600                      'a_key1', None,
2601                      ForeignKey("a.key1", link_to_name=True), key="ak1"),
2602                  )
2603
2604        c = Table('c', metadata,
2605                  Column(
2606                      'b_key1', None,
2607                      ForeignKey("b.a_key1", link_to_name=True), key="bk1"),
2608                  )
2609
2610        Table(
2611            'a', metadata,
2612            Column('key1', Integer, primary_key=True, key='ak1'))
2613
2614        assert isinstance(b.c.ak1.type, Integer)
2615        assert isinstance(c.c.bk1.type, Integer)
2616
2617    def test_type_propagate_chained_string_source_last_onname_conflict(self):
2618        metadata = MetaData()
2619
2620        b = Table('b', metadata,
2621                  # b.c.key1 -> a.c.key1 -> String
2622                  Column(
2623                      'ak1', None,
2624                      ForeignKey("a.key1", link_to_name=False), key="key1"),
2625                  # b.c.ak1 -> a.c.ak1 -> Integer
2626                  Column(
2627                      'a_key1', None,
2628                      ForeignKey("a.key1", link_to_name=True), key="ak1"),
2629                  )
2630
2631        c = Table('c', metadata,
2632                  # c.c.b_key1 -> b.c.ak1 -> Integer
2633                  Column(
2634                      'b_key1', None,
2635                      ForeignKey("b.ak1", link_to_name=False)),
2636                  # c.c.b_ak1 -> b.c.ak1
2637                  Column(
2638                      'b_ak1', None,
2639                      ForeignKey("b.ak1", link_to_name=True)),
2640                  )
2641
2642        Table(
2643            'a', metadata,
2644            # a.c.key1
2645            Column('ak1', String, key="key1"),
2646            # a.c.ak1
2647            Column('key1', Integer, primary_key=True, key='ak1'),
2648        )
2649
2650        assert isinstance(b.c.key1.type, String)
2651        assert isinstance(b.c.ak1.type, Integer)
2652
2653        assert isinstance(c.c.b_ak1.type, String)
2654        assert isinstance(c.c.b_key1.type, Integer)
2655
2656    def test_type_propagate_chained_col_orig_first(self):
2657        metadata = MetaData()
2658        a = Table('a', metadata,
2659                  Column('key1', Integer, primary_key=True))
2660
2661        b = Table('b', metadata,
2662                  Column('a_key1', None, ForeignKey(a.c.key1)),
2663                  )
2664
2665        c = Table('c', metadata,
2666                  Column('b_key1', None, ForeignKey(b.c.a_key1)),
2667                  )
2668
2669        assert isinstance(b.c.a_key1.type, Integer)
2670        assert isinstance(c.c.b_key1.type, Integer)
2671
2672    def test_column_accessor_col(self):
2673        c1 = Column('x', Integer)
2674        fk = ForeignKey(c1)
2675        is_(fk.column, c1)
2676
2677    def test_column_accessor_clause_element(self):
2678        c1 = Column('x', Integer)
2679
2680        class CThing(object):
2681
2682            def __init__(self, c):
2683                self.c = c
2684
2685            def __clause_element__(self):
2686                return self.c
2687
2688        fk = ForeignKey(CThing(c1))
2689        is_(fk.column, c1)
2690
2691    def test_column_accessor_string_no_parent(self):
2692        fk = ForeignKey("sometable.somecol")
2693        assert_raises_message(
2694            exc.InvalidRequestError,
2695            "this ForeignKey object does not yet have a parent "
2696            "Column associated with it.",
2697            getattr, fk, "column"
2698        )
2699
2700    def test_column_accessor_string_no_parent_table(self):
2701        fk = ForeignKey("sometable.somecol")
2702        Column('x', fk)
2703        assert_raises_message(
2704            exc.InvalidRequestError,
2705            "this ForeignKey's parent column is not yet "
2706            "associated with a Table.",
2707            getattr, fk, "column"
2708        )
2709
2710    def test_column_accessor_string_no_target_table(self):
2711        fk = ForeignKey("sometable.somecol")
2712        c1 = Column('x', fk)
2713        Table('t', MetaData(), c1)
2714        assert_raises_message(
2715            exc.NoReferencedTableError,
2716            "Foreign key associated with column 't.x' could not find "
2717            "table 'sometable' with which to generate a "
2718            "foreign key to target column 'somecol'",
2719            getattr, fk, "column"
2720        )
2721
2722    def test_column_accessor_string_no_target_column(self):
2723        fk = ForeignKey("sometable.somecol")
2724        c1 = Column('x', fk)
2725        m = MetaData()
2726        Table('t', m, c1)
2727        Table("sometable", m, Column('notsomecol', Integer))
2728        assert_raises_message(
2729            exc.NoReferencedColumnError,
2730            "Could not initialize target column for ForeignKey "
2731            "'sometable.somecol' on table 't': "
2732            "table 'sometable' has no column named 'somecol'",
2733            getattr, fk, "column"
2734        )
2735
2736    def test_remove_table_fk_bookkeeping(self):
2737        metadata = MetaData()
2738        fk = ForeignKey('t1.x')
2739        t2 = Table('t2', metadata, Column('y', Integer, fk))
2740        t3 = Table('t3', metadata, Column('y', Integer, ForeignKey('t1.x')))
2741
2742        assert t2.key in metadata.tables
2743        assert ("t1", "x") in metadata._fk_memos
2744
2745        metadata.remove(t2)
2746
2747        # key is removed
2748        assert t2.key not in metadata.tables
2749
2750        # the memo for the FK is still there
2751        assert ("t1", "x") in metadata._fk_memos
2752
2753        # fk is not in the collection
2754        assert fk not in metadata._fk_memos[("t1", "x")]
2755
2756        # make the referenced table
2757        t1 = Table('t1', metadata, Column('x', Integer))
2758
2759        # t2 tells us exactly what's wrong
2760        assert_raises_message(
2761            exc.InvalidRequestError,
2762            "Table t2 is no longer associated with its parent MetaData",
2763            getattr, fk, "column"
2764        )
2765
2766        # t3 is unaffected
2767        assert t3.c.y.references(t1.c.x)
2768
2769        # remove twice OK
2770        metadata.remove(t2)
2771
2772    def test_double_fk_usage_raises(self):
2773        f = ForeignKey('b.id')
2774
2775        Column('x', Integer, f)
2776        assert_raises(exc.InvalidRequestError, Column, "y", Integer, f)
2777
2778    def test_auto_append_constraint(self):
2779        m = MetaData()
2780
2781        t = Table('tbl', m,
2782                  Column('a', Integer),
2783                  Column('b', Integer)
2784                  )
2785
2786        t2 = Table('t2', m,
2787                   Column('a', Integer),
2788                   Column('b', Integer)
2789                   )
2790
2791        for c in (
2792            UniqueConstraint(t.c.a),
2793            CheckConstraint(t.c.a > 5),
2794            ForeignKeyConstraint([t.c.a], [t2.c.a]),
2795            PrimaryKeyConstraint(t.c.a)
2796        ):
2797            assert c in t.constraints
2798            t.append_constraint(c)
2799            assert c in t.constraints
2800
2801        c = Index('foo', t.c.a)
2802        assert c in t.indexes
2803
2804    def test_auto_append_lowercase_table(self):
2805        from sqlalchemy import table, column
2806
2807        t = table('t', column('a'))
2808        t2 = table('t2', column('a'))
2809        for c in (
2810            UniqueConstraint(t.c.a),
2811            CheckConstraint(t.c.a > 5),
2812            ForeignKeyConstraint([t.c.a], [t2.c.a]),
2813            PrimaryKeyConstraint(t.c.a),
2814            Index('foo', t.c.a)
2815        ):
2816            assert True
2817
2818    def test_tometadata_ok(self):
2819        m = MetaData()
2820
2821        t = Table('tbl', m,
2822                  Column('a', Integer),
2823                  Column('b', Integer)
2824                  )
2825
2826        t2 = Table('t2', m,
2827                   Column('a', Integer),
2828                   Column('b', Integer)
2829                   )
2830
2831        UniqueConstraint(t.c.a)
2832        CheckConstraint(t.c.a > 5)
2833        ForeignKeyConstraint([t.c.a], [t2.c.a])
2834        PrimaryKeyConstraint(t.c.a)
2835
2836        m2 = MetaData()
2837
2838        t3 = t.tometadata(m2)
2839
2840        eq_(len(t3.constraints), 4)
2841
2842        for c in t3.constraints:
2843            assert c.table is t3
2844
2845    def test_check_constraint_copy(self):
2846        m = MetaData()
2847        t = Table('tbl', m,
2848                  Column('a', Integer),
2849                  Column('b', Integer)
2850                  )
2851        ck = CheckConstraint(t.c.a > 5)
2852        ck2 = ck.copy()
2853        assert ck in t.constraints
2854        assert ck2 not in t.constraints
2855
2856    def test_ambig_check_constraint_auto_append(self):
2857        m = MetaData()
2858
2859        t = Table('tbl', m,
2860                  Column('a', Integer),
2861                  Column('b', Integer)
2862                  )
2863
2864        t2 = Table('t2', m,
2865                   Column('a', Integer),
2866                   Column('b', Integer)
2867                   )
2868        c = CheckConstraint(t.c.a > t2.c.b)
2869        assert c not in t.constraints
2870        assert c not in t2.constraints
2871
2872    def test_auto_append_ck_on_col_attach_one(self):
2873        m = MetaData()
2874
2875        a = Column('a', Integer)
2876        b = Column('b', Integer)
2877        ck = CheckConstraint(a > b)
2878
2879        t = Table('tbl', m, a, b)
2880        assert ck in t.constraints
2881
2882    def test_auto_append_ck_on_col_attach_two(self):
2883        m = MetaData()
2884
2885        a = Column('a', Integer)
2886        b = Column('b', Integer)
2887        c = Column('c', Integer)
2888        ck = CheckConstraint(a > b + c)
2889
2890        t = Table('tbl', m, a)
2891        assert ck not in t.constraints
2892
2893        t.append_column(b)
2894        assert ck not in t.constraints
2895
2896        t.append_column(c)
2897        assert ck in t.constraints
2898
2899    def test_auto_append_ck_on_col_attach_three(self):
2900        m = MetaData()
2901
2902        a = Column('a', Integer)
2903        b = Column('b', Integer)
2904        c = Column('c', Integer)
2905        ck = CheckConstraint(a > b + c)
2906
2907        t = Table('tbl', m, a)
2908        assert ck not in t.constraints
2909
2910        t.append_column(b)
2911        assert ck not in t.constraints
2912
2913        t2 = Table('t2', m)
2914        t2.append_column(c)
2915
2916        # two different tables, so CheckConstraint does nothing.
2917        assert ck not in t.constraints
2918
2919    def test_auto_append_uq_on_col_attach_one(self):
2920        m = MetaData()
2921
2922        a = Column('a', Integer)
2923        b = Column('b', Integer)
2924        uq = UniqueConstraint(a, b)
2925
2926        t = Table('tbl', m, a, b)
2927        assert uq in t.constraints
2928
2929    def test_auto_append_uq_on_col_attach_two(self):
2930        m = MetaData()
2931
2932        a = Column('a', Integer)
2933        b = Column('b', Integer)
2934        c = Column('c', Integer)
2935        uq = UniqueConstraint(a, b, c)
2936
2937        t = Table('tbl', m, a)
2938        assert uq not in t.constraints
2939
2940        t.append_column(b)
2941        assert uq not in t.constraints
2942
2943        t.append_column(c)
2944        assert uq in t.constraints
2945
2946    def test_auto_append_uq_on_col_attach_three(self):
2947        m = MetaData()
2948
2949        a = Column('a', Integer)
2950        b = Column('b', Integer)
2951        c = Column('c', Integer)
2952        uq = UniqueConstraint(a, b, c)
2953
2954        t = Table('tbl', m, a)
2955        assert uq not in t.constraints
2956
2957        t.append_column(b)
2958        assert uq not in t.constraints
2959
2960        t2 = Table('t2', m)
2961
2962        # two different tables, so UniqueConstraint raises
2963        assert_raises_message(
2964            exc.ArgumentError,
2965            r"Column\(s\) 't2\.c' are not part of table 'tbl'\.",
2966            t2.append_column, c
2967        )
2968
2969    def test_auto_append_uq_on_col_attach_four(self):
2970        """Test that a uniqueconstraint that names Column and string names
2971        won't autoattach using deferred column attachment.
2972
2973        """
2974        m = MetaData()
2975
2976        a = Column('a', Integer)
2977        b = Column('b', Integer)
2978        c = Column('c', Integer)
2979        uq = UniqueConstraint(a, 'b', 'c')
2980
2981        t = Table('tbl', m, a)
2982        assert uq not in t.constraints
2983
2984        t.append_column(b)
2985        assert uq not in t.constraints
2986
2987        t.append_column(c)
2988
2989        # we don't track events for previously unknown columns
2990        # named 'c' to be attached
2991        assert uq not in t.constraints
2992
2993        t.append_constraint(uq)
2994
2995        assert uq in t.constraints
2996
2997        eq_(
2998            [cn for cn in t.constraints if isinstance(cn, UniqueConstraint)],
2999            [uq]
3000        )
3001
3002    def test_auto_append_uq_on_col_attach_five(self):
3003        """Test that a uniqueconstraint that names Column and string names
3004        *will* autoattach if the table has all those names up front.
3005
3006        """
3007        m = MetaData()
3008
3009        a = Column('a', Integer)
3010        b = Column('b', Integer)
3011        c = Column('c', Integer)
3012
3013        t = Table('tbl', m, a, c, b)
3014
3015        uq = UniqueConstraint(a, 'b', 'c')
3016
3017        assert uq in t.constraints
3018
3019        t.append_constraint(uq)
3020
3021        assert uq in t.constraints
3022
3023        eq_(
3024            [cn for cn in t.constraints if isinstance(cn, UniqueConstraint)],
3025            [uq]
3026        )
3027
3028    def test_index_asserts_cols_standalone(self):
3029        metadata = MetaData()
3030
3031        t1 = Table('t1', metadata,
3032                   Column('x', Integer)
3033                   )
3034        t2 = Table('t2', metadata,
3035                   Column('y', Integer)
3036                   )
3037        assert_raises_message(
3038            exc.ArgumentError,
3039            r"Column\(s\) 't2.y' are not part of table 't1'.",
3040            Index,
3041            "bar", t1.c.x, t2.c.y
3042        )
3043
3044    def test_index_asserts_cols_inline(self):
3045        metadata = MetaData()
3046
3047        t1 = Table('t1', metadata,
3048                   Column('x', Integer)
3049                   )
3050        assert_raises_message(
3051            exc.ArgumentError,
3052            "Index 'bar' is against table 't1', and "
3053            "cannot be associated with table 't2'.",
3054            Table, 't2', metadata,
3055            Column('y', Integer),
3056            Index('bar', t1.c.x)
3057        )
3058
3059    def test_raise_index_nonexistent_name(self):
3060        m = MetaData()
3061        # the KeyError isn't ideal here, a nicer message
3062        # perhaps
3063        assert_raises(
3064            KeyError,
3065            Table, 't', m, Column('x', Integer), Index("foo", "q")
3066        )
3067
3068    def test_raise_not_a_column(self):
3069        assert_raises(
3070            exc.ArgumentError,
3071            Index, "foo", 5
3072        )
3073
3074    def test_raise_expr_no_column(self):
3075        idx = Index('foo', func.lower(5))
3076
3077        assert_raises_message(
3078            exc.CompileError,
3079            "Index 'foo' is not associated with any table.",
3080            schema.CreateIndex(idx).compile, dialect=testing.db.dialect
3081        )
3082        assert_raises_message(
3083            exc.CompileError,
3084            "Index 'foo' is not associated with any table.",
3085            schema.CreateIndex(idx).compile
3086        )
3087
3088    def test_no_warning_w_no_columns(self):
3089        idx = Index(name="foo")
3090
3091        assert_raises_message(
3092            exc.CompileError,
3093            "Index 'foo' is not associated with any table.",
3094            schema.CreateIndex(idx).compile, dialect=testing.db.dialect
3095        )
3096        assert_raises_message(
3097            exc.CompileError,
3098            "Index 'foo' is not associated with any table.",
3099            schema.CreateIndex(idx).compile
3100        )
3101
3102    def test_raise_clauseelement_not_a_column(self):
3103        m = MetaData()
3104        t2 = Table('t2', m, Column('x', Integer))
3105
3106        class SomeClass(object):
3107
3108            def __clause_element__(self):
3109                return t2
3110        assert_raises_message(
3111            exc.ArgumentError,
3112            r"Element Table\('t2', .* is not a string name or column element",
3113            Index, "foo", SomeClass()
3114        )
3115
3116
3117class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
3118
3119    """Test Column() construction."""
3120
3121    __dialect__ = 'default'
3122
3123    def columns(self):
3124        return [Column(Integer),
3125                Column('b', Integer),
3126                Column(Integer),
3127                Column('d', Integer),
3128                Column(Integer, name='e'),
3129                Column(type_=Integer),
3130                Column(Integer()),
3131                Column('h', Integer()),
3132                Column(type_=Integer())]
3133
3134    def test_basic(self):
3135        c = self.columns()
3136
3137        for i, v in ((0, 'a'), (2, 'c'), (5, 'f'), (6, 'g'), (8, 'i')):
3138            c[i].name = v
3139            c[i].key = v
3140        del i, v
3141
3142        tbl = Table('table', MetaData(), *c)
3143
3144        for i, col in enumerate(tbl.c):
3145            assert col.name == c[i].name
3146
3147    def test_name_none(self):
3148
3149        c = Column(Integer)
3150        assert_raises_message(
3151            exc.ArgumentError,
3152            "Column must be constructed with a non-blank name or assign a "
3153            "non-blank .name ",
3154            Table, 't', MetaData(), c)
3155
3156    def test_name_blank(self):
3157
3158        c = Column('', Integer)
3159        assert_raises_message(
3160            exc.ArgumentError,
3161            "Column must be constructed with a non-blank name or assign a "
3162            "non-blank .name ",
3163            Table, 't', MetaData(), c)
3164
3165    def test_dupe_column(self):
3166        c = Column('x', Integer)
3167        Table('t', MetaData(), c)
3168
3169        assert_raises_message(
3170            exc.ArgumentError,
3171            "Column object 'x' already assigned to Table 't'",
3172            Table, 'q', MetaData(), c)
3173
3174    def test_incomplete_key(self):
3175        c = Column(Integer)
3176        assert c.name is None
3177        assert c.key is None
3178
3179        c.name = 'named'
3180        Table('t', MetaData(), c)
3181
3182        assert c.name == 'named'
3183        assert c.name == c.key
3184
3185    def test_unique_index_flags_default_to_none(self):
3186        c = Column(Integer)
3187        eq_(c.unique, None)
3188        eq_(c.index, None)
3189
3190        c = Column('c', Integer, index=True)
3191        eq_(c.unique, None)
3192        eq_(c.index, True)
3193
3194        t = Table('t', MetaData(), c)
3195        eq_(list(t.indexes)[0].unique, False)
3196
3197        c = Column(Integer, unique=True)
3198        eq_(c.unique, True)
3199        eq_(c.index, None)
3200
3201        c = Column('c', Integer, index=True, unique=True)
3202        eq_(c.unique, True)
3203        eq_(c.index, True)
3204
3205        t = Table('t', MetaData(), c)
3206        eq_(list(t.indexes)[0].unique, True)
3207
3208    def test_bogus(self):
3209        assert_raises(exc.ArgumentError, Column, 'foo', name='bar')
3210        assert_raises(exc.ArgumentError, Column, 'foo', Integer,
3211                      type_=Integer())
3212
3213    def test_custom_subclass_proxy(self):
3214        """test proxy generation of a Column subclass, can be compiled."""
3215
3216        from sqlalchemy.schema import Column
3217        from sqlalchemy.ext.compiler import compiles
3218        from sqlalchemy.sql import select
3219
3220        class MyColumn(Column):
3221
3222            def _constructor(self, name, type, **kw):
3223                kw['name'] = name
3224                return MyColumn(type, **kw)
3225
3226            def __init__(self, type, **kw):
3227                Column.__init__(self, type, **kw)
3228
3229            def my_goofy_thing(self):
3230                return "hi"
3231
3232        @compiles(MyColumn)
3233        def goofy(element, compiler, **kw):
3234            s = compiler.visit_column(element, **kw)
3235            return s + "-"
3236
3237        id = MyColumn(Integer, primary_key=True)
3238        id.name = 'id'
3239        name = MyColumn(String)
3240        name.name = 'name'
3241        t1 = Table('foo', MetaData(),
3242                   id,
3243                   name
3244                   )
3245
3246        # goofy thing
3247        eq_(t1.c.name.my_goofy_thing(), "hi")
3248
3249        # create proxy
3250        s = select([t1.select().alias()])
3251
3252        # proxy has goofy thing
3253        eq_(s.c.name.my_goofy_thing(), "hi")
3254
3255        # compile works
3256        self.assert_compile(
3257            select([t1.select().alias()]),
3258            "SELECT anon_1.id-, anon_1.name- FROM "
3259            "(SELECT foo.id- AS id, foo.name- AS name "
3260            "FROM foo) AS anon_1",
3261        )
3262
3263    def test_custom_subclass_proxy_typeerror(self):
3264        from sqlalchemy.schema import Column
3265        from sqlalchemy.sql import select
3266
3267        class MyColumn(Column):
3268
3269            def __init__(self, type, **kw):
3270                Column.__init__(self, type, **kw)
3271
3272        id = MyColumn(Integer, primary_key=True)
3273        id.name = 'id'
3274        name = MyColumn(String)
3275        name.name = 'name'
3276        t1 = Table('foo', MetaData(),
3277                   id,
3278                   name
3279                   )
3280        assert_raises_message(
3281            TypeError,
3282            "Could not create a copy of this <class "
3283            "'test.sql.test_metadata..*MyColumn'> "
3284            "object.  Ensure the class includes a _constructor()",
3285            getattr, select([t1.select().alias()]), 'c'
3286        )
3287
3288    def test_custom_create(self):
3289        from sqlalchemy.ext.compiler import compiles, deregister
3290
3291        @compiles(schema.CreateColumn)
3292        def compile(element, compiler, **kw):
3293            column = element.element
3294
3295            if "special" not in column.info:
3296                return compiler.visit_create_column(element, **kw)
3297
3298            text = "%s SPECIAL DIRECTIVE %s" % (
3299                column.name,
3300                compiler.type_compiler.process(column.type)
3301            )
3302            default = compiler.get_column_default_string(column)
3303            if default is not None:
3304                text += " DEFAULT " + default
3305
3306            if not column.nullable:
3307                text += " NOT NULL"
3308
3309            if column.constraints:
3310                text += " ".join(
3311                    compiler.process(const)
3312                    for const in column.constraints)
3313            return text
3314
3315        t = Table(
3316            'mytable', MetaData(),
3317            Column('x', Integer, info={
3318                   "special": True}, primary_key=True),
3319            Column('y', String(50)),
3320            Column('z', String(20), info={
3321                   "special": True}))
3322
3323        self.assert_compile(
3324            schema.CreateTable(t),
3325            "CREATE TABLE mytable (x SPECIAL DIRECTIVE INTEGER "
3326            "NOT NULL, y VARCHAR(50), "
3327            "z SPECIAL DIRECTIVE VARCHAR(20), PRIMARY KEY (x))"
3328        )
3329
3330        deregister(schema.CreateColumn)
3331
3332
3333class ColumnDefaultsTest(fixtures.TestBase):
3334
3335    """test assignment of default fixures to columns"""
3336
3337    def _fixture(self, *arg, **kw):
3338        return Column('x', Integer, *arg, **kw)
3339
3340    def test_server_default_positional(self):
3341        target = schema.DefaultClause('y')
3342        c = self._fixture(target)
3343        assert c.server_default is target
3344        assert target.column is c
3345
3346    def test_onupdate_default_not_server_default_one(self):
3347        target1 = schema.DefaultClause('y')
3348        target2 = schema.DefaultClause('z')
3349
3350        c = self._fixture(server_default=target1, server_onupdate=target2)
3351        eq_(c.server_default.arg, 'y')
3352        eq_(c.server_onupdate.arg, 'z')
3353
3354    def test_onupdate_default_not_server_default_two(self):
3355        target1 = schema.DefaultClause('y', for_update=True)
3356        target2 = schema.DefaultClause('z', for_update=True)
3357
3358        c = self._fixture(server_default=target1, server_onupdate=target2)
3359        eq_(c.server_default.arg, 'y')
3360        eq_(c.server_onupdate.arg, 'z')
3361
3362    def test_onupdate_default_not_server_default_three(self):
3363        target1 = schema.DefaultClause('y', for_update=False)
3364        target2 = schema.DefaultClause('z', for_update=True)
3365
3366        c = self._fixture(target1, target2)
3367        eq_(c.server_default.arg, 'y')
3368        eq_(c.server_onupdate.arg, 'z')
3369
3370    def test_onupdate_default_not_server_default_four(self):
3371        target1 = schema.DefaultClause('y', for_update=False)
3372
3373        c = self._fixture(server_onupdate=target1)
3374        is_(c.server_default, None)
3375        eq_(c.server_onupdate.arg, 'y')
3376
3377    def test_server_default_keyword_as_schemaitem(self):
3378        target = schema.DefaultClause('y')
3379        c = self._fixture(server_default=target)
3380        assert c.server_default is target
3381        assert target.column is c
3382
3383    def test_server_default_keyword_as_clause(self):
3384        target = 'y'
3385        c = self._fixture(server_default=target)
3386        assert c.server_default.arg == target
3387        assert c.server_default.column is c
3388
3389    def test_server_default_onupdate_positional(self):
3390        target = schema.DefaultClause('y', for_update=True)
3391        c = self._fixture(target)
3392        assert c.server_onupdate is target
3393        assert target.column is c
3394
3395    def test_server_default_onupdate_keyword_as_schemaitem(self):
3396        target = schema.DefaultClause('y', for_update=True)
3397        c = self._fixture(server_onupdate=target)
3398        assert c.server_onupdate is target
3399        assert target.column is c
3400
3401    def test_server_default_onupdate_keyword_as_clause(self):
3402        target = 'y'
3403        c = self._fixture(server_onupdate=target)
3404        assert c.server_onupdate.arg == target
3405        assert c.server_onupdate.column is c
3406
3407    def test_column_default_positional(self):
3408        target = schema.ColumnDefault('y')
3409        c = self._fixture(target)
3410        assert c.default is target
3411        assert target.column is c
3412
3413    def test_column_default_keyword_as_schemaitem(self):
3414        target = schema.ColumnDefault('y')
3415        c = self._fixture(default=target)
3416        assert c.default is target
3417        assert target.column is c
3418
3419    def test_column_default_keyword_as_clause(self):
3420        target = 'y'
3421        c = self._fixture(default=target)
3422        assert c.default.arg == target
3423        assert c.default.column is c
3424
3425    def test_column_default_onupdate_positional(self):
3426        target = schema.ColumnDefault('y', for_update=True)
3427        c = self._fixture(target)
3428        assert c.onupdate is target
3429        assert target.column is c
3430
3431    def test_column_default_onupdate_keyword_as_schemaitem(self):
3432        target = schema.ColumnDefault('y', for_update=True)
3433        c = self._fixture(onupdate=target)
3434        assert c.onupdate is target
3435        assert target.column is c
3436
3437    def test_column_default_onupdate_keyword_as_clause(self):
3438        target = 'y'
3439        c = self._fixture(onupdate=target)
3440        assert c.onupdate.arg == target
3441        assert c.onupdate.column is c
3442
3443
3444class ColumnOptionsTest(fixtures.TestBase):
3445
3446    def test_default_generators(self):
3447        g1, g2 = Sequence('foo_id_seq'), ColumnDefault('f5')
3448        assert Column(String, default=g1).default is g1
3449        assert Column(String, onupdate=g1).onupdate is g1
3450        assert Column(String, default=g2).default is g2
3451        assert Column(String, onupdate=g2).onupdate is g2
3452
3453    def _null_type_error(self, col):
3454        t = Table('t', MetaData(), col)
3455        assert_raises_message(
3456            exc.CompileError,
3457            r"\(in table 't', column 'foo'\): Can't generate DDL for NullType",
3458            schema.CreateTable(t).compile
3459        )
3460
3461    def _no_name_error(self, col):
3462        assert_raises_message(
3463            exc.ArgumentError,
3464            "Column must be constructed with a non-blank name or "
3465            "assign a non-blank .name",
3466            Table, 't', MetaData(), col
3467        )
3468
3469    def _no_error(self, col):
3470        m = MetaData()
3471        b = Table('bar', m, Column('id', Integer))
3472        t = Table('t', m, col)
3473        schema.CreateTable(t).compile()
3474
3475    def test_argument_signatures(self):
3476        self._no_name_error(Column())
3477        self._null_type_error(Column("foo"))
3478        self._no_name_error(Column(default="foo"))
3479
3480        self._no_name_error(Column(Sequence("a")))
3481        self._null_type_error(Column("foo", default="foo"))
3482
3483        self._null_type_error(Column("foo", Sequence("a")))
3484
3485        self._no_name_error(Column(ForeignKey('bar.id')))
3486
3487        self._no_error(Column("foo", ForeignKey('bar.id')))
3488
3489        self._no_name_error(Column(ForeignKey('bar.id'), default="foo"))
3490
3491        self._no_name_error(Column(ForeignKey('bar.id'), Sequence("a")))
3492        self._no_error(Column("foo", ForeignKey('bar.id'), default="foo"))
3493        self._no_error(Column("foo", ForeignKey('bar.id'), Sequence("a")))
3494
3495    def test_column_info(self):
3496
3497        c1 = Column('foo', String, info={'x': 'y'})
3498        c2 = Column('bar', String, info={})
3499        c3 = Column('bat', String)
3500        assert c1.info == {'x': 'y'}
3501        assert c2.info == {}
3502        assert c3.info == {}
3503
3504        for c in (c1, c2, c3):
3505            c.info['bar'] = 'zip'
3506            assert c.info['bar'] == 'zip'
3507
3508
3509class CatchAllEventsTest(fixtures.RemovesEvents, fixtures.TestBase):
3510
3511    def test_all_events(self):
3512        canary = []
3513
3514        def before_attach(obj, parent):
3515            canary.append("%s->%s" % (obj.__class__.__name__,
3516                                      parent.__class__.__name__))
3517
3518        def after_attach(obj, parent):
3519            canary.append("%s->%s" % (obj.__class__.__name__, parent))
3520
3521        self.event_listen(
3522            schema.SchemaItem,
3523            "before_parent_attach",
3524            before_attach)
3525        self.event_listen(
3526            schema.SchemaItem,
3527            "after_parent_attach",
3528            after_attach)
3529
3530        m = MetaData()
3531        Table('t1', m,
3532              Column('id', Integer, Sequence('foo_id'), primary_key=True),
3533              Column('bar', String, ForeignKey('t2.id'))
3534              )
3535        Table('t2', m,
3536              Column('id', Integer, primary_key=True),
3537              )
3538
3539        eq_(
3540            canary,
3541            ['Sequence->Column', 'Sequence->id', 'ForeignKey->Column',
3542             'ForeignKey->bar', 'Table->MetaData',
3543             'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t1',
3544             'Column->Table', 'Column->t1', 'Column->Table',
3545             'Column->t1', 'ForeignKeyConstraint->Table',
3546             'ForeignKeyConstraint->t1', 'Table->MetaData(bind=None)',
3547             'Table->MetaData', 'PrimaryKeyConstraint->Table',
3548             'PrimaryKeyConstraint->t2', 'Column->Table', 'Column->t2',
3549             'Table->MetaData(bind=None)']
3550        )
3551
3552    def test_events_per_constraint(self):
3553        canary = []
3554
3555        def evt(target):
3556            def before_attach(obj, parent):
3557                canary.append("%s->%s" % (target.__name__,
3558                                          parent.__class__.__name__))
3559
3560            def after_attach(obj, parent):
3561                assert hasattr(obj, 'name')  # so we can change it
3562                canary.append("%s->%s" % (target.__name__, parent))
3563            self.event_listen(target, "before_parent_attach", before_attach)
3564            self.event_listen(target, "after_parent_attach", after_attach)
3565
3566        for target in [
3567            schema.ForeignKeyConstraint, schema.PrimaryKeyConstraint,
3568            schema.UniqueConstraint,
3569            schema.CheckConstraint,
3570            schema.Index
3571        ]:
3572            evt(target)
3573
3574        m = MetaData()
3575        Table('t1', m,
3576              Column('id', Integer, Sequence('foo_id'), primary_key=True),
3577              Column('bar', String, ForeignKey('t2.id'), index=True),
3578              Column('bat', Integer, unique=True),
3579              )
3580        Table('t2', m,
3581              Column('id', Integer, primary_key=True),
3582              Column('bar', Integer),
3583              Column('bat', Integer),
3584              CheckConstraint("bar>5"),
3585              UniqueConstraint('bar', 'bat'),
3586              Index(None, 'bar', 'bat')
3587              )
3588        eq_(
3589            canary,
3590            [
3591                'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t1',
3592                'Index->Table', 'Index->t1',
3593                'ForeignKeyConstraint->Table', 'ForeignKeyConstraint->t1',
3594                'UniqueConstraint->Table', 'UniqueConstraint->t1',
3595                'PrimaryKeyConstraint->Table', 'PrimaryKeyConstraint->t2',
3596                'CheckConstraint->Table', 'CheckConstraint->t2',
3597                'UniqueConstraint->Table', 'UniqueConstraint->t2',
3598                'Index->Table', 'Index->t2'
3599            ]
3600        )
3601
3602
3603class DialectKWArgTest(fixtures.TestBase):
3604
3605    @contextmanager
3606    def _fixture(self):
3607        from sqlalchemy.engine.default import DefaultDialect
3608
3609        class ParticipatingDialect(DefaultDialect):
3610            construct_arguments = [
3611                (schema.Index, {
3612                    "x": 5,
3613                    "y": False,
3614                    "z_one": None
3615                }),
3616                (schema.ForeignKeyConstraint, {
3617                    "foobar": False
3618                })
3619            ]
3620
3621        class ParticipatingDialect2(DefaultDialect):
3622            construct_arguments = [
3623                (schema.Index, {
3624                    "x": 9,
3625                    "y": True,
3626                    "pp": "default"
3627                }),
3628                (schema.Table, {
3629                    "*": None
3630                })
3631            ]
3632
3633        class NonParticipatingDialect(DefaultDialect):
3634            construct_arguments = None
3635
3636        def load(dialect_name):
3637            if dialect_name == "participating":
3638                return ParticipatingDialect
3639            elif dialect_name == "participating2":
3640                return ParticipatingDialect2
3641            elif dialect_name == "nonparticipating":
3642                return NonParticipatingDialect
3643            else:
3644                raise exc.NoSuchModuleError("no dialect %r" % dialect_name)
3645        with mock.patch("sqlalchemy.dialects.registry.load", load):
3646            yield
3647
3648    def teardown(self):
3649        Index._kw_registry.clear()
3650
3651    def test_participating(self):
3652        with self._fixture():
3653            idx = Index('a', 'b', 'c', participating_y=True)
3654            eq_(
3655                idx.dialect_options,
3656                {"participating": {"x": 5, "y": True, "z_one": None}}
3657            )
3658            eq_(
3659                idx.dialect_kwargs,
3660                {
3661                    'participating_y': True,
3662                }
3663            )
3664
3665    def test_nonparticipating(self):
3666        with self._fixture():
3667            idx = Index(
3668                'a',
3669                'b',
3670                'c',
3671                nonparticipating_y=True,
3672                nonparticipating_q=5)
3673            eq_(
3674                idx.dialect_kwargs,
3675                {
3676                    'nonparticipating_y': True,
3677                    'nonparticipating_q': 5
3678                }
3679            )
3680
3681    def test_bad_kwarg_raise(self):
3682        with self._fixture():
3683            assert_raises_message(
3684                TypeError,
3685                "Additional arguments should be named "
3686                "<dialectname>_<argument>, got 'foobar'",
3687                Index, 'a', 'b', 'c', foobar=True
3688            )
3689
3690    def test_unknown_dialect_warning(self):
3691        with self._fixture():
3692            assert_raises_message(
3693                exc.SAWarning,
3694                "Can't validate argument 'unknown_y'; can't locate "
3695                "any SQLAlchemy dialect named 'unknown'",
3696                Index, 'a', 'b', 'c', unknown_y=True
3697            )
3698
3699    def test_participating_bad_kw(self):
3700        with self._fixture():
3701            assert_raises_message(
3702                exc.ArgumentError,
3703                "Argument 'participating_q_p_x' is not accepted by dialect "
3704                "'participating' on behalf of "
3705                "<class 'sqlalchemy.sql.schema.Index'>",
3706                Index, 'a', 'b', 'c', participating_q_p_x=8
3707            )
3708
3709    def test_participating_unknown_schema_item(self):
3710        with self._fixture():
3711            # the dialect doesn't include UniqueConstraint in
3712            # its registry at all.
3713            assert_raises_message(
3714                exc.ArgumentError,
3715                "Argument 'participating_q_p_x' is not accepted by dialect "
3716                "'participating' on behalf of "
3717                "<class 'sqlalchemy.sql.schema.UniqueConstraint'>",
3718                UniqueConstraint, 'a', 'b', participating_q_p_x=8
3719            )
3720
3721    @testing.emits_warning("Can't validate")
3722    def test_unknown_dialect_warning_still_populates(self):
3723        with self._fixture():
3724            idx = Index('a', 'b', 'c', unknown_y=True)
3725            eq_(idx.dialect_kwargs, {"unknown_y": True})  # still populates
3726
3727    @testing.emits_warning("Can't validate")
3728    def test_unknown_dialect_warning_still_populates_multiple(self):
3729        with self._fixture():
3730            idx = Index('a', 'b', 'c', unknown_y=True, unknown_z=5,
3731                        otherunknown_foo='bar', participating_y=8)
3732            eq_(
3733                idx.dialect_options,
3734                {
3735                    "unknown": {'y': True, 'z': 5, '*': None},
3736                    "otherunknown": {'foo': 'bar', '*': None},
3737                    "participating": {'x': 5, 'y': 8, 'z_one': None}
3738                }
3739            )
3740            eq_(idx.dialect_kwargs,
3741                {'unknown_z': 5, 'participating_y': 8,
3742                 'unknown_y': True,
3743                 'otherunknown_foo': 'bar'}
3744                )  # still populates
3745
3746    def test_runs_safekwarg(self):
3747
3748        with mock.patch("sqlalchemy.util.safe_kwarg",
3749                        lambda arg: "goofy_%s" % arg):
3750            with self._fixture():
3751                idx = Index('a', 'b')
3752                idx.kwargs[util.u('participating_x')] = 7
3753
3754                eq_(
3755                    list(idx.dialect_kwargs),
3756                    ['goofy_participating_x']
3757                )
3758
3759    def test_combined(self):
3760        with self._fixture():
3761            idx = Index('a', 'b', 'c', participating_x=7,
3762                        nonparticipating_y=True)
3763
3764            eq_(
3765                idx.dialect_options,
3766                {
3767                    'participating': {'y': False, 'x': 7, 'z_one': None},
3768                    'nonparticipating': {'y': True, '*': None}
3769                }
3770            )
3771            eq_(
3772                idx.dialect_kwargs,
3773                {
3774                    'participating_x': 7,
3775                    'nonparticipating_y': True,
3776                }
3777            )
3778
3779    def test_multiple_participating(self):
3780        with self._fixture():
3781            idx = Index('a', 'b', 'c',
3782                        participating_x=7,
3783                        participating2_x=15,
3784                        participating2_y="lazy"
3785                        )
3786            eq_(
3787                idx.dialect_options,
3788                {
3789                    "participating": {'x': 7, 'y': False, 'z_one': None},
3790                    "participating2": {'x': 15, 'y': 'lazy', 'pp': 'default'},
3791                }
3792            )
3793            eq_(
3794                idx.dialect_kwargs,
3795                {
3796                    'participating_x': 7,
3797                    'participating2_x': 15,
3798                    'participating2_y': 'lazy'
3799                }
3800            )
3801
3802    def test_foreign_key_propagate(self):
3803        with self._fixture():
3804            m = MetaData()
3805            fk = ForeignKey('t2.id', participating_foobar=True)
3806            t = Table('t', m, Column('id', Integer, fk))
3807            fkc = [
3808                c for c in t.constraints if isinstance(
3809                    c,
3810                    ForeignKeyConstraint)][0]
3811            eq_(
3812                fkc.dialect_kwargs,
3813                {'participating_foobar': True}
3814            )
3815
3816    def test_foreign_key_propagate_exceptions_delayed(self):
3817        with self._fixture():
3818            m = MetaData()
3819            fk = ForeignKey('t2.id', participating_fake=True)
3820            c1 = Column('id', Integer, fk)
3821            assert_raises_message(
3822                exc.ArgumentError,
3823                "Argument 'participating_fake' is not accepted by "
3824                "dialect 'participating' on behalf of "
3825                "<class 'sqlalchemy.sql.schema.ForeignKeyConstraint'>",
3826                Table, 't', m, c1
3827            )
3828
3829    def test_wildcard(self):
3830        with self._fixture():
3831            m = MetaData()
3832            t = Table('x', m, Column('x', Integer),
3833                      participating2_xyz='foo',
3834                      participating2_engine='InnoDB',
3835                      )
3836            eq_(
3837                t.dialect_kwargs,
3838                {
3839                    'participating2_xyz': 'foo',
3840                    'participating2_engine': 'InnoDB'
3841                }
3842            )
3843
3844    def test_uninit_wildcard(self):
3845        with self._fixture():
3846            m = MetaData()
3847            t = Table('x', m, Column('x', Integer))
3848            eq_(
3849                t.dialect_options['participating2'], {'*': None}
3850            )
3851            eq_(
3852                t.dialect_kwargs, {}
3853            )
3854
3855    def test_not_contains_wildcard(self):
3856        with self._fixture():
3857            m = MetaData()
3858            t = Table('x', m, Column('x', Integer))
3859            assert 'foobar' not in t.dialect_options['participating2']
3860
3861    def test_contains_wildcard(self):
3862        with self._fixture():
3863            m = MetaData()
3864            t = Table('x', m, Column('x', Integer), participating2_foobar=5)
3865            assert 'foobar' in t.dialect_options['participating2']
3866
3867    def test_update(self):
3868        with self._fixture():
3869            idx = Index('a', 'b', 'c', participating_x=20)
3870            eq_(idx.dialect_kwargs, {
3871                "participating_x": 20,
3872            })
3873            idx._validate_dialect_kwargs({
3874                "participating_x": 25,
3875                "participating_z_one": "default"})
3876            eq_(idx.dialect_options, {
3877                "participating": {"x": 25, "y": False, "z_one": "default"}
3878            })
3879            eq_(idx.dialect_kwargs, {
3880                "participating_x": 25,
3881                'participating_z_one': "default"
3882            })
3883
3884            idx._validate_dialect_kwargs({
3885                "participating_x": 25,
3886                "participating_z_one": "default"})
3887
3888            eq_(idx.dialect_options, {
3889                "participating": {"x": 25, "y": False, "z_one": "default"}
3890            })
3891            eq_(idx.dialect_kwargs, {
3892                "participating_x": 25,
3893                'participating_z_one': "default"
3894            })
3895
3896            idx._validate_dialect_kwargs({
3897                "participating_y": True,
3898                'participating2_y': "p2y"})
3899            eq_(idx.dialect_options, {
3900                "participating": {"x": 25, "y": True, "z_one": "default"},
3901                "participating2": {"y": "p2y", "pp": "default", "x": 9}
3902            })
3903            eq_(idx.dialect_kwargs, {
3904                "participating_x": 25,
3905                "participating_y": True,
3906                'participating2_y': "p2y",
3907                "participating_z_one": "default"})
3908
3909    def test_key_error_kwargs_no_dialect(self):
3910        with self._fixture():
3911            idx = Index('a', 'b', 'c')
3912            assert_raises(
3913                KeyError,
3914                idx.kwargs.__getitem__, 'foo_bar'
3915            )
3916
3917    def test_key_error_kwargs_no_underscore(self):
3918        with self._fixture():
3919            idx = Index('a', 'b', 'c')
3920            assert_raises(
3921                KeyError,
3922                idx.kwargs.__getitem__, 'foobar'
3923            )
3924
3925    def test_key_error_kwargs_no_argument(self):
3926        with self._fixture():
3927            idx = Index('a', 'b', 'c')
3928            assert_raises(
3929                KeyError,
3930                idx.kwargs.__getitem__, 'participating_asdmfq34098'
3931            )
3932
3933            assert_raises(
3934                KeyError,
3935                idx.kwargs.__getitem__, 'nonparticipating_asdmfq34098'
3936            )
3937
3938    def test_key_error_dialect_options(self):
3939        with self._fixture():
3940            idx = Index('a', 'b', 'c')
3941            assert_raises(
3942                KeyError,
3943                idx.dialect_options['participating'].__getitem__, 'asdfaso890'
3944            )
3945
3946            assert_raises(
3947                KeyError,
3948                idx.dialect_options['nonparticipating'].__getitem__,
3949                'asdfaso890')
3950
3951    def test_ad_hoc_participating_via_opt(self):
3952        with self._fixture():
3953            idx = Index('a', 'b', 'c')
3954            idx.dialect_options['participating']['foobar'] = 5
3955
3956            eq_(idx.dialect_options['participating']['foobar'], 5)
3957            eq_(idx.kwargs['participating_foobar'], 5)
3958
3959    def test_ad_hoc_nonparticipating_via_opt(self):
3960        with self._fixture():
3961            idx = Index('a', 'b', 'c')
3962            idx.dialect_options['nonparticipating']['foobar'] = 5
3963
3964            eq_(idx.dialect_options['nonparticipating']['foobar'], 5)
3965            eq_(idx.kwargs['nonparticipating_foobar'], 5)
3966
3967    def test_ad_hoc_participating_via_kwargs(self):
3968        with self._fixture():
3969            idx = Index('a', 'b', 'c')
3970            idx.kwargs['participating_foobar'] = 5
3971
3972            eq_(idx.dialect_options['participating']['foobar'], 5)
3973            eq_(idx.kwargs['participating_foobar'], 5)
3974
3975    def test_ad_hoc_nonparticipating_via_kwargs(self):
3976        with self._fixture():
3977            idx = Index('a', 'b', 'c')
3978            idx.kwargs['nonparticipating_foobar'] = 5
3979
3980            eq_(idx.dialect_options['nonparticipating']['foobar'], 5)
3981            eq_(idx.kwargs['nonparticipating_foobar'], 5)
3982
3983    def test_ad_hoc_via_kwargs_invalid_key(self):
3984        with self._fixture():
3985            idx = Index('a', 'b', 'c')
3986            assert_raises_message(
3987                exc.ArgumentError,
3988                "Keys must be of the form <dialectname>_<argname>",
3989                idx.kwargs.__setitem__, "foobar", 5
3990            )
3991
3992    def test_ad_hoc_via_kwargs_invalid_dialect(self):
3993        with self._fixture():
3994            idx = Index('a', 'b', 'c')
3995            assert_raises_message(
3996                exc.ArgumentError,
3997                "no dialect 'nonexistent'",
3998                idx.kwargs.__setitem__, "nonexistent_foobar", 5
3999            )
4000
4001    def test_add_new_arguments_participating(self):
4002        with self._fixture():
4003            Index.argument_for("participating", "xyzqpr", False)
4004
4005            idx = Index('a', 'b', 'c', participating_xyzqpr=True)
4006
4007            eq_(idx.kwargs['participating_xyzqpr'], True)
4008
4009            idx = Index('a', 'b', 'c')
4010            eq_(idx.dialect_options['participating']['xyzqpr'], False)
4011
4012    def test_add_new_arguments_participating_no_existing(self):
4013        with self._fixture():
4014            PrimaryKeyConstraint.argument_for("participating", "xyzqpr", False)
4015
4016            pk = PrimaryKeyConstraint('a', 'b', 'c', participating_xyzqpr=True)
4017
4018            eq_(pk.kwargs['participating_xyzqpr'], True)
4019
4020            pk = PrimaryKeyConstraint('a', 'b', 'c')
4021            eq_(pk.dialect_options['participating']['xyzqpr'], False)
4022
4023    def test_add_new_arguments_nonparticipating(self):
4024        with self._fixture():
4025            assert_raises_message(
4026                exc.ArgumentError,
4027                "Dialect 'nonparticipating' does have keyword-argument "
4028                "validation and defaults enabled configured",
4029                Index.argument_for, "nonparticipating", "xyzqpr", False
4030            )
4031
4032    def test_add_new_arguments_invalid_dialect(self):
4033        with self._fixture():
4034            assert_raises_message(
4035                exc.ArgumentError,
4036                "no dialect 'nonexistent'",
4037                Index.argument_for, "nonexistent", "foobar", 5
4038            )
4039
4040
4041class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
4042    __dialect__ = 'default'
4043
4044    def _fixture(self, naming_convention, table_schema=None):
4045        m1 = MetaData(naming_convention=naming_convention)
4046
4047        u1 = Table('user', m1,
4048                   Column('id', Integer, primary_key=True),
4049                   Column('version', Integer, primary_key=True),
4050                   Column('data', String(30)),
4051                   schema=table_schema
4052                   )
4053
4054        return u1
4055
4056    def test_uq_name(self):
4057        u1 = self._fixture(naming_convention={
4058            "uq": "uq_%(table_name)s_%(column_0_name)s"
4059        })
4060        uq = UniqueConstraint(u1.c.data)
4061        eq_(uq.name, "uq_user_data")
4062
4063    def test_ck_name_required(self):
4064        u1 = self._fixture(naming_convention={
4065            "ck": "ck_%(table_name)s_%(constraint_name)s"
4066        })
4067        ck = CheckConstraint(u1.c.data == 'x', name='mycheck')
4068        eq_(ck.name, "ck_user_mycheck")
4069
4070        assert_raises_message(
4071            exc.InvalidRequestError,
4072            r"Naming convention including %\(constraint_name\)s token "
4073            "requires that constraint is explicitly named.",
4074            CheckConstraint, u1.c.data == 'x'
4075        )
4076
4077    def test_ck_name_deferred_required(self):
4078        u1 = self._fixture(naming_convention={
4079            "ck": "ck_%(table_name)s_%(constraint_name)s"
4080        })
4081        ck = CheckConstraint(u1.c.data == 'x', name=elements._defer_name(None))
4082
4083        assert_raises_message(
4084            exc.InvalidRequestError,
4085            r"Naming convention including %\(constraint_name\)s token "
4086            "requires that constraint is explicitly named.",
4087            schema.AddConstraint(ck).compile
4088        )
4089
4090    def test_column_attached_ck_name(self):
4091        m = MetaData(naming_convention={
4092            "ck": "ck_%(table_name)s_%(constraint_name)s"
4093        })
4094        ck = CheckConstraint('x > 5', name='x1')
4095        Table('t', m, Column('x', ck))
4096        eq_(ck.name, "ck_t_x1")
4097
4098    def test_table_attached_ck_name(self):
4099        m = MetaData(naming_convention={
4100            "ck": "ck_%(table_name)s_%(constraint_name)s"
4101        })
4102        ck = CheckConstraint('x > 5', name='x1')
4103        Table('t', m, Column('x', Integer), ck)
4104        eq_(ck.name, "ck_t_x1")
4105
4106    def test_uq_name_already_conv(self):
4107        m = MetaData(naming_convention={
4108            "uq": "uq_%(constraint_name)s_%(column_0_name)s"
4109        })
4110
4111        t = Table('mytable', m)
4112        uq = UniqueConstraint(name=naming.conv('my_special_key'))
4113
4114        t.append_constraint(uq)
4115        eq_(uq.name, "my_special_key")
4116
4117    def test_fk_name_schema(self):
4118        u1 = self._fixture(naming_convention={
4119            "fk": "fk_%(table_name)s_%(column_0_name)s_"
4120            "%(referred_table_name)s_%(referred_column_0_name)s"
4121        }, table_schema="foo")
4122        m1 = u1.metadata
4123        a1 = Table('address', m1,
4124                   Column('id', Integer, primary_key=True),
4125                   Column('user_id', Integer),
4126                   Column('user_version_id', Integer)
4127                   )
4128        fk = ForeignKeyConstraint(['user_id', 'user_version_id'],
4129                                  ['foo.user.id', 'foo.user.version'])
4130        a1.append_constraint(fk)
4131        eq_(fk.name, "fk_address_user_id_user_id")
4132
4133    def test_fk_attrs(self):
4134        u1 = self._fixture(naming_convention={
4135            "fk": "fk_%(table_name)s_%(column_0_name)s_"
4136            "%(referred_table_name)s_%(referred_column_0_name)s"
4137        })
4138        m1 = u1.metadata
4139        a1 = Table('address', m1,
4140                   Column('id', Integer, primary_key=True),
4141                   Column('user_id', Integer),
4142                   Column('user_version_id', Integer)
4143                   )
4144        fk = ForeignKeyConstraint(['user_id', 'user_version_id'],
4145                                  ['user.id', 'user.version'])
4146        a1.append_constraint(fk)
4147        eq_(fk.name, "fk_address_user_id_user_id")
4148
4149    def test_custom(self):
4150        def key_hash(const, table):
4151            return "HASH_%s" % table.name
4152
4153        u1 = self._fixture(naming_convention={
4154            "fk": "fk_%(table_name)s_%(key_hash)s",
4155            "key_hash": key_hash
4156        })
4157        m1 = u1.metadata
4158        a1 = Table('address', m1,
4159                   Column('id', Integer, primary_key=True),
4160                   Column('user_id', Integer),
4161                   Column('user_version_id', Integer)
4162                   )
4163        fk = ForeignKeyConstraint(['user_id', 'user_version_id'],
4164                                  ['user.id', 'user.version'])
4165        a1.append_constraint(fk)
4166        eq_(fk.name, "fk_address_HASH_address")
4167
4168    def test_schematype_ck_name_boolean(self):
4169        m1 = MetaData(naming_convention={
4170            "ck": "ck_%(table_name)s_%(constraint_name)s"})
4171
4172        u1 = Table('user', m1,
4173                   Column('x', Boolean(name='foo'))
4174                   )
4175        # constraint is not hit
4176        eq_(
4177            [c for c in u1.constraints
4178                if isinstance(c, CheckConstraint)][0].name, "foo"
4179        )
4180        # but is hit at compile time
4181        self.assert_compile(
4182            schema.CreateTable(u1),
4183            'CREATE TABLE "user" ('
4184            "x BOOLEAN, "
4185            "CONSTRAINT ck_user_foo CHECK (x IN (0, 1))"
4186            ")"
4187        )
4188
4189    def test_schematype_ck_name_boolean_not_on_name(self):
4190        m1 = MetaData(naming_convention={
4191            "ck": "ck_%(table_name)s_%(column_0_name)s"})
4192
4193        u1 = Table('user', m1,
4194                   Column('x', Boolean())
4195                   )
4196        # constraint is not hit
4197        eq_(
4198            [c for c in u1.constraints
4199                if isinstance(c, CheckConstraint)][0].name, "_unnamed_"
4200        )
4201        # but is hit at compile time
4202        self.assert_compile(
4203            schema.CreateTable(u1),
4204            'CREATE TABLE "user" ('
4205            "x BOOLEAN, "
4206            "CONSTRAINT ck_user_x CHECK (x IN (0, 1))"
4207            ")"
4208        )
4209
4210    def test_schematype_ck_name_enum(self):
4211        m1 = MetaData(naming_convention={
4212            "ck": "ck_%(table_name)s_%(constraint_name)s"})
4213
4214        u1 = Table('user', m1,
4215                   Column('x', Enum('a', 'b', name='foo'))
4216                   )
4217        eq_(
4218            [c for c in u1.constraints
4219                if isinstance(c, CheckConstraint)][0].name, "foo"
4220        )
4221        # but is hit at compile time
4222        self.assert_compile(
4223            schema.CreateTable(u1),
4224            'CREATE TABLE "user" ('
4225            "x VARCHAR(1), "
4226            "CONSTRAINT ck_user_foo CHECK (x IN ('a', 'b'))"
4227            ")"
4228        )
4229
4230    def test_schematype_ck_name_propagate_conv(self):
4231        m1 = MetaData(naming_convention={
4232            "ck": "ck_%(table_name)s_%(constraint_name)s"})
4233
4234        u1 = Table('user', m1,
4235                   Column('x', Enum('a', 'b', name=naming.conv('foo')))
4236                   )
4237        eq_(
4238            [c for c in u1.constraints
4239                if isinstance(c, CheckConstraint)][0].name, "foo"
4240        )
4241        # but is hit at compile time
4242        self.assert_compile(
4243            schema.CreateTable(u1),
4244            'CREATE TABLE "user" ('
4245            "x VARCHAR(1), "
4246            "CONSTRAINT foo CHECK (x IN ('a', 'b'))"
4247            ")"
4248        )
4249
4250    def test_schematype_ck_name_boolean_no_name(self):
4251        m1 = MetaData(naming_convention={
4252            "ck": "ck_%(table_name)s_%(constraint_name)s"
4253        })
4254
4255        u1 = Table(
4256            'user', m1,
4257            Column('x', Boolean())
4258        )
4259        # constraint gets special _defer_none_name
4260        eq_(
4261            [c for c in u1.constraints
4262                if isinstance(c, CheckConstraint)][0].name, "_unnamed_"
4263        )
4264        # no issue with native boolean
4265        self.assert_compile(
4266            schema.CreateTable(u1),
4267            'CREATE TABLE "user" ('
4268            "x BOOLEAN"
4269            ")",
4270            dialect='postgresql'
4271        )
4272
4273        assert_raises_message(
4274            exc.InvalidRequestError,
4275            r"Naming convention including \%\(constraint_name\)s token "
4276            r"requires that constraint is explicitly named.",
4277            schema.CreateTable(u1).compile, dialect=default.DefaultDialect()
4278        )
4279
4280    def test_schematype_no_ck_name_boolean_no_name(self):
4281        m1 = MetaData()  # no naming convention
4282
4283        u1 = Table(
4284            'user', m1,
4285            Column('x', Boolean())
4286        )
4287        # constraint gets special _defer_none_name
4288        eq_(
4289            [c for c in u1.constraints
4290                if isinstance(c, CheckConstraint)][0].name, "_unnamed_"
4291        )
4292
4293        self.assert_compile(
4294            schema.CreateTable(u1),
4295            'CREATE TABLE "user" (x BOOLEAN, CHECK (x IN (0, 1)))'
4296        )
4297
4298    def test_ck_constraint_redundant_event(self):
4299        u1 = self._fixture(naming_convention={
4300            "ck": "ck_%(table_name)s_%(constraint_name)s"})
4301
4302        ck1 = CheckConstraint(u1.c.version > 3, name='foo')
4303        u1.append_constraint(ck1)
4304        u1.append_constraint(ck1)
4305        u1.append_constraint(ck1)
4306
4307        eq_(ck1.name, "ck_user_foo")
4308
4309    def test_pickle_metadata(self):
4310        m = MetaData(naming_convention={"pk": "%(table_name)s_pk"})
4311
4312        m2 = pickle.loads(pickle.dumps(m))
4313
4314        eq_(m2.naming_convention, {"pk": "%(table_name)s_pk"})
4315
4316        t2a = Table('t2', m, Column('id', Integer, primary_key=True))
4317        t2b = Table('t2', m2, Column('id', Integer, primary_key=True))
4318
4319        eq_(t2a.primary_key.name, t2b.primary_key.name)
4320        eq_(t2b.primary_key.name, "t2_pk")
4321