1# coding: utf-8
2
3from sqlalchemy import BLOB
4from sqlalchemy import BOOLEAN
5from sqlalchemy import Boolean
6from sqlalchemy import cast
7from sqlalchemy import CHAR
8from sqlalchemy import CLOB
9from sqlalchemy import Column
10from sqlalchemy import DATE
11from sqlalchemy import Date
12from sqlalchemy import DATETIME
13from sqlalchemy import DateTime
14from sqlalchemy import DECIMAL
15from sqlalchemy import exc
16from sqlalchemy import extract
17from sqlalchemy import FLOAT
18from sqlalchemy import Float
19from sqlalchemy import ForeignKey
20from sqlalchemy import func
21from sqlalchemy import Index
22from sqlalchemy import INT
23from sqlalchemy import Integer
24from sqlalchemy import Interval
25from sqlalchemy import LargeBinary
26from sqlalchemy import literal
27from sqlalchemy import MetaData
28from sqlalchemy import NCHAR
29from sqlalchemy import NUMERIC
30from sqlalchemy import Numeric
31from sqlalchemy import NVARCHAR
32from sqlalchemy import PrimaryKeyConstraint
33from sqlalchemy import schema
34from sqlalchemy import select
35from sqlalchemy import SmallInteger
36from sqlalchemy import sql
37from sqlalchemy import String
38from sqlalchemy import Table
39from sqlalchemy import TEXT
40from sqlalchemy import TIME
41from sqlalchemy import Time
42from sqlalchemy import TIMESTAMP
43from sqlalchemy import types as sqltypes
44from sqlalchemy import Unicode
45from sqlalchemy import UnicodeText
46from sqlalchemy import VARCHAR
47from sqlalchemy.dialects.mysql import base as mysql
48from sqlalchemy.dialects.mysql import insert
49from sqlalchemy.sql import column
50from sqlalchemy.sql import table
51from sqlalchemy.sql.expression import literal_column
52from sqlalchemy.testing import assert_raises_message
53from sqlalchemy.testing import AssertsCompiledSQL
54from sqlalchemy.testing import eq_
55from sqlalchemy.testing import expect_warnings
56from sqlalchemy.testing import fixtures
57
58
59class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
60
61    __dialect__ = mysql.dialect()
62
63    def test_reserved_words(self):
64        table = Table(
65            "mysql_table",
66            MetaData(),
67            Column("col1", Integer),
68            Column("master_ssl_verify_server_cert", Integer),
69        )
70        x = select([table.c.col1, table.c.master_ssl_verify_server_cert])
71
72        self.assert_compile(
73            x,
74            "SELECT mysql_table.col1, "
75            "mysql_table.`master_ssl_verify_server_cert` FROM mysql_table",
76        )
77
78    def test_create_index_simple(self):
79        m = MetaData()
80        tbl = Table("testtbl", m, Column("data", String(255)))
81        idx = Index("test_idx1", tbl.c.data)
82
83        self.assert_compile(
84            schema.CreateIndex(idx), "CREATE INDEX test_idx1 ON testtbl (data)"
85        )
86
87    def test_create_index_with_prefix(self):
88        m = MetaData()
89        tbl = Table("testtbl", m, Column("data", String(255)))
90        idx = Index(
91            "test_idx1", tbl.c.data, mysql_length=10, mysql_prefix="FULLTEXT"
92        )
93
94        self.assert_compile(
95            schema.CreateIndex(idx),
96            "CREATE FULLTEXT INDEX test_idx1 " "ON testtbl (data(10))",
97        )
98
99    def test_create_index_with_length(self):
100        m = MetaData()
101        tbl = Table("testtbl", m, Column("data", String(255)))
102        idx1 = Index("test_idx1", tbl.c.data, mysql_length=10)
103        idx2 = Index("test_idx2", tbl.c.data, mysql_length=5)
104
105        self.assert_compile(
106            schema.CreateIndex(idx1),
107            "CREATE INDEX test_idx1 ON testtbl (data(10))",
108        )
109        self.assert_compile(
110            schema.CreateIndex(idx2),
111            "CREATE INDEX test_idx2 ON testtbl (data(5))",
112        )
113
114    def test_create_index_with_length_quoted(self):
115        m = MetaData()
116        tbl = Table(
117            "testtbl", m, Column("some quoted data", String(255), key="s")
118        )
119        idx1 = Index("test_idx1", tbl.c.s, mysql_length=10)
120
121        self.assert_compile(
122            schema.CreateIndex(idx1),
123            "CREATE INDEX test_idx1 ON testtbl (`some quoted data`(10))",
124        )
125
126    def test_create_composite_index_with_length_quoted(self):
127        m = MetaData()
128        tbl = Table(
129            "testtbl",
130            m,
131            Column("some Quoted a", String(255), key="a"),
132            Column("some Quoted b", String(255), key="b"),
133        )
134        idx1 = Index(
135            "test_idx1",
136            tbl.c.a,
137            tbl.c.b,
138            mysql_length={"some Quoted a": 10, "some Quoted b": 20},
139        )
140
141        self.assert_compile(
142            schema.CreateIndex(idx1),
143            "CREATE INDEX test_idx1 ON testtbl "
144            "(`some Quoted a`(10), `some Quoted b`(20))",
145        )
146
147    def test_create_composite_index_with_length_quoted_3085_workaround(self):
148        m = MetaData()
149        tbl = Table(
150            "testtbl",
151            m,
152            Column("some quoted a", String(255), key="a"),
153            Column("some quoted b", String(255), key="b"),
154        )
155        idx1 = Index(
156            "test_idx1",
157            tbl.c.a,
158            tbl.c.b,
159            mysql_length={"`some quoted a`": 10, "`some quoted b`": 20},
160        )
161
162        self.assert_compile(
163            schema.CreateIndex(idx1),
164            "CREATE INDEX test_idx1 ON testtbl "
165            "(`some quoted a`(10), `some quoted b`(20))",
166        )
167
168    def test_create_composite_index_with_length(self):
169        m = MetaData()
170        tbl = Table(
171            "testtbl", m, Column("a", String(255)), Column("b", String(255))
172        )
173
174        idx1 = Index(
175            "test_idx1", tbl.c.a, tbl.c.b, mysql_length={"a": 10, "b": 20}
176        )
177        idx2 = Index("test_idx2", tbl.c.a, tbl.c.b, mysql_length={"a": 15})
178        idx3 = Index("test_idx3", tbl.c.a, tbl.c.b, mysql_length=30)
179
180        self.assert_compile(
181            schema.CreateIndex(idx1),
182            "CREATE INDEX test_idx1 ON testtbl (a(10), b(20))",
183        )
184        self.assert_compile(
185            schema.CreateIndex(idx2),
186            "CREATE INDEX test_idx2 ON testtbl (a(15), b)",
187        )
188        self.assert_compile(
189            schema.CreateIndex(idx3),
190            "CREATE INDEX test_idx3 ON testtbl (a(30), b(30))",
191        )
192
193    def test_create_index_with_using(self):
194        m = MetaData()
195        tbl = Table("testtbl", m, Column("data", String(255)))
196        idx1 = Index("test_idx1", tbl.c.data, mysql_using="btree")
197        idx2 = Index("test_idx2", tbl.c.data, mysql_using="hash")
198
199        self.assert_compile(
200            schema.CreateIndex(idx1),
201            "CREATE INDEX test_idx1 ON testtbl (data) USING btree",
202        )
203        self.assert_compile(
204            schema.CreateIndex(idx2),
205            "CREATE INDEX test_idx2 ON testtbl (data) USING hash",
206        )
207
208    def test_create_pk_plain(self):
209        m = MetaData()
210        tbl = Table(
211            "testtbl",
212            m,
213            Column("data", String(255)),
214            PrimaryKeyConstraint("data"),
215        )
216
217        self.assert_compile(
218            schema.CreateTable(tbl),
219            "CREATE TABLE testtbl (data VARCHAR(255) NOT NULL, "
220            "PRIMARY KEY (data))",
221        )
222
223    def test_create_pk_with_using(self):
224        m = MetaData()
225        tbl = Table(
226            "testtbl",
227            m,
228            Column("data", String(255)),
229            PrimaryKeyConstraint("data", mysql_using="btree"),
230        )
231
232        self.assert_compile(
233            schema.CreateTable(tbl),
234            "CREATE TABLE testtbl (data VARCHAR(255) NOT NULL, "
235            "PRIMARY KEY (data) USING btree)",
236        )
237
238    def test_create_index_expr(self):
239        m = MetaData()
240        t1 = Table("foo", m, Column("x", Integer))
241        self.assert_compile(
242            schema.CreateIndex(Index("bar", t1.c.x > 5)),
243            "CREATE INDEX bar ON foo (x > 5)",
244        )
245
246    def test_deferrable_initially_kw_not_ignored(self):
247        m = MetaData()
248        Table("t1", m, Column("id", Integer, primary_key=True))
249        t2 = Table(
250            "t2",
251            m,
252            Column(
253                "id",
254                Integer,
255                ForeignKey("t1.id", deferrable=True, initially="XYZ"),
256                primary_key=True,
257            ),
258        )
259
260        self.assert_compile(
261            schema.CreateTable(t2),
262            "CREATE TABLE t2 (id INTEGER NOT NULL, "
263            "PRIMARY KEY (id), FOREIGN KEY(id) REFERENCES t1 (id) "
264            "DEFERRABLE INITIALLY XYZ)",
265        )
266
267    def test_match_kw_raises(self):
268        m = MetaData()
269        Table("t1", m, Column("id", Integer, primary_key=True))
270        t2 = Table(
271            "t2",
272            m,
273            Column(
274                "id",
275                Integer,
276                ForeignKey("t1.id", match="XYZ"),
277                primary_key=True,
278            ),
279        )
280
281        assert_raises_message(
282            exc.CompileError,
283            "MySQL ignores the 'MATCH' keyword while at the same time causes "
284            "ON UPDATE/ON DELETE clauses to be ignored.",
285            schema.CreateTable(t2).compile,
286            dialect=mysql.dialect(),
287        )
288
289    def test_match(self):
290        matchtable = table("matchtable", column("title", String))
291        self.assert_compile(
292            matchtable.c.title.match("somstr"),
293            "MATCH (matchtable.title) AGAINST (%s IN BOOLEAN MODE)",
294        )
295
296    def test_match_compile_kw(self):
297        expr = literal("x").match(literal("y"))
298        self.assert_compile(
299            expr,
300            "MATCH ('x') AGAINST ('y' IN BOOLEAN MODE)",
301            literal_binds=True,
302        )
303
304    def test_concat_compile_kw(self):
305        expr = literal("x", type_=String) + literal("y", type_=String)
306        self.assert_compile(expr, "concat('x', 'y')", literal_binds=True)
307
308    def test_for_update(self):
309        table1 = table(
310            "mytable", column("myid"), column("name"), column("description")
311        )
312
313        self.assert_compile(
314            table1.select(table1.c.myid == 7).with_for_update(),
315            "SELECT mytable.myid, mytable.name, mytable.description "
316            "FROM mytable WHERE mytable.myid = %s FOR UPDATE",
317        )
318
319        self.assert_compile(
320            table1.select(table1.c.myid == 7).with_for_update(read=True),
321            "SELECT mytable.myid, mytable.name, mytable.description "
322            "FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE",
323        )
324
325    def test_delete_extra_froms(self):
326        t1 = table("t1", column("c1"))
327        t2 = table("t2", column("c1"))
328        q = sql.delete(t1).where(t1.c.c1 == t2.c.c1)
329        self.assert_compile(
330            q, "DELETE FROM t1 USING t1, t2 WHERE t1.c1 = t2.c1"
331        )
332
333    def test_delete_extra_froms_alias(self):
334        a1 = table("t1", column("c1")).alias("a1")
335        t2 = table("t2", column("c1"))
336        q = sql.delete(a1).where(a1.c.c1 == t2.c.c1)
337        self.assert_compile(
338            q, "DELETE FROM a1 USING t1 AS a1, t2 WHERE a1.c1 = t2.c1"
339        )
340        self.assert_compile(sql.delete(a1), "DELETE FROM t1 AS a1")
341
342
343class SQLTest(fixtures.TestBase, AssertsCompiledSQL):
344
345    """Tests MySQL-dialect specific compilation."""
346
347    __dialect__ = mysql.dialect()
348
349    def test_precolumns(self):
350        dialect = self.__dialect__
351
352        def gen(distinct=None, prefixes=None):
353            kw = {}
354            if distinct is not None:
355                kw["distinct"] = distinct
356            if prefixes is not None:
357                kw["prefixes"] = prefixes
358            return str(select([column("q")], **kw).compile(dialect=dialect))
359
360        eq_(gen(None), "SELECT q")
361        eq_(gen(True), "SELECT DISTINCT q")
362
363        eq_(gen(prefixes=["ALL"]), "SELECT ALL q")
364        eq_(gen(prefixes=["DISTINCTROW"]), "SELECT DISTINCTROW q")
365
366        # Interaction with MySQL prefix extensions
367        eq_(gen(None, ["straight_join"]), "SELECT straight_join q")
368        eq_(
369            gen(False, ["HIGH_PRIORITY", "SQL_SMALL_RESULT", "ALL"]),
370            "SELECT HIGH_PRIORITY SQL_SMALL_RESULT ALL q",
371        )
372        eq_(
373            gen(True, ["high_priority", sql.text("sql_cache")]),
374            "SELECT high_priority sql_cache DISTINCT q",
375        )
376
377    def test_backslash_escaping(self):
378        self.assert_compile(
379            sql.column("foo").like("bar", escape="\\"),
380            "foo LIKE %s ESCAPE '\\\\'",
381        )
382
383        dialect = mysql.dialect()
384        dialect._backslash_escapes = False
385        self.assert_compile(
386            sql.column("foo").like("bar", escape="\\"),
387            "foo LIKE %s ESCAPE '\\'",
388            dialect=dialect,
389        )
390
391    def test_limit(self):
392        t = sql.table("t", sql.column("col1"), sql.column("col2"))
393
394        self.assert_compile(
395            select([t]).limit(10).offset(20),
396            "SELECT t.col1, t.col2 FROM t  LIMIT %s, %s",
397            {"param_1": 20, "param_2": 10},
398        )
399        self.assert_compile(
400            select([t]).limit(10),
401            "SELECT t.col1, t.col2 FROM t  LIMIT %s",
402            {"param_1": 10},
403        )
404
405        self.assert_compile(
406            select([t]).offset(10),
407            "SELECT t.col1, t.col2 FROM t  LIMIT %s, 18446744073709551615",
408            {"param_1": 10},
409        )
410
411    def test_varchar_raise(self):
412        for type_ in (
413            String,
414            VARCHAR,
415            String(),
416            VARCHAR(),
417            NVARCHAR(),
418            Unicode,
419            Unicode(),
420        ):
421            type_ = sqltypes.to_instance(type_)
422            assert_raises_message(
423                exc.CompileError,
424                "VARCHAR requires a length on dialect mysql",
425                type_.compile,
426                dialect=mysql.dialect(),
427            )
428
429            t1 = Table("sometable", MetaData(), Column("somecolumn", type_))
430            assert_raises_message(
431                exc.CompileError,
432                r"\(in table 'sometable', column 'somecolumn'\)\: "
433                r"(?:N)?VARCHAR requires a length on dialect mysql",
434                schema.CreateTable(t1).compile,
435                dialect=mysql.dialect(),
436            )
437
438    def test_update_limit(self):
439        t = sql.table("t", sql.column("col1"), sql.column("col2"))
440
441        self.assert_compile(
442            t.update(values={"col1": 123}), "UPDATE t SET col1=%s"
443        )
444        self.assert_compile(
445            t.update(values={"col1": 123}, mysql_limit=5),
446            "UPDATE t SET col1=%s LIMIT 5",
447        )
448        self.assert_compile(
449            t.update(values={"col1": 123}, mysql_limit=None),
450            "UPDATE t SET col1=%s",
451        )
452        self.assert_compile(
453            t.update(t.c.col2 == 456, values={"col1": 123}, mysql_limit=1),
454            "UPDATE t SET col1=%s WHERE t.col2 = %s LIMIT 1",
455        )
456
457    def test_utc_timestamp(self):
458        self.assert_compile(func.utc_timestamp(), "utc_timestamp()")
459
460    def test_utc_timestamp_fsp(self):
461        self.assert_compile(
462            func.utc_timestamp(5),
463            "utc_timestamp(%s)",
464            checkparams={"utc_timestamp_1": 5},
465        )
466
467    def test_sysdate(self):
468        self.assert_compile(func.sysdate(), "SYSDATE()")
469
470    def test_cast(self):
471        t = sql.table("t", sql.column("col"))
472        m = mysql
473
474        specs = [
475            (Integer, "CAST(t.col AS SIGNED INTEGER)"),
476            (INT, "CAST(t.col AS SIGNED INTEGER)"),
477            (m.MSInteger, "CAST(t.col AS SIGNED INTEGER)"),
478            (m.MSInteger(unsigned=True), "CAST(t.col AS UNSIGNED INTEGER)"),
479            (SmallInteger, "CAST(t.col AS SIGNED INTEGER)"),
480            (m.MSSmallInteger, "CAST(t.col AS SIGNED INTEGER)"),
481            (m.MSTinyInteger, "CAST(t.col AS SIGNED INTEGER)"),
482            # 'SIGNED INTEGER' is a bigint, so this is ok.
483            (m.MSBigInteger, "CAST(t.col AS SIGNED INTEGER)"),
484            (m.MSBigInteger(unsigned=False), "CAST(t.col AS SIGNED INTEGER)"),
485            (m.MSBigInteger(unsigned=True), "CAST(t.col AS UNSIGNED INTEGER)"),
486            # this is kind of sucky.  thank you default arguments!
487            (NUMERIC, "CAST(t.col AS DECIMAL)"),
488            (DECIMAL, "CAST(t.col AS DECIMAL)"),
489            (Numeric, "CAST(t.col AS DECIMAL)"),
490            (m.MSNumeric, "CAST(t.col AS DECIMAL)"),
491            (m.MSDecimal, "CAST(t.col AS DECIMAL)"),
492            (TIMESTAMP, "CAST(t.col AS DATETIME)"),
493            (DATETIME, "CAST(t.col AS DATETIME)"),
494            (DATE, "CAST(t.col AS DATE)"),
495            (TIME, "CAST(t.col AS TIME)"),
496            (DateTime, "CAST(t.col AS DATETIME)"),
497            (Date, "CAST(t.col AS DATE)"),
498            (Time, "CAST(t.col AS TIME)"),
499            (DateTime, "CAST(t.col AS DATETIME)"),
500            (Date, "CAST(t.col AS DATE)"),
501            (m.MSTime, "CAST(t.col AS TIME)"),
502            (m.MSTimeStamp, "CAST(t.col AS DATETIME)"),
503            (String, "CAST(t.col AS CHAR)"),
504            (Unicode, "CAST(t.col AS CHAR)"),
505            (UnicodeText, "CAST(t.col AS CHAR)"),
506            (VARCHAR, "CAST(t.col AS CHAR)"),
507            (NCHAR, "CAST(t.col AS CHAR)"),
508            (CHAR, "CAST(t.col AS CHAR)"),
509            (m.CHAR(charset="utf8"), "CAST(t.col AS CHAR CHARACTER SET utf8)"),
510            (CLOB, "CAST(t.col AS CHAR)"),
511            (TEXT, "CAST(t.col AS CHAR)"),
512            (m.TEXT(charset="utf8"), "CAST(t.col AS CHAR CHARACTER SET utf8)"),
513            (String(32), "CAST(t.col AS CHAR(32))"),
514            (Unicode(32), "CAST(t.col AS CHAR(32))"),
515            (CHAR(32), "CAST(t.col AS CHAR(32))"),
516            (m.MSString, "CAST(t.col AS CHAR)"),
517            (m.MSText, "CAST(t.col AS CHAR)"),
518            (m.MSTinyText, "CAST(t.col AS CHAR)"),
519            (m.MSMediumText, "CAST(t.col AS CHAR)"),
520            (m.MSLongText, "CAST(t.col AS CHAR)"),
521            (m.MSNChar, "CAST(t.col AS CHAR)"),
522            (m.MSNVarChar, "CAST(t.col AS CHAR)"),
523            (LargeBinary, "CAST(t.col AS BINARY)"),
524            (BLOB, "CAST(t.col AS BINARY)"),
525            (m.MSBlob, "CAST(t.col AS BINARY)"),
526            (m.MSBlob(32), "CAST(t.col AS BINARY)"),
527            (m.MSTinyBlob, "CAST(t.col AS BINARY)"),
528            (m.MSMediumBlob, "CAST(t.col AS BINARY)"),
529            (m.MSLongBlob, "CAST(t.col AS BINARY)"),
530            (m.MSBinary, "CAST(t.col AS BINARY)"),
531            (m.MSBinary(32), "CAST(t.col AS BINARY)"),
532            (m.MSVarBinary, "CAST(t.col AS BINARY)"),
533            (m.MSVarBinary(32), "CAST(t.col AS BINARY)"),
534            (Interval, "CAST(t.col AS DATETIME)"),
535        ]
536
537        for type_, expected in specs:
538            self.assert_compile(cast(t.c.col, type_), expected)
539
540    def test_cast_type_decorator(self):
541        class MyInteger(sqltypes.TypeDecorator):
542            impl = Integer
543
544        type_ = MyInteger()
545        t = sql.table("t", sql.column("col"))
546        self.assert_compile(
547            cast(t.c.col, type_), "CAST(t.col AS SIGNED INTEGER)"
548        )
549
550    def test_cast_literal_bind(self):
551        expr = cast(column("foo", Integer) + 5, Integer())
552
553        self.assert_compile(
554            expr, "CAST(foo + 5 AS SIGNED INTEGER)", literal_binds=True
555        )
556
557    def test_unsupported_cast_literal_bind(self):
558        expr = cast(column("foo", Integer) + 5, Float)
559
560        with expect_warnings("Datatype FLOAT does not support CAST on MySQL;"):
561            self.assert_compile(expr, "(foo + 5)", literal_binds=True)
562
563        dialect = mysql.MySQLDialect()
564        dialect.server_version_info = (3, 9, 8)
565        with expect_warnings("Current MySQL version does not support CAST"):
566            eq_(
567                str(
568                    expr.compile(
569                        dialect=dialect, compile_kwargs={"literal_binds": True}
570                    )
571                ),
572                "(foo + 5)",
573            )
574
575    def test_unsupported_casts(self):
576
577        t = sql.table("t", sql.column("col"))
578        m = mysql
579
580        specs = [
581            (m.MSBit, "t.col"),
582            (FLOAT, "t.col"),
583            (Float, "t.col"),
584            (m.MSFloat, "t.col"),
585            (m.MSDouble, "t.col"),
586            (m.MSReal, "t.col"),
587            (m.MSYear, "t.col"),
588            (m.MSYear(2), "t.col"),
589            (Boolean, "t.col"),
590            (BOOLEAN, "t.col"),
591            (m.MSEnum, "t.col"),
592            (m.MSEnum("1", "2"), "t.col"),
593            (m.MSSet, "t.col"),
594            (m.MSSet("1", "2"), "t.col"),
595        ]
596
597        for type_, expected in specs:
598            with expect_warnings(
599                "Datatype .* does not support CAST on MySQL;"
600            ):
601                self.assert_compile(cast(t.c.col, type_), expected)
602
603    def test_no_cast_pre_4(self):
604        self.assert_compile(
605            cast(Column("foo", Integer), String), "CAST(foo AS CHAR)"
606        )
607        dialect = mysql.dialect()
608        dialect.server_version_info = (3, 2, 3)
609        with expect_warnings("Current MySQL version does not support CAST;"):
610            self.assert_compile(
611                cast(Column("foo", Integer), String), "foo", dialect=dialect
612            )
613
614    def test_cast_grouped_expression_non_castable(self):
615        with expect_warnings("Datatype FLOAT does not support CAST on MySQL;"):
616            self.assert_compile(
617                cast(sql.column("x") + sql.column("y"), Float), "(x + y)"
618            )
619
620    def test_cast_grouped_expression_pre_4(self):
621        dialect = mysql.dialect()
622        dialect.server_version_info = (3, 2, 3)
623        with expect_warnings("Current MySQL version does not support CAST;"):
624            self.assert_compile(
625                cast(sql.column("x") + sql.column("y"), Integer),
626                "(x + y)",
627                dialect=dialect,
628            )
629
630    def test_extract(self):
631        t = sql.table("t", sql.column("col1"))
632
633        for field in "year", "month", "day":
634            self.assert_compile(
635                select([extract(field, t.c.col1)]),
636                "SELECT EXTRACT(%s FROM t.col1) AS anon_1 FROM t" % field,
637            )
638
639        # millsecondS to millisecond
640        self.assert_compile(
641            select([extract("milliseconds", t.c.col1)]),
642            "SELECT EXTRACT(millisecond FROM t.col1) AS anon_1 FROM t",
643        )
644
645    def test_too_long_index(self):
646        exp = "ix_zyrenian_zyme_zyzzogeton_zyzzogeton_zyrenian_zyme_zyz_5cd2"
647        tname = "zyrenian_zyme_zyzzogeton_zyzzogeton"
648        cname = "zyrenian_zyme_zyzzogeton_zo"
649
650        t1 = Table(tname, MetaData(), Column(cname, Integer, index=True))
651        ix1 = list(t1.indexes)[0]
652
653        self.assert_compile(
654            schema.CreateIndex(ix1),
655            "CREATE INDEX %s " "ON %s (%s)" % (exp, tname, cname),
656        )
657
658    def test_innodb_autoincrement(self):
659        t1 = Table(
660            "sometable",
661            MetaData(),
662            Column(
663                "assigned_id", Integer(), primary_key=True, autoincrement=False
664            ),
665            Column("id", Integer(), primary_key=True, autoincrement=True),
666            mysql_engine="InnoDB",
667        )
668        self.assert_compile(
669            schema.CreateTable(t1),
670            "CREATE TABLE sometable (assigned_id "
671            "INTEGER NOT NULL, id INTEGER NOT NULL "
672            "AUTO_INCREMENT, PRIMARY KEY (id, assigned_id)"
673            ")ENGINE=InnoDB",
674        )
675
676        t1 = Table(
677            "sometable",
678            MetaData(),
679            Column(
680                "assigned_id", Integer(), primary_key=True, autoincrement=True
681            ),
682            Column("id", Integer(), primary_key=True, autoincrement=False),
683            mysql_engine="InnoDB",
684        )
685        self.assert_compile(
686            schema.CreateTable(t1),
687            "CREATE TABLE sometable (assigned_id "
688            "INTEGER NOT NULL AUTO_INCREMENT, id "
689            "INTEGER NOT NULL, PRIMARY KEY "
690            "(assigned_id, id))ENGINE=InnoDB",
691        )
692
693    def test_innodb_autoincrement_reserved_word_column_name(self):
694        t1 = Table(
695            "sometable",
696            MetaData(),
697            Column("id", Integer(), primary_key=True, autoincrement=False),
698            Column("order", Integer(), primary_key=True, autoincrement=True),
699            mysql_engine="InnoDB",
700        )
701        self.assert_compile(
702            schema.CreateTable(t1),
703            "CREATE TABLE sometable ("
704            "id INTEGER NOT NULL, "
705            "`order` INTEGER NOT NULL AUTO_INCREMENT, "
706            "PRIMARY KEY (`order`, id)"
707            ")ENGINE=InnoDB",
708        )
709
710    def test_create_table_with_partition(self):
711        t1 = Table(
712            "testtable",
713            MetaData(),
714            Column("id", Integer(), primary_key=True, autoincrement=True),
715            Column(
716                "other_id", Integer(), primary_key=True, autoincrement=False
717            ),
718            mysql_partitions="2",
719            mysql_partition_by="KEY(other_id)",
720        )
721        self.assert_compile(
722            schema.CreateTable(t1),
723            "CREATE TABLE testtable ("
724            "id INTEGER NOT NULL AUTO_INCREMENT, "
725            "other_id INTEGER NOT NULL, "
726            "PRIMARY KEY (id, other_id)"
727            ")PARTITION BY KEY(other_id) PARTITIONS 2",
728        )
729
730    def test_create_table_with_subpartition(self):
731        t1 = Table(
732            "testtable",
733            MetaData(),
734            Column("id", Integer(), primary_key=True, autoincrement=True),
735            Column(
736                "other_id", Integer(), primary_key=True, autoincrement=False
737            ),
738            mysql_partitions="2",
739            mysql_partition_by="KEY(other_id)",
740            mysql_subpartition_by="HASH(some_expr)",
741            mysql_subpartitions="2",
742        )
743        self.assert_compile(
744            schema.CreateTable(t1),
745            "CREATE TABLE testtable ("
746            "id INTEGER NOT NULL AUTO_INCREMENT, "
747            "other_id INTEGER NOT NULL, "
748            "PRIMARY KEY (id, other_id)"
749            ")PARTITION BY KEY(other_id) PARTITIONS 2 "
750            "SUBPARTITION BY HASH(some_expr) SUBPARTITIONS 2",
751        )
752
753    def test_create_table_with_partition_hash(self):
754        t1 = Table(
755            "testtable",
756            MetaData(),
757            Column("id", Integer(), primary_key=True, autoincrement=True),
758            Column(
759                "other_id", Integer(), primary_key=True, autoincrement=False
760            ),
761            mysql_partitions="2",
762            mysql_partition_by="HASH(other_id)",
763        )
764        self.assert_compile(
765            schema.CreateTable(t1),
766            "CREATE TABLE testtable ("
767            "id INTEGER NOT NULL AUTO_INCREMENT, "
768            "other_id INTEGER NOT NULL, "
769            "PRIMARY KEY (id, other_id)"
770            ")PARTITION BY HASH(other_id) PARTITIONS 2",
771        )
772
773    def test_create_table_with_partition_and_other_opts(self):
774        t1 = Table(
775            "testtable",
776            MetaData(),
777            Column("id", Integer(), primary_key=True, autoincrement=True),
778            Column(
779                "other_id", Integer(), primary_key=True, autoincrement=False
780            ),
781            mysql_stats_sample_pages="2",
782            mysql_partitions="2",
783            mysql_partition_by="HASH(other_id)",
784        )
785        self.assert_compile(
786            schema.CreateTable(t1),
787            "CREATE TABLE testtable ("
788            "id INTEGER NOT NULL AUTO_INCREMENT, "
789            "other_id INTEGER NOT NULL, "
790            "PRIMARY KEY (id, other_id)"
791            ")STATS_SAMPLE_PAGES=2 PARTITION BY HASH(other_id) PARTITIONS 2",
792        )
793
794    def test_inner_join(self):
795        t1 = table("t1", column("x"))
796        t2 = table("t2", column("y"))
797
798        self.assert_compile(
799            t1.join(t2, t1.c.x == t2.c.y), "t1 INNER JOIN t2 ON t1.x = t2.y"
800        )
801
802    def test_outer_join(self):
803        t1 = table("t1", column("x"))
804        t2 = table("t2", column("y"))
805
806        self.assert_compile(
807            t1.outerjoin(t2, t1.c.x == t2.c.y),
808            "t1 LEFT OUTER JOIN t2 ON t1.x = t2.y",
809        )
810
811    def test_full_outer_join(self):
812        t1 = table("t1", column("x"))
813        t2 = table("t2", column("y"))
814
815        self.assert_compile(
816            t1.outerjoin(t2, t1.c.x == t2.c.y, full=True),
817            "t1 FULL OUTER JOIN t2 ON t1.x = t2.y",
818        )
819
820
821class InsertOnDuplicateTest(fixtures.TestBase, AssertsCompiledSQL):
822    __dialect__ = mysql.dialect()
823
824    def setup(self):
825        self.table = Table(
826            "foos",
827            MetaData(),
828            Column("id", Integer, primary_key=True),
829            Column("bar", String(10)),
830            Column("baz", String(10)),
831        )
832
833    def test_from_values(self):
834        stmt = insert(self.table).values(
835            [{"id": 1, "bar": "ab"}, {"id": 2, "bar": "b"}]
836        )
837        stmt = stmt.on_duplicate_key_update(
838            bar=stmt.inserted.bar, baz=stmt.inserted.baz
839        )
840        expected_sql = (
841            "INSERT INTO foos (id, bar) VALUES (%s, %s), (%s, %s) "
842            "ON DUPLICATE KEY UPDATE bar = VALUES(bar), baz = VALUES(baz)"
843        )
844        self.assert_compile(stmt, expected_sql)
845
846    def test_from_literal(self):
847        stmt = insert(self.table).values(
848            [{"id": 1, "bar": "ab"}, {"id": 2, "bar": "b"}]
849        )
850        stmt = stmt.on_duplicate_key_update(bar=literal_column("bb"))
851        expected_sql = (
852            "INSERT INTO foos (id, bar) VALUES (%s, %s), (%s, %s) "
853            "ON DUPLICATE KEY UPDATE bar = bb"
854        )
855        self.assert_compile(stmt, expected_sql)
856
857    def test_python_values(self):
858        stmt = insert(self.table).values(
859            [{"id": 1, "bar": "ab"}, {"id": 2, "bar": "b"}]
860        )
861        stmt = stmt.on_duplicate_key_update(bar="foobar")
862        expected_sql = (
863            "INSERT INTO foos (id, bar) VALUES (%s, %s), (%s, %s) "
864            "ON DUPLICATE KEY UPDATE bar = %s"
865        )
866        self.assert_compile(stmt, expected_sql)
867