1# coding: utf-8 2 3from sqlalchemy import BLOB 4from sqlalchemy import BOOLEAN 5from sqlalchemy import Boolean 6from sqlalchemy import cast 7from sqlalchemy import CHAR 8from sqlalchemy import CLOB 9from sqlalchemy import Column 10from sqlalchemy import DATE 11from sqlalchemy import Date 12from sqlalchemy import DATETIME 13from sqlalchemy import DateTime 14from sqlalchemy import DECIMAL 15from sqlalchemy import exc 16from sqlalchemy import extract 17from sqlalchemy import FLOAT 18from sqlalchemy import Float 19from sqlalchemy import ForeignKey 20from sqlalchemy import func 21from sqlalchemy import Index 22from sqlalchemy import INT 23from sqlalchemy import Integer 24from sqlalchemy import Interval 25from sqlalchemy import LargeBinary 26from sqlalchemy import literal 27from sqlalchemy import MetaData 28from sqlalchemy import NCHAR 29from sqlalchemy import NUMERIC 30from sqlalchemy import Numeric 31from sqlalchemy import NVARCHAR 32from sqlalchemy import PrimaryKeyConstraint 33from sqlalchemy import schema 34from sqlalchemy import select 35from sqlalchemy import SmallInteger 36from sqlalchemy import sql 37from sqlalchemy import String 38from sqlalchemy import Table 39from sqlalchemy import TEXT 40from sqlalchemy import TIME 41from sqlalchemy import Time 42from sqlalchemy import TIMESTAMP 43from sqlalchemy import types as sqltypes 44from sqlalchemy import Unicode 45from sqlalchemy import UnicodeText 46from sqlalchemy import VARCHAR 47from sqlalchemy.dialects.mysql import base as mysql 48from sqlalchemy.dialects.mysql import insert 49from sqlalchemy.sql import column 50from sqlalchemy.sql import table 51from sqlalchemy.sql.expression import literal_column 52from sqlalchemy.testing import assert_raises_message 53from sqlalchemy.testing import AssertsCompiledSQL 54from sqlalchemy.testing import eq_ 55from sqlalchemy.testing import expect_warnings 56from sqlalchemy.testing import fixtures 57 58 59class CompileTest(fixtures.TestBase, AssertsCompiledSQL): 60 61 __dialect__ = mysql.dialect() 62 63 def test_reserved_words(self): 64 table = Table( 65 "mysql_table", 66 MetaData(), 67 Column("col1", Integer), 68 Column("master_ssl_verify_server_cert", Integer), 69 ) 70 x = select([table.c.col1, table.c.master_ssl_verify_server_cert]) 71 72 self.assert_compile( 73 x, 74 "SELECT mysql_table.col1, " 75 "mysql_table.`master_ssl_verify_server_cert` FROM mysql_table", 76 ) 77 78 def test_create_index_simple(self): 79 m = MetaData() 80 tbl = Table("testtbl", m, Column("data", String(255))) 81 idx = Index("test_idx1", tbl.c.data) 82 83 self.assert_compile( 84 schema.CreateIndex(idx), "CREATE INDEX test_idx1 ON testtbl (data)" 85 ) 86 87 def test_create_index_with_prefix(self): 88 m = MetaData() 89 tbl = Table("testtbl", m, Column("data", String(255))) 90 idx = Index( 91 "test_idx1", tbl.c.data, mysql_length=10, mysql_prefix="FULLTEXT" 92 ) 93 94 self.assert_compile( 95 schema.CreateIndex(idx), 96 "CREATE FULLTEXT INDEX test_idx1 " "ON testtbl (data(10))", 97 ) 98 99 def test_create_index_with_length(self): 100 m = MetaData() 101 tbl = Table("testtbl", m, Column("data", String(255))) 102 idx1 = Index("test_idx1", tbl.c.data, mysql_length=10) 103 idx2 = Index("test_idx2", tbl.c.data, mysql_length=5) 104 105 self.assert_compile( 106 schema.CreateIndex(idx1), 107 "CREATE INDEX test_idx1 ON testtbl (data(10))", 108 ) 109 self.assert_compile( 110 schema.CreateIndex(idx2), 111 "CREATE INDEX test_idx2 ON testtbl (data(5))", 112 ) 113 114 def test_create_index_with_length_quoted(self): 115 m = MetaData() 116 tbl = Table( 117 "testtbl", m, Column("some quoted data", String(255), key="s") 118 ) 119 idx1 = Index("test_idx1", tbl.c.s, mysql_length=10) 120 121 self.assert_compile( 122 schema.CreateIndex(idx1), 123 "CREATE INDEX test_idx1 ON testtbl (`some quoted data`(10))", 124 ) 125 126 def test_create_composite_index_with_length_quoted(self): 127 m = MetaData() 128 tbl = Table( 129 "testtbl", 130 m, 131 Column("some Quoted a", String(255), key="a"), 132 Column("some Quoted b", String(255), key="b"), 133 ) 134 idx1 = Index( 135 "test_idx1", 136 tbl.c.a, 137 tbl.c.b, 138 mysql_length={"some Quoted a": 10, "some Quoted b": 20}, 139 ) 140 141 self.assert_compile( 142 schema.CreateIndex(idx1), 143 "CREATE INDEX test_idx1 ON testtbl " 144 "(`some Quoted a`(10), `some Quoted b`(20))", 145 ) 146 147 def test_create_composite_index_with_length_quoted_3085_workaround(self): 148 m = MetaData() 149 tbl = Table( 150 "testtbl", 151 m, 152 Column("some quoted a", String(255), key="a"), 153 Column("some quoted b", String(255), key="b"), 154 ) 155 idx1 = Index( 156 "test_idx1", 157 tbl.c.a, 158 tbl.c.b, 159 mysql_length={"`some quoted a`": 10, "`some quoted b`": 20}, 160 ) 161 162 self.assert_compile( 163 schema.CreateIndex(idx1), 164 "CREATE INDEX test_idx1 ON testtbl " 165 "(`some quoted a`(10), `some quoted b`(20))", 166 ) 167 168 def test_create_composite_index_with_length(self): 169 m = MetaData() 170 tbl = Table( 171 "testtbl", m, Column("a", String(255)), Column("b", String(255)) 172 ) 173 174 idx1 = Index( 175 "test_idx1", tbl.c.a, tbl.c.b, mysql_length={"a": 10, "b": 20} 176 ) 177 idx2 = Index("test_idx2", tbl.c.a, tbl.c.b, mysql_length={"a": 15}) 178 idx3 = Index("test_idx3", tbl.c.a, tbl.c.b, mysql_length=30) 179 180 self.assert_compile( 181 schema.CreateIndex(idx1), 182 "CREATE INDEX test_idx1 ON testtbl (a(10), b(20))", 183 ) 184 self.assert_compile( 185 schema.CreateIndex(idx2), 186 "CREATE INDEX test_idx2 ON testtbl (a(15), b)", 187 ) 188 self.assert_compile( 189 schema.CreateIndex(idx3), 190 "CREATE INDEX test_idx3 ON testtbl (a(30), b(30))", 191 ) 192 193 def test_create_index_with_using(self): 194 m = MetaData() 195 tbl = Table("testtbl", m, Column("data", String(255))) 196 idx1 = Index("test_idx1", tbl.c.data, mysql_using="btree") 197 idx2 = Index("test_idx2", tbl.c.data, mysql_using="hash") 198 199 self.assert_compile( 200 schema.CreateIndex(idx1), 201 "CREATE INDEX test_idx1 ON testtbl (data) USING btree", 202 ) 203 self.assert_compile( 204 schema.CreateIndex(idx2), 205 "CREATE INDEX test_idx2 ON testtbl (data) USING hash", 206 ) 207 208 def test_create_pk_plain(self): 209 m = MetaData() 210 tbl = Table( 211 "testtbl", 212 m, 213 Column("data", String(255)), 214 PrimaryKeyConstraint("data"), 215 ) 216 217 self.assert_compile( 218 schema.CreateTable(tbl), 219 "CREATE TABLE testtbl (data VARCHAR(255) NOT NULL, " 220 "PRIMARY KEY (data))", 221 ) 222 223 def test_create_pk_with_using(self): 224 m = MetaData() 225 tbl = Table( 226 "testtbl", 227 m, 228 Column("data", String(255)), 229 PrimaryKeyConstraint("data", mysql_using="btree"), 230 ) 231 232 self.assert_compile( 233 schema.CreateTable(tbl), 234 "CREATE TABLE testtbl (data VARCHAR(255) NOT NULL, " 235 "PRIMARY KEY (data) USING btree)", 236 ) 237 238 def test_create_index_expr(self): 239 m = MetaData() 240 t1 = Table("foo", m, Column("x", Integer)) 241 self.assert_compile( 242 schema.CreateIndex(Index("bar", t1.c.x > 5)), 243 "CREATE INDEX bar ON foo (x > 5)", 244 ) 245 246 def test_deferrable_initially_kw_not_ignored(self): 247 m = MetaData() 248 Table("t1", m, Column("id", Integer, primary_key=True)) 249 t2 = Table( 250 "t2", 251 m, 252 Column( 253 "id", 254 Integer, 255 ForeignKey("t1.id", deferrable=True, initially="XYZ"), 256 primary_key=True, 257 ), 258 ) 259 260 self.assert_compile( 261 schema.CreateTable(t2), 262 "CREATE TABLE t2 (id INTEGER NOT NULL, " 263 "PRIMARY KEY (id), FOREIGN KEY(id) REFERENCES t1 (id) " 264 "DEFERRABLE INITIALLY XYZ)", 265 ) 266 267 def test_match_kw_raises(self): 268 m = MetaData() 269 Table("t1", m, Column("id", Integer, primary_key=True)) 270 t2 = Table( 271 "t2", 272 m, 273 Column( 274 "id", 275 Integer, 276 ForeignKey("t1.id", match="XYZ"), 277 primary_key=True, 278 ), 279 ) 280 281 assert_raises_message( 282 exc.CompileError, 283 "MySQL ignores the 'MATCH' keyword while at the same time causes " 284 "ON UPDATE/ON DELETE clauses to be ignored.", 285 schema.CreateTable(t2).compile, 286 dialect=mysql.dialect(), 287 ) 288 289 def test_match(self): 290 matchtable = table("matchtable", column("title", String)) 291 self.assert_compile( 292 matchtable.c.title.match("somstr"), 293 "MATCH (matchtable.title) AGAINST (%s IN BOOLEAN MODE)", 294 ) 295 296 def test_match_compile_kw(self): 297 expr = literal("x").match(literal("y")) 298 self.assert_compile( 299 expr, 300 "MATCH ('x') AGAINST ('y' IN BOOLEAN MODE)", 301 literal_binds=True, 302 ) 303 304 def test_concat_compile_kw(self): 305 expr = literal("x", type_=String) + literal("y", type_=String) 306 self.assert_compile(expr, "concat('x', 'y')", literal_binds=True) 307 308 def test_for_update(self): 309 table1 = table( 310 "mytable", column("myid"), column("name"), column("description") 311 ) 312 313 self.assert_compile( 314 table1.select(table1.c.myid == 7).with_for_update(), 315 "SELECT mytable.myid, mytable.name, mytable.description " 316 "FROM mytable WHERE mytable.myid = %s FOR UPDATE", 317 ) 318 319 self.assert_compile( 320 table1.select(table1.c.myid == 7).with_for_update(read=True), 321 "SELECT mytable.myid, mytable.name, mytable.description " 322 "FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE", 323 ) 324 325 def test_delete_extra_froms(self): 326 t1 = table("t1", column("c1")) 327 t2 = table("t2", column("c1")) 328 q = sql.delete(t1).where(t1.c.c1 == t2.c.c1) 329 self.assert_compile( 330 q, "DELETE FROM t1 USING t1, t2 WHERE t1.c1 = t2.c1" 331 ) 332 333 def test_delete_extra_froms_alias(self): 334 a1 = table("t1", column("c1")).alias("a1") 335 t2 = table("t2", column("c1")) 336 q = sql.delete(a1).where(a1.c.c1 == t2.c.c1) 337 self.assert_compile( 338 q, "DELETE FROM a1 USING t1 AS a1, t2 WHERE a1.c1 = t2.c1" 339 ) 340 self.assert_compile(sql.delete(a1), "DELETE FROM t1 AS a1") 341 342 343class SQLTest(fixtures.TestBase, AssertsCompiledSQL): 344 345 """Tests MySQL-dialect specific compilation.""" 346 347 __dialect__ = mysql.dialect() 348 349 def test_precolumns(self): 350 dialect = self.__dialect__ 351 352 def gen(distinct=None, prefixes=None): 353 kw = {} 354 if distinct is not None: 355 kw["distinct"] = distinct 356 if prefixes is not None: 357 kw["prefixes"] = prefixes 358 return str(select([column("q")], **kw).compile(dialect=dialect)) 359 360 eq_(gen(None), "SELECT q") 361 eq_(gen(True), "SELECT DISTINCT q") 362 363 eq_(gen(prefixes=["ALL"]), "SELECT ALL q") 364 eq_(gen(prefixes=["DISTINCTROW"]), "SELECT DISTINCTROW q") 365 366 # Interaction with MySQL prefix extensions 367 eq_(gen(None, ["straight_join"]), "SELECT straight_join q") 368 eq_( 369 gen(False, ["HIGH_PRIORITY", "SQL_SMALL_RESULT", "ALL"]), 370 "SELECT HIGH_PRIORITY SQL_SMALL_RESULT ALL q", 371 ) 372 eq_( 373 gen(True, ["high_priority", sql.text("sql_cache")]), 374 "SELECT high_priority sql_cache DISTINCT q", 375 ) 376 377 def test_backslash_escaping(self): 378 self.assert_compile( 379 sql.column("foo").like("bar", escape="\\"), 380 "foo LIKE %s ESCAPE '\\\\'", 381 ) 382 383 dialect = mysql.dialect() 384 dialect._backslash_escapes = False 385 self.assert_compile( 386 sql.column("foo").like("bar", escape="\\"), 387 "foo LIKE %s ESCAPE '\\'", 388 dialect=dialect, 389 ) 390 391 def test_limit(self): 392 t = sql.table("t", sql.column("col1"), sql.column("col2")) 393 394 self.assert_compile( 395 select([t]).limit(10).offset(20), 396 "SELECT t.col1, t.col2 FROM t LIMIT %s, %s", 397 {"param_1": 20, "param_2": 10}, 398 ) 399 self.assert_compile( 400 select([t]).limit(10), 401 "SELECT t.col1, t.col2 FROM t LIMIT %s", 402 {"param_1": 10}, 403 ) 404 405 self.assert_compile( 406 select([t]).offset(10), 407 "SELECT t.col1, t.col2 FROM t LIMIT %s, 18446744073709551615", 408 {"param_1": 10}, 409 ) 410 411 def test_varchar_raise(self): 412 for type_ in ( 413 String, 414 VARCHAR, 415 String(), 416 VARCHAR(), 417 NVARCHAR(), 418 Unicode, 419 Unicode(), 420 ): 421 type_ = sqltypes.to_instance(type_) 422 assert_raises_message( 423 exc.CompileError, 424 "VARCHAR requires a length on dialect mysql", 425 type_.compile, 426 dialect=mysql.dialect(), 427 ) 428 429 t1 = Table("sometable", MetaData(), Column("somecolumn", type_)) 430 assert_raises_message( 431 exc.CompileError, 432 r"\(in table 'sometable', column 'somecolumn'\)\: " 433 r"(?:N)?VARCHAR requires a length on dialect mysql", 434 schema.CreateTable(t1).compile, 435 dialect=mysql.dialect(), 436 ) 437 438 def test_update_limit(self): 439 t = sql.table("t", sql.column("col1"), sql.column("col2")) 440 441 self.assert_compile( 442 t.update(values={"col1": 123}), "UPDATE t SET col1=%s" 443 ) 444 self.assert_compile( 445 t.update(values={"col1": 123}, mysql_limit=5), 446 "UPDATE t SET col1=%s LIMIT 5", 447 ) 448 self.assert_compile( 449 t.update(values={"col1": 123}, mysql_limit=None), 450 "UPDATE t SET col1=%s", 451 ) 452 self.assert_compile( 453 t.update(t.c.col2 == 456, values={"col1": 123}, mysql_limit=1), 454 "UPDATE t SET col1=%s WHERE t.col2 = %s LIMIT 1", 455 ) 456 457 def test_utc_timestamp(self): 458 self.assert_compile(func.utc_timestamp(), "utc_timestamp()") 459 460 def test_utc_timestamp_fsp(self): 461 self.assert_compile( 462 func.utc_timestamp(5), 463 "utc_timestamp(%s)", 464 checkparams={"utc_timestamp_1": 5}, 465 ) 466 467 def test_sysdate(self): 468 self.assert_compile(func.sysdate(), "SYSDATE()") 469 470 def test_cast(self): 471 t = sql.table("t", sql.column("col")) 472 m = mysql 473 474 specs = [ 475 (Integer, "CAST(t.col AS SIGNED INTEGER)"), 476 (INT, "CAST(t.col AS SIGNED INTEGER)"), 477 (m.MSInteger, "CAST(t.col AS SIGNED INTEGER)"), 478 (m.MSInteger(unsigned=True), "CAST(t.col AS UNSIGNED INTEGER)"), 479 (SmallInteger, "CAST(t.col AS SIGNED INTEGER)"), 480 (m.MSSmallInteger, "CAST(t.col AS SIGNED INTEGER)"), 481 (m.MSTinyInteger, "CAST(t.col AS SIGNED INTEGER)"), 482 # 'SIGNED INTEGER' is a bigint, so this is ok. 483 (m.MSBigInteger, "CAST(t.col AS SIGNED INTEGER)"), 484 (m.MSBigInteger(unsigned=False), "CAST(t.col AS SIGNED INTEGER)"), 485 (m.MSBigInteger(unsigned=True), "CAST(t.col AS UNSIGNED INTEGER)"), 486 # this is kind of sucky. thank you default arguments! 487 (NUMERIC, "CAST(t.col AS DECIMAL)"), 488 (DECIMAL, "CAST(t.col AS DECIMAL)"), 489 (Numeric, "CAST(t.col AS DECIMAL)"), 490 (m.MSNumeric, "CAST(t.col AS DECIMAL)"), 491 (m.MSDecimal, "CAST(t.col AS DECIMAL)"), 492 (TIMESTAMP, "CAST(t.col AS DATETIME)"), 493 (DATETIME, "CAST(t.col AS DATETIME)"), 494 (DATE, "CAST(t.col AS DATE)"), 495 (TIME, "CAST(t.col AS TIME)"), 496 (DateTime, "CAST(t.col AS DATETIME)"), 497 (Date, "CAST(t.col AS DATE)"), 498 (Time, "CAST(t.col AS TIME)"), 499 (DateTime, "CAST(t.col AS DATETIME)"), 500 (Date, "CAST(t.col AS DATE)"), 501 (m.MSTime, "CAST(t.col AS TIME)"), 502 (m.MSTimeStamp, "CAST(t.col AS DATETIME)"), 503 (String, "CAST(t.col AS CHAR)"), 504 (Unicode, "CAST(t.col AS CHAR)"), 505 (UnicodeText, "CAST(t.col AS CHAR)"), 506 (VARCHAR, "CAST(t.col AS CHAR)"), 507 (NCHAR, "CAST(t.col AS CHAR)"), 508 (CHAR, "CAST(t.col AS CHAR)"), 509 (m.CHAR(charset="utf8"), "CAST(t.col AS CHAR CHARACTER SET utf8)"), 510 (CLOB, "CAST(t.col AS CHAR)"), 511 (TEXT, "CAST(t.col AS CHAR)"), 512 (m.TEXT(charset="utf8"), "CAST(t.col AS CHAR CHARACTER SET utf8)"), 513 (String(32), "CAST(t.col AS CHAR(32))"), 514 (Unicode(32), "CAST(t.col AS CHAR(32))"), 515 (CHAR(32), "CAST(t.col AS CHAR(32))"), 516 (m.MSString, "CAST(t.col AS CHAR)"), 517 (m.MSText, "CAST(t.col AS CHAR)"), 518 (m.MSTinyText, "CAST(t.col AS CHAR)"), 519 (m.MSMediumText, "CAST(t.col AS CHAR)"), 520 (m.MSLongText, "CAST(t.col AS CHAR)"), 521 (m.MSNChar, "CAST(t.col AS CHAR)"), 522 (m.MSNVarChar, "CAST(t.col AS CHAR)"), 523 (LargeBinary, "CAST(t.col AS BINARY)"), 524 (BLOB, "CAST(t.col AS BINARY)"), 525 (m.MSBlob, "CAST(t.col AS BINARY)"), 526 (m.MSBlob(32), "CAST(t.col AS BINARY)"), 527 (m.MSTinyBlob, "CAST(t.col AS BINARY)"), 528 (m.MSMediumBlob, "CAST(t.col AS BINARY)"), 529 (m.MSLongBlob, "CAST(t.col AS BINARY)"), 530 (m.MSBinary, "CAST(t.col AS BINARY)"), 531 (m.MSBinary(32), "CAST(t.col AS BINARY)"), 532 (m.MSVarBinary, "CAST(t.col AS BINARY)"), 533 (m.MSVarBinary(32), "CAST(t.col AS BINARY)"), 534 (Interval, "CAST(t.col AS DATETIME)"), 535 ] 536 537 for type_, expected in specs: 538 self.assert_compile(cast(t.c.col, type_), expected) 539 540 def test_cast_type_decorator(self): 541 class MyInteger(sqltypes.TypeDecorator): 542 impl = Integer 543 544 type_ = MyInteger() 545 t = sql.table("t", sql.column("col")) 546 self.assert_compile( 547 cast(t.c.col, type_), "CAST(t.col AS SIGNED INTEGER)" 548 ) 549 550 def test_cast_literal_bind(self): 551 expr = cast(column("foo", Integer) + 5, Integer()) 552 553 self.assert_compile( 554 expr, "CAST(foo + 5 AS SIGNED INTEGER)", literal_binds=True 555 ) 556 557 def test_unsupported_cast_literal_bind(self): 558 expr = cast(column("foo", Integer) + 5, Float) 559 560 with expect_warnings("Datatype FLOAT does not support CAST on MySQL;"): 561 self.assert_compile(expr, "(foo + 5)", literal_binds=True) 562 563 dialect = mysql.MySQLDialect() 564 dialect.server_version_info = (3, 9, 8) 565 with expect_warnings("Current MySQL version does not support CAST"): 566 eq_( 567 str( 568 expr.compile( 569 dialect=dialect, compile_kwargs={"literal_binds": True} 570 ) 571 ), 572 "(foo + 5)", 573 ) 574 575 def test_unsupported_casts(self): 576 577 t = sql.table("t", sql.column("col")) 578 m = mysql 579 580 specs = [ 581 (m.MSBit, "t.col"), 582 (FLOAT, "t.col"), 583 (Float, "t.col"), 584 (m.MSFloat, "t.col"), 585 (m.MSDouble, "t.col"), 586 (m.MSReal, "t.col"), 587 (m.MSYear, "t.col"), 588 (m.MSYear(2), "t.col"), 589 (Boolean, "t.col"), 590 (BOOLEAN, "t.col"), 591 (m.MSEnum, "t.col"), 592 (m.MSEnum("1", "2"), "t.col"), 593 (m.MSSet, "t.col"), 594 (m.MSSet("1", "2"), "t.col"), 595 ] 596 597 for type_, expected in specs: 598 with expect_warnings( 599 "Datatype .* does not support CAST on MySQL;" 600 ): 601 self.assert_compile(cast(t.c.col, type_), expected) 602 603 def test_no_cast_pre_4(self): 604 self.assert_compile( 605 cast(Column("foo", Integer), String), "CAST(foo AS CHAR)" 606 ) 607 dialect = mysql.dialect() 608 dialect.server_version_info = (3, 2, 3) 609 with expect_warnings("Current MySQL version does not support CAST;"): 610 self.assert_compile( 611 cast(Column("foo", Integer), String), "foo", dialect=dialect 612 ) 613 614 def test_cast_grouped_expression_non_castable(self): 615 with expect_warnings("Datatype FLOAT does not support CAST on MySQL;"): 616 self.assert_compile( 617 cast(sql.column("x") + sql.column("y"), Float), "(x + y)" 618 ) 619 620 def test_cast_grouped_expression_pre_4(self): 621 dialect = mysql.dialect() 622 dialect.server_version_info = (3, 2, 3) 623 with expect_warnings("Current MySQL version does not support CAST;"): 624 self.assert_compile( 625 cast(sql.column("x") + sql.column("y"), Integer), 626 "(x + y)", 627 dialect=dialect, 628 ) 629 630 def test_extract(self): 631 t = sql.table("t", sql.column("col1")) 632 633 for field in "year", "month", "day": 634 self.assert_compile( 635 select([extract(field, t.c.col1)]), 636 "SELECT EXTRACT(%s FROM t.col1) AS anon_1 FROM t" % field, 637 ) 638 639 # millsecondS to millisecond 640 self.assert_compile( 641 select([extract("milliseconds", t.c.col1)]), 642 "SELECT EXTRACT(millisecond FROM t.col1) AS anon_1 FROM t", 643 ) 644 645 def test_too_long_index(self): 646 exp = "ix_zyrenian_zyme_zyzzogeton_zyzzogeton_zyrenian_zyme_zyz_5cd2" 647 tname = "zyrenian_zyme_zyzzogeton_zyzzogeton" 648 cname = "zyrenian_zyme_zyzzogeton_zo" 649 650 t1 = Table(tname, MetaData(), Column(cname, Integer, index=True)) 651 ix1 = list(t1.indexes)[0] 652 653 self.assert_compile( 654 schema.CreateIndex(ix1), 655 "CREATE INDEX %s " "ON %s (%s)" % (exp, tname, cname), 656 ) 657 658 def test_innodb_autoincrement(self): 659 t1 = Table( 660 "sometable", 661 MetaData(), 662 Column( 663 "assigned_id", Integer(), primary_key=True, autoincrement=False 664 ), 665 Column("id", Integer(), primary_key=True, autoincrement=True), 666 mysql_engine="InnoDB", 667 ) 668 self.assert_compile( 669 schema.CreateTable(t1), 670 "CREATE TABLE sometable (assigned_id " 671 "INTEGER NOT NULL, id INTEGER NOT NULL " 672 "AUTO_INCREMENT, PRIMARY KEY (id, assigned_id)" 673 ")ENGINE=InnoDB", 674 ) 675 676 t1 = Table( 677 "sometable", 678 MetaData(), 679 Column( 680 "assigned_id", Integer(), primary_key=True, autoincrement=True 681 ), 682 Column("id", Integer(), primary_key=True, autoincrement=False), 683 mysql_engine="InnoDB", 684 ) 685 self.assert_compile( 686 schema.CreateTable(t1), 687 "CREATE TABLE sometable (assigned_id " 688 "INTEGER NOT NULL AUTO_INCREMENT, id " 689 "INTEGER NOT NULL, PRIMARY KEY " 690 "(assigned_id, id))ENGINE=InnoDB", 691 ) 692 693 def test_innodb_autoincrement_reserved_word_column_name(self): 694 t1 = Table( 695 "sometable", 696 MetaData(), 697 Column("id", Integer(), primary_key=True, autoincrement=False), 698 Column("order", Integer(), primary_key=True, autoincrement=True), 699 mysql_engine="InnoDB", 700 ) 701 self.assert_compile( 702 schema.CreateTable(t1), 703 "CREATE TABLE sometable (" 704 "id INTEGER NOT NULL, " 705 "`order` INTEGER NOT NULL AUTO_INCREMENT, " 706 "PRIMARY KEY (`order`, id)" 707 ")ENGINE=InnoDB", 708 ) 709 710 def test_create_table_with_partition(self): 711 t1 = Table( 712 "testtable", 713 MetaData(), 714 Column("id", Integer(), primary_key=True, autoincrement=True), 715 Column( 716 "other_id", Integer(), primary_key=True, autoincrement=False 717 ), 718 mysql_partitions="2", 719 mysql_partition_by="KEY(other_id)", 720 ) 721 self.assert_compile( 722 schema.CreateTable(t1), 723 "CREATE TABLE testtable (" 724 "id INTEGER NOT NULL AUTO_INCREMENT, " 725 "other_id INTEGER NOT NULL, " 726 "PRIMARY KEY (id, other_id)" 727 ")PARTITION BY KEY(other_id) PARTITIONS 2", 728 ) 729 730 def test_create_table_with_subpartition(self): 731 t1 = Table( 732 "testtable", 733 MetaData(), 734 Column("id", Integer(), primary_key=True, autoincrement=True), 735 Column( 736 "other_id", Integer(), primary_key=True, autoincrement=False 737 ), 738 mysql_partitions="2", 739 mysql_partition_by="KEY(other_id)", 740 mysql_subpartition_by="HASH(some_expr)", 741 mysql_subpartitions="2", 742 ) 743 self.assert_compile( 744 schema.CreateTable(t1), 745 "CREATE TABLE testtable (" 746 "id INTEGER NOT NULL AUTO_INCREMENT, " 747 "other_id INTEGER NOT NULL, " 748 "PRIMARY KEY (id, other_id)" 749 ")PARTITION BY KEY(other_id) PARTITIONS 2 " 750 "SUBPARTITION BY HASH(some_expr) SUBPARTITIONS 2", 751 ) 752 753 def test_create_table_with_partition_hash(self): 754 t1 = Table( 755 "testtable", 756 MetaData(), 757 Column("id", Integer(), primary_key=True, autoincrement=True), 758 Column( 759 "other_id", Integer(), primary_key=True, autoincrement=False 760 ), 761 mysql_partitions="2", 762 mysql_partition_by="HASH(other_id)", 763 ) 764 self.assert_compile( 765 schema.CreateTable(t1), 766 "CREATE TABLE testtable (" 767 "id INTEGER NOT NULL AUTO_INCREMENT, " 768 "other_id INTEGER NOT NULL, " 769 "PRIMARY KEY (id, other_id)" 770 ")PARTITION BY HASH(other_id) PARTITIONS 2", 771 ) 772 773 def test_create_table_with_partition_and_other_opts(self): 774 t1 = Table( 775 "testtable", 776 MetaData(), 777 Column("id", Integer(), primary_key=True, autoincrement=True), 778 Column( 779 "other_id", Integer(), primary_key=True, autoincrement=False 780 ), 781 mysql_stats_sample_pages="2", 782 mysql_partitions="2", 783 mysql_partition_by="HASH(other_id)", 784 ) 785 self.assert_compile( 786 schema.CreateTable(t1), 787 "CREATE TABLE testtable (" 788 "id INTEGER NOT NULL AUTO_INCREMENT, " 789 "other_id INTEGER NOT NULL, " 790 "PRIMARY KEY (id, other_id)" 791 ")STATS_SAMPLE_PAGES=2 PARTITION BY HASH(other_id) PARTITIONS 2", 792 ) 793 794 def test_inner_join(self): 795 t1 = table("t1", column("x")) 796 t2 = table("t2", column("y")) 797 798 self.assert_compile( 799 t1.join(t2, t1.c.x == t2.c.y), "t1 INNER JOIN t2 ON t1.x = t2.y" 800 ) 801 802 def test_outer_join(self): 803 t1 = table("t1", column("x")) 804 t2 = table("t2", column("y")) 805 806 self.assert_compile( 807 t1.outerjoin(t2, t1.c.x == t2.c.y), 808 "t1 LEFT OUTER JOIN t2 ON t1.x = t2.y", 809 ) 810 811 def test_full_outer_join(self): 812 t1 = table("t1", column("x")) 813 t2 = table("t2", column("y")) 814 815 self.assert_compile( 816 t1.outerjoin(t2, t1.c.x == t2.c.y, full=True), 817 "t1 FULL OUTER JOIN t2 ON t1.x = t2.y", 818 ) 819 820 821class InsertOnDuplicateTest(fixtures.TestBase, AssertsCompiledSQL): 822 __dialect__ = mysql.dialect() 823 824 def setup(self): 825 self.table = Table( 826 "foos", 827 MetaData(), 828 Column("id", Integer, primary_key=True), 829 Column("bar", String(10)), 830 Column("baz", String(10)), 831 ) 832 833 def test_from_values(self): 834 stmt = insert(self.table).values( 835 [{"id": 1, "bar": "ab"}, {"id": 2, "bar": "b"}] 836 ) 837 stmt = stmt.on_duplicate_key_update( 838 bar=stmt.inserted.bar, baz=stmt.inserted.baz 839 ) 840 expected_sql = ( 841 "INSERT INTO foos (id, bar) VALUES (%s, %s), (%s, %s) " 842 "ON DUPLICATE KEY UPDATE bar = VALUES(bar), baz = VALUES(baz)" 843 ) 844 self.assert_compile(stmt, expected_sql) 845 846 def test_from_literal(self): 847 stmt = insert(self.table).values( 848 [{"id": 1, "bar": "ab"}, {"id": 2, "bar": "b"}] 849 ) 850 stmt = stmt.on_duplicate_key_update(bar=literal_column("bb")) 851 expected_sql = ( 852 "INSERT INTO foos (id, bar) VALUES (%s, %s), (%s, %s) " 853 "ON DUPLICATE KEY UPDATE bar = bb" 854 ) 855 self.assert_compile(stmt, expected_sql) 856 857 def test_python_values(self): 858 stmt = insert(self.table).values( 859 [{"id": 1, "bar": "ab"}, {"id": 2, "bar": "b"}] 860 ) 861 stmt = stmt.on_duplicate_key_update(bar="foobar") 862 expected_sql = ( 863 "INSERT INTO foos (id, bar) VALUES (%s, %s), (%s, %s) " 864 "ON DUPLICATE KEY UPDATE bar = %s" 865 ) 866 self.assert_compile(stmt, expected_sql) 867