1# -*- encoding: utf-8 2from sqlalchemy import Column 3from sqlalchemy import delete 4from sqlalchemy import extract 5from sqlalchemy import func 6from sqlalchemy import Index 7from sqlalchemy import insert 8from sqlalchemy import Integer 9from sqlalchemy import literal 10from sqlalchemy import MetaData 11from sqlalchemy import PrimaryKeyConstraint 12from sqlalchemy import schema 13from sqlalchemy import select 14from sqlalchemy import Sequence 15from sqlalchemy import sql 16from sqlalchemy import String 17from sqlalchemy import Table 18from sqlalchemy import testing 19from sqlalchemy import union 20from sqlalchemy import UniqueConstraint 21from sqlalchemy import update 22from sqlalchemy.dialects import mssql 23from sqlalchemy.dialects.mssql import base 24from sqlalchemy.dialects.mssql import mxodbc 25from sqlalchemy.sql import column 26from sqlalchemy.sql import quoted_name 27from sqlalchemy.sql import table 28from sqlalchemy.testing import AssertsCompiledSQL 29from sqlalchemy.testing import eq_ 30from sqlalchemy.testing import fixtures 31from sqlalchemy.testing import is_ 32 33 34class CompileTest(fixtures.TestBase, AssertsCompiledSQL): 35 __dialect__ = mssql.dialect() 36 37 def test_true_false(self): 38 self.assert_compile(sql.false(), "0") 39 self.assert_compile(sql.true(), "1") 40 41 def test_select(self): 42 t = table("sometable", column("somecolumn")) 43 self.assert_compile( 44 t.select(), "SELECT sometable.somecolumn FROM sometable" 45 ) 46 47 def test_select_with_nolock(self): 48 t = table("sometable", column("somecolumn")) 49 self.assert_compile( 50 t.select().with_hint(t, "WITH (NOLOCK)"), 51 "SELECT sometable.somecolumn FROM sometable WITH (NOLOCK)", 52 ) 53 54 def test_select_with_nolock_schema(self): 55 m = MetaData() 56 t = Table( 57 "sometable", m, Column("somecolumn", Integer), schema="test_schema" 58 ) 59 self.assert_compile( 60 t.select().with_hint(t, "WITH (NOLOCK)"), 61 "SELECT test_schema.sometable.somecolumn " 62 "FROM test_schema.sometable WITH (NOLOCK)", 63 ) 64 65 def test_select_w_order_by_collate(self): 66 m = MetaData() 67 t = Table("sometable", m, Column("somecolumn", String)) 68 69 self.assert_compile( 70 select([t]).order_by( 71 t.c.somecolumn.collate("Latin1_General_CS_AS_KS_WS_CI").asc() 72 ), 73 "SELECT sometable.somecolumn FROM sometable " 74 "ORDER BY sometable.somecolumn COLLATE " 75 "Latin1_General_CS_AS_KS_WS_CI ASC", 76 ) 77 78 def test_join_with_hint(self): 79 t1 = table( 80 "t1", 81 column("a", Integer), 82 column("b", String), 83 column("c", String), 84 ) 85 t2 = table( 86 "t2", 87 column("a", Integer), 88 column("b", Integer), 89 column("c", Integer), 90 ) 91 join = ( 92 t1.join(t2, t1.c.a == t2.c.a) 93 .select() 94 .with_hint(t1, "WITH (NOLOCK)") 95 ) 96 self.assert_compile( 97 join, 98 "SELECT t1.a, t1.b, t1.c, t2.a, t2.b, t2.c " 99 "FROM t1 WITH (NOLOCK) JOIN t2 ON t1.a = t2.a", 100 ) 101 102 def test_insert(self): 103 t = table("sometable", column("somecolumn")) 104 self.assert_compile( 105 t.insert(), 106 "INSERT INTO sometable (somecolumn) VALUES " "(:somecolumn)", 107 ) 108 109 def test_update(self): 110 t = table("sometable", column("somecolumn")) 111 self.assert_compile( 112 t.update(t.c.somecolumn == 7), 113 "UPDATE sometable SET somecolumn=:somecolum" 114 "n WHERE sometable.somecolumn = " 115 ":somecolumn_1", 116 dict(somecolumn=10), 117 ) 118 119 def test_insert_hint(self): 120 t = table("sometable", column("somecolumn")) 121 for targ in (None, t): 122 for darg in ("*", "mssql"): 123 self.assert_compile( 124 t.insert() 125 .values(somecolumn="x") 126 .with_hint( 127 "WITH (PAGLOCK)", selectable=targ, dialect_name=darg 128 ), 129 "INSERT INTO sometable WITH (PAGLOCK) " 130 "(somecolumn) VALUES (:somecolumn)", 131 ) 132 133 def test_update_hint(self): 134 t = table("sometable", column("somecolumn")) 135 for targ in (None, t): 136 for darg in ("*", "mssql"): 137 self.assert_compile( 138 t.update() 139 .where(t.c.somecolumn == "q") 140 .values(somecolumn="x") 141 .with_hint( 142 "WITH (PAGLOCK)", selectable=targ, dialect_name=darg 143 ), 144 "UPDATE sometable WITH (PAGLOCK) " 145 "SET somecolumn=:somecolumn " 146 "WHERE sometable.somecolumn = :somecolumn_1", 147 ) 148 149 def test_update_exclude_hint(self): 150 t = table("sometable", column("somecolumn")) 151 self.assert_compile( 152 t.update() 153 .where(t.c.somecolumn == "q") 154 .values(somecolumn="x") 155 .with_hint("XYZ", "mysql"), 156 "UPDATE sometable SET somecolumn=:somecolumn " 157 "WHERE sometable.somecolumn = :somecolumn_1", 158 ) 159 160 def test_delete_hint(self): 161 t = table("sometable", column("somecolumn")) 162 for targ in (None, t): 163 for darg in ("*", "mssql"): 164 self.assert_compile( 165 t.delete() 166 .where(t.c.somecolumn == "q") 167 .with_hint( 168 "WITH (PAGLOCK)", selectable=targ, dialect_name=darg 169 ), 170 "DELETE FROM sometable WITH (PAGLOCK) " 171 "WHERE sometable.somecolumn = :somecolumn_1", 172 ) 173 174 def test_delete_exclude_hint(self): 175 t = table("sometable", column("somecolumn")) 176 self.assert_compile( 177 t.delete() 178 .where(t.c.somecolumn == "q") 179 .with_hint("XYZ", dialect_name="mysql"), 180 "DELETE FROM sometable WHERE " 181 "sometable.somecolumn = :somecolumn_1", 182 ) 183 184 def test_delete_extra_froms(self): 185 t1 = table("t1", column("c1")) 186 t2 = table("t2", column("c1")) 187 q = sql.delete(t1).where(t1.c.c1 == t2.c.c1) 188 self.assert_compile( 189 q, "DELETE FROM t1 FROM t1, t2 WHERE t1.c1 = t2.c1" 190 ) 191 192 def test_delete_extra_froms_alias(self): 193 a1 = table("t1", column("c1")).alias("a1") 194 t2 = table("t2", column("c1")) 195 q = sql.delete(a1).where(a1.c.c1 == t2.c.c1) 196 self.assert_compile( 197 q, "DELETE FROM a1 FROM t1 AS a1, t2 WHERE a1.c1 = t2.c1" 198 ) 199 self.assert_compile(sql.delete(a1), "DELETE FROM t1 AS a1") 200 201 def test_update_from(self): 202 metadata = MetaData() 203 table1 = Table( 204 "mytable", 205 metadata, 206 Column("myid", Integer), 207 Column("name", String(30)), 208 Column("description", String(50)), 209 ) 210 table2 = Table( 211 "myothertable", 212 metadata, 213 Column("otherid", Integer), 214 Column("othername", String(30)), 215 ) 216 217 mt = table1.alias() 218 219 u = ( 220 table1.update() 221 .values(name="foo") 222 .where(table2.c.otherid == table1.c.myid) 223 ) 224 225 # testing mssql.base.MSSQLCompiler.update_from_clause 226 self.assert_compile( 227 u, 228 "UPDATE mytable SET name=:name " 229 "FROM mytable, myothertable WHERE " 230 "myothertable.otherid = mytable.myid", 231 ) 232 233 self.assert_compile( 234 u.where(table2.c.othername == mt.c.name), 235 "UPDATE mytable SET name=:name " 236 "FROM mytable, myothertable, mytable AS mytable_1 " 237 "WHERE myothertable.otherid = mytable.myid " 238 "AND myothertable.othername = mytable_1.name", 239 ) 240 241 def test_update_from_hint(self): 242 t = table("sometable", column("somecolumn")) 243 t2 = table("othertable", column("somecolumn")) 244 for darg in ("*", "mssql"): 245 self.assert_compile( 246 t.update() 247 .where(t.c.somecolumn == t2.c.somecolumn) 248 .values(somecolumn="x") 249 .with_hint("WITH (PAGLOCK)", selectable=t2, dialect_name=darg), 250 "UPDATE sometable SET somecolumn=:somecolumn " 251 "FROM sometable, othertable WITH (PAGLOCK) " 252 "WHERE sometable.somecolumn = othertable.somecolumn", 253 ) 254 255 def test_update_to_select_schema(self): 256 meta = MetaData() 257 table = Table( 258 "sometable", 259 meta, 260 Column("sym", String), 261 Column("val", Integer), 262 schema="schema", 263 ) 264 other = Table( 265 "#other", meta, Column("sym", String), Column("newval", Integer) 266 ) 267 stmt = table.update().values( 268 val=select([other.c.newval]) 269 .where(table.c.sym == other.c.sym) 270 .as_scalar() 271 ) 272 273 self.assert_compile( 274 stmt, 275 "UPDATE [schema].sometable SET val=" 276 "(SELECT [#other].newval FROM [#other] " 277 "WHERE [schema].sometable.sym = [#other].sym)", 278 ) 279 280 stmt = ( 281 table.update() 282 .values(val=other.c.newval) 283 .where(table.c.sym == other.c.sym) 284 ) 285 self.assert_compile( 286 stmt, 287 "UPDATE [schema].sometable SET val=" 288 "[#other].newval FROM [schema].sometable, " 289 "[#other] WHERE [schema].sometable.sym = [#other].sym", 290 ) 291 292 # TODO: not supported yet. 293 # def test_delete_from_hint(self): 294 # t = table('sometable', column('somecolumn')) 295 # t2 = table('othertable', column('somecolumn')) 296 # for darg in ("*", "mssql"): 297 # self.assert_compile( 298 # t.delete().where(t.c.somecolumn==t2.c.somecolumn). 299 # with_hint("WITH (PAGLOCK)", 300 # selectable=t2, 301 # dialect_name=darg), 302 # "" 303 # ) 304 305 def test_strict_binds(self): 306 """test the 'strict' compiler binds.""" 307 308 from sqlalchemy.dialects.mssql.base import MSSQLStrictCompiler 309 310 mxodbc_dialect = mxodbc.dialect() 311 mxodbc_dialect.statement_compiler = MSSQLStrictCompiler 312 313 t = table("sometable", column("foo")) 314 315 for expr, compiled in [ 316 ( 317 select([literal("x"), literal("y")]), 318 "SELECT 'x' AS anon_1, 'y' AS anon_2", 319 ), 320 ( 321 select([t]).where(t.c.foo.in_(["x", "y", "z"])), 322 "SELECT sometable.foo FROM sometable WHERE sometable.foo " 323 "IN ('x', 'y', 'z')", 324 ), 325 (t.c.foo.in_([None]), "sometable.foo IN (NULL)"), 326 ]: 327 self.assert_compile(expr, compiled, dialect=mxodbc_dialect) 328 329 def test_in_with_subqueries(self): 330 """Test removal of legacy behavior that converted "x==subquery" 331 to use IN. 332 333 """ 334 335 t = table("sometable", column("somecolumn")) 336 self.assert_compile( 337 t.select().where(t.c.somecolumn == t.select()), 338 "SELECT sometable.somecolumn FROM " 339 "sometable WHERE sometable.somecolumn = " 340 "(SELECT sometable.somecolumn FROM " 341 "sometable)", 342 ) 343 self.assert_compile( 344 t.select().where(t.c.somecolumn != t.select()), 345 "SELECT sometable.somecolumn FROM " 346 "sometable WHERE sometable.somecolumn != " 347 "(SELECT sometable.somecolumn FROM " 348 "sometable)", 349 ) 350 351 @testing.uses_deprecated 352 def test_count(self): 353 t = table("sometable", column("somecolumn")) 354 self.assert_compile( 355 t.count(), 356 "SELECT count(sometable.somecolumn) AS " 357 "tbl_row_count FROM sometable", 358 ) 359 360 def test_noorderby_insubquery(self): 361 """test that the ms-sql dialect removes ORDER BY clauses from 362 subqueries""" 363 364 table1 = table( 365 "mytable", 366 column("myid", Integer), 367 column("name", String), 368 column("description", String), 369 ) 370 371 q = select([table1.c.myid], order_by=[table1.c.myid]).alias("foo") 372 crit = q.c.myid == table1.c.myid 373 self.assert_compile( 374 select(["*"], crit), 375 "SELECT * FROM (SELECT mytable.myid AS " 376 "myid FROM mytable) AS foo, mytable WHERE " 377 "foo.myid = mytable.myid", 378 ) 379 380 def test_force_schema_quoted_name_w_dot_case_insensitive(self): 381 metadata = MetaData() 382 tbl = Table( 383 "test", 384 metadata, 385 Column("id", Integer, primary_key=True), 386 schema=quoted_name("foo.dbo", True), 387 ) 388 self.assert_compile( 389 select([tbl]), "SELECT [foo.dbo].test.id FROM [foo.dbo].test" 390 ) 391 392 def test_force_schema_quoted_w_dot_case_insensitive(self): 393 metadata = MetaData() 394 tbl = Table( 395 "test", 396 metadata, 397 Column("id", Integer, primary_key=True), 398 schema=quoted_name("foo.dbo", True), 399 ) 400 self.assert_compile( 401 select([tbl]), "SELECT [foo.dbo].test.id FROM [foo.dbo].test" 402 ) 403 404 def test_force_schema_quoted_name_w_dot_case_sensitive(self): 405 metadata = MetaData() 406 tbl = Table( 407 "test", 408 metadata, 409 Column("id", Integer, primary_key=True), 410 schema=quoted_name("Foo.dbo", True), 411 ) 412 self.assert_compile( 413 select([tbl]), "SELECT [Foo.dbo].test.id FROM [Foo.dbo].test" 414 ) 415 416 def test_force_schema_quoted_w_dot_case_sensitive(self): 417 metadata = MetaData() 418 tbl = Table( 419 "test", 420 metadata, 421 Column("id", Integer, primary_key=True), 422 schema="[Foo.dbo]", 423 ) 424 self.assert_compile( 425 select([tbl]), "SELECT [Foo.dbo].test.id FROM [Foo.dbo].test" 426 ) 427 428 def test_schema_autosplit_w_dot_case_insensitive(self): 429 metadata = MetaData() 430 tbl = Table( 431 "test", 432 metadata, 433 Column("id", Integer, primary_key=True), 434 schema="foo.dbo", 435 ) 436 self.assert_compile( 437 select([tbl]), "SELECT foo.dbo.test.id FROM foo.dbo.test" 438 ) 439 440 def test_schema_autosplit_w_dot_case_sensitive(self): 441 metadata = MetaData() 442 tbl = Table( 443 "test", 444 metadata, 445 Column("id", Integer, primary_key=True), 446 schema="Foo.dbo", 447 ) 448 self.assert_compile( 449 select([tbl]), "SELECT [Foo].dbo.test.id FROM [Foo].dbo.test" 450 ) 451 452 def test_owner_database_pairs(self): 453 dialect = mssql.dialect() 454 455 for identifier, expected_schema, expected_owner in [ 456 ("foo", None, "foo"), 457 ("foo.bar", "foo", "bar"), 458 ("Foo.Bar", "Foo", "Bar"), 459 ("[Foo.Bar]", None, "Foo.Bar"), 460 ("[Foo.Bar].[bat]", "Foo.Bar", "bat"), 461 ]: 462 schema, owner = base._owner_plus_db(dialect, identifier) 463 464 eq_(owner, expected_owner) 465 eq_(schema, expected_schema) 466 467 def test_delete_schema(self): 468 metadata = MetaData() 469 tbl = Table( 470 "test", 471 metadata, 472 Column("id", Integer, primary_key=True), 473 schema="paj", 474 ) 475 self.assert_compile( 476 tbl.delete(tbl.c.id == 1), 477 "DELETE FROM paj.test WHERE paj.test.id = " ":id_1", 478 ) 479 s = select([tbl.c.id]).where(tbl.c.id == 1) 480 self.assert_compile( 481 tbl.delete().where(tbl.c.id.in_(s)), 482 "DELETE FROM paj.test WHERE paj.test.id IN " 483 "(SELECT paj.test.id FROM paj.test " 484 "WHERE paj.test.id = :id_1)", 485 ) 486 487 def test_delete_schema_multipart(self): 488 metadata = MetaData() 489 tbl = Table( 490 "test", 491 metadata, 492 Column("id", Integer, primary_key=True), 493 schema="banana.paj", 494 ) 495 self.assert_compile( 496 tbl.delete(tbl.c.id == 1), 497 "DELETE FROM banana.paj.test WHERE " "banana.paj.test.id = :id_1", 498 ) 499 s = select([tbl.c.id]).where(tbl.c.id == 1) 500 self.assert_compile( 501 tbl.delete().where(tbl.c.id.in_(s)), 502 "DELETE FROM banana.paj.test WHERE " 503 "banana.paj.test.id IN (SELECT banana.paj.test.id " 504 "FROM banana.paj.test WHERE " 505 "banana.paj.test.id = :id_1)", 506 ) 507 508 def test_delete_schema_multipart_needs_quoting(self): 509 metadata = MetaData() 510 tbl = Table( 511 "test", 512 metadata, 513 Column("id", Integer, primary_key=True), 514 schema="banana split.paj", 515 ) 516 self.assert_compile( 517 tbl.delete(tbl.c.id == 1), 518 "DELETE FROM [banana split].paj.test WHERE " 519 "[banana split].paj.test.id = :id_1", 520 ) 521 s = select([tbl.c.id]).where(tbl.c.id == 1) 522 self.assert_compile( 523 tbl.delete().where(tbl.c.id.in_(s)), 524 "DELETE FROM [banana split].paj.test WHERE " 525 "[banana split].paj.test.id IN (" 526 "SELECT [banana split].paj.test.id FROM " 527 "[banana split].paj.test WHERE " 528 "[banana split].paj.test.id = :id_1)", 529 ) 530 531 def test_delete_schema_multipart_both_need_quoting(self): 532 metadata = MetaData() 533 tbl = Table( 534 "test", 535 metadata, 536 Column("id", Integer, primary_key=True), 537 schema="banana split.paj with a space", 538 ) 539 self.assert_compile( 540 tbl.delete(tbl.c.id == 1), 541 "DELETE FROM [banana split].[paj with a " 542 "space].test WHERE [banana split].[paj " 543 "with a space].test.id = :id_1", 544 ) 545 s = select([tbl.c.id]).where(tbl.c.id == 1) 546 self.assert_compile( 547 tbl.delete().where(tbl.c.id.in_(s)), 548 "DELETE FROM [banana split].[paj with a space].test " 549 "WHERE [banana split].[paj with a space].test.id IN " 550 "(SELECT [banana split].[paj with a space].test.id " 551 "FROM [banana split].[paj with a space].test " 552 "WHERE [banana split].[paj with a space].test.id = :id_1)", 553 ) 554 555 def test_union(self): 556 t1 = table( 557 "t1", 558 column("col1"), 559 column("col2"), 560 column("col3"), 561 column("col4"), 562 ) 563 t2 = table( 564 "t2", 565 column("col1"), 566 column("col2"), 567 column("col3"), 568 column("col4"), 569 ) 570 s1, s2 = ( 571 select( 572 [t1.c.col3.label("col3"), t1.c.col4.label("col4")], 573 t1.c.col2.in_(["t1col2r1", "t1col2r2"]), 574 ), 575 select( 576 [t2.c.col3.label("col3"), t2.c.col4.label("col4")], 577 t2.c.col2.in_(["t2col2r2", "t2col2r3"]), 578 ), 579 ) 580 u = union(s1, s2, order_by=["col3", "col4"]) 581 self.assert_compile( 582 u, 583 "SELECT t1.col3 AS col3, t1.col4 AS col4 " 584 "FROM t1 WHERE t1.col2 IN (:col2_1, " 585 ":col2_2) UNION SELECT t2.col3 AS col3, " 586 "t2.col4 AS col4 FROM t2 WHERE t2.col2 IN " 587 "(:col2_3, :col2_4) ORDER BY col3, col4", 588 ) 589 self.assert_compile( 590 u.alias("bar").select(), 591 "SELECT bar.col3, bar.col4 FROM (SELECT " 592 "t1.col3 AS col3, t1.col4 AS col4 FROM t1 " 593 "WHERE t1.col2 IN (:col2_1, :col2_2) UNION " 594 "SELECT t2.col3 AS col3, t2.col4 AS col4 " 595 "FROM t2 WHERE t2.col2 IN (:col2_3, " 596 ":col2_4)) AS bar", 597 ) 598 599 def test_function(self): 600 self.assert_compile(func.foo(1, 2), "foo(:foo_1, :foo_2)") 601 self.assert_compile(func.current_time(), "CURRENT_TIME") 602 self.assert_compile(func.foo(), "foo()") 603 m = MetaData() 604 t = Table( 605 "sometable", m, Column("col1", Integer), Column("col2", Integer) 606 ) 607 self.assert_compile( 608 select([func.max(t.c.col1)]), 609 "SELECT max(sometable.col1) AS max_1 FROM " "sometable", 610 ) 611 612 def test_function_overrides(self): 613 self.assert_compile(func.current_date(), "GETDATE()") 614 self.assert_compile(func.length(3), "LEN(:length_1)") 615 616 def test_extract(self): 617 t = table("t", column("col1")) 618 619 for field in "day", "month", "year": 620 self.assert_compile( 621 select([extract(field, t.c.col1)]), 622 "SELECT DATEPART(%s, t.col1) AS anon_1 FROM t" % field, 623 ) 624 625 def test_update_returning(self): 626 table1 = table( 627 "mytable", 628 column("myid", Integer), 629 column("name", String(128)), 630 column("description", String(128)), 631 ) 632 u = update(table1, values=dict(name="foo")).returning( 633 table1.c.myid, table1.c.name 634 ) 635 self.assert_compile( 636 u, 637 "UPDATE mytable SET name=:name OUTPUT " 638 "inserted.myid, inserted.name", 639 ) 640 u = update(table1, values=dict(name="foo")).returning(table1) 641 self.assert_compile( 642 u, 643 "UPDATE mytable SET name=:name OUTPUT " 644 "inserted.myid, inserted.name, " 645 "inserted.description", 646 ) 647 u = ( 648 update(table1, values=dict(name="foo")) 649 .returning(table1) 650 .where(table1.c.name == "bar") 651 ) 652 self.assert_compile( 653 u, 654 "UPDATE mytable SET name=:name OUTPUT " 655 "inserted.myid, inserted.name, " 656 "inserted.description WHERE mytable.name = " 657 ":name_1", 658 ) 659 u = update(table1, values=dict(name="foo")).returning( 660 func.length(table1.c.name) 661 ) 662 self.assert_compile( 663 u, 664 "UPDATE mytable SET name=:name OUTPUT " 665 "LEN(inserted.name) AS length_1", 666 ) 667 668 def test_delete_returning(self): 669 table1 = table( 670 "mytable", 671 column("myid", Integer), 672 column("name", String(128)), 673 column("description", String(128)), 674 ) 675 d = delete(table1).returning(table1.c.myid, table1.c.name) 676 self.assert_compile( 677 d, "DELETE FROM mytable OUTPUT deleted.myid, " "deleted.name" 678 ) 679 d = ( 680 delete(table1) 681 .where(table1.c.name == "bar") 682 .returning(table1.c.myid, table1.c.name) 683 ) 684 self.assert_compile( 685 d, 686 "DELETE FROM mytable OUTPUT deleted.myid, " 687 "deleted.name WHERE mytable.name = :name_1", 688 ) 689 690 def test_insert_returning(self): 691 table1 = table( 692 "mytable", 693 column("myid", Integer), 694 column("name", String(128)), 695 column("description", String(128)), 696 ) 697 i = insert(table1, values=dict(name="foo")).returning( 698 table1.c.myid, table1.c.name 699 ) 700 self.assert_compile( 701 i, 702 "INSERT INTO mytable (name) OUTPUT " 703 "inserted.myid, inserted.name VALUES " 704 "(:name)", 705 ) 706 i = insert(table1, values=dict(name="foo")).returning(table1) 707 self.assert_compile( 708 i, 709 "INSERT INTO mytable (name) OUTPUT " 710 "inserted.myid, inserted.name, " 711 "inserted.description VALUES (:name)", 712 ) 713 i = insert(table1, values=dict(name="foo")).returning( 714 func.length(table1.c.name) 715 ) 716 self.assert_compile( 717 i, 718 "INSERT INTO mytable (name) OUTPUT " 719 "LEN(inserted.name) AS length_1 VALUES " 720 "(:name)", 721 ) 722 723 def test_limit_using_top(self): 724 t = table("t", column("x", Integer), column("y", Integer)) 725 726 s = select([t]).where(t.c.x == 5).order_by(t.c.y).limit(10) 727 728 self.assert_compile( 729 s, 730 "SELECT TOP 10 t.x, t.y FROM t WHERE t.x = :x_1 ORDER BY t.y", 731 checkparams={"x_1": 5}, 732 ) 733 734 def test_limit_zero_using_top(self): 735 t = table("t", column("x", Integer), column("y", Integer)) 736 737 s = select([t]).where(t.c.x == 5).order_by(t.c.y).limit(0) 738 739 self.assert_compile( 740 s, 741 "SELECT TOP 0 t.x, t.y FROM t WHERE t.x = :x_1 ORDER BY t.y", 742 checkparams={"x_1": 5}, 743 ) 744 c = s.compile(dialect=mssql.dialect()) 745 eq_(len(c._result_columns), 2) 746 assert t.c.x in set(c._create_result_map()["x"][1]) 747 748 def test_offset_using_window(self): 749 t = table("t", column("x", Integer), column("y", Integer)) 750 751 s = select([t]).where(t.c.x == 5).order_by(t.c.y).offset(20) 752 753 # test that the select is not altered with subsequent compile 754 # calls 755 for i in range(2): 756 self.assert_compile( 757 s, 758 "SELECT anon_1.x, anon_1.y FROM (SELECT t.x AS x, t.y " 759 "AS y, ROW_NUMBER() OVER (ORDER BY t.y) AS " 760 "mssql_rn FROM t WHERE t.x = :x_1) AS " 761 "anon_1 WHERE mssql_rn > :param_1", 762 checkparams={"param_1": 20, "x_1": 5}, 763 ) 764 765 c = s.compile(dialect=mssql.dialect()) 766 eq_(len(c._result_columns), 2) 767 assert t.c.x in set(c._create_result_map()["x"][1]) 768 769 def test_limit_offset_using_window(self): 770 t = table("t", column("x", Integer), column("y", Integer)) 771 772 s = select([t]).where(t.c.x == 5).order_by(t.c.y).limit(10).offset(20) 773 774 self.assert_compile( 775 s, 776 "SELECT anon_1.x, anon_1.y " 777 "FROM (SELECT t.x AS x, t.y AS y, " 778 "ROW_NUMBER() OVER (ORDER BY t.y) AS mssql_rn " 779 "FROM t " 780 "WHERE t.x = :x_1) AS anon_1 " 781 "WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1", 782 checkparams={"param_1": 20, "param_2": 10, "x_1": 5}, 783 ) 784 c = s.compile(dialect=mssql.dialect()) 785 eq_(len(c._result_columns), 2) 786 assert t.c.x in set(c._create_result_map()["x"][1]) 787 assert t.c.y in set(c._create_result_map()["y"][1]) 788 789 def test_limit_offset_w_ambiguous_cols(self): 790 t = table("t", column("x", Integer), column("y", Integer)) 791 792 cols = [t.c.x, t.c.x.label("q"), t.c.x.label("p"), t.c.y] 793 s = select(cols).where(t.c.x == 5).order_by(t.c.y).limit(10).offset(20) 794 795 self.assert_compile( 796 s, 797 "SELECT anon_1.x, anon_1.q, anon_1.p, anon_1.y " 798 "FROM (SELECT t.x AS x, t.x AS q, t.x AS p, t.y AS y, " 799 "ROW_NUMBER() OVER (ORDER BY t.y) AS mssql_rn " 800 "FROM t " 801 "WHERE t.x = :x_1) AS anon_1 " 802 "WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1", 803 checkparams={"param_1": 20, "param_2": 10, "x_1": 5}, 804 ) 805 c = s.compile(dialect=mssql.dialect()) 806 eq_(len(c._result_columns), 4) 807 808 result_map = c._create_result_map() 809 810 for col in cols: 811 is_(result_map[col.key][1][0], col) 812 813 def test_limit_offset_with_correlated_order_by(self): 814 t1 = table("t1", column("x", Integer), column("y", Integer)) 815 t2 = table("t2", column("x", Integer), column("y", Integer)) 816 817 order_by = select([t2.c.y]).where(t1.c.x == t2.c.x).as_scalar() 818 s = ( 819 select([t1]) 820 .where(t1.c.x == 5) 821 .order_by(order_by) 822 .limit(10) 823 .offset(20) 824 ) 825 826 self.assert_compile( 827 s, 828 "SELECT anon_1.x, anon_1.y " 829 "FROM (SELECT t1.x AS x, t1.y AS y, " 830 "ROW_NUMBER() OVER (ORDER BY " 831 "(SELECT t2.y FROM t2 WHERE t1.x = t2.x)" 832 ") AS mssql_rn " 833 "FROM t1 " 834 "WHERE t1.x = :x_1) AS anon_1 " 835 "WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1", 836 checkparams={"param_1": 20, "param_2": 10, "x_1": 5}, 837 ) 838 839 c = s.compile(dialect=mssql.dialect()) 840 eq_(len(c._result_columns), 2) 841 assert t1.c.x in set(c._create_result_map()["x"][1]) 842 assert t1.c.y in set(c._create_result_map()["y"][1]) 843 844 def test_offset_dont_misapply_labelreference(self): 845 m = MetaData() 846 847 t = Table("t", m, Column("x", Integer)) 848 849 expr1 = func.foo(t.c.x).label("x") 850 expr2 = func.foo(t.c.x).label("y") 851 852 stmt1 = select([expr1]).order_by(expr1.desc()).offset(1) 853 stmt2 = select([expr2]).order_by(expr2.desc()).offset(1) 854 855 self.assert_compile( 856 stmt1, 857 "SELECT anon_1.x FROM (SELECT foo(t.x) AS x, " 858 "ROW_NUMBER() OVER (ORDER BY foo(t.x) DESC) AS mssql_rn FROM t) " 859 "AS anon_1 WHERE mssql_rn > :param_1", 860 ) 861 862 self.assert_compile( 863 stmt2, 864 "SELECT anon_1.y FROM (SELECT foo(t.x) AS y, " 865 "ROW_NUMBER() OVER (ORDER BY foo(t.x) DESC) AS mssql_rn FROM t) " 866 "AS anon_1 WHERE mssql_rn > :param_1", 867 ) 868 869 def test_limit_zero_offset_using_window(self): 870 t = table("t", column("x", Integer), column("y", Integer)) 871 872 s = select([t]).where(t.c.x == 5).order_by(t.c.y).limit(0).offset(0) 873 874 # render the LIMIT of zero, but not the OFFSET 875 # of zero, so produces TOP 0 876 self.assert_compile( 877 s, 878 "SELECT TOP 0 t.x, t.y FROM t " "WHERE t.x = :x_1 ORDER BY t.y", 879 checkparams={"x_1": 5}, 880 ) 881 882 def test_sequence_start_0(self): 883 metadata = MetaData() 884 tbl = Table( 885 "test", 886 metadata, 887 Column("id", Integer, Sequence("", 0), primary_key=True), 888 ) 889 self.assert_compile( 890 schema.CreateTable(tbl), 891 "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(0,1), " 892 "PRIMARY KEY (id))", 893 ) 894 895 def test_sequence_non_primary_key(self): 896 metadata = MetaData() 897 tbl = Table( 898 "test", 899 metadata, 900 Column("id", Integer, Sequence(""), primary_key=False), 901 ) 902 self.assert_compile( 903 schema.CreateTable(tbl), 904 "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,1))", 905 ) 906 907 def test_sequence_ignore_nullability(self): 908 metadata = MetaData() 909 tbl = Table( 910 "test", 911 metadata, 912 Column("id", Integer, Sequence(""), nullable=True), 913 ) 914 self.assert_compile( 915 schema.CreateTable(tbl), 916 "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(1,1))", 917 ) 918 919 def test_table_pkc_clustering(self): 920 metadata = MetaData() 921 tbl = Table( 922 "test", 923 metadata, 924 Column("x", Integer, autoincrement=False), 925 Column("y", Integer, autoincrement=False), 926 PrimaryKeyConstraint("x", "y", mssql_clustered=True), 927 ) 928 self.assert_compile( 929 schema.CreateTable(tbl), 930 "CREATE TABLE test (x INTEGER NOT NULL, y INTEGER NOT NULL, " 931 "PRIMARY KEY CLUSTERED (x, y))", 932 ) 933 934 def test_table_pkc_explicit_nonclustered(self): 935 metadata = MetaData() 936 tbl = Table( 937 "test", 938 metadata, 939 Column("x", Integer, autoincrement=False), 940 Column("y", Integer, autoincrement=False), 941 PrimaryKeyConstraint("x", "y", mssql_clustered=False), 942 ) 943 self.assert_compile( 944 schema.CreateTable(tbl), 945 "CREATE TABLE test (x INTEGER NOT NULL, y INTEGER NOT NULL, " 946 "PRIMARY KEY NONCLUSTERED (x, y))", 947 ) 948 949 def test_table_idx_explicit_nonclustered(self): 950 metadata = MetaData() 951 tbl = Table( 952 "test", 953 metadata, 954 Column("x", Integer, autoincrement=False), 955 Column("y", Integer, autoincrement=False), 956 ) 957 958 idx = Index("myidx", tbl.c.x, tbl.c.y, mssql_clustered=False) 959 self.assert_compile( 960 schema.CreateIndex(idx), 961 "CREATE NONCLUSTERED INDEX myidx ON test (x, y)", 962 ) 963 964 def test_table_uc_explicit_nonclustered(self): 965 metadata = MetaData() 966 tbl = Table( 967 "test", 968 metadata, 969 Column("x", Integer, autoincrement=False), 970 Column("y", Integer, autoincrement=False), 971 UniqueConstraint("x", "y", mssql_clustered=False), 972 ) 973 self.assert_compile( 974 schema.CreateTable(tbl), 975 "CREATE TABLE test (x INTEGER NULL, y INTEGER NULL, " 976 "UNIQUE NONCLUSTERED (x, y))", 977 ) 978 979 def test_table_uc_clustering(self): 980 metadata = MetaData() 981 tbl = Table( 982 "test", 983 metadata, 984 Column("x", Integer, autoincrement=False), 985 Column("y", Integer, autoincrement=False), 986 PrimaryKeyConstraint("x"), 987 UniqueConstraint("y", mssql_clustered=True), 988 ) 989 self.assert_compile( 990 schema.CreateTable(tbl), 991 "CREATE TABLE test (x INTEGER NOT NULL, y INTEGER NULL, " 992 "PRIMARY KEY (x), UNIQUE CLUSTERED (y))", 993 ) 994 995 def test_index_clustering(self): 996 metadata = MetaData() 997 tbl = Table("test", metadata, Column("id", Integer)) 998 idx = Index("foo", tbl.c.id, mssql_clustered=True) 999 self.assert_compile( 1000 schema.CreateIndex(idx), "CREATE CLUSTERED INDEX foo ON test (id)" 1001 ) 1002 1003 def test_index_ordering(self): 1004 metadata = MetaData() 1005 tbl = Table( 1006 "test", 1007 metadata, 1008 Column("x", Integer), 1009 Column("y", Integer), 1010 Column("z", Integer), 1011 ) 1012 idx = Index("foo", tbl.c.x.desc(), "y") 1013 self.assert_compile( 1014 schema.CreateIndex(idx), "CREATE INDEX foo ON test (x DESC, y)" 1015 ) 1016 1017 def test_create_index_expr(self): 1018 m = MetaData() 1019 t1 = Table("foo", m, Column("x", Integer)) 1020 self.assert_compile( 1021 schema.CreateIndex(Index("bar", t1.c.x > 5)), 1022 "CREATE INDEX bar ON foo (x > 5)", 1023 ) 1024 1025 def test_drop_index_w_schema(self): 1026 m = MetaData() 1027 t1 = Table("foo", m, Column("x", Integer), schema="bar") 1028 self.assert_compile( 1029 schema.DropIndex(Index("idx_foo", t1.c.x)), 1030 "DROP INDEX idx_foo ON bar.foo", 1031 ) 1032 1033 def test_index_extra_include_1(self): 1034 metadata = MetaData() 1035 tbl = Table( 1036 "test", 1037 metadata, 1038 Column("x", Integer), 1039 Column("y", Integer), 1040 Column("z", Integer), 1041 ) 1042 idx = Index("foo", tbl.c.x, mssql_include=["y"]) 1043 self.assert_compile( 1044 schema.CreateIndex(idx), "CREATE INDEX foo ON test (x) INCLUDE (y)" 1045 ) 1046 1047 def test_index_extra_include_2(self): 1048 metadata = MetaData() 1049 tbl = Table( 1050 "test", 1051 metadata, 1052 Column("x", Integer), 1053 Column("y", Integer), 1054 Column("z", Integer), 1055 ) 1056 idx = Index("foo", tbl.c.x, mssql_include=[tbl.c.y]) 1057 self.assert_compile( 1058 schema.CreateIndex(idx), "CREATE INDEX foo ON test (x) INCLUDE (y)" 1059 ) 1060 1061 1062class SchemaTest(fixtures.TestBase): 1063 def setup(self): 1064 t = Table( 1065 "sometable", 1066 MetaData(), 1067 Column("pk_column", Integer), 1068 Column("test_column", String), 1069 ) 1070 self.column = t.c.test_column 1071 1072 dialect = mssql.dialect() 1073 self.ddl_compiler = dialect.ddl_compiler( 1074 dialect, schema.CreateTable(t) 1075 ) 1076 1077 def _column_spec(self): 1078 return self.ddl_compiler.get_column_specification(self.column) 1079 1080 def test_that_mssql_default_nullability_emits_null(self): 1081 eq_("test_column VARCHAR(max) NULL", self._column_spec()) 1082 1083 def test_that_mssql_none_nullability_does_not_emit_nullability(self): 1084 self.column.nullable = None 1085 eq_("test_column VARCHAR(max)", self._column_spec()) 1086 1087 def test_that_mssql_specified_nullable_emits_null(self): 1088 self.column.nullable = True 1089 eq_("test_column VARCHAR(max) NULL", self._column_spec()) 1090 1091 def test_that_mssql_specified_not_nullable_emits_not_null(self): 1092 self.column.nullable = False 1093 eq_("test_column VARCHAR(max) NOT NULL", self._column_spec()) 1094