1# coding: utf-8
2
3from sqlalchemy.testing import eq_, is_
4from sqlalchemy import Column, Table, DDL, MetaData, TIMESTAMP, \
5    DefaultClause, String, Integer, Text, UnicodeText, SmallInteger,\
6    NCHAR, LargeBinary, DateTime, select, UniqueConstraint, Unicode,\
7    BigInteger
8from sqlalchemy import event
9from sqlalchemy import sql
10from sqlalchemy import inspect
11from sqlalchemy.dialects.mysql import base as mysql
12from sqlalchemy.testing import fixtures, AssertsExecutionResults
13from sqlalchemy import testing
14
15
16class TypeReflectionTest(fixtures.TestBase):
17    __only_on__ = 'mysql'
18    __backend__ = True
19
20    @testing.provide_metadata
21    def _run_test(self, specs, attributes):
22        columns = [Column('c%i' % (i + 1), t[0]) for i, t in enumerate(specs)]
23
24        # Early 5.0 releases seem to report more "general" for columns
25        # in a view, e.g. char -> varchar, tinyblob -> mediumblob
26        use_views = testing.db.dialect.server_version_info > (5, 0, 10)
27
28        m = self.metadata
29        Table('mysql_types', m, *columns)
30
31        if use_views:
32            event.listen(
33                m, 'after_create',
34                DDL(
35                    'CREATE OR REPLACE VIEW mysql_types_v '
36                    'AS SELECT * from mysql_types')
37            )
38            event.listen(
39                m, 'before_drop',
40                DDL("DROP VIEW IF EXISTS mysql_types_v")
41            )
42        m.create_all()
43
44        m2 = MetaData(testing.db)
45        tables = [
46            Table('mysql_types', m2, autoload=True)
47        ]
48        if use_views:
49            tables.append(Table('mysql_types_v', m2, autoload=True))
50
51        for table in tables:
52            for i, (reflected_col, spec) in enumerate(zip(table.c, specs)):
53                expected_spec = spec[1]
54                reflected_type = reflected_col.type
55                is_(type(reflected_type), type(expected_spec))
56
57                for attr in attributes:
58                    eq_(
59                        getattr(reflected_type, attr),
60                        getattr(expected_spec, attr),
61                        "Column %s: Attribute %s value of %s does not "
62                        "match %s for type %s" % (
63                            "c%i" % (i + 1),
64                            attr,
65                            getattr(reflected_type, attr),
66                            getattr(expected_spec, attr),
67                            spec[0]
68                        )
69                    )
70
71    def test_time_types(self):
72        specs = []
73
74        if testing.requires.mysql_fsp.enabled:
75            fsps = [None, 0, 5]
76        else:
77            fsps = [None]
78
79        for type_ in (mysql.TIMESTAMP, mysql.DATETIME, mysql.TIME):
80            # MySQL defaults fsp to 0, and if 0 does not report it.
81            # we don't actually render 0 right now in DDL but even if we do,
82            # it comes back blank
83            for fsp in fsps:
84                if fsp:
85                    specs.append((type_(fsp=fsp), type_(fsp=fsp)))
86                else:
87                    specs.append((type_(), type_()))
88
89        specs.extend([
90            (TIMESTAMP(), mysql.TIMESTAMP()),
91            (DateTime(), mysql.DATETIME()),
92        ])
93
94        # note 'timezone' should always be None on both
95        self._run_test(specs, ['fsp', 'timezone'])
96
97    def test_year_types(self):
98        specs = [
99            (mysql.YEAR(), mysql.YEAR(display_width=4)),
100            (mysql.YEAR(display_width=2), mysql.YEAR(display_width=2)),
101            (mysql.YEAR(display_width=4), mysql.YEAR(display_width=4)),
102        ]
103
104        self._run_test(specs, ['display_width'])
105
106    def test_string_types(self):
107        specs = [
108            (String(1), mysql.MSString(1)),
109            (String(3), mysql.MSString(3)),
110            (Text(), mysql.MSText()),
111            (Unicode(1), mysql.MSString(1)),
112            (Unicode(3), mysql.MSString(3)),
113            (UnicodeText(), mysql.MSText()),
114            (mysql.MSChar(1), mysql.MSChar(1)),
115            (mysql.MSChar(3), mysql.MSChar(3)),
116            (NCHAR(2), mysql.MSChar(2)),
117            (mysql.MSNChar(2), mysql.MSChar(2)),
118            (mysql.MSNVarChar(22), mysql.MSString(22),),
119        ]
120        self._run_test(specs, ['length'])
121
122    def test_integer_types(self):
123        specs = []
124        for type_ in [
125                mysql.TINYINT, mysql.SMALLINT,
126                mysql.MEDIUMINT, mysql.INTEGER, mysql.BIGINT]:
127            for display_width in [None, 4, 7]:
128                for unsigned in [False, True]:
129                    for zerofill in [None, True]:
130                        kw = {}
131                        if display_width:
132                            kw['display_width'] = display_width
133                        if unsigned is not None:
134                            kw['unsigned'] = unsigned
135                        if zerofill is not None:
136                            kw['zerofill'] = zerofill
137
138                        zerofill = bool(zerofill)
139                        source_type = type_(**kw)
140
141                        if display_width is None:
142                            display_width = {
143                                mysql.MEDIUMINT: 9,
144                                mysql.SMALLINT: 6,
145                                mysql.TINYINT: 4,
146                                mysql.INTEGER: 11,
147                                mysql.BIGINT: 20
148                            }[type_]
149
150                        if zerofill:
151                            unsigned = True
152
153                        expected_type = type_(
154                            display_width=display_width,
155                            unsigned=unsigned,
156                            zerofill=zerofill
157                        )
158                        specs.append(
159                            (source_type, expected_type)
160                        )
161
162        specs.extend([
163            (SmallInteger(), mysql.SMALLINT(display_width=6)),
164            (Integer(), mysql.INTEGER(display_width=11)),
165            (BigInteger, mysql.BIGINT(display_width=20))
166        ])
167        self._run_test(specs, ['display_width', 'unsigned', 'zerofill'])
168
169    def test_binary_types(self):
170        specs = [
171            (LargeBinary(3), mysql.TINYBLOB(), ),
172            (LargeBinary(), mysql.BLOB()),
173            (mysql.MSBinary(3), mysql.MSBinary(3), ),
174            (mysql.MSVarBinary(3), mysql.MSVarBinary(3)),
175            (mysql.MSTinyBlob(), mysql.MSTinyBlob()),
176            (mysql.MSBlob(), mysql.MSBlob()),
177            (mysql.MSBlob(1234), mysql.MSBlob()),
178            (mysql.MSMediumBlob(), mysql.MSMediumBlob()),
179            (mysql.MSLongBlob(), mysql.MSLongBlob()),
180        ]
181        self._run_test(specs, [])
182
183    @testing.uses_deprecated('Manually quoting ENUM value literals')
184    def test_legacy_enum_types(self):
185
186        specs = [
187            (mysql.ENUM("''","'fleem'"), mysql.ENUM("''","'fleem'")),  # noqa
188        ]
189
190        self._run_test(specs, ['enums'])
191
192
193class ReflectionTest(fixtures.TestBase, AssertsExecutionResults):
194
195    __only_on__ = 'mysql'
196    __backend__ = True
197
198    def test_default_reflection(self):
199        """Test reflection of column defaults."""
200
201        from sqlalchemy.dialects.mysql import VARCHAR
202        def_table = Table(
203            'mysql_def',
204            MetaData(testing.db),
205            Column('c1', VARCHAR(10, collation='utf8_unicode_ci'),
206                   DefaultClause(''), nullable=False),
207            Column('c2', String(10), DefaultClause('0')),
208            Column('c3', String(10), DefaultClause('abc')),
209            Column('c4', TIMESTAMP, DefaultClause('2009-04-05 12:00:00')),
210            Column('c5', TIMESTAMP),
211            Column('c6', TIMESTAMP,
212                   DefaultClause(sql.text("CURRENT_TIMESTAMP "
213                                          "ON UPDATE CURRENT_TIMESTAMP"))),
214        )
215        def_table.create()
216        try:
217            reflected = Table('mysql_def', MetaData(testing.db),
218                              autoload=True)
219        finally:
220            def_table.drop()
221        assert def_table.c.c1.server_default.arg == ''
222        assert def_table.c.c2.server_default.arg == '0'
223        assert def_table.c.c3.server_default.arg == 'abc'
224        assert def_table.c.c4.server_default.arg \
225            == '2009-04-05 12:00:00'
226        assert str(reflected.c.c1.server_default.arg) == "''"
227        assert str(reflected.c.c2.server_default.arg) == "'0'"
228        assert str(reflected.c.c3.server_default.arg) == "'abc'"
229        assert str(reflected.c.c4.server_default.arg) \
230            == "'2009-04-05 12:00:00'"
231        assert reflected.c.c5.default is None
232        assert reflected.c.c5.server_default is None
233        assert reflected.c.c6.default is None
234        eq_(
235            str(reflected.c.c6.server_default.arg).upper(),
236            "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
237        )
238        reflected.create()
239        try:
240            reflected2 = Table('mysql_def', MetaData(testing.db),
241                               autoload=True)
242        finally:
243            reflected.drop()
244        assert str(reflected2.c.c1.server_default.arg) == "''"
245        assert str(reflected2.c.c2.server_default.arg) == "'0'"
246        assert str(reflected2.c.c3.server_default.arg) == "'abc'"
247        assert str(reflected2.c.c4.server_default.arg) \
248            == "'2009-04-05 12:00:00'"
249        assert reflected.c.c5.default is None
250        assert reflected.c.c5.server_default is None
251        assert reflected.c.c6.default is None
252        eq_(
253            str(reflected.c.c6.server_default.arg).upper(),
254            "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"
255        )
256
257    def test_reflection_with_table_options(self):
258        comment = r"""Comment types type speedily ' " \ '' Fun!"""
259
260        def_table = Table(
261            'mysql_def', MetaData(testing.db),
262            Column('c1', Integer()),
263            mysql_engine='MEMORY',
264            mysql_comment=comment,
265            mysql_default_charset='utf8',
266            mysql_auto_increment='5',
267            mysql_avg_row_length='3',
268            mysql_password='secret',
269            mysql_connection='fish',
270        )
271
272        def_table.create()
273        try:
274            reflected = Table(
275                'mysql_def', MetaData(testing.db),
276                autoload=True)
277        finally:
278            def_table.drop()
279
280        assert def_table.kwargs['mysql_engine'] == 'MEMORY'
281        assert def_table.kwargs['mysql_comment'] == comment
282        assert def_table.kwargs['mysql_default_charset'] == 'utf8'
283        assert def_table.kwargs['mysql_auto_increment'] == '5'
284        assert def_table.kwargs['mysql_avg_row_length'] == '3'
285        assert def_table.kwargs['mysql_password'] == 'secret'
286        assert def_table.kwargs['mysql_connection'] == 'fish'
287
288        assert reflected.kwargs['mysql_engine'] == 'MEMORY'
289        assert reflected.kwargs['mysql_comment'] == comment
290        assert reflected.kwargs['mysql_default charset'] == 'utf8'
291        assert reflected.kwargs['mysql_avg_row_length'] == '3'
292        assert reflected.kwargs['mysql_connection'] == 'fish'
293
294        # This field doesn't seem to be returned by mysql itself.
295        # assert reflected.kwargs['mysql_password'] == 'secret'
296
297        # This is explicitly ignored when reflecting schema.
298        # assert reflected.kwargs['mysql_auto_increment'] == '5'
299
300    def test_reflection_on_include_columns(self):
301        """Test reflection of include_columns to be sure they respect case."""
302
303        case_table = Table(
304            'mysql_case', MetaData(testing.db),
305            Column('c1', String(10)),
306            Column('C2', String(10)),
307            Column('C3', String(10)))
308
309        try:
310            case_table.create()
311            reflected = Table('mysql_case', MetaData(testing.db),
312                              autoload=True, include_columns=['c1', 'C2'])
313            for t in case_table, reflected:
314                assert 'c1' in t.c.keys()
315                assert 'C2' in t.c.keys()
316            reflected2 = Table(
317                'mysql_case', MetaData(testing.db),
318                autoload=True, include_columns=['c1', 'c2'])
319            assert 'c1' in reflected2.c.keys()
320            for c in ['c2', 'C2', 'C3']:
321                assert c not in reflected2.c.keys()
322        finally:
323            case_table.drop()
324
325    def test_autoincrement(self):
326        meta = MetaData(testing.db)
327        try:
328            Table('ai_1', meta,
329                  Column('int_y', Integer, primary_key=True),
330                  Column('int_n', Integer, DefaultClause('0'),
331                         primary_key=True),
332                         mysql_engine='MyISAM')
333            Table('ai_2', meta,
334                  Column('int_y', Integer, primary_key=True),
335                  Column('int_n', Integer, DefaultClause('0'),
336                         primary_key=True),
337                         mysql_engine='MyISAM')
338            Table('ai_3', meta,
339                  Column('int_n', Integer, DefaultClause('0'),
340                         primary_key=True, autoincrement=False),
341                  Column('int_y', Integer, primary_key=True),
342                         mysql_engine='MyISAM')
343            Table('ai_4', meta,
344                  Column('int_n', Integer, DefaultClause('0'),
345                         primary_key=True, autoincrement=False),
346                  Column('int_n2', Integer, DefaultClause('0'),
347                         primary_key=True, autoincrement=False),
348                         mysql_engine='MyISAM')
349            Table('ai_5', meta,
350                  Column('int_y', Integer, primary_key=True),
351                  Column('int_n', Integer, DefaultClause('0'),
352                         primary_key=True, autoincrement=False),
353                         mysql_engine='MyISAM')
354            Table('ai_6', meta,
355                  Column('o1', String(1), DefaultClause('x'),
356                         primary_key=True),
357                  Column('int_y', Integer, primary_key=True),
358                         mysql_engine='MyISAM')
359            Table('ai_7', meta,
360                  Column('o1', String(1), DefaultClause('x'),
361                         primary_key=True),
362                  Column('o2', String(1), DefaultClause('x'),
363                         primary_key=True),
364                  Column('int_y', Integer, primary_key=True),
365                         mysql_engine='MyISAM')
366            Table('ai_8', meta,
367                  Column('o1', String(1), DefaultClause('x'),
368                         primary_key=True),
369                  Column('o2', String(1), DefaultClause('x'),
370                         primary_key=True),
371                         mysql_engine='MyISAM')
372            meta.create_all()
373
374            table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4',
375                           'ai_5', 'ai_6', 'ai_7', 'ai_8']
376            mr = MetaData(testing.db)
377            mr.reflect(only=table_names)
378
379            for tbl in [mr.tables[name] for name in table_names]:
380                for c in tbl.c:
381                    if c.name.startswith('int_y'):
382                        assert c.autoincrement
383                    elif c.name.startswith('int_n'):
384                        assert not c.autoincrement
385                tbl.insert().execute()
386                if 'int_y' in tbl.c:
387                    assert select([tbl.c.int_y]).scalar() == 1
388                    assert list(tbl.select().execute().first()).count(1) == 1
389                else:
390                    assert 1 not in list(tbl.select().execute().first())
391        finally:
392            meta.drop_all()
393
394    @testing.provide_metadata
395    def test_view_reflection(self):
396        Table('x', self.metadata, Column('a', Integer), Column('b', String(50)))
397        self.metadata.create_all()
398
399        with testing.db.connect() as conn:
400            conn.execute("CREATE VIEW v1 AS SELECT * FROM x")
401            conn.execute(
402                "CREATE ALGORITHM=MERGE VIEW v2 AS SELECT * FROM x")
403            conn.execute(
404                "CREATE ALGORITHM=UNDEFINED VIEW v3 AS SELECT * FROM x")
405            conn.execute(
406                "CREATE DEFINER=CURRENT_USER VIEW v4 AS SELECT * FROM x")
407
408        @event.listens_for(self.metadata, "before_drop")
409        def cleanup(*arg, **kw):
410            with testing.db.connect() as conn:
411                for v in ['v1', 'v2', 'v3', 'v4']:
412                    conn.execute("DROP VIEW %s" % v)
413
414        insp = inspect(testing.db)
415        for v in ['v1', 'v2', 'v3', 'v4']:
416            eq_(
417                [
418                    (col['name'], col['type'].__class__)
419                    for col in insp.get_columns(v)
420                ],
421                [('a', mysql.INTEGER), ('b', mysql.VARCHAR)]
422            )
423
424
425    @testing.exclude('mysql', '<', (5, 0, 0), 'no information_schema support')
426    def test_system_views(self):
427        dialect = testing.db.dialect
428        connection = testing.db.connect()
429        view_names = dialect.get_view_names(connection, "information_schema")
430        self.assert_('TABLES' in view_names)
431
432    @testing.provide_metadata
433    def test_nullable_reflection(self):
434        """test reflection of NULL/NOT NULL, in particular with TIMESTAMP
435        defaults where MySQL is inconsistent in how it reports CREATE TABLE.
436
437        """
438        meta = self.metadata
439
440        # this is ideally one table, but older MySQL versions choke
441        # on the multiple TIMESTAMP columns
442
443        reflected = []
444        for idx, cols in enumerate([
445            [
446                "x INTEGER NULL",
447                "y INTEGER NOT NULL",
448                "z INTEGER",
449                "q TIMESTAMP NULL"
450            ],
451
452            ["p TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP"],
453            ["r TIMESTAMP NOT NULL"],
454            ["s TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP"],
455            ["t TIMESTAMP"],
456            ["u TIMESTAMP DEFAULT CURRENT_TIMESTAMP"]
457        ]):
458            Table("nn_t%d" % idx, meta)  # to allow DROP
459
460            testing.db.execute("""
461                CREATE TABLE nn_t%d (
462                    %s
463                )
464            """ % (idx, ", \n".join(cols)))
465
466            reflected.extend(
467                {
468                    "name": d['name'], "nullable": d['nullable'],
469                    "default": d['default']}
470                for d in inspect(testing.db).get_columns("nn_t%d" % idx)
471            )
472
473        eq_(
474            reflected,
475            [
476                {'name': 'x', 'nullable': True, 'default': None},
477                {'name': 'y', 'nullable': False, 'default': None},
478                {'name': 'z', 'nullable': True, 'default': None},
479                {'name': 'q', 'nullable': True, 'default': None},
480                {'name': 'p', 'nullable': True,
481                 'default': 'CURRENT_TIMESTAMP'},
482                {'name': 'r', 'nullable': False,
483                 'default': "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"},
484                {'name': 's', 'nullable': False,
485                 'default': 'CURRENT_TIMESTAMP'},
486                {'name': 't', 'nullable': False,
487                 'default': "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"},
488                {'name': 'u', 'nullable': False,
489                 'default': 'CURRENT_TIMESTAMP'},
490            ]
491        )
492
493    @testing.provide_metadata
494    def test_reflection_with_unique_constraint(self):
495        insp = inspect(testing.db)
496
497        meta = self.metadata
498        uc_table = Table('mysql_uc', meta,
499                         Column('a', String(10)),
500                         UniqueConstraint('a', name='uc_a'))
501
502        uc_table.create()
503
504        # MySQL converts unique constraints into unique indexes.
505        # separately we get both
506        indexes = dict((i['name'], i) for i in insp.get_indexes('mysql_uc'))
507        constraints = set(i['name']
508                          for i in insp.get_unique_constraints('mysql_uc'))
509
510        self.assert_('uc_a' in indexes)
511        self.assert_(indexes['uc_a']['unique'])
512        self.assert_('uc_a' in constraints)
513
514        # reflection here favors the unique index, as that's the
515        # more "official" MySQL construct
516        reflected = Table('mysql_uc', MetaData(testing.db), autoload=True)
517
518        indexes = dict((i.name, i) for i in reflected.indexes)
519        constraints = set(uc.name for uc in reflected.constraints)
520
521        self.assert_('uc_a' in indexes)
522        self.assert_(indexes['uc_a'].unique)
523        self.assert_('uc_a' not in constraints)
524
525
526class RawReflectionTest(fixtures.TestBase):
527    def setup(self):
528        dialect = mysql.dialect()
529        self.parser = mysql.MySQLTableDefinitionParser(
530            dialect, dialect.identifier_preparer)
531
532    def test_key_reflection(self):
533        regex = self.parser._re_key
534
535        assert regex.match('  PRIMARY KEY (`id`),')
536        assert regex.match('  PRIMARY KEY USING BTREE (`id`),')
537        assert regex.match('  PRIMARY KEY (`id`) USING BTREE,')
538        assert regex.match('  PRIMARY KEY (`id`)')
539        assert regex.match('  PRIMARY KEY USING BTREE (`id`)')
540        assert regex.match('  PRIMARY KEY (`id`) USING BTREE')
541        assert regex.match(
542            '  PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE 16')
543        assert regex.match(
544            '  PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE=16')
545        assert regex.match(
546            '  PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE  = 16')
547        assert not regex.match(
548            '  PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE = = 16')
549
550    def test_fk_reflection(self):
551        regex = self.parser._re_constraint
552
553        m = regex.match('  CONSTRAINT `addresses_user_id_fkey` '
554                        'FOREIGN KEY (`user_id`) '
555                        'REFERENCES `users` (`id`) '
556                        'ON DELETE CASCADE ON UPDATE CASCADE')
557        eq_(m.groups(), ('addresses_user_id_fkey', '`user_id`',
558                            '`users`', '`id`', None, 'CASCADE', 'CASCADE'))
559
560
561        m = regex.match('  CONSTRAINT `addresses_user_id_fkey` '
562                        'FOREIGN KEY (`user_id`) '
563                        'REFERENCES `users` (`id`) '
564                        'ON DELETE CASCADE ON UPDATE SET NULL')
565        eq_(m.groups(), ('addresses_user_id_fkey', '`user_id`',
566                            '`users`', '`id`', None, 'CASCADE', 'SET NULL'))
567
568
569