1# coding: utf-8 2 3from sqlalchemy.testing import eq_, is_ 4from sqlalchemy import Column, Table, DDL, MetaData, TIMESTAMP, \ 5 DefaultClause, String, Integer, Text, UnicodeText, SmallInteger,\ 6 NCHAR, LargeBinary, DateTime, select, UniqueConstraint, Unicode,\ 7 BigInteger 8from sqlalchemy import event 9from sqlalchemy import sql 10from sqlalchemy import inspect 11from sqlalchemy.dialects.mysql import base as mysql 12from sqlalchemy.testing import fixtures, AssertsExecutionResults 13from sqlalchemy import testing 14 15 16class TypeReflectionTest(fixtures.TestBase): 17 __only_on__ = 'mysql' 18 __backend__ = True 19 20 @testing.provide_metadata 21 def _run_test(self, specs, attributes): 22 columns = [Column('c%i' % (i + 1), t[0]) for i, t in enumerate(specs)] 23 24 # Early 5.0 releases seem to report more "general" for columns 25 # in a view, e.g. char -> varchar, tinyblob -> mediumblob 26 use_views = testing.db.dialect.server_version_info > (5, 0, 10) 27 28 m = self.metadata 29 Table('mysql_types', m, *columns) 30 31 if use_views: 32 event.listen( 33 m, 'after_create', 34 DDL( 35 'CREATE OR REPLACE VIEW mysql_types_v ' 36 'AS SELECT * from mysql_types') 37 ) 38 event.listen( 39 m, 'before_drop', 40 DDL("DROP VIEW IF EXISTS mysql_types_v") 41 ) 42 m.create_all() 43 44 m2 = MetaData(testing.db) 45 tables = [ 46 Table('mysql_types', m2, autoload=True) 47 ] 48 if use_views: 49 tables.append(Table('mysql_types_v', m2, autoload=True)) 50 51 for table in tables: 52 for i, (reflected_col, spec) in enumerate(zip(table.c, specs)): 53 expected_spec = spec[1] 54 reflected_type = reflected_col.type 55 is_(type(reflected_type), type(expected_spec)) 56 57 for attr in attributes: 58 eq_( 59 getattr(reflected_type, attr), 60 getattr(expected_spec, attr), 61 "Column %s: Attribute %s value of %s does not " 62 "match %s for type %s" % ( 63 "c%i" % (i + 1), 64 attr, 65 getattr(reflected_type, attr), 66 getattr(expected_spec, attr), 67 spec[0] 68 ) 69 ) 70 71 def test_time_types(self): 72 specs = [] 73 74 if testing.requires.mysql_fsp.enabled: 75 fsps = [None, 0, 5] 76 else: 77 fsps = [None] 78 79 for type_ in (mysql.TIMESTAMP, mysql.DATETIME, mysql.TIME): 80 # MySQL defaults fsp to 0, and if 0 does not report it. 81 # we don't actually render 0 right now in DDL but even if we do, 82 # it comes back blank 83 for fsp in fsps: 84 if fsp: 85 specs.append((type_(fsp=fsp), type_(fsp=fsp))) 86 else: 87 specs.append((type_(), type_())) 88 89 specs.extend([ 90 (TIMESTAMP(), mysql.TIMESTAMP()), 91 (DateTime(), mysql.DATETIME()), 92 ]) 93 94 # note 'timezone' should always be None on both 95 self._run_test(specs, ['fsp', 'timezone']) 96 97 def test_year_types(self): 98 specs = [ 99 (mysql.YEAR(), mysql.YEAR(display_width=4)), 100 (mysql.YEAR(display_width=2), mysql.YEAR(display_width=2)), 101 (mysql.YEAR(display_width=4), mysql.YEAR(display_width=4)), 102 ] 103 104 self._run_test(specs, ['display_width']) 105 106 def test_string_types(self): 107 specs = [ 108 (String(1), mysql.MSString(1)), 109 (String(3), mysql.MSString(3)), 110 (Text(), mysql.MSText()), 111 (Unicode(1), mysql.MSString(1)), 112 (Unicode(3), mysql.MSString(3)), 113 (UnicodeText(), mysql.MSText()), 114 (mysql.MSChar(1), mysql.MSChar(1)), 115 (mysql.MSChar(3), mysql.MSChar(3)), 116 (NCHAR(2), mysql.MSChar(2)), 117 (mysql.MSNChar(2), mysql.MSChar(2)), 118 (mysql.MSNVarChar(22), mysql.MSString(22),), 119 ] 120 self._run_test(specs, ['length']) 121 122 def test_integer_types(self): 123 specs = [] 124 for type_ in [ 125 mysql.TINYINT, mysql.SMALLINT, 126 mysql.MEDIUMINT, mysql.INTEGER, mysql.BIGINT]: 127 for display_width in [None, 4, 7]: 128 for unsigned in [False, True]: 129 for zerofill in [None, True]: 130 kw = {} 131 if display_width: 132 kw['display_width'] = display_width 133 if unsigned is not None: 134 kw['unsigned'] = unsigned 135 if zerofill is not None: 136 kw['zerofill'] = zerofill 137 138 zerofill = bool(zerofill) 139 source_type = type_(**kw) 140 141 if display_width is None: 142 display_width = { 143 mysql.MEDIUMINT: 9, 144 mysql.SMALLINT: 6, 145 mysql.TINYINT: 4, 146 mysql.INTEGER: 11, 147 mysql.BIGINT: 20 148 }[type_] 149 150 if zerofill: 151 unsigned = True 152 153 expected_type = type_( 154 display_width=display_width, 155 unsigned=unsigned, 156 zerofill=zerofill 157 ) 158 specs.append( 159 (source_type, expected_type) 160 ) 161 162 specs.extend([ 163 (SmallInteger(), mysql.SMALLINT(display_width=6)), 164 (Integer(), mysql.INTEGER(display_width=11)), 165 (BigInteger, mysql.BIGINT(display_width=20)) 166 ]) 167 self._run_test(specs, ['display_width', 'unsigned', 'zerofill']) 168 169 def test_binary_types(self): 170 specs = [ 171 (LargeBinary(3), mysql.TINYBLOB(), ), 172 (LargeBinary(), mysql.BLOB()), 173 (mysql.MSBinary(3), mysql.MSBinary(3), ), 174 (mysql.MSVarBinary(3), mysql.MSVarBinary(3)), 175 (mysql.MSTinyBlob(), mysql.MSTinyBlob()), 176 (mysql.MSBlob(), mysql.MSBlob()), 177 (mysql.MSBlob(1234), mysql.MSBlob()), 178 (mysql.MSMediumBlob(), mysql.MSMediumBlob()), 179 (mysql.MSLongBlob(), mysql.MSLongBlob()), 180 ] 181 self._run_test(specs, []) 182 183 @testing.uses_deprecated('Manually quoting ENUM value literals') 184 def test_legacy_enum_types(self): 185 186 specs = [ 187 (mysql.ENUM("''","'fleem'"), mysql.ENUM("''","'fleem'")), # noqa 188 ] 189 190 self._run_test(specs, ['enums']) 191 192 193class ReflectionTest(fixtures.TestBase, AssertsExecutionResults): 194 195 __only_on__ = 'mysql' 196 __backend__ = True 197 198 def test_default_reflection(self): 199 """Test reflection of column defaults.""" 200 201 from sqlalchemy.dialects.mysql import VARCHAR 202 def_table = Table( 203 'mysql_def', 204 MetaData(testing.db), 205 Column('c1', VARCHAR(10, collation='utf8_unicode_ci'), 206 DefaultClause(''), nullable=False), 207 Column('c2', String(10), DefaultClause('0')), 208 Column('c3', String(10), DefaultClause('abc')), 209 Column('c4', TIMESTAMP, DefaultClause('2009-04-05 12:00:00')), 210 Column('c5', TIMESTAMP), 211 Column('c6', TIMESTAMP, 212 DefaultClause(sql.text("CURRENT_TIMESTAMP " 213 "ON UPDATE CURRENT_TIMESTAMP"))), 214 ) 215 def_table.create() 216 try: 217 reflected = Table('mysql_def', MetaData(testing.db), 218 autoload=True) 219 finally: 220 def_table.drop() 221 assert def_table.c.c1.server_default.arg == '' 222 assert def_table.c.c2.server_default.arg == '0' 223 assert def_table.c.c3.server_default.arg == 'abc' 224 assert def_table.c.c4.server_default.arg \ 225 == '2009-04-05 12:00:00' 226 assert str(reflected.c.c1.server_default.arg) == "''" 227 assert str(reflected.c.c2.server_default.arg) == "'0'" 228 assert str(reflected.c.c3.server_default.arg) == "'abc'" 229 assert str(reflected.c.c4.server_default.arg) \ 230 == "'2009-04-05 12:00:00'" 231 assert reflected.c.c5.default is None 232 assert reflected.c.c5.server_default is None 233 assert reflected.c.c6.default is None 234 eq_( 235 str(reflected.c.c6.server_default.arg).upper(), 236 "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP" 237 ) 238 reflected.create() 239 try: 240 reflected2 = Table('mysql_def', MetaData(testing.db), 241 autoload=True) 242 finally: 243 reflected.drop() 244 assert str(reflected2.c.c1.server_default.arg) == "''" 245 assert str(reflected2.c.c2.server_default.arg) == "'0'" 246 assert str(reflected2.c.c3.server_default.arg) == "'abc'" 247 assert str(reflected2.c.c4.server_default.arg) \ 248 == "'2009-04-05 12:00:00'" 249 assert reflected.c.c5.default is None 250 assert reflected.c.c5.server_default is None 251 assert reflected.c.c6.default is None 252 eq_( 253 str(reflected.c.c6.server_default.arg).upper(), 254 "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP" 255 ) 256 257 def test_reflection_with_table_options(self): 258 comment = r"""Comment types type speedily ' " \ '' Fun!""" 259 260 def_table = Table( 261 'mysql_def', MetaData(testing.db), 262 Column('c1', Integer()), 263 mysql_engine='MEMORY', 264 mysql_comment=comment, 265 mysql_default_charset='utf8', 266 mysql_auto_increment='5', 267 mysql_avg_row_length='3', 268 mysql_password='secret', 269 mysql_connection='fish', 270 ) 271 272 def_table.create() 273 try: 274 reflected = Table( 275 'mysql_def', MetaData(testing.db), 276 autoload=True) 277 finally: 278 def_table.drop() 279 280 assert def_table.kwargs['mysql_engine'] == 'MEMORY' 281 assert def_table.kwargs['mysql_comment'] == comment 282 assert def_table.kwargs['mysql_default_charset'] == 'utf8' 283 assert def_table.kwargs['mysql_auto_increment'] == '5' 284 assert def_table.kwargs['mysql_avg_row_length'] == '3' 285 assert def_table.kwargs['mysql_password'] == 'secret' 286 assert def_table.kwargs['mysql_connection'] == 'fish' 287 288 assert reflected.kwargs['mysql_engine'] == 'MEMORY' 289 assert reflected.kwargs['mysql_comment'] == comment 290 assert reflected.kwargs['mysql_default charset'] == 'utf8' 291 assert reflected.kwargs['mysql_avg_row_length'] == '3' 292 assert reflected.kwargs['mysql_connection'] == 'fish' 293 294 # This field doesn't seem to be returned by mysql itself. 295 # assert reflected.kwargs['mysql_password'] == 'secret' 296 297 # This is explicitly ignored when reflecting schema. 298 # assert reflected.kwargs['mysql_auto_increment'] == '5' 299 300 def test_reflection_on_include_columns(self): 301 """Test reflection of include_columns to be sure they respect case.""" 302 303 case_table = Table( 304 'mysql_case', MetaData(testing.db), 305 Column('c1', String(10)), 306 Column('C2', String(10)), 307 Column('C3', String(10))) 308 309 try: 310 case_table.create() 311 reflected = Table('mysql_case', MetaData(testing.db), 312 autoload=True, include_columns=['c1', 'C2']) 313 for t in case_table, reflected: 314 assert 'c1' in t.c.keys() 315 assert 'C2' in t.c.keys() 316 reflected2 = Table( 317 'mysql_case', MetaData(testing.db), 318 autoload=True, include_columns=['c1', 'c2']) 319 assert 'c1' in reflected2.c.keys() 320 for c in ['c2', 'C2', 'C3']: 321 assert c not in reflected2.c.keys() 322 finally: 323 case_table.drop() 324 325 def test_autoincrement(self): 326 meta = MetaData(testing.db) 327 try: 328 Table('ai_1', meta, 329 Column('int_y', Integer, primary_key=True), 330 Column('int_n', Integer, DefaultClause('0'), 331 primary_key=True), 332 mysql_engine='MyISAM') 333 Table('ai_2', meta, 334 Column('int_y', Integer, primary_key=True), 335 Column('int_n', Integer, DefaultClause('0'), 336 primary_key=True), 337 mysql_engine='MyISAM') 338 Table('ai_3', meta, 339 Column('int_n', Integer, DefaultClause('0'), 340 primary_key=True, autoincrement=False), 341 Column('int_y', Integer, primary_key=True), 342 mysql_engine='MyISAM') 343 Table('ai_4', meta, 344 Column('int_n', Integer, DefaultClause('0'), 345 primary_key=True, autoincrement=False), 346 Column('int_n2', Integer, DefaultClause('0'), 347 primary_key=True, autoincrement=False), 348 mysql_engine='MyISAM') 349 Table('ai_5', meta, 350 Column('int_y', Integer, primary_key=True), 351 Column('int_n', Integer, DefaultClause('0'), 352 primary_key=True, autoincrement=False), 353 mysql_engine='MyISAM') 354 Table('ai_6', meta, 355 Column('o1', String(1), DefaultClause('x'), 356 primary_key=True), 357 Column('int_y', Integer, primary_key=True), 358 mysql_engine='MyISAM') 359 Table('ai_7', meta, 360 Column('o1', String(1), DefaultClause('x'), 361 primary_key=True), 362 Column('o2', String(1), DefaultClause('x'), 363 primary_key=True), 364 Column('int_y', Integer, primary_key=True), 365 mysql_engine='MyISAM') 366 Table('ai_8', meta, 367 Column('o1', String(1), DefaultClause('x'), 368 primary_key=True), 369 Column('o2', String(1), DefaultClause('x'), 370 primary_key=True), 371 mysql_engine='MyISAM') 372 meta.create_all() 373 374 table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4', 375 'ai_5', 'ai_6', 'ai_7', 'ai_8'] 376 mr = MetaData(testing.db) 377 mr.reflect(only=table_names) 378 379 for tbl in [mr.tables[name] for name in table_names]: 380 for c in tbl.c: 381 if c.name.startswith('int_y'): 382 assert c.autoincrement 383 elif c.name.startswith('int_n'): 384 assert not c.autoincrement 385 tbl.insert().execute() 386 if 'int_y' in tbl.c: 387 assert select([tbl.c.int_y]).scalar() == 1 388 assert list(tbl.select().execute().first()).count(1) == 1 389 else: 390 assert 1 not in list(tbl.select().execute().first()) 391 finally: 392 meta.drop_all() 393 394 @testing.provide_metadata 395 def test_view_reflection(self): 396 Table('x', self.metadata, Column('a', Integer), Column('b', String(50))) 397 self.metadata.create_all() 398 399 with testing.db.connect() as conn: 400 conn.execute("CREATE VIEW v1 AS SELECT * FROM x") 401 conn.execute( 402 "CREATE ALGORITHM=MERGE VIEW v2 AS SELECT * FROM x") 403 conn.execute( 404 "CREATE ALGORITHM=UNDEFINED VIEW v3 AS SELECT * FROM x") 405 conn.execute( 406 "CREATE DEFINER=CURRENT_USER VIEW v4 AS SELECT * FROM x") 407 408 @event.listens_for(self.metadata, "before_drop") 409 def cleanup(*arg, **kw): 410 with testing.db.connect() as conn: 411 for v in ['v1', 'v2', 'v3', 'v4']: 412 conn.execute("DROP VIEW %s" % v) 413 414 insp = inspect(testing.db) 415 for v in ['v1', 'v2', 'v3', 'v4']: 416 eq_( 417 [ 418 (col['name'], col['type'].__class__) 419 for col in insp.get_columns(v) 420 ], 421 [('a', mysql.INTEGER), ('b', mysql.VARCHAR)] 422 ) 423 424 425 @testing.exclude('mysql', '<', (5, 0, 0), 'no information_schema support') 426 def test_system_views(self): 427 dialect = testing.db.dialect 428 connection = testing.db.connect() 429 view_names = dialect.get_view_names(connection, "information_schema") 430 self.assert_('TABLES' in view_names) 431 432 @testing.provide_metadata 433 def test_nullable_reflection(self): 434 """test reflection of NULL/NOT NULL, in particular with TIMESTAMP 435 defaults where MySQL is inconsistent in how it reports CREATE TABLE. 436 437 """ 438 meta = self.metadata 439 440 # this is ideally one table, but older MySQL versions choke 441 # on the multiple TIMESTAMP columns 442 443 reflected = [] 444 for idx, cols in enumerate([ 445 [ 446 "x INTEGER NULL", 447 "y INTEGER NOT NULL", 448 "z INTEGER", 449 "q TIMESTAMP NULL" 450 ], 451 452 ["p TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP"], 453 ["r TIMESTAMP NOT NULL"], 454 ["s TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP"], 455 ["t TIMESTAMP"], 456 ["u TIMESTAMP DEFAULT CURRENT_TIMESTAMP"] 457 ]): 458 Table("nn_t%d" % idx, meta) # to allow DROP 459 460 testing.db.execute(""" 461 CREATE TABLE nn_t%d ( 462 %s 463 ) 464 """ % (idx, ", \n".join(cols))) 465 466 reflected.extend( 467 { 468 "name": d['name'], "nullable": d['nullable'], 469 "default": d['default']} 470 for d in inspect(testing.db).get_columns("nn_t%d" % idx) 471 ) 472 473 eq_( 474 reflected, 475 [ 476 {'name': 'x', 'nullable': True, 'default': None}, 477 {'name': 'y', 'nullable': False, 'default': None}, 478 {'name': 'z', 'nullable': True, 'default': None}, 479 {'name': 'q', 'nullable': True, 'default': None}, 480 {'name': 'p', 'nullable': True, 481 'default': 'CURRENT_TIMESTAMP'}, 482 {'name': 'r', 'nullable': False, 483 'default': "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"}, 484 {'name': 's', 'nullable': False, 485 'default': 'CURRENT_TIMESTAMP'}, 486 {'name': 't', 'nullable': False, 487 'default': "CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"}, 488 {'name': 'u', 'nullable': False, 489 'default': 'CURRENT_TIMESTAMP'}, 490 ] 491 ) 492 493 @testing.provide_metadata 494 def test_reflection_with_unique_constraint(self): 495 insp = inspect(testing.db) 496 497 meta = self.metadata 498 uc_table = Table('mysql_uc', meta, 499 Column('a', String(10)), 500 UniqueConstraint('a', name='uc_a')) 501 502 uc_table.create() 503 504 # MySQL converts unique constraints into unique indexes. 505 # separately we get both 506 indexes = dict((i['name'], i) for i in insp.get_indexes('mysql_uc')) 507 constraints = set(i['name'] 508 for i in insp.get_unique_constraints('mysql_uc')) 509 510 self.assert_('uc_a' in indexes) 511 self.assert_(indexes['uc_a']['unique']) 512 self.assert_('uc_a' in constraints) 513 514 # reflection here favors the unique index, as that's the 515 # more "official" MySQL construct 516 reflected = Table('mysql_uc', MetaData(testing.db), autoload=True) 517 518 indexes = dict((i.name, i) for i in reflected.indexes) 519 constraints = set(uc.name for uc in reflected.constraints) 520 521 self.assert_('uc_a' in indexes) 522 self.assert_(indexes['uc_a'].unique) 523 self.assert_('uc_a' not in constraints) 524 525 526class RawReflectionTest(fixtures.TestBase): 527 def setup(self): 528 dialect = mysql.dialect() 529 self.parser = mysql.MySQLTableDefinitionParser( 530 dialect, dialect.identifier_preparer) 531 532 def test_key_reflection(self): 533 regex = self.parser._re_key 534 535 assert regex.match(' PRIMARY KEY (`id`),') 536 assert regex.match(' PRIMARY KEY USING BTREE (`id`),') 537 assert regex.match(' PRIMARY KEY (`id`) USING BTREE,') 538 assert regex.match(' PRIMARY KEY (`id`)') 539 assert regex.match(' PRIMARY KEY USING BTREE (`id`)') 540 assert regex.match(' PRIMARY KEY (`id`) USING BTREE') 541 assert regex.match( 542 ' PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE 16') 543 assert regex.match( 544 ' PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE=16') 545 assert regex.match( 546 ' PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE = 16') 547 assert not regex.match( 548 ' PRIMARY KEY (`id`) USING BTREE KEY_BLOCK_SIZE = = 16') 549 550 def test_fk_reflection(self): 551 regex = self.parser._re_constraint 552 553 m = regex.match(' CONSTRAINT `addresses_user_id_fkey` ' 554 'FOREIGN KEY (`user_id`) ' 555 'REFERENCES `users` (`id`) ' 556 'ON DELETE CASCADE ON UPDATE CASCADE') 557 eq_(m.groups(), ('addresses_user_id_fkey', '`user_id`', 558 '`users`', '`id`', None, 'CASCADE', 'CASCADE')) 559 560 561 m = regex.match(' CONSTRAINT `addresses_user_id_fkey` ' 562 'FOREIGN KEY (`user_id`) ' 563 'REFERENCES `users` (`id`) ' 564 'ON DELETE CASCADE ON UPDATE SET NULL') 565 eq_(m.groups(), ('addresses_user_id_fkey', '`user_id`', 566 '`users`', '`id`', None, 'CASCADE', 'SET NULL')) 567 568 569