1# -*- encoding: utf-8
2from sqlalchemy import Column
3from sqlalchemy import delete
4from sqlalchemy import extract
5from sqlalchemy import func
6from sqlalchemy import Index
7from sqlalchemy import insert
8from sqlalchemy import Integer
9from sqlalchemy import literal
10from sqlalchemy import MetaData
11from sqlalchemy import PrimaryKeyConstraint
12from sqlalchemy import schema
13from sqlalchemy import select
14from sqlalchemy import Sequence
15from sqlalchemy import sql
16from sqlalchemy import String
17from sqlalchemy import Table
18from sqlalchemy import testing
19from sqlalchemy import union
20from sqlalchemy import UniqueConstraint
21from sqlalchemy import update
22from sqlalchemy.dialects import mssql
23from sqlalchemy.dialects.mssql import base
24from sqlalchemy.dialects.mssql import mxodbc
25from sqlalchemy.sql import column
26from sqlalchemy.sql import quoted_name
27from sqlalchemy.sql import table
28from sqlalchemy.testing import AssertsCompiledSQL
29from sqlalchemy.testing import eq_
30from sqlalchemy.testing import fixtures
31from sqlalchemy.testing import is_
32
33
34class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
35    __dialect__ = mssql.dialect()
36
37    def test_true_false(self):
38        self.assert_compile(sql.false(), "0")
39        self.assert_compile(sql.true(), "1")
40
41    def test_select(self):
42        t = table("sometable", column("somecolumn"))
43        self.assert_compile(
44            t.select(), "SELECT sometable.somecolumn FROM sometable"
45        )
46
47    def test_select_with_nolock(self):
48        t = table("sometable", column("somecolumn"))
49        self.assert_compile(
50            t.select().with_hint(t, "WITH (NOLOCK)"),
51            "SELECT sometable.somecolumn FROM sometable WITH (NOLOCK)",
52        )
53
54    def test_select_with_nolock_schema(self):
55        m = MetaData()
56        t = Table(
57            "sometable", m, Column("somecolumn", Integer), schema="test_schema"
58        )
59        self.assert_compile(
60            t.select().with_hint(t, "WITH (NOLOCK)"),
61            "SELECT test_schema.sometable.somecolumn "
62            "FROM test_schema.sometable WITH (NOLOCK)",
63        )
64
65    def test_select_w_order_by_collate(self):
66        m = MetaData()
67        t = Table("sometable", m, Column("somecolumn", String))
68
69        self.assert_compile(
70            select([t]).order_by(
71                t.c.somecolumn.collate("Latin1_General_CS_AS_KS_WS_CI").asc()
72            ),
73            "SELECT sometable.somecolumn FROM sometable "
74            "ORDER BY sometable.somecolumn COLLATE "
75            "Latin1_General_CS_AS_KS_WS_CI ASC",
76        )
77
78    def test_join_with_hint(self):
79        t1 = table(
80            "t1",
81            column("a", Integer),
82            column("b", String),
83            column("c", String),
84        )
85        t2 = table(
86            "t2",
87            column("a", Integer),
88            column("b", Integer),
89            column("c", Integer),
90        )
91        join = (
92            t1.join(t2, t1.c.a == t2.c.a)
93            .select()
94            .with_hint(t1, "WITH (NOLOCK)")
95        )
96        self.assert_compile(
97            join,
98            "SELECT t1.a, t1.b, t1.c, t2.a, t2.b, t2.c "
99            "FROM t1 WITH (NOLOCK) JOIN t2 ON t1.a = t2.a",
100        )
101
102    def test_insert(self):
103        t = table("sometable", column("somecolumn"))
104        self.assert_compile(
105            t.insert(),
106            "INSERT INTO sometable (somecolumn) VALUES " "(:somecolumn)",
107        )
108
109    def test_update(self):
110        t = table("sometable", column("somecolumn"))
111        self.assert_compile(
112            t.update(t.c.somecolumn == 7),
113            "UPDATE sometable SET somecolumn=:somecolum"
114            "n WHERE sometable.somecolumn = "
115            ":somecolumn_1",
116            dict(somecolumn=10),
117        )
118
119    def test_insert_hint(self):
120        t = table("sometable", column("somecolumn"))
121        for targ in (None, t):
122            for darg in ("*", "mssql"):
123                self.assert_compile(
124                    t.insert()
125                    .values(somecolumn="x")
126                    .with_hint(
127                        "WITH (PAGLOCK)", selectable=targ, dialect_name=darg
128                    ),
129                    "INSERT INTO sometable WITH (PAGLOCK) "
130                    "(somecolumn) VALUES (:somecolumn)",
131                )
132
133    def test_update_hint(self):
134        t = table("sometable", column("somecolumn"))
135        for targ in (None, t):
136            for darg in ("*", "mssql"):
137                self.assert_compile(
138                    t.update()
139                    .where(t.c.somecolumn == "q")
140                    .values(somecolumn="x")
141                    .with_hint(
142                        "WITH (PAGLOCK)", selectable=targ, dialect_name=darg
143                    ),
144                    "UPDATE sometable WITH (PAGLOCK) "
145                    "SET somecolumn=:somecolumn "
146                    "WHERE sometable.somecolumn = :somecolumn_1",
147                )
148
149    def test_update_exclude_hint(self):
150        t = table("sometable", column("somecolumn"))
151        self.assert_compile(
152            t.update()
153            .where(t.c.somecolumn == "q")
154            .values(somecolumn="x")
155            .with_hint("XYZ", "mysql"),
156            "UPDATE sometable SET somecolumn=:somecolumn "
157            "WHERE sometable.somecolumn = :somecolumn_1",
158        )
159
160    def test_delete_hint(self):
161        t = table("sometable", column("somecolumn"))
162        for targ in (None, t):
163            for darg in ("*", "mssql"):
164                self.assert_compile(
165                    t.delete()
166                    .where(t.c.somecolumn == "q")
167                    .with_hint(
168                        "WITH (PAGLOCK)", selectable=targ, dialect_name=darg
169                    ),
170                    "DELETE FROM sometable WITH (PAGLOCK) "
171                    "WHERE sometable.somecolumn = :somecolumn_1",
172                )
173
174    def test_delete_exclude_hint(self):
175        t = table("sometable", column("somecolumn"))
176        self.assert_compile(
177            t.delete()
178            .where(t.c.somecolumn == "q")
179            .with_hint("XYZ", dialect_name="mysql"),
180            "DELETE FROM sometable WHERE "
181            "sometable.somecolumn = :somecolumn_1",
182        )
183
184    def test_delete_extra_froms(self):
185        t1 = table("t1", column("c1"))
186        t2 = table("t2", column("c1"))
187        q = sql.delete(t1).where(t1.c.c1 == t2.c.c1)
188        self.assert_compile(
189            q, "DELETE FROM t1 FROM t1, t2 WHERE t1.c1 = t2.c1"
190        )
191
192    def test_delete_extra_froms_alias(self):
193        a1 = table("t1", column("c1")).alias("a1")
194        t2 = table("t2", column("c1"))
195        q = sql.delete(a1).where(a1.c.c1 == t2.c.c1)
196        self.assert_compile(
197            q, "DELETE FROM a1 FROM t1 AS a1, t2 WHERE a1.c1 = t2.c1"
198        )
199        self.assert_compile(sql.delete(a1), "DELETE FROM t1 AS a1")
200
201    def test_update_from(self):
202        metadata = MetaData()
203        table1 = Table(
204            "mytable",
205            metadata,
206            Column("myid", Integer),
207            Column("name", String(30)),
208            Column("description", String(50)),
209        )
210        table2 = Table(
211            "myothertable",
212            metadata,
213            Column("otherid", Integer),
214            Column("othername", String(30)),
215        )
216
217        mt = table1.alias()
218
219        u = (
220            table1.update()
221            .values(name="foo")
222            .where(table2.c.otherid == table1.c.myid)
223        )
224
225        # testing mssql.base.MSSQLCompiler.update_from_clause
226        self.assert_compile(
227            u,
228            "UPDATE mytable SET name=:name "
229            "FROM mytable, myothertable WHERE "
230            "myothertable.otherid = mytable.myid",
231        )
232
233        self.assert_compile(
234            u.where(table2.c.othername == mt.c.name),
235            "UPDATE mytable SET name=:name "
236            "FROM mytable, myothertable, mytable AS mytable_1 "
237            "WHERE myothertable.otherid = mytable.myid "
238            "AND myothertable.othername = mytable_1.name",
239        )
240
241    def test_update_from_hint(self):
242        t = table("sometable", column("somecolumn"))
243        t2 = table("othertable", column("somecolumn"))
244        for darg in ("*", "mssql"):
245            self.assert_compile(
246                t.update()
247                .where(t.c.somecolumn == t2.c.somecolumn)
248                .values(somecolumn="x")
249                .with_hint("WITH (PAGLOCK)", selectable=t2, dialect_name=darg),
250                "UPDATE sometable SET somecolumn=:somecolumn "
251                "FROM sometable, othertable WITH (PAGLOCK) "
252                "WHERE sometable.somecolumn = othertable.somecolumn",
253            )
254
255    def test_update_to_select_schema(self):
256        meta = MetaData()
257        table = Table(
258            "sometable",
259            meta,
260            Column("sym", String),
261            Column("val", Integer),
262            schema="schema",
263        )
264        other = Table(
265            "#other", meta, Column("sym", String), Column("newval", Integer)
266        )
267        stmt = table.update().values(
268            val=select([other.c.newval])
269            .where(table.c.sym == other.c.sym)
270            .as_scalar()
271        )
272
273        self.assert_compile(
274            stmt,
275            "UPDATE [schema].sometable SET val="
276            "(SELECT [#other].newval FROM [#other] "
277            "WHERE [schema].sometable.sym = [#other].sym)",
278        )
279
280        stmt = (
281            table.update()
282            .values(val=other.c.newval)
283            .where(table.c.sym == other.c.sym)
284        )
285        self.assert_compile(
286            stmt,
287            "UPDATE [schema].sometable SET val="
288            "[#other].newval FROM [schema].sometable, "
289            "[#other] WHERE [schema].sometable.sym = [#other].sym",
290        )
291
292    # TODO: not supported yet.
293    # def test_delete_from_hint(self):
294    #    t = table('sometable', column('somecolumn'))
295    #    t2 = table('othertable', column('somecolumn'))
296    #    for darg in ("*", "mssql"):
297    #        self.assert_compile(
298    #            t.delete().where(t.c.somecolumn==t2.c.somecolumn).
299    #                    with_hint("WITH (PAGLOCK)",
300    #                            selectable=t2,
301    #                            dialect_name=darg),
302    #            ""
303    #        )
304
305    def test_strict_binds(self):
306        """test the 'strict' compiler binds."""
307
308        from sqlalchemy.dialects.mssql.base import MSSQLStrictCompiler
309
310        mxodbc_dialect = mxodbc.dialect()
311        mxodbc_dialect.statement_compiler = MSSQLStrictCompiler
312
313        t = table("sometable", column("foo"))
314
315        for expr, compiled in [
316            (
317                select([literal("x"), literal("y")]),
318                "SELECT 'x' AS anon_1, 'y' AS anon_2",
319            ),
320            (
321                select([t]).where(t.c.foo.in_(["x", "y", "z"])),
322                "SELECT sometable.foo FROM sometable WHERE sometable.foo "
323                "IN ('x', 'y', 'z')",
324            ),
325            (t.c.foo.in_([None]), "sometable.foo IN (NULL)"),
326        ]:
327            self.assert_compile(expr, compiled, dialect=mxodbc_dialect)
328
329    def test_in_with_subqueries(self):
330        """Test removal of legacy behavior that converted "x==subquery"
331        to use IN.
332
333        """
334
335        t = table("sometable", column("somecolumn"))
336        self.assert_compile(
337            t.select().where(t.c.somecolumn == t.select()),
338            "SELECT sometable.somecolumn FROM "
339            "sometable WHERE sometable.somecolumn = "
340            "(SELECT sometable.somecolumn FROM "
341            "sometable)",
342        )
343        self.assert_compile(
344            t.select().where(t.c.somecolumn != t.select()),
345            "SELECT sometable.somecolumn FROM "
346            "sometable WHERE sometable.somecolumn != "
347            "(SELECT sometable.somecolumn FROM "
348            "sometable)",
349        )
350
351    @testing.uses_deprecated
352    def test_count(self):
353        t = table("sometable", column("somecolumn"))
354        self.assert_compile(
355            t.count(),
356            "SELECT count(sometable.somecolumn) AS "
357            "tbl_row_count FROM sometable",
358        )
359
360    def test_noorderby_insubquery(self):
361        """test that the ms-sql dialect removes ORDER BY clauses from
362        subqueries"""
363
364        table1 = table(
365            "mytable",
366            column("myid", Integer),
367            column("name", String),
368            column("description", String),
369        )
370
371        q = select([table1.c.myid], order_by=[table1.c.myid]).alias("foo")
372        crit = q.c.myid == table1.c.myid
373        self.assert_compile(
374            select(["*"], crit),
375            "SELECT * FROM (SELECT mytable.myid AS "
376            "myid FROM mytable) AS foo, mytable WHERE "
377            "foo.myid = mytable.myid",
378        )
379
380    def test_force_schema_quoted_name_w_dot_case_insensitive(self):
381        metadata = MetaData()
382        tbl = Table(
383            "test",
384            metadata,
385            Column("id", Integer, primary_key=True),
386            schema=quoted_name("foo.dbo", True),
387        )
388        self.assert_compile(
389            select([tbl]), "SELECT [foo.dbo].test.id FROM [foo.dbo].test"
390        )
391
392    def test_force_schema_quoted_w_dot_case_insensitive(self):
393        metadata = MetaData()
394        tbl = Table(
395            "test",
396            metadata,
397            Column("id", Integer, primary_key=True),
398            schema=quoted_name("foo.dbo", True),
399        )
400        self.assert_compile(
401            select([tbl]), "SELECT [foo.dbo].test.id FROM [foo.dbo].test"
402        )
403
404    def test_force_schema_quoted_name_w_dot_case_sensitive(self):
405        metadata = MetaData()
406        tbl = Table(
407            "test",
408            metadata,
409            Column("id", Integer, primary_key=True),
410            schema=quoted_name("Foo.dbo", True),
411        )
412        self.assert_compile(
413            select([tbl]), "SELECT [Foo.dbo].test.id FROM [Foo.dbo].test"
414        )
415
416    def test_force_schema_quoted_w_dot_case_sensitive(self):
417        metadata = MetaData()
418        tbl = Table(
419            "test",
420            metadata,
421            Column("id", Integer, primary_key=True),
422            schema="[Foo.dbo]",
423        )
424        self.assert_compile(
425            select([tbl]), "SELECT [Foo.dbo].test.id FROM [Foo.dbo].test"
426        )
427
428    def test_schema_autosplit_w_dot_case_insensitive(self):
429        metadata = MetaData()
430        tbl = Table(
431            "test",
432            metadata,
433            Column("id", Integer, primary_key=True),
434            schema="foo.dbo",
435        )
436        self.assert_compile(
437            select([tbl]), "SELECT foo.dbo.test.id FROM foo.dbo.test"
438        )
439
440    def test_schema_autosplit_w_dot_case_sensitive(self):
441        metadata = MetaData()
442        tbl = Table(
443            "test",
444            metadata,
445            Column("id", Integer, primary_key=True),
446            schema="Foo.dbo",
447        )
448        self.assert_compile(
449            select([tbl]), "SELECT [Foo].dbo.test.id FROM [Foo].dbo.test"
450        )
451
452    def test_owner_database_pairs(self):
453        dialect = mssql.dialect()
454
455        for identifier, expected_schema, expected_owner in [
456            ("foo", None, "foo"),
457            ("foo.bar", "foo", "bar"),
458            ("Foo.Bar", "Foo", "Bar"),
459            ("[Foo.Bar]", None, "Foo.Bar"),
460            ("[Foo.Bar].[bat]", "Foo.Bar", "bat"),
461        ]:
462            schema, owner = base._owner_plus_db(dialect, identifier)
463
464            eq_(owner, expected_owner)
465            eq_(schema, expected_schema)
466
467    def test_delete_schema(self):
468        metadata = MetaData()
469        tbl = Table(
470            "test",
471            metadata,
472            Column("id", Integer, primary_key=True),
473            schema="paj",
474        )
475        self.assert_compile(
476            tbl.delete(tbl.c.id == 1),
477            "DELETE FROM paj.test WHERE paj.test.id = " ":id_1",
478        )
479        s = select([tbl.c.id]).where(tbl.c.id == 1)
480        self.assert_compile(
481            tbl.delete().where(tbl.c.id.in_(s)),
482            "DELETE FROM paj.test WHERE paj.test.id IN "
483            "(SELECT paj.test.id FROM paj.test "
484            "WHERE paj.test.id = :id_1)",
485        )
486
487    def test_delete_schema_multipart(self):
488        metadata = MetaData()
489        tbl = Table(
490            "test",
491            metadata,
492            Column("id", Integer, primary_key=True),
493            schema="banana.paj",
494        )
495        self.assert_compile(
496            tbl.delete(tbl.c.id == 1),
497            "DELETE FROM banana.paj.test WHERE " "banana.paj.test.id = :id_1",
498        )
499        s = select([tbl.c.id]).where(tbl.c.id == 1)
500        self.assert_compile(
501            tbl.delete().where(tbl.c.id.in_(s)),
502            "DELETE FROM banana.paj.test WHERE "
503            "banana.paj.test.id IN (SELECT banana.paj.test.id "
504            "FROM banana.paj.test WHERE "
505            "banana.paj.test.id = :id_1)",
506        )
507
508    def test_delete_schema_multipart_needs_quoting(self):
509        metadata = MetaData()
510        tbl = Table(
511            "test",
512            metadata,
513            Column("id", Integer, primary_key=True),
514            schema="banana split.paj",
515        )
516        self.assert_compile(
517            tbl.delete(tbl.c.id == 1),
518            "DELETE FROM [banana split].paj.test WHERE "
519            "[banana split].paj.test.id = :id_1",
520        )
521        s = select([tbl.c.id]).where(tbl.c.id == 1)
522        self.assert_compile(
523            tbl.delete().where(tbl.c.id.in_(s)),
524            "DELETE FROM [banana split].paj.test WHERE "
525            "[banana split].paj.test.id IN ("
526            "SELECT [banana split].paj.test.id FROM "
527            "[banana split].paj.test WHERE "
528            "[banana split].paj.test.id = :id_1)",
529        )
530
531    def test_delete_schema_multipart_both_need_quoting(self):
532        metadata = MetaData()
533        tbl = Table(
534            "test",
535            metadata,
536            Column("id", Integer, primary_key=True),
537            schema="banana split.paj with a space",
538        )
539        self.assert_compile(
540            tbl.delete(tbl.c.id == 1),
541            "DELETE FROM [banana split].[paj with a "
542            "space].test WHERE [banana split].[paj "
543            "with a space].test.id = :id_1",
544        )
545        s = select([tbl.c.id]).where(tbl.c.id == 1)
546        self.assert_compile(
547            tbl.delete().where(tbl.c.id.in_(s)),
548            "DELETE FROM [banana split].[paj with a space].test "
549            "WHERE [banana split].[paj with a space].test.id IN "
550            "(SELECT [banana split].[paj with a space].test.id "
551            "FROM [banana split].[paj with a space].test "
552            "WHERE [banana split].[paj with a space].test.id = :id_1)",
553        )
554
555    def test_union(self):
556        t1 = table(
557            "t1",
558            column("col1"),
559            column("col2"),
560            column("col3"),
561            column("col4"),
562        )
563        t2 = table(
564            "t2",
565            column("col1"),
566            column("col2"),
567            column("col3"),
568            column("col4"),
569        )
570        s1, s2 = (
571            select(
572                [t1.c.col3.label("col3"), t1.c.col4.label("col4")],
573                t1.c.col2.in_(["t1col2r1", "t1col2r2"]),
574            ),
575            select(
576                [t2.c.col3.label("col3"), t2.c.col4.label("col4")],
577                t2.c.col2.in_(["t2col2r2", "t2col2r3"]),
578            ),
579        )
580        u = union(s1, s2, order_by=["col3", "col4"])
581        self.assert_compile(
582            u,
583            "SELECT t1.col3 AS col3, t1.col4 AS col4 "
584            "FROM t1 WHERE t1.col2 IN (:col2_1, "
585            ":col2_2) UNION SELECT t2.col3 AS col3, "
586            "t2.col4 AS col4 FROM t2 WHERE t2.col2 IN "
587            "(:col2_3, :col2_4) ORDER BY col3, col4",
588        )
589        self.assert_compile(
590            u.alias("bar").select(),
591            "SELECT bar.col3, bar.col4 FROM (SELECT "
592            "t1.col3 AS col3, t1.col4 AS col4 FROM t1 "
593            "WHERE t1.col2 IN (:col2_1, :col2_2) UNION "
594            "SELECT t2.col3 AS col3, t2.col4 AS col4 "
595            "FROM t2 WHERE t2.col2 IN (:col2_3, "
596            ":col2_4)) AS bar",
597        )
598
599    def test_function(self):
600        self.assert_compile(func.foo(1, 2), "foo(:foo_1, :foo_2)")
601        self.assert_compile(func.current_time(), "CURRENT_TIME")
602        self.assert_compile(func.foo(), "foo()")
603        m = MetaData()
604        t = Table(
605            "sometable", m, Column("col1", Integer), Column("col2", Integer)
606        )
607        self.assert_compile(
608            select([func.max(t.c.col1)]),
609            "SELECT max(sometable.col1) AS max_1 FROM " "sometable",
610        )
611
612    def test_function_overrides(self):
613        self.assert_compile(func.current_date(), "GETDATE()")
614        self.assert_compile(func.length(3), "LEN(:length_1)")
615
616    def test_extract(self):
617        t = table("t", column("col1"))
618
619        for field in "day", "month", "year":
620            self.assert_compile(
621                select([extract(field, t.c.col1)]),
622                "SELECT DATEPART(%s, t.col1) AS anon_1 FROM t" % field,
623            )
624
625    def test_update_returning(self):
626        table1 = table(
627            "mytable",
628            column("myid", Integer),
629            column("name", String(128)),
630            column("description", String(128)),
631        )
632        u = update(table1, values=dict(name="foo")).returning(
633            table1.c.myid, table1.c.name
634        )
635        self.assert_compile(
636            u,
637            "UPDATE mytable SET name=:name OUTPUT "
638            "inserted.myid, inserted.name",
639        )
640        u = update(table1, values=dict(name="foo")).returning(table1)
641        self.assert_compile(
642            u,
643            "UPDATE mytable SET name=:name OUTPUT "
644            "inserted.myid, inserted.name, "
645            "inserted.description",
646        )
647        u = (
648            update(table1, values=dict(name="foo"))
649            .returning(table1)
650            .where(table1.c.name == "bar")
651        )
652        self.assert_compile(
653            u,
654            "UPDATE mytable SET name=:name OUTPUT "
655            "inserted.myid, inserted.name, "
656            "inserted.description WHERE mytable.name = "
657            ":name_1",
658        )
659        u = update(table1, values=dict(name="foo")).returning(
660            func.length(table1.c.name)
661        )
662        self.assert_compile(
663            u,
664            "UPDATE mytable SET name=:name OUTPUT "
665            "LEN(inserted.name) AS length_1",
666        )
667
668    def test_delete_returning(self):
669        table1 = table(
670            "mytable",
671            column("myid", Integer),
672            column("name", String(128)),
673            column("description", String(128)),
674        )
675        d = delete(table1).returning(table1.c.myid, table1.c.name)
676        self.assert_compile(
677            d, "DELETE FROM mytable OUTPUT deleted.myid, " "deleted.name"
678        )
679        d = (
680            delete(table1)
681            .where(table1.c.name == "bar")
682            .returning(table1.c.myid, table1.c.name)
683        )
684        self.assert_compile(
685            d,
686            "DELETE FROM mytable OUTPUT deleted.myid, "
687            "deleted.name WHERE mytable.name = :name_1",
688        )
689
690    def test_insert_returning(self):
691        table1 = table(
692            "mytable",
693            column("myid", Integer),
694            column("name", String(128)),
695            column("description", String(128)),
696        )
697        i = insert(table1, values=dict(name="foo")).returning(
698            table1.c.myid, table1.c.name
699        )
700        self.assert_compile(
701            i,
702            "INSERT INTO mytable (name) OUTPUT "
703            "inserted.myid, inserted.name VALUES "
704            "(:name)",
705        )
706        i = insert(table1, values=dict(name="foo")).returning(table1)
707        self.assert_compile(
708            i,
709            "INSERT INTO mytable (name) OUTPUT "
710            "inserted.myid, inserted.name, "
711            "inserted.description VALUES (:name)",
712        )
713        i = insert(table1, values=dict(name="foo")).returning(
714            func.length(table1.c.name)
715        )
716        self.assert_compile(
717            i,
718            "INSERT INTO mytable (name) OUTPUT "
719            "LEN(inserted.name) AS length_1 VALUES "
720            "(:name)",
721        )
722
723    def test_limit_using_top(self):
724        t = table("t", column("x", Integer), column("y", Integer))
725
726        s = select([t]).where(t.c.x == 5).order_by(t.c.y).limit(10)
727
728        self.assert_compile(
729            s,
730            "SELECT TOP 10 t.x, t.y FROM t WHERE t.x = :x_1 ORDER BY t.y",
731            checkparams={"x_1": 5},
732        )
733
734    def test_limit_zero_using_top(self):
735        t = table("t", column("x", Integer), column("y", Integer))
736
737        s = select([t]).where(t.c.x == 5).order_by(t.c.y).limit(0)
738
739        self.assert_compile(
740            s,
741            "SELECT TOP 0 t.x, t.y FROM t WHERE t.x = :x_1 ORDER BY t.y",
742            checkparams={"x_1": 5},
743        )
744        c = s.compile(dialect=mssql.dialect())
745        eq_(len(c._result_columns), 2)
746        assert t.c.x in set(c._create_result_map()["x"][1])
747
748    def test_offset_using_window(self):
749        t = table("t", column("x", Integer), column("y", Integer))
750
751        s = select([t]).where(t.c.x == 5).order_by(t.c.y).offset(20)
752
753        # test that the select is not altered with subsequent compile
754        # calls
755        for i in range(2):
756            self.assert_compile(
757                s,
758                "SELECT anon_1.x, anon_1.y FROM (SELECT t.x AS x, t.y "
759                "AS y, ROW_NUMBER() OVER (ORDER BY t.y) AS "
760                "mssql_rn FROM t WHERE t.x = :x_1) AS "
761                "anon_1 WHERE mssql_rn > :param_1",
762                checkparams={"param_1": 20, "x_1": 5},
763            )
764
765            c = s.compile(dialect=mssql.dialect())
766            eq_(len(c._result_columns), 2)
767            assert t.c.x in set(c._create_result_map()["x"][1])
768
769    def test_limit_offset_using_window(self):
770        t = table("t", column("x", Integer), column("y", Integer))
771
772        s = select([t]).where(t.c.x == 5).order_by(t.c.y).limit(10).offset(20)
773
774        self.assert_compile(
775            s,
776            "SELECT anon_1.x, anon_1.y "
777            "FROM (SELECT t.x AS x, t.y AS y, "
778            "ROW_NUMBER() OVER (ORDER BY t.y) AS mssql_rn "
779            "FROM t "
780            "WHERE t.x = :x_1) AS anon_1 "
781            "WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1",
782            checkparams={"param_1": 20, "param_2": 10, "x_1": 5},
783        )
784        c = s.compile(dialect=mssql.dialect())
785        eq_(len(c._result_columns), 2)
786        assert t.c.x in set(c._create_result_map()["x"][1])
787        assert t.c.y in set(c._create_result_map()["y"][1])
788
789    def test_limit_offset_w_ambiguous_cols(self):
790        t = table("t", column("x", Integer), column("y", Integer))
791
792        cols = [t.c.x, t.c.x.label("q"), t.c.x.label("p"), t.c.y]
793        s = select(cols).where(t.c.x == 5).order_by(t.c.y).limit(10).offset(20)
794
795        self.assert_compile(
796            s,
797            "SELECT anon_1.x, anon_1.q, anon_1.p, anon_1.y "
798            "FROM (SELECT t.x AS x, t.x AS q, t.x AS p, t.y AS y, "
799            "ROW_NUMBER() OVER (ORDER BY t.y) AS mssql_rn "
800            "FROM t "
801            "WHERE t.x = :x_1) AS anon_1 "
802            "WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1",
803            checkparams={"param_1": 20, "param_2": 10, "x_1": 5},
804        )
805        c = s.compile(dialect=mssql.dialect())
806        eq_(len(c._result_columns), 4)
807
808        result_map = c._create_result_map()
809
810        for col in cols:
811            is_(result_map[col.key][1][0], col)
812
813    def test_limit_offset_with_correlated_order_by(self):
814        t1 = table("t1", column("x", Integer), column("y", Integer))
815        t2 = table("t2", column("x", Integer), column("y", Integer))
816
817        order_by = select([t2.c.y]).where(t1.c.x == t2.c.x).as_scalar()
818        s = (
819            select([t1])
820            .where(t1.c.x == 5)
821            .order_by(order_by)
822            .limit(10)
823            .offset(20)
824        )
825
826        self.assert_compile(
827            s,
828            "SELECT anon_1.x, anon_1.y "
829            "FROM (SELECT t1.x AS x, t1.y AS y, "
830            "ROW_NUMBER() OVER (ORDER BY "
831            "(SELECT t2.y FROM t2 WHERE t1.x = t2.x)"
832            ") AS mssql_rn "
833            "FROM t1 "
834            "WHERE t1.x = :x_1) AS anon_1 "
835            "WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1",
836            checkparams={"param_1": 20, "param_2": 10, "x_1": 5},
837        )
838
839        c = s.compile(dialect=mssql.dialect())
840        eq_(len(c._result_columns), 2)
841        assert t1.c.x in set(c._create_result_map()["x"][1])
842        assert t1.c.y in set(c._create_result_map()["y"][1])
843
844    def test_offset_dont_misapply_labelreference(self):
845        m = MetaData()
846
847        t = Table("t", m, Column("x", Integer))
848
849        expr1 = func.foo(t.c.x).label("x")
850        expr2 = func.foo(t.c.x).label("y")
851
852        stmt1 = select([expr1]).order_by(expr1.desc()).offset(1)
853        stmt2 = select([expr2]).order_by(expr2.desc()).offset(1)
854
855        self.assert_compile(
856            stmt1,
857            "SELECT anon_1.x FROM (SELECT foo(t.x) AS x, "
858            "ROW_NUMBER() OVER (ORDER BY foo(t.x) DESC) AS mssql_rn FROM t) "
859            "AS anon_1 WHERE mssql_rn > :param_1",
860        )
861
862        self.assert_compile(
863            stmt2,
864            "SELECT anon_1.y FROM (SELECT foo(t.x) AS y, "
865            "ROW_NUMBER() OVER (ORDER BY foo(t.x) DESC) AS mssql_rn FROM t) "
866            "AS anon_1 WHERE mssql_rn > :param_1",
867        )
868
869    def test_limit_zero_offset_using_window(self):
870        t = table("t", column("x", Integer), column("y", Integer))
871
872        s = select([t]).where(t.c.x == 5).order_by(t.c.y).limit(0).offset(0)
873
874        # render the LIMIT of zero, but not the OFFSET
875        # of zero, so produces TOP 0
876        self.assert_compile(
877            s,
878            "SELECT TOP 0 t.x, t.y FROM t " "WHERE t.x = :x_1 ORDER BY t.y",
879            checkparams={"x_1": 5},
880        )
881
882    def test_sequence_start_0(self):
883        metadata = MetaData()
884        tbl = Table(
885            "test",
886            metadata,
887            Column("id", Integer, Sequence("", 0), primary_key=True),
888        )
889        self.assert_compile(
890            schema.CreateTable(tbl),
891            "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(0,1), "
892            "PRIMARY KEY (id))",
893        )
894
895    def test_sequence_non_primary_key(self):
896        metadata = MetaData()
897        tbl = Table(
898            "test",
899            metadata,
900            Column("id", Integer, Sequence(""), primary_key=False),
901        )
902        self.assert_compile(
903            schema.CreateTable(tbl),
904            "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,1))",
905        )
906
907    def test_sequence_ignore_nullability(self):
908        metadata = MetaData()
909        tbl = Table(
910            "test",
911            metadata,
912            Column("id", Integer, Sequence(""), nullable=True),
913        )
914        self.assert_compile(
915            schema.CreateTable(tbl),
916            "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,1))",
917        )
918
919    def test_table_pkc_clustering(self):
920        metadata = MetaData()
921        tbl = Table(
922            "test",
923            metadata,
924            Column("x", Integer, autoincrement=False),
925            Column("y", Integer, autoincrement=False),
926            PrimaryKeyConstraint("x", "y", mssql_clustered=True),
927        )
928        self.assert_compile(
929            schema.CreateTable(tbl),
930            "CREATE TABLE test (x INTEGER NOT NULL, y INTEGER NOT NULL, "
931            "PRIMARY KEY CLUSTERED (x, y))",
932        )
933
934    def test_table_pkc_explicit_nonclustered(self):
935        metadata = MetaData()
936        tbl = Table(
937            "test",
938            metadata,
939            Column("x", Integer, autoincrement=False),
940            Column("y", Integer, autoincrement=False),
941            PrimaryKeyConstraint("x", "y", mssql_clustered=False),
942        )
943        self.assert_compile(
944            schema.CreateTable(tbl),
945            "CREATE TABLE test (x INTEGER NOT NULL, y INTEGER NOT NULL, "
946            "PRIMARY KEY NONCLUSTERED (x, y))",
947        )
948
949    def test_table_idx_explicit_nonclustered(self):
950        metadata = MetaData()
951        tbl = Table(
952            "test",
953            metadata,
954            Column("x", Integer, autoincrement=False),
955            Column("y", Integer, autoincrement=False),
956        )
957
958        idx = Index("myidx", tbl.c.x, tbl.c.y, mssql_clustered=False)
959        self.assert_compile(
960            schema.CreateIndex(idx),
961            "CREATE NONCLUSTERED INDEX myidx ON test (x, y)",
962        )
963
964    def test_table_uc_explicit_nonclustered(self):
965        metadata = MetaData()
966        tbl = Table(
967            "test",
968            metadata,
969            Column("x", Integer, autoincrement=False),
970            Column("y", Integer, autoincrement=False),
971            UniqueConstraint("x", "y", mssql_clustered=False),
972        )
973        self.assert_compile(
974            schema.CreateTable(tbl),
975            "CREATE TABLE test (x INTEGER NULL, y INTEGER NULL, "
976            "UNIQUE NONCLUSTERED (x, y))",
977        )
978
979    def test_table_uc_clustering(self):
980        metadata = MetaData()
981        tbl = Table(
982            "test",
983            metadata,
984            Column("x", Integer, autoincrement=False),
985            Column("y", Integer, autoincrement=False),
986            PrimaryKeyConstraint("x"),
987            UniqueConstraint("y", mssql_clustered=True),
988        )
989        self.assert_compile(
990            schema.CreateTable(tbl),
991            "CREATE TABLE test (x INTEGER NOT NULL, y INTEGER NULL, "
992            "PRIMARY KEY (x), UNIQUE CLUSTERED (y))",
993        )
994
995    def test_index_clustering(self):
996        metadata = MetaData()
997        tbl = Table("test", metadata, Column("id", Integer))
998        idx = Index("foo", tbl.c.id, mssql_clustered=True)
999        self.assert_compile(
1000            schema.CreateIndex(idx), "CREATE CLUSTERED INDEX foo ON test (id)"
1001        )
1002
1003    def test_index_ordering(self):
1004        metadata = MetaData()
1005        tbl = Table(
1006            "test",
1007            metadata,
1008            Column("x", Integer),
1009            Column("y", Integer),
1010            Column("z", Integer),
1011        )
1012        idx = Index("foo", tbl.c.x.desc(), "y")
1013        self.assert_compile(
1014            schema.CreateIndex(idx), "CREATE INDEX foo ON test (x DESC, y)"
1015        )
1016
1017    def test_create_index_expr(self):
1018        m = MetaData()
1019        t1 = Table("foo", m, Column("x", Integer))
1020        self.assert_compile(
1021            schema.CreateIndex(Index("bar", t1.c.x > 5)),
1022            "CREATE INDEX bar ON foo (x > 5)",
1023        )
1024
1025    def test_drop_index_w_schema(self):
1026        m = MetaData()
1027        t1 = Table("foo", m, Column("x", Integer), schema="bar")
1028        self.assert_compile(
1029            schema.DropIndex(Index("idx_foo", t1.c.x)),
1030            "DROP INDEX idx_foo ON bar.foo",
1031        )
1032
1033    def test_index_extra_include_1(self):
1034        metadata = MetaData()
1035        tbl = Table(
1036            "test",
1037            metadata,
1038            Column("x", Integer),
1039            Column("y", Integer),
1040            Column("z", Integer),
1041        )
1042        idx = Index("foo", tbl.c.x, mssql_include=["y"])
1043        self.assert_compile(
1044            schema.CreateIndex(idx), "CREATE INDEX foo ON test (x) INCLUDE (y)"
1045        )
1046
1047    def test_index_extra_include_2(self):
1048        metadata = MetaData()
1049        tbl = Table(
1050            "test",
1051            metadata,
1052            Column("x", Integer),
1053            Column("y", Integer),
1054            Column("z", Integer),
1055        )
1056        idx = Index("foo", tbl.c.x, mssql_include=[tbl.c.y])
1057        self.assert_compile(
1058            schema.CreateIndex(idx), "CREATE INDEX foo ON test (x) INCLUDE (y)"
1059        )
1060
1061
1062class SchemaTest(fixtures.TestBase):
1063    def setup(self):
1064        t = Table(
1065            "sometable",
1066            MetaData(),
1067            Column("pk_column", Integer),
1068            Column("test_column", String),
1069        )
1070        self.column = t.c.test_column
1071
1072        dialect = mssql.dialect()
1073        self.ddl_compiler = dialect.ddl_compiler(
1074            dialect, schema.CreateTable(t)
1075        )
1076
1077    def _column_spec(self):
1078        return self.ddl_compiler.get_column_specification(self.column)
1079
1080    def test_that_mssql_default_nullability_emits_null(self):
1081        eq_("test_column VARCHAR(max) NULL", self._column_spec())
1082
1083    def test_that_mssql_none_nullability_does_not_emit_nullability(self):
1084        self.column.nullable = None
1085        eq_("test_column VARCHAR(max)", self._column_spec())
1086
1087    def test_that_mssql_specified_nullable_emits_null(self):
1088        self.column.nullable = True
1089        eq_("test_column VARCHAR(max) NULL", self._column_spec())
1090
1091    def test_that_mssql_specified_not_nullable_emits_not_null(self):
1092        self.column.nullable = False
1093        eq_("test_column VARCHAR(max) NOT NULL", self._column_spec())
1094