1"""Test the TextClause and related constructs.""" 2 3from sqlalchemy import and_ 4from sqlalchemy import asc 5from sqlalchemy import bindparam 6from sqlalchemy import Column 7from sqlalchemy import desc 8from sqlalchemy import exc 9from sqlalchemy import extract 10from sqlalchemy import Float 11from sqlalchemy import func 12from sqlalchemy import Integer 13from sqlalchemy import literal 14from sqlalchemy import literal_column 15from sqlalchemy import MetaData 16from sqlalchemy import select 17from sqlalchemy import String 18from sqlalchemy import Table 19from sqlalchemy import testing 20from sqlalchemy import text 21from sqlalchemy import union 22from sqlalchemy import util 23from sqlalchemy.sql import column 24from sqlalchemy.sql import LABEL_STYLE_TABLENAME_PLUS_COL 25from sqlalchemy.sql import quoted_name 26from sqlalchemy.sql import sqltypes 27from sqlalchemy.sql import table 28from sqlalchemy.sql import util as sql_util 29from sqlalchemy.sql.selectable import LABEL_STYLE_DISAMBIGUATE_ONLY 30from sqlalchemy.sql.selectable import LABEL_STYLE_NONE 31from sqlalchemy.testing import assert_raises_message 32from sqlalchemy.testing import AssertsCompiledSQL 33from sqlalchemy.testing import eq_ 34from sqlalchemy.testing import fixtures 35from sqlalchemy.testing.assertions import expect_raises_message 36from sqlalchemy.types import NullType 37 38table1 = table( 39 "mytable", 40 column("myid", Integer), 41 column("name", String), 42 column("description", String), 43) 44 45table2 = table( 46 "myothertable", column("otherid", Integer), column("othername", String) 47) 48 49 50class CompileTest(fixtures.TestBase, AssertsCompiledSQL): 51 __dialect__ = "default" 52 53 def test_basic(self): 54 self.assert_compile( 55 text("select * from foo where lala = bar"), 56 "select * from foo where lala = bar", 57 ) 58 59 def test_text_adds_to_result_map(self): 60 t1, t2 = text("t1"), text("t2") 61 62 stmt = select(t1, t2) 63 compiled = stmt.compile() 64 eq_( 65 compiled._result_columns, 66 [ 67 (None, None, (t1,), sqltypes.NULLTYPE), 68 (None, None, (t2,), sqltypes.NULLTYPE), 69 ], 70 ) 71 72 73class SelectCompositionTest(fixtures.TestBase, AssertsCompiledSQL): 74 75 """test the usage of text() implicit within the select() construct 76 when strings are passed.""" 77 78 __dialect__ = "default" 79 80 def test_select_composition_one(self): 81 self.assert_compile( 82 select( 83 literal_column("foobar(a)"), 84 literal_column("pk_foo_bar(syslaal)"), 85 ) 86 .where(text("a = 12")) 87 .select_from( 88 text("foobar left outer join lala on foobar.foo = lala.foo") 89 ), 90 "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar " 91 "left outer join lala on foobar.foo = lala.foo WHERE a = 12", 92 ) 93 94 def test_select_composition_two(self): 95 s = select() 96 s = s.add_columns(column("column1"), column("column2")) 97 s = s.where(text("column1=12")) 98 s = s.where(text("column2=19")) 99 s = s.order_by("column1") 100 s = s.select_from(text("table1")) 101 self.assert_compile( 102 s, 103 "SELECT column1, column2 FROM table1 WHERE " 104 "column1=12 AND column2=19 ORDER BY column1", 105 ) 106 107 def test_select_composition_three(self): 108 self.assert_compile( 109 select(column("column1"), column("column2")) 110 .select_from(table1) 111 .alias("somealias") 112 .select(), 113 "SELECT somealias.column1, somealias.column2 FROM " 114 "(SELECT column1, column2 FROM mytable) AS somealias", 115 ) 116 117 def test_select_composition_four(self): 118 # test that use_labels doesn't interfere with literal columns 119 self.assert_compile( 120 select( 121 text("column1"), 122 column("column2"), 123 column("column3").label("bar"), 124 table1.c.myid, 125 ) 126 .select_from(table1) 127 .set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL), 128 "SELECT column1, column2, column3 AS bar, " 129 "mytable.myid AS mytable_myid " 130 "FROM mytable", 131 ) 132 133 def test_select_composition_five(self): 134 # test that use_labels doesn't interfere 135 # with literal columns that have textual labels 136 self.assert_compile( 137 select( 138 text("column1 AS foobar"), 139 text("column2 AS hoho"), 140 table1.c.myid, 141 ) 142 .select_from(table1) 143 .set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL), 144 "SELECT column1 AS foobar, column2 AS hoho, " 145 "mytable.myid AS mytable_myid FROM mytable", 146 ) 147 148 def test_select_composition_six(self): 149 # test that "auto-labeling of subquery columns" 150 # doesn't interfere with literal columns, 151 # exported columns don't get quoted. 152 # [ticket:4730] refines this but for the moment the behavior with 153 # no columns is being maintained. 154 self.assert_compile( 155 select( 156 literal_column("column1 AS foobar"), 157 literal_column("column2 AS hoho"), 158 table1.c.myid, 159 ) 160 .select_from(table1) 161 .subquery() 162 .select(), 163 "SELECT anon_1.column1 AS foobar, anon_1.column2 AS hoho, " 164 "anon_1.myid FROM " 165 "(SELECT column1 AS foobar, column2 AS hoho, " 166 "mytable.myid AS myid FROM mytable) AS anon_1", 167 ) 168 169 def test_select_composition_seven(self): 170 self.assert_compile( 171 select(literal_column("col1"), literal_column("col2")) 172 .select_from(table("tablename")) 173 .alias("myalias"), 174 "SELECT col1, col2 FROM tablename", 175 ) 176 177 def test_select_composition_eight(self): 178 self.assert_compile( 179 select(table1.alias("t"), text("foo.f")) 180 .where(text("foo.f = t.id")) 181 .select_from(text("(select f from bar where lala=heyhey) foo")), 182 "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, " 183 "(select f from bar where lala=heyhey) foo WHERE foo.f = t.id", 184 ) 185 186 def test_expression_element_role(self): 187 """test #7287""" 188 189 self.assert_compile( 190 extract("year", text("some_date + :param")), 191 "EXTRACT(year FROM some_date + :param)", 192 ) 193 194 @testing.combinations( 195 ( 196 None, 197 "SELECT mytable.myid, whatever FROM mytable " 198 "UNION SELECT mytable.myid, whatever FROM mytable", 199 ), 200 ( 201 LABEL_STYLE_NONE, 202 "SELECT mytable.myid, whatever FROM mytable " 203 "UNION SELECT mytable.myid, whatever FROM mytable", 204 ), 205 ( 206 LABEL_STYLE_DISAMBIGUATE_ONLY, 207 "SELECT mytable.myid, whatever FROM mytable " 208 "UNION SELECT mytable.myid, whatever FROM mytable", 209 ), 210 ( 211 LABEL_STYLE_TABLENAME_PLUS_COL, 212 "SELECT mytable.myid AS mytable_myid, whatever FROM mytable " 213 "UNION SELECT mytable.myid AS mytable_myid, whatever FROM mytable", 214 ), 215 ) 216 def test_select_composition_nine(self, label_style, expected): 217 218 s1 = select(table1.c.myid, text("whatever")) 219 if label_style: 220 s1 = s1.set_label_style(label_style) 221 222 s2 = select(table1.c.myid, text("whatever")) 223 224 if label_style: 225 s2 = s2.set_label_style(label_style) 226 227 stmt = s1.union(s2) 228 229 self.assert_compile(stmt, expected) 230 231 @testing.combinations( 232 ( 233 None, 234 "SELECT anon_1.myid FROM (SELECT mytable.myid AS myid, " 235 "whatever FROM mytable UNION SELECT mytable.myid AS myid, " 236 "whatever FROM mytable) AS anon_1", 237 ), 238 ( 239 LABEL_STYLE_NONE, 240 "SELECT anon_1.myid FROM (SELECT mytable.myid AS myid, " 241 "whatever FROM mytable UNION SELECT mytable.myid AS myid, " 242 "whatever FROM mytable) AS anon_1", 243 ), 244 ( 245 LABEL_STYLE_DISAMBIGUATE_ONLY, 246 "SELECT anon_1.myid FROM (SELECT mytable.myid AS myid, " 247 "whatever FROM mytable UNION SELECT mytable.myid AS myid, " 248 "whatever FROM mytable) AS anon_1", 249 ), 250 ( 251 LABEL_STYLE_TABLENAME_PLUS_COL, 252 "SELECT anon_1.mytable_myid FROM " 253 "(SELECT mytable.myid AS mytable_myid, whatever FROM mytable " 254 "UNION SELECT mytable.myid AS mytable_myid, whatever " 255 "FROM mytable) AS anon_1", 256 ), 257 ) 258 def test_select_composition_ten(self, label_style, expected): 259 260 s1 = select(table1.c.myid, text("whatever")) 261 if label_style: 262 s1 = s1.set_label_style(label_style) 263 264 s2 = select(table1.c.myid, text("whatever")) 265 266 if label_style: 267 s2 = s2.set_label_style(label_style) 268 269 stmt = s1.union(s2).subquery().select() 270 271 self.assert_compile(stmt, expected) 272 273 @testing.combinations( 274 (None, "SELECT mytable.myid, whatever FROM mytable"), 275 (LABEL_STYLE_NONE, "SELECT mytable.myid, whatever FROM mytable"), 276 ( 277 LABEL_STYLE_DISAMBIGUATE_ONLY, 278 "SELECT mytable.myid, whatever FROM mytable", 279 ), 280 ( 281 LABEL_STYLE_TABLENAME_PLUS_COL, 282 "SELECT mytable.myid AS mytable_myid, whatever FROM mytable", 283 ), 284 ) 285 def test_select_composition_eleven(self, label_style, expected): 286 287 stmt = select(table1.c.myid, text("whatever")) 288 if label_style: 289 stmt = stmt.set_label_style(label_style) 290 291 self.assert_compile(stmt, expected) 292 293 @testing.combinations( 294 (None, ["myid", "description"]), 295 (LABEL_STYLE_NONE, ["myid", "description"]), 296 (LABEL_STYLE_DISAMBIGUATE_ONLY, ["myid", "description"]), 297 ( 298 LABEL_STYLE_TABLENAME_PLUS_COL, 299 ["mytable_myid", "mytable_description"], 300 ), 301 ) 302 def test_select_selected_columns_ignores_text(self, label_style, expected): 303 304 stmt = select(table1.c.myid, text("whatever"), table1.c.description) 305 if label_style: 306 stmt = stmt.set_label_style(label_style) 307 308 eq_(stmt.selected_columns.keys(), expected) 309 310 def test_select_bundle_columns(self): 311 self.assert_compile( 312 select( 313 table1, 314 table2.c.otherid, 315 text("sysdate()"), 316 text("foo, bar, lala"), 317 ).where( 318 and_( 319 text("foo.id = foofoo(lala)"), 320 text("datetime(foo) = Today"), 321 table1.c.myid == table2.c.otherid, 322 ), 323 ), 324 "SELECT mytable.myid, mytable.name, mytable.description, " 325 "myothertable.otherid, sysdate(), foo, bar, lala " 326 "FROM mytable, myothertable WHERE foo.id = foofoo(lala) AND " 327 "datetime(foo) = Today AND mytable.myid = myothertable.otherid", 328 ) 329 330 331class BindParamTest(fixtures.TestBase, AssertsCompiledSQL): 332 __dialect__ = "default" 333 334 def test_positional(self): 335 t = text("select * from foo where lala=:bar and hoho=:whee") 336 t = t.bindparams(bindparam("bar", 4), bindparam("whee", 7)) 337 338 self.assert_compile( 339 t, 340 "select * from foo where lala=:bar and hoho=:whee", 341 checkparams={"bar": 4, "whee": 7}, 342 ) 343 344 def test_kw(self): 345 t = text("select * from foo where lala=:bar and hoho=:whee") 346 t = t.bindparams(bar=4, whee=7) 347 348 self.assert_compile( 349 t, 350 "select * from foo where lala=:bar and hoho=:whee", 351 checkparams={"bar": 4, "whee": 7}, 352 ) 353 354 def test_positional_plus_kw(self): 355 t = text("select * from foo where lala=:bar and hoho=:whee") 356 t = t.bindparams(bindparam("bar", 4), whee=7) 357 358 self.assert_compile( 359 t, 360 "select * from foo where lala=:bar and hoho=:whee", 361 checkparams={"bar": 4, "whee": 7}, 362 ) 363 364 def test_literal_binds(self): 365 t = text("select * from foo where lala=:bar and hoho=:whee") 366 t = t.bindparams(bindparam("bar", 4), whee="whee") 367 368 self.assert_compile( 369 t, 370 "select * from foo where lala=4 and hoho='whee'", 371 checkparams={}, 372 literal_binds=True, 373 ) 374 375 def _assert_type_map(self, t, compare): 376 map_ = dict((b.key, b.type) for b in t._bindparams.values()) 377 for k in compare: 378 assert compare[k]._type_affinity is map_[k]._type_affinity 379 380 def test_typing_construction(self): 381 t = text("select * from table :foo :bar :bat") 382 383 self._assert_type_map( 384 t, {"foo": NullType(), "bar": NullType(), "bat": NullType()} 385 ) 386 387 t = t.bindparams(bindparam("foo", type_=String)) 388 389 self._assert_type_map( 390 t, {"foo": String(), "bar": NullType(), "bat": NullType()} 391 ) 392 393 t = t.bindparams(bindparam("bar", type_=Integer)) 394 395 self._assert_type_map( 396 t, {"foo": String(), "bar": Integer(), "bat": NullType()} 397 ) 398 399 t = t.bindparams(bat=45.564) 400 401 self._assert_type_map( 402 t, {"foo": String(), "bar": Integer(), "bat": Float()} 403 ) 404 405 def test_binds_compiled_named(self): 406 self.assert_compile( 407 text( 408 "select * from foo where lala=:bar and hoho=:whee" 409 ).bindparams(bar=4, whee=7), 410 "select * from foo where lala=%(bar)s and hoho=%(whee)s", 411 checkparams={"bar": 4, "whee": 7}, 412 dialect="postgresql", 413 ) 414 415 def test_unique_binds(self): 416 # unique binds can be used in text() however they uniquify across 417 # multiple text() constructs only, not within a single text 418 419 t1 = text("select :foo").bindparams(bindparam("foo", 5, unique=True)) 420 t2 = text("select :foo").bindparams(bindparam("foo", 10, unique=True)) 421 stmt = select(t1, t2) 422 self.assert_compile( 423 stmt, 424 "SELECT select :foo_1, select :foo_2", 425 checkparams={"foo_1": 5, "foo_2": 10}, 426 ) 427 428 def test_binds_compiled_positional(self): 429 self.assert_compile( 430 text( 431 "select * from foo where lala=:bar and hoho=:whee" 432 ).bindparams(bar=4, whee=7), 433 "select * from foo where lala=? and hoho=?", 434 checkparams={"bar": 4, "whee": 7}, 435 dialect="sqlite", 436 ) 437 438 def test_missing_bind_kw(self): 439 assert_raises_message( 440 exc.ArgumentError, 441 r"This text\(\) construct doesn't define " 442 r"a bound parameter named 'bar'", 443 text(":foo").bindparams, 444 foo=5, 445 bar=7, 446 ) 447 448 def test_missing_bind_posn(self): 449 assert_raises_message( 450 exc.ArgumentError, 451 r"This text\(\) construct doesn't define " 452 r"a bound parameter named 'bar'", 453 text(":foo").bindparams, 454 bindparam("foo", value=5), 455 bindparam("bar", value=7), 456 ) 457 458 def test_escaping_colons(self): 459 # test escaping out text() params with a backslash 460 self.assert_compile( 461 text( 462 r"select * from foo where clock='05:06:07' " 463 r"and mork='\:mindy'" 464 ), 465 "select * from foo where clock='05:06:07' and mork=':mindy'", 466 checkparams={}, 467 params={}, 468 dialect="postgresql", 469 ) 470 471 def test_escaping_double_colons(self): 472 self.assert_compile( 473 text( 474 r"SELECT * FROM pg_attribute WHERE " 475 r"attrelid = :tab\:\:regclass" 476 ), 477 "SELECT * FROM pg_attribute WHERE " "attrelid = %(tab)s::regclass", 478 params={"tab": None}, 479 dialect="postgresql", 480 ) 481 482 def test_double_colons_dont_actually_need_escaping(self): 483 # this is news to me. bound param won't work but you can put the 484 # double colons in 485 self.assert_compile( 486 text( 487 r"SELECT * FROM pg_attribute WHERE " 488 r"attrelid = foo::regclass" 489 ), 490 "SELECT * FROM pg_attribute WHERE " "attrelid = foo::regclass", 491 params={}, 492 dialect="postgresql", 493 ) 494 495 def test_text_in_select_nonfrom(self): 496 497 generate_series = text( 498 "generate_series(:x, :y, :z) as s(a)" 499 ).bindparams(x=None, y=None, z=None) 500 501 s = select( 502 (func.current_date() + literal_column("s.a")).label("dates") 503 ).select_from(generate_series) 504 505 self.assert_compile( 506 s, 507 "SELECT CURRENT_DATE + s.a AS dates FROM " 508 "generate_series(:x, :y, :z) as s(a)", 509 checkparams={"y": None, "x": None, "z": None}, 510 ) 511 512 self.assert_compile( 513 s.params(x=5, y=6, z=7), 514 "SELECT CURRENT_DATE + s.a AS dates FROM " 515 "generate_series(:x, :y, :z) as s(a)", 516 checkparams={"y": 6, "x": 5, "z": 7}, 517 ) 518 519 def test_escaping_percent_signs(self): 520 stmt = text("select '%' where foo like '%bar%'") 521 self.assert_compile( 522 stmt, "select '%' where foo like '%bar%'", dialect="sqlite" 523 ) 524 525 self.assert_compile( 526 stmt, "select '%%' where foo like '%%bar%%'", dialect="mysql" 527 ) 528 529 def test_percent_signs_literal_binds(self): 530 stmt = select(literal("percent % signs %%")) 531 self.assert_compile( 532 stmt, 533 "SELECT 'percent % signs %%' AS anon_1", 534 dialect="sqlite", 535 literal_binds=True, 536 ) 537 538 self.assert_compile( 539 stmt, 540 "SELECT 'percent %% signs %%%%' AS anon_1", 541 dialect="mysql", 542 literal_binds=True, 543 ) 544 545 546class AsFromTest(fixtures.TestBase, AssertsCompiledSQL): 547 __dialect__ = "default" 548 549 def test_basic_toplevel_resultmap_positional(self): 550 t = text("select id, name from user").columns( 551 column("id", Integer), column("name") 552 ) 553 554 col_pos = {col.name: idx for idx, col in enumerate(t.selected_columns)} 555 556 compiled = t.compile() 557 eq_( 558 compiled._create_result_map(), 559 { 560 "id": ( 561 "id", 562 (t.selected_columns.id, "id", "id", "id"), 563 t.selected_columns.id.type, 564 col_pos["id"], 565 ), 566 "name": ( 567 "name", 568 (t.selected_columns.name, "name", "name", "name"), 569 t.selected_columns.name.type, 570 col_pos["name"], 571 ), 572 }, 573 ) 574 575 def test_basic_toplevel_resultmap(self): 576 t = text("select id, name from user").columns(id=Integer, name=String) 577 578 col_pos = {col.name: idx for idx, col in enumerate(t.selected_columns)} 579 580 compiled = t.compile() 581 eq_( 582 compiled._create_result_map(), 583 { 584 "id": ( 585 "id", 586 (t.selected_columns.id, "id", "id", "id"), 587 t.selected_columns.id.type, 588 col_pos["id"], 589 ), 590 "name": ( 591 "name", 592 (t.selected_columns.name, "name", "name", "name"), 593 t.selected_columns.name.type, 594 col_pos["name"], 595 ), 596 }, 597 ) 598 599 def test_basic_subquery_resultmap(self): 600 t = ( 601 text("select id, name from user") 602 .columns(id=Integer, name=String) 603 .subquery() 604 ) 605 606 stmt = select(table1.c.myid).select_from( 607 table1.join(t, table1.c.myid == t.c.id) 608 ) 609 compiled = stmt.compile() 610 eq_( 611 compiled._create_result_map(), 612 { 613 "myid": ( 614 "myid", 615 (table1.c.myid, "myid", "myid", "mytable_myid"), 616 table1.c.myid.type, 617 0, 618 ) 619 }, 620 ) 621 622 def test_column_collection_ordered(self): 623 t = text("select a, b, c from foo").columns( 624 column("a"), column("b"), column("c") 625 ) 626 eq_(t.selected_columns.keys(), ["a", "b", "c"]) 627 628 def test_column_collection_pos_plus_bykey(self): 629 # overlapping positional names + type names 630 t = text("select a, b, c from foo").columns( 631 column("a"), column("b"), b=Integer, c=String 632 ) 633 eq_(t.selected_columns.keys(), ["a", "b", "c"]) 634 eq_(t.selected_columns.b.type._type_affinity, Integer) 635 eq_(t.selected_columns.c.type._type_affinity, String) 636 637 def _xy_table_fixture(self): 638 m = MetaData() 639 t = Table("t", m, Column("x", Integer), Column("y", Integer)) 640 return t 641 642 def _mapping(self, stmt): 643 compiled = stmt.compile() 644 return dict( 645 (elem, key) 646 for key, elements in compiled._create_result_map().items() 647 for elem in elements[1] 648 ) 649 650 def test_select_label_alt_name(self): 651 t = self._xy_table_fixture() 652 l1, l2 = t.c.x.label("a"), t.c.y.label("b") 653 s = text("select x AS a, y AS b FROM t").columns(l1, l2) 654 mapping = self._mapping(s) 655 assert l1 in mapping 656 657 assert t.c.x not in mapping 658 659 def test_select_alias_label_alt_name(self): 660 t = self._xy_table_fixture() 661 l1, l2 = t.c.x.label("a"), t.c.y.label("b") 662 s = text("select x AS a, y AS b FROM t").columns(l1, l2).alias() 663 mapping = self._mapping(s) 664 assert l1 in mapping 665 666 assert t.c.x not in mapping 667 668 def test_select_column(self): 669 t = self._xy_table_fixture() 670 x, y = t.c.x, t.c.y 671 s = text("select x, y FROM t").columns(x, y) 672 mapping = self._mapping(s) 673 assert t.c.x in mapping 674 675 def test_select_alias_column(self): 676 t = self._xy_table_fixture() 677 x, y = t.c.x, t.c.y 678 s = text("select x, y FROM t").columns(x, y).alias() 679 mapping = self._mapping(s) 680 assert t.c.x in mapping 681 682 def test_select_table_alias_column(self): 683 t = self._xy_table_fixture() 684 x = t.c.x 685 686 ta = t.alias() 687 s = text("select ta.x, ta.y FROM t AS ta").columns(ta.c.x, ta.c.y) 688 mapping = self._mapping(s) 689 assert x not in mapping 690 691 def test_select_label_alt_name_table_alias_column(self): 692 t = self._xy_table_fixture() 693 x = t.c.x 694 695 ta = t.alias() 696 l1, l2 = ta.c.x.label("a"), ta.c.y.label("b") 697 698 s = text("SELECT ta.x AS a, ta.y AS b FROM t AS ta").columns(l1, l2) 699 mapping = self._mapping(s) 700 assert x not in mapping 701 assert l1 in mapping 702 assert ta.c.x not in mapping 703 704 def test_cte(self): 705 t = ( 706 text("select id, name from user") 707 .columns(id=Integer, name=String) 708 .cte("t") 709 ) 710 711 s = select(table1).where(table1.c.myid == t.c.id) 712 self.assert_compile( 713 s, 714 "WITH t AS (select id, name from user) " 715 "SELECT mytable.myid, mytable.name, mytable.description " 716 "FROM mytable, t WHERE mytable.myid = t.id", 717 ) 718 719 def test_subquery(self): 720 t = ( 721 text("select id, name from user") 722 .columns(id=Integer, name=String) 723 .subquery() 724 ) 725 726 stmt = ( 727 select(table1.c.myid) 728 .select_from(table1.join(t, table1.c.myid == t.c.id)) 729 .order_by(t.c.name) 730 ) 731 732 self.assert_compile( 733 stmt, 734 "SELECT mytable.myid FROM mytable JOIN " 735 "(select id, name from user) AS anon_1 " 736 "ON mytable.myid = anon_1.id ORDER BY anon_1.name", 737 ) 738 739 def test_alias(self): 740 t = ( 741 text("select id, name from user") 742 .columns(id=Integer, name=String) 743 .alias("t") 744 ) 745 746 s = select(table1).where(table1.c.myid == t.c.id) 747 self.assert_compile( 748 s, 749 "SELECT mytable.myid, mytable.name, mytable.description " 750 "FROM mytable, (select id, name from user) AS t " 751 "WHERE mytable.myid = t.id", 752 ) 753 754 def test_scalar_subquery(self): 755 t = text("select id from user").columns(id=Integer) 756 subq = t.scalar_subquery() 757 758 assert subq.type._type_affinity is Integer()._type_affinity 759 760 s = select(table1.c.myid, subq).where(table1.c.myid == subq) 761 self.assert_compile( 762 s, 763 "SELECT mytable.myid, (select id from user) AS anon_1 " 764 "FROM mytable WHERE mytable.myid = (select id from user)", 765 ) 766 767 def test_build_bindparams(self): 768 t = text("select id from user :foo :bar :bat") 769 t = t.bindparams(bindparam("foo", type_=Integer)) 770 t = t.columns(id=Integer) 771 t = t.bindparams(bar=String) 772 t = t.bindparams(bindparam("bat", value="bat")) 773 774 eq_(set(t.element._bindparams), set(["bat", "foo", "bar"])) 775 776 777class TextErrorsTest(fixtures.TestBase, AssertsCompiledSQL): 778 __dialect__ = "default" 779 780 def _test(self, fn, arg, offending_clause): 781 arg = util.to_list(arg) 782 783 assert_raises_message( 784 exc.ArgumentError, 785 r"Textual (?:SQL|column|SQL FROM) expression %(stmt)r should be " 786 r"explicitly declared (?:with|as) text\(%(stmt)r\)" 787 % {"stmt": util.ellipses_string(offending_clause)}, 788 fn, 789 *arg 790 ) 791 792 def test_where(self): 793 self._test(select(table1.c.myid).where, "myid == 5", "myid == 5") 794 795 def test_column(self): 796 self._test(select, ["myid"], "myid") 797 798 def test_having(self): 799 self._test(select(table1.c.myid).having, "myid == 5", "myid == 5") 800 801 def test_from(self): 802 self._test(select(table1.c.myid).select_from, "mytable", "mytable") 803 804 805class OrderByLabelResolutionTest(fixtures.TestBase, AssertsCompiledSQL): 806 __dialect__ = "default" 807 808 def _test_exception(self, stmt, offending_clause, dialect=None): 809 assert_raises_message( 810 exc.CompileError, 811 r"Can't resolve label reference for ORDER BY / GROUP BY / " 812 "DISTINCT etc. " 813 "Textual SQL " 814 "expression %r should be explicitly " 815 r"declared as text\(%r\)" % (offending_clause, offending_clause), 816 stmt.compile, 817 dialect=dialect, 818 ) 819 820 def test_order_by_label(self): 821 stmt = select(table1.c.myid.label("foo")).order_by("foo") 822 self.assert_compile( 823 stmt, "SELECT mytable.myid AS foo FROM mytable ORDER BY foo" 824 ) 825 826 def test_no_order_by_text(self): 827 stmt = select(text("foo")).order_by("foo") 828 829 with expect_raises_message( 830 exc.CompileError, 831 r"Can't resolve label reference for ORDER BY / GROUP BY / ", 832 ): 833 stmt.compile() 834 835 def test_order_by_colname(self): 836 stmt = select(table1.c.myid).order_by("name") 837 self.assert_compile( 838 stmt, "SELECT mytable.myid FROM mytable ORDER BY mytable.name" 839 ) 840 841 def test_order_by_alias_colname(self): 842 t1 = table1.alias() 843 stmt = ( 844 select(t1.c.myid) 845 .set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) 846 .order_by("name") 847 ) 848 self.assert_compile( 849 stmt, 850 "SELECT mytable_1.myid AS mytable_1_myid " 851 "FROM mytable AS mytable_1 ORDER BY mytable_1.name", 852 ) 853 854 @testing.combinations( 855 ((column("q") + 5).label("a"), "a", ()), 856 (column("q").op("+")(5).label("a"), "a", ()), 857 ((column("q") + 5).label("a"), "a DESC", (desc,)), 858 (column("q").op("+")(5).label("a"), "a DESC", (desc,)), 859 ) 860 def test_order_by_expr(self, case, expected, modifiers): 861 862 order_by = case 863 for mod in modifiers: 864 order_by = mod(order_by) 865 866 stmt = select(case).order_by(order_by) 867 868 col_expr = str(case) 869 self.assert_compile( 870 stmt, "SELECT %s AS a ORDER BY %s" % (col_expr, expected) 871 ) 872 873 def test_order_by_named_label_from_anon_label(self): 874 s1 = select(table1.c.myid.label(None).label("foo"), table1.c.name) 875 stmt = s1.order_by("foo") 876 self.assert_compile( 877 stmt, 878 "SELECT mytable.myid AS foo, mytable.name " 879 "FROM mytable ORDER BY foo", 880 ) 881 882 def test_order_by_outermost_label(self): 883 # test [ticket:3335], assure that order_by("foo") 884 # catches the label named "foo" in the columns clause only, 885 # and not the label named "foo" in the FROM clause 886 s1 = select(table1.c.myid.label("foo"), table1.c.name).alias() 887 stmt = select(s1.c.name, func.bar().label("foo")).order_by("foo") 888 889 self.assert_compile( 890 stmt, 891 "SELECT anon_1.name, bar() AS foo FROM " 892 "(SELECT mytable.myid AS foo, mytable.name AS name " 893 "FROM mytable) AS anon_1 ORDER BY foo", 894 ) 895 896 def test_unresolvable_warning_order_by(self): 897 stmt = select(table1.c.myid).order_by("foobar") 898 self._test_exception(stmt, "foobar") 899 900 def test_distinct_label(self): 901 902 stmt = select(table1.c.myid.label("foo")).distinct("foo") 903 self.assert_compile( 904 stmt, 905 "SELECT DISTINCT ON (foo) mytable.myid AS foo FROM mytable", 906 dialect="postgresql", 907 ) 908 909 def test_distinct_label_keyword(self): 910 911 stmt = select(table1.c.myid.label("foo")).distinct("foo") 912 self.assert_compile( 913 stmt, 914 "SELECT DISTINCT ON (foo) mytable.myid AS foo FROM mytable", 915 dialect="postgresql", 916 ) 917 918 def test_unresolvable_distinct_label(self): 919 from sqlalchemy.dialects import postgresql 920 921 stmt = select(table1.c.myid.label("foo")).distinct("not a label") 922 self._test_exception(stmt, "not a label", dialect=postgresql.dialect()) 923 924 def test_group_by_label(self): 925 stmt = select(table1.c.myid.label("foo")).group_by("foo") 926 self.assert_compile( 927 stmt, "SELECT mytable.myid AS foo FROM mytable GROUP BY foo" 928 ) 929 930 def test_group_by_colname(self): 931 stmt = select(table1.c.myid).group_by("name") 932 self.assert_compile( 933 stmt, "SELECT mytable.myid FROM mytable GROUP BY mytable.name" 934 ) 935 936 def test_unresolvable_warning_group_by(self): 937 stmt = select(table1.c.myid).group_by("foobar") 938 self._test_exception(stmt, "foobar") 939 940 def test_asc(self): 941 stmt = select(table1.c.myid).order_by(asc("name"), "description") 942 self.assert_compile( 943 stmt, 944 "SELECT mytable.myid FROM mytable " 945 "ORDER BY mytable.name ASC, mytable.description", 946 ) 947 948 def test_group_by_subquery(self): 949 stmt = select(table1).alias() 950 stmt = ( 951 select(stmt) 952 .set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) 953 .group_by("myid") 954 ) 955 self.assert_compile( 956 stmt, 957 "SELECT anon_1.myid AS anon_1_myid, anon_1.name AS anon_1_name, " 958 "anon_1.description AS anon_1_description FROM " 959 "(SELECT mytable.myid AS myid, mytable.name AS name, " 960 "mytable.description AS description FROM mytable) AS anon_1 " 961 "GROUP BY anon_1.myid", 962 ) 963 964 def test_order_by_literal_col_quoting_one(self): 965 col = literal_column("SUM(ABC)").label("SUM(ABC)") 966 tbl = table("my_table") 967 query = select(col).select_from(tbl).order_by(col) 968 self.assert_compile( 969 query, 970 'SELECT SUM(ABC) AS "SUM(ABC)" FROM my_table ORDER BY "SUM(ABC)"', 971 ) 972 973 def test_order_by_literal_col_quoting_two(self): 974 col = literal_column("SUM(ABC)").label("SUM(ABC)_") 975 tbl = table("my_table") 976 query = select(col).select_from(tbl).order_by(col) 977 self.assert_compile( 978 query, 979 'SELECT SUM(ABC) AS "SUM(ABC)_" FROM my_table ORDER BY ' 980 '"SUM(ABC)_"', 981 ) 982 983 def test_order_by_literal_col_quoting_one_explict_quote(self): 984 col = literal_column("SUM(ABC)").label(quoted_name("SUM(ABC)", True)) 985 tbl = table("my_table") 986 query = select(col).select_from(tbl).order_by(col) 987 self.assert_compile( 988 query, 989 'SELECT SUM(ABC) AS "SUM(ABC)" FROM my_table ORDER BY "SUM(ABC)"', 990 ) 991 992 def test_order_by_literal_col_quoting_two_explicit_quote(self): 993 col = literal_column("SUM(ABC)").label(quoted_name("SUM(ABC)_", True)) 994 tbl = table("my_table") 995 query = select(col).select_from(tbl).order_by(col) 996 self.assert_compile( 997 query, 998 'SELECT SUM(ABC) AS "SUM(ABC)_" FROM my_table ORDER BY ' 999 '"SUM(ABC)_"', 1000 ) 1001 1002 def test_order_by_func_label_desc(self): 1003 stmt = select(func.foo("bar").label("fb"), table1).order_by(desc("fb")) 1004 1005 self.assert_compile( 1006 stmt, 1007 "SELECT foo(:foo_1) AS fb, mytable.myid, mytable.name, " 1008 "mytable.description FROM mytable ORDER BY fb DESC", 1009 ) 1010 1011 def test_pg_distinct(self): 1012 stmt = select(table1).distinct("name") 1013 self.assert_compile( 1014 stmt, 1015 "SELECT DISTINCT ON (mytable.name) mytable.myid, " 1016 "mytable.name, mytable.description FROM mytable", 1017 dialect="postgresql", 1018 ) 1019 1020 def test_over(self): 1021 stmt = select(column("foo"), column("bar")).subquery() 1022 stmt = select( 1023 func.row_number().over(order_by="foo", partition_by="bar") 1024 ).select_from(stmt) 1025 1026 self.assert_compile( 1027 stmt, 1028 "SELECT row_number() OVER " 1029 "(PARTITION BY anon_2.bar ORDER BY anon_2.foo) " 1030 "AS anon_1 FROM (SELECT foo, bar) AS anon_2", 1031 ) 1032 1033 def test_union_column(self): 1034 s1 = select(table1) 1035 s2 = select(table1) 1036 stmt = union(s1, s2).order_by("name") 1037 self.assert_compile( 1038 stmt, 1039 "SELECT mytable.myid, mytable.name, mytable.description FROM " 1040 "mytable UNION SELECT mytable.myid, mytable.name, " 1041 "mytable.description FROM mytable ORDER BY name", 1042 ) 1043 1044 def test_union_label(self): 1045 s1 = select(func.foo("hoho").label("x")) 1046 s2 = select(func.foo("Bar").label("y")) 1047 stmt = union(s1, s2).order_by("x") 1048 self.assert_compile( 1049 stmt, 1050 "SELECT foo(:foo_1) AS x UNION SELECT foo(:foo_2) AS y ORDER BY x", 1051 ) 1052 1053 def test_standalone_units_stringable(self): 1054 self.assert_compile(desc("somelabel"), "somelabel DESC") 1055 1056 def test_columnadapter_anonymized(self): 1057 """test issue #3148 1058 1059 Testing the anonymization applied from the ColumnAdapter.columns 1060 collection, typically as used in eager loading. 1061 1062 """ 1063 exprs = [ 1064 table1.c.myid, 1065 table1.c.name.label("t1name"), 1066 func.foo("hoho").label("x"), 1067 ] 1068 1069 ta = table1.alias() 1070 adapter = sql_util.ColumnAdapter(ta, anonymize_labels=True) 1071 1072 s1 = ( 1073 select(*[adapter.columns[expr] for expr in exprs]) 1074 .set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) 1075 .order_by("myid", "t1name", "x") 1076 ) 1077 1078 assert_raises_message( 1079 exc.CompileError, 1080 r"Can't resolve label reference for ORDER BY / GROUP BY / " 1081 "DISTINCT etc. " 1082 "Textual SQL " 1083 "expression 't1name' should be explicitly " 1084 r"declared as text\('t1name'\)", 1085 s1.compile, 1086 ) 1087 1088 def test_columnadapter_non_anonymized(self): 1089 """test issue #3148 1090 1091 Testing the anonymization applied from the ColumnAdapter.columns 1092 collection, typically as used in eager loading. 1093 1094 """ 1095 exprs = [ 1096 table1.c.myid, 1097 table1.c.name.label("t1name"), 1098 func.foo("hoho").label("x"), 1099 ] 1100 1101 ta = table1.alias() 1102 adapter = sql_util.ColumnAdapter(ta) 1103 1104 s1 = ( 1105 select(*[adapter.columns[expr] for expr in exprs]) 1106 .set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) 1107 .order_by("myid", "t1name", "x") 1108 ) 1109 1110 # labels are maintained 1111 self.assert_compile( 1112 s1, 1113 "SELECT mytable_1.myid AS mytable_1_myid, " 1114 "mytable_1.name AS t1name, foo(:foo_1) AS x " 1115 "FROM mytable AS mytable_1 ORDER BY mytable_1.myid, t1name, x", 1116 ) 1117