1from sqlalchemy import and_ 2from sqlalchemy import bindparam 3from sqlalchemy import case 4from sqlalchemy import Column 5from sqlalchemy import exc 6from sqlalchemy import extract 7from sqlalchemy import ForeignKey 8from sqlalchemy import func 9from sqlalchemy import Integer 10from sqlalchemy import literal_column 11from sqlalchemy import MetaData 12from sqlalchemy import select 13from sqlalchemy import String 14from sqlalchemy import Table 15from sqlalchemy import testing 16from sqlalchemy import text 17from sqlalchemy import tuple_ 18from sqlalchemy import union 19from sqlalchemy.sql import ClauseElement 20from sqlalchemy.sql import column 21from sqlalchemy.sql import operators 22from sqlalchemy.sql import table 23from sqlalchemy.sql import util as sql_util 24from sqlalchemy.sql import visitors 25from sqlalchemy.sql.expression import _clone 26from sqlalchemy.sql.expression import _from_objects 27from sqlalchemy.sql.visitors import ClauseVisitor 28from sqlalchemy.sql.visitors import cloned_traverse 29from sqlalchemy.sql.visitors import CloningVisitor 30from sqlalchemy.sql.visitors import ReplacingCloningVisitor 31from sqlalchemy.testing import assert_raises 32from sqlalchemy.testing import assert_raises_message 33from sqlalchemy.testing import AssertsCompiledSQL 34from sqlalchemy.testing import AssertsExecutionResults 35from sqlalchemy.testing import eq_ 36from sqlalchemy.testing import fixtures 37from sqlalchemy.testing import is_ 38from sqlalchemy.testing import is_not_ 39 40A = B = t1 = t2 = t3 = table1 = table2 = table3 = table4 = None 41 42 43class TraversalTest(fixtures.TestBase, AssertsExecutionResults): 44 45 """test ClauseVisitor's traversal, particularly its 46 ability to copy and modify a ClauseElement in place.""" 47 48 @classmethod 49 def setup_class(cls): 50 global A, B 51 52 # establish two fictitious ClauseElements. 53 # define deep equality semantics as well as deep 54 # identity semantics. 55 class A(ClauseElement): 56 __visit_name__ = "a" 57 58 def __init__(self, expr): 59 self.expr = expr 60 61 def is_other(self, other): 62 return other is self 63 64 __hash__ = ClauseElement.__hash__ 65 66 def __eq__(self, other): 67 return other.expr == self.expr 68 69 def __ne__(self, other): 70 return other.expr != self.expr 71 72 def __str__(self): 73 return "A(%s)" % repr(self.expr) 74 75 class B(ClauseElement): 76 __visit_name__ = "b" 77 78 def __init__(self, *items): 79 self.items = items 80 81 def is_other(self, other): 82 if other is not self: 83 return False 84 for i1, i2 in zip(self.items, other.items): 85 if i1 is not i2: 86 return False 87 return True 88 89 __hash__ = ClauseElement.__hash__ 90 91 def __eq__(self, other): 92 for i1, i2 in zip(self.items, other.items): 93 if i1 != i2: 94 return False 95 return True 96 97 def __ne__(self, other): 98 for i1, i2 in zip(self.items, other.items): 99 if i1 != i2: 100 return True 101 return False 102 103 def _copy_internals(self, clone=_clone): 104 self.items = [clone(i) for i in self.items] 105 106 def get_children(self, **kwargs): 107 return self.items 108 109 def __str__(self): 110 return "B(%s)" % repr([str(i) for i in self.items]) 111 112 def test_test_classes(self): 113 a1 = A("expr1") 114 struct = B(a1, A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")) 115 struct2 = B(a1, A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")) 116 struct3 = B( 117 a1, A("expr2"), B(A("expr1b"), A("expr2bmodified")), A("expr3") 118 ) 119 120 assert a1.is_other(a1) 121 assert struct.is_other(struct) 122 assert struct == struct2 123 assert struct != struct3 124 assert not struct.is_other(struct2) 125 assert not struct.is_other(struct3) 126 127 def test_clone(self): 128 struct = B( 129 A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3") 130 ) 131 132 class Vis(CloningVisitor): 133 def visit_a(self, a): 134 pass 135 136 def visit_b(self, b): 137 pass 138 139 vis = Vis() 140 s2 = vis.traverse(struct) 141 assert struct == s2 142 assert not struct.is_other(s2) 143 144 def test_no_clone(self): 145 struct = B( 146 A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3") 147 ) 148 149 class Vis(ClauseVisitor): 150 def visit_a(self, a): 151 pass 152 153 def visit_b(self, b): 154 pass 155 156 vis = Vis() 157 s2 = vis.traverse(struct) 158 assert struct == s2 159 assert struct.is_other(s2) 160 161 def test_clone_anon_label(self): 162 from sqlalchemy.sql.elements import Grouping 163 164 c1 = Grouping(literal_column("q")) 165 s1 = select([c1]) 166 167 class Vis(CloningVisitor): 168 def visit_grouping(self, elem): 169 pass 170 171 vis = Vis() 172 s2 = vis.traverse(s1) 173 eq_(list(s2.inner_columns)[0].anon_label, c1.anon_label) 174 175 def test_change_in_place(self): 176 struct = B( 177 A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3") 178 ) 179 struct2 = B( 180 A("expr1"), 181 A("expr2modified"), 182 B(A("expr1b"), A("expr2b")), 183 A("expr3"), 184 ) 185 struct3 = B( 186 A("expr1"), 187 A("expr2"), 188 B(A("expr1b"), A("expr2bmodified")), 189 A("expr3"), 190 ) 191 192 class Vis(CloningVisitor): 193 def visit_a(self, a): 194 if a.expr == "expr2": 195 a.expr = "expr2modified" 196 197 def visit_b(self, b): 198 pass 199 200 vis = Vis() 201 s2 = vis.traverse(struct) 202 assert struct != s2 203 assert not struct.is_other(s2) 204 assert struct2 == s2 205 206 class Vis2(CloningVisitor): 207 def visit_a(self, a): 208 if a.expr == "expr2b": 209 a.expr = "expr2bmodified" 210 211 def visit_b(self, b): 212 pass 213 214 vis2 = Vis2() 215 s3 = vis2.traverse(struct) 216 assert struct != s3 217 assert struct3 == s3 218 219 def test_visit_name(self): 220 # override fns in testlib/schema.py 221 from sqlalchemy import Column 222 223 class CustomObj(Column): 224 pass 225 226 assert CustomObj.__visit_name__ == Column.__visit_name__ == "column" 227 228 foo, bar = CustomObj("foo", String), CustomObj("bar", String) 229 bin_ = foo == bar 230 set(ClauseVisitor().iterate(bin_)) 231 assert set(ClauseVisitor().iterate(bin_)) == set([foo, bar, bin_]) 232 233 234class BinaryEndpointTraversalTest(fixtures.TestBase): 235 236 """test the special binary product visit""" 237 238 def _assert_traversal(self, expr, expected): 239 canary = [] 240 241 def visit(binary, l, r): 242 canary.append((binary.operator, l, r)) 243 print(binary.operator, l, r) 244 245 sql_util.visit_binary_product(visit, expr) 246 eq_(canary, expected) 247 248 def test_basic(self): 249 a, b = column("a"), column("b") 250 self._assert_traversal(a == b, [(operators.eq, a, b)]) 251 252 def test_with_tuples(self): 253 a, b, c, d, b1, b1a, b1b, e, f = ( 254 column("a"), 255 column("b"), 256 column("c"), 257 column("d"), 258 column("b1"), 259 column("b1a"), 260 column("b1b"), 261 column("e"), 262 column("f"), 263 ) 264 expr = tuple_(a, b, b1 == tuple_(b1a, b1b == d), c) > tuple_( 265 func.go(e + f) 266 ) 267 self._assert_traversal( 268 expr, 269 [ 270 (operators.gt, a, e), 271 (operators.gt, a, f), 272 (operators.gt, b, e), 273 (operators.gt, b, f), 274 (operators.eq, b1, b1a), 275 (operators.eq, b1b, d), 276 (operators.gt, c, e), 277 (operators.gt, c, f), 278 ], 279 ) 280 281 def test_composed(self): 282 a, b, e, f, q, j, r = ( 283 column("a"), 284 column("b"), 285 column("e"), 286 column("f"), 287 column("q"), 288 column("j"), 289 column("r"), 290 ) 291 expr = and_((a + b) == q + func.sum(e + f), and_(j == r, f == q)) 292 self._assert_traversal( 293 expr, 294 [ 295 (operators.eq, a, q), 296 (operators.eq, a, e), 297 (operators.eq, a, f), 298 (operators.eq, b, q), 299 (operators.eq, b, e), 300 (operators.eq, b, f), 301 (operators.eq, j, r), 302 (operators.eq, f, q), 303 ], 304 ) 305 306 def test_subquery(self): 307 a, b, c = column("a"), column("b"), column("c") 308 subq = select([c]).where(c == a).as_scalar() 309 expr = and_(a == b, b == subq) 310 self._assert_traversal( 311 expr, [(operators.eq, a, b), (operators.eq, b, subq)] 312 ) 313 314 315class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): 316 317 """test copy-in-place behavior of various ClauseElements.""" 318 319 __dialect__ = "default" 320 321 @classmethod 322 def setup_class(cls): 323 global t1, t2, t3 324 t1 = table("table1", column("col1"), column("col2"), column("col3")) 325 t2 = table("table2", column("col1"), column("col2"), column("col3")) 326 t3 = Table( 327 "table3", 328 MetaData(), 329 Column("col1", Integer), 330 Column("col2", Integer), 331 ) 332 333 def test_binary(self): 334 clause = t1.c.col2 == t2.c.col2 335 eq_(str(clause), str(CloningVisitor().traverse(clause))) 336 337 def test_binary_anon_label_quirk(self): 338 t = table("t1", column("col1")) 339 340 f = t.c.col1 * 5 341 self.assert_compile( 342 select([f]), "SELECT t1.col1 * :col1_1 AS anon_1 FROM t1" 343 ) 344 345 f.anon_label 346 347 a = t.alias() 348 f = sql_util.ClauseAdapter(a).traverse(f) 349 350 self.assert_compile( 351 select([f]), "SELECT t1_1.col1 * :col1_1 AS anon_1 FROM t1 AS t1_1" 352 ) 353 354 def test_join(self): 355 clause = t1.join(t2, t1.c.col2 == t2.c.col2) 356 c1 = str(clause) 357 assert str(clause) == str(CloningVisitor().traverse(clause)) 358 359 class Vis(CloningVisitor): 360 def visit_binary(self, binary): 361 binary.right = t2.c.col3 362 363 clause2 = Vis().traverse(clause) 364 assert c1 == str(clause) 365 assert str(clause2) == str(t1.join(t2, t1.c.col2 == t2.c.col3)) 366 367 def test_aliased_column_adapt(self): 368 clause = t1.select() 369 370 aliased = t1.select().alias() 371 aliased2 = t1.alias() 372 373 adapter = sql_util.ColumnAdapter(aliased) 374 375 f = select([adapter.columns[c] for c in aliased2.c]).select_from( 376 aliased 377 ) 378 379 s = select([aliased2]).select_from(aliased) 380 eq_(str(s), str(f)) 381 382 f = select([adapter.columns[func.count(aliased2.c.col1)]]).select_from( 383 aliased 384 ) 385 eq_( 386 str(select([func.count(aliased2.c.col1)]).select_from(aliased)), 387 str(f), 388 ) 389 390 def test_aliased_cloned_column_adapt_inner(self): 391 clause = select([t1.c.col1, func.foo(t1.c.col2).label("foo")]) 392 393 aliased1 = select([clause.c.col1, clause.c.foo]) 394 aliased2 = clause 395 aliased2.c.col1, aliased2.c.foo 396 aliased3 = cloned_traverse(aliased2, {}, {}) 397 398 # fixed by [ticket:2419]. the inside columns 399 # on aliased3 have _is_clone_of pointers to those of 400 # aliased2. corresponding_column checks these 401 # now. 402 adapter = sql_util.ColumnAdapter(aliased1) 403 f1 = select([adapter.columns[c] for c in aliased2._raw_columns]) 404 f2 = select([adapter.columns[c] for c in aliased3._raw_columns]) 405 eq_(str(f1), str(f2)) 406 407 def test_aliased_cloned_column_adapt_exported(self): 408 clause = select([t1.c.col1, func.foo(t1.c.col2).label("foo")]) 409 410 aliased1 = select([clause.c.col1, clause.c.foo]) 411 aliased2 = clause 412 aliased2.c.col1, aliased2.c.foo 413 aliased3 = cloned_traverse(aliased2, {}, {}) 414 415 # also fixed by [ticket:2419]. When we look at the 416 # *outside* columns of aliased3, they previously did not 417 # have an _is_clone_of pointer. But we now modified _make_proxy 418 # to assign this. 419 adapter = sql_util.ColumnAdapter(aliased1) 420 f1 = select([adapter.columns[c] for c in aliased2.c]) 421 f2 = select([adapter.columns[c] for c in aliased3.c]) 422 eq_(str(f1), str(f2)) 423 424 def test_aliased_cloned_schema_column_adapt_exported(self): 425 clause = select([t3.c.col1, func.foo(t3.c.col2).label("foo")]) 426 427 aliased1 = select([clause.c.col1, clause.c.foo]) 428 aliased2 = clause 429 aliased2.c.col1, aliased2.c.foo 430 aliased3 = cloned_traverse(aliased2, {}, {}) 431 432 # also fixed by [ticket:2419]. When we look at the 433 # *outside* columns of aliased3, they previously did not 434 # have an _is_clone_of pointer. But we now modified _make_proxy 435 # to assign this. 436 adapter = sql_util.ColumnAdapter(aliased1) 437 f1 = select([adapter.columns[c] for c in aliased2.c]) 438 f2 = select([adapter.columns[c] for c in aliased3.c]) 439 eq_(str(f1), str(f2)) 440 441 def test_labeled_expression_adapt(self): 442 lbl_x = (t3.c.col1 == 1).label("x") 443 t3_alias = t3.alias() 444 445 adapter = sql_util.ColumnAdapter(t3_alias) 446 447 lblx_adapted = adapter.traverse(lbl_x) 448 is_not_(lblx_adapted._element, lbl_x._element) 449 450 lblx_adapted = adapter.traverse(lbl_x) 451 self.assert_compile( 452 select([lblx_adapted.self_group()]), 453 "SELECT (table3_1.col1 = :col1_1) AS x FROM table3 AS table3_1", 454 ) 455 456 self.assert_compile( 457 select([lblx_adapted.is_(True)]), 458 "SELECT (table3_1.col1 = :col1_1) IS 1 AS anon_1 " 459 "FROM table3 AS table3_1", 460 ) 461 462 def test_cte_w_union(self): 463 t = select([func.values(1).label("n")]).cte("t", recursive=True) 464 t = t.union_all(select([t.c.n + 1]).where(t.c.n < 100)) 465 s = select([func.sum(t.c.n)]) 466 467 from sqlalchemy.sql.visitors import cloned_traverse 468 469 cloned = cloned_traverse(s, {}, {}) 470 471 self.assert_compile( 472 cloned, 473 "WITH RECURSIVE t(n) AS " 474 "(SELECT values(:values_1) AS n " 475 "UNION ALL SELECT t.n + :n_1 AS anon_1 " 476 "FROM t " 477 "WHERE t.n < :n_2) " 478 "SELECT sum(t.n) AS sum_1 FROM t", 479 ) 480 481 def test_aliased_cte_w_union(self): 482 t = ( 483 select([func.values(1).label("n")]) 484 .cte("t", recursive=True) 485 .alias("foo") 486 ) 487 t = t.union_all(select([t.c.n + 1]).where(t.c.n < 100)) 488 s = select([func.sum(t.c.n)]) 489 490 from sqlalchemy.sql.visitors import cloned_traverse 491 492 cloned = cloned_traverse(s, {}, {}) 493 494 self.assert_compile( 495 cloned, 496 "WITH RECURSIVE foo(n) AS (SELECT values(:values_1) AS n " 497 "UNION ALL SELECT foo.n + :n_1 AS anon_1 FROM foo " 498 "WHERE foo.n < :n_2) SELECT sum(foo.n) AS sum_1 FROM foo", 499 ) 500 501 def test_text(self): 502 clause = text( 503 "select * from table where foo=:bar", bindparams=[bindparam("bar")] 504 ) 505 c1 = str(clause) 506 507 class Vis(CloningVisitor): 508 def visit_textclause(self, text): 509 text.text = text.text + " SOME MODIFIER=:lala" 510 text._bindparams["lala"] = bindparam("lala") 511 512 clause2 = Vis().traverse(clause) 513 assert c1 == str(clause) 514 assert str(clause2) == c1 + " SOME MODIFIER=:lala" 515 assert list(clause._bindparams.keys()) == ["bar"] 516 assert set(clause2._bindparams.keys()) == set(["bar", "lala"]) 517 518 def test_select(self): 519 s2 = select([t1]) 520 s2_assert = str(s2) 521 s3_assert = str(select([t1], t1.c.col2 == 7)) 522 523 class Vis(CloningVisitor): 524 def visit_select(self, select): 525 select.append_whereclause(t1.c.col2 == 7) 526 527 s3 = Vis().traverse(s2) 528 assert str(s3) == s3_assert 529 assert str(s2) == s2_assert 530 print(str(s2)) 531 print(str(s3)) 532 533 class Vis(ClauseVisitor): 534 def visit_select(self, select): 535 select.append_whereclause(t1.c.col2 == 7) 536 537 Vis().traverse(s2) 538 assert str(s2) == s3_assert 539 540 s4_assert = str(select([t1], and_(t1.c.col2 == 7, t1.c.col3 == 9))) 541 542 class Vis(CloningVisitor): 543 def visit_select(self, select): 544 select.append_whereclause(t1.c.col3 == 9) 545 546 s4 = Vis().traverse(s3) 547 print(str(s3)) 548 print(str(s4)) 549 assert str(s4) == s4_assert 550 assert str(s3) == s3_assert 551 552 s5_assert = str(select([t1], and_(t1.c.col2 == 7, t1.c.col1 == 9))) 553 554 class Vis(CloningVisitor): 555 def visit_binary(self, binary): 556 if binary.left is t1.c.col3: 557 binary.left = t1.c.col1 558 binary.right = bindparam("col1", unique=True) 559 560 s5 = Vis().traverse(s4) 561 print(str(s4)) 562 print(str(s5)) 563 assert str(s5) == s5_assert 564 assert str(s4) == s4_assert 565 566 def test_union(self): 567 u = union(t1.select(), t2.select()) 568 u2 = CloningVisitor().traverse(u) 569 assert str(u) == str(u2) 570 assert [str(c) for c in u2.c] == [str(c) for c in u.c] 571 572 u = union(t1.select(), t2.select()) 573 cols = [str(c) for c in u.c] 574 u2 = CloningVisitor().traverse(u) 575 assert str(u) == str(u2) 576 assert [str(c) for c in u2.c] == cols 577 578 s1 = select([t1], t1.c.col1 == bindparam("id_param")) 579 s2 = select([t2]) 580 u = union(s1, s2) 581 582 u2 = u.params(id_param=7) 583 u3 = u.params(id_param=10) 584 assert str(u) == str(u2) == str(u3) 585 assert u2.compile().params == {"id_param": 7} 586 assert u3.compile().params == {"id_param": 10} 587 588 def test_in(self): 589 expr = t1.c.col1.in_(["foo", "bar"]) 590 expr2 = CloningVisitor().traverse(expr) 591 assert str(expr) == str(expr2) 592 593 def test_over(self): 594 expr = func.row_number().over(order_by=t1.c.col1) 595 expr2 = CloningVisitor().traverse(expr) 596 assert str(expr) == str(expr2) 597 598 assert expr in visitors.iterate(expr, {}) 599 600 def test_within_group(self): 601 expr = func.row_number().within_group(t1.c.col1) 602 expr2 = CloningVisitor().traverse(expr) 603 assert str(expr) == str(expr2) 604 605 assert expr in visitors.iterate(expr, {}) 606 607 def test_funcfilter(self): 608 expr = func.count(1).filter(t1.c.col1 > 1) 609 expr2 = CloningVisitor().traverse(expr) 610 assert str(expr) == str(expr2) 611 612 def test_adapt_union(self): 613 u = union( 614 t1.select().where(t1.c.col1 == 4), 615 t1.select().where(t1.c.col1 == 5), 616 ).alias() 617 618 assert sql_util.ClauseAdapter(u).traverse(t1) is u 619 620 def test_binds(self): 621 """test that unique bindparams change their name upon clone() 622 to prevent conflicts""" 623 624 s = select([t1], t1.c.col1 == bindparam(None, unique=True)).alias() 625 s2 = CloningVisitor().traverse(s).alias() 626 s3 = select([s], s.c.col2 == s2.c.col2) 627 628 self.assert_compile( 629 s3, 630 "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM " 631 "(SELECT table1.col1 AS col1, table1.col2 AS col2, " 632 "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_1) " 633 "AS anon_1, " 634 "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 " 635 "AS col3 FROM table1 WHERE table1.col1 = :param_2) AS anon_2 " 636 "WHERE anon_1.col2 = anon_2.col2", 637 ) 638 639 s = select([t1], t1.c.col1 == 4).alias() 640 s2 = CloningVisitor().traverse(s).alias() 641 s3 = select([s], s.c.col2 == s2.c.col2) 642 self.assert_compile( 643 s3, 644 "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM " 645 "(SELECT table1.col1 AS col1, table1.col2 AS col2, " 646 "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) " 647 "AS anon_1, " 648 "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 " 649 "AS col3 FROM table1 WHERE table1.col1 = :col1_2) AS anon_2 " 650 "WHERE anon_1.col2 = anon_2.col2", 651 ) 652 653 def test_extract(self): 654 s = select([extract("foo", t1.c.col1).label("col1")]) 655 self.assert_compile( 656 s, "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1" 657 ) 658 659 s2 = CloningVisitor().traverse(s).alias() 660 s3 = select([s2.c.col1]) 661 self.assert_compile( 662 s, "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1" 663 ) 664 self.assert_compile( 665 s3, 666 "SELECT anon_1.col1 FROM (SELECT EXTRACT(foo FROM " 667 "table1.col1) AS col1 FROM table1) AS anon_1", 668 ) 669 670 @testing.emits_warning(".*replaced by another column with the same key") 671 def test_alias(self): 672 subq = t2.select().alias("subq") 673 s = select( 674 [t1.c.col1, subq.c.col1], 675 from_obj=[t1, subq, t1.join(subq, t1.c.col1 == subq.c.col2)], 676 ) 677 orig = str(s) 678 s2 = CloningVisitor().traverse(s) 679 assert orig == str(s) == str(s2) 680 681 s4 = CloningVisitor().traverse(s2) 682 assert orig == str(s) == str(s2) == str(s4) 683 684 s3 = sql_util.ClauseAdapter(table("foo")).traverse(s) 685 assert orig == str(s) == str(s3) 686 687 s4 = sql_util.ClauseAdapter(table("foo")).traverse(s3) 688 assert orig == str(s) == str(s3) == str(s4) 689 690 subq = subq.alias("subq") 691 s = select( 692 [t1.c.col1, subq.c.col1], 693 from_obj=[t1, subq, t1.join(subq, t1.c.col1 == subq.c.col2)], 694 ) 695 s5 = CloningVisitor().traverse(s) 696 assert orig == str(s) == str(s5) 697 698 def test_correlated_select(self): 699 s = select( 700 [literal_column("*")], t1.c.col1 == t2.c.col1, from_obj=[t1, t2] 701 ).correlate(t2) 702 703 class Vis(CloningVisitor): 704 def visit_select(self, select): 705 select.append_whereclause(t1.c.col2 == 7) 706 707 self.assert_compile( 708 select([t2]).where(t2.c.col1 == Vis().traverse(s)), 709 "SELECT table2.col1, table2.col2, table2.col3 " 710 "FROM table2 WHERE table2.col1 = " 711 "(SELECT * FROM table1 WHERE table1.col1 = table2.col1 " 712 "AND table1.col2 = :col2_1)", 713 ) 714 715 def test_this_thing(self): 716 s = select([t1]).where(t1.c.col1 == "foo").alias() 717 s2 = select([s.c.col1]) 718 719 self.assert_compile( 720 s2, 721 "SELECT anon_1.col1 FROM (SELECT " 722 "table1.col1 AS col1, table1.col2 AS col2, " 723 "table1.col3 AS col3 FROM table1 WHERE " 724 "table1.col1 = :col1_1) AS anon_1", 725 ) 726 t1a = t1.alias() 727 s2 = sql_util.ClauseAdapter(t1a).traverse(s2) 728 self.assert_compile( 729 s2, 730 "SELECT anon_1.col1 FROM (SELECT " 731 "table1_1.col1 AS col1, table1_1.col2 AS " 732 "col2, table1_1.col3 AS col3 FROM table1 " 733 "AS table1_1 WHERE table1_1.col1 = " 734 ":col1_1) AS anon_1", 735 ) 736 737 def test_select_fromtwice_one(self): 738 t1a = t1.alias() 739 740 s = select([1], t1.c.col1 == t1a.c.col1, from_obj=t1a).correlate(t1a) 741 s = select([t1]).where(t1.c.col1 == s) 742 self.assert_compile( 743 s, 744 "SELECT table1.col1, table1.col2, table1.col3 FROM table1 " 745 "WHERE table1.col1 = " 746 "(SELECT 1 FROM table1, table1 AS table1_1 " 747 "WHERE table1.col1 = table1_1.col1)", 748 ) 749 s = CloningVisitor().traverse(s) 750 self.assert_compile( 751 s, 752 "SELECT table1.col1, table1.col2, table1.col3 FROM table1 " 753 "WHERE table1.col1 = " 754 "(SELECT 1 FROM table1, table1 AS table1_1 " 755 "WHERE table1.col1 = table1_1.col1)", 756 ) 757 758 def test_select_fromtwice_two(self): 759 s = select([t1]).where(t1.c.col1 == "foo").alias() 760 761 s2 = select([1], t1.c.col1 == s.c.col1, from_obj=s).correlate(t1) 762 s3 = select([t1]).where(t1.c.col1 == s2) 763 self.assert_compile( 764 s3, 765 "SELECT table1.col1, table1.col2, table1.col3 " 766 "FROM table1 WHERE table1.col1 = " 767 "(SELECT 1 FROM " 768 "(SELECT table1.col1 AS col1, table1.col2 AS col2, " 769 "table1.col3 AS col3 FROM table1 " 770 "WHERE table1.col1 = :col1_1) " 771 "AS anon_1 WHERE table1.col1 = anon_1.col1)", 772 ) 773 774 s4 = ReplacingCloningVisitor().traverse(s3) 775 self.assert_compile( 776 s4, 777 "SELECT table1.col1, table1.col2, table1.col3 " 778 "FROM table1 WHERE table1.col1 = " 779 "(SELECT 1 FROM " 780 "(SELECT table1.col1 AS col1, table1.col2 AS col2, " 781 "table1.col3 AS col3 FROM table1 " 782 "WHERE table1.col1 = :col1_1) " 783 "AS anon_1 WHERE table1.col1 = anon_1.col1)", 784 ) 785 786 787class ColumnAdapterTest(fixtures.TestBase, AssertsCompiledSQL): 788 __dialect__ = "default" 789 790 @classmethod 791 def setup_class(cls): 792 global t1, t2 793 t1 = table( 794 "table1", 795 column("col1"), 796 column("col2"), 797 column("col3"), 798 column("col4"), 799 ) 800 t2 = table("table2", column("col1"), column("col2"), column("col3")) 801 802 def test_traverse_memoizes_w_columns(self): 803 t1a = t1.alias() 804 adapter = sql_util.ColumnAdapter(t1a, anonymize_labels=True) 805 806 expr = select([t1a.c.col1]).label("x") 807 expr_adapted = adapter.traverse(expr) 808 is_not_(expr, expr_adapted) 809 is_(adapter.columns[expr], expr_adapted) 810 811 def test_traverse_memoizes_w_itself(self): 812 t1a = t1.alias() 813 adapter = sql_util.ColumnAdapter(t1a, anonymize_labels=True) 814 815 expr = select([t1a.c.col1]).label("x") 816 expr_adapted = adapter.traverse(expr) 817 is_not_(expr, expr_adapted) 818 is_(adapter.traverse(expr), expr_adapted) 819 820 def test_columns_memoizes_w_itself(self): 821 t1a = t1.alias() 822 adapter = sql_util.ColumnAdapter(t1a, anonymize_labels=True) 823 824 expr = select([t1a.c.col1]).label("x") 825 expr_adapted = adapter.columns[expr] 826 is_not_(expr, expr_adapted) 827 is_(adapter.columns[expr], expr_adapted) 828 829 def test_wrapping_fallthrough(self): 830 t1a = t1.alias(name="t1a") 831 t2a = t2.alias(name="t2a") 832 a1 = sql_util.ColumnAdapter(t1a) 833 834 s1 = select([t1a.c.col1, t2a.c.col1]).apply_labels().alias() 835 a2 = sql_util.ColumnAdapter(s1) 836 a3 = a2.wrap(a1) 837 a4 = a1.wrap(a2) 838 a5 = a1.chain(a2) 839 840 # t1.c.col1 -> s1.c.t1a_col1 841 842 # adapted by a2 843 is_(a3.columns[t1.c.col1], s1.c.t1a_col1) 844 is_(a4.columns[t1.c.col1], s1.c.t1a_col1) 845 846 # chaining can't fall through because a1 grabs it 847 # first 848 is_(a5.columns[t1.c.col1], t1a.c.col1) 849 850 # t2.c.col1 -> s1.c.t2a_col1 851 852 # adapted by a2 853 is_(a3.columns[t2.c.col1], s1.c.t2a_col1) 854 is_(a4.columns[t2.c.col1], s1.c.t2a_col1) 855 # chaining, t2 hits s1 856 is_(a5.columns[t2.c.col1], s1.c.t2a_col1) 857 858 # t1.c.col2 -> t1a.c.col2 859 860 # fallthrough to a1 861 is_(a3.columns[t1.c.col2], t1a.c.col2) 862 is_(a4.columns[t1.c.col2], t1a.c.col2) 863 864 # chaining hits a1 865 is_(a5.columns[t1.c.col2], t1a.c.col2) 866 867 # t2.c.col2 -> t2.c.col2 868 869 # fallthrough to no adaption 870 is_(a3.columns[t2.c.col2], t2.c.col2) 871 is_(a4.columns[t2.c.col2], t2.c.col2) 872 873 def test_wrapping_ordering(self): 874 """illustrate an example where order of wrappers matters. 875 876 This test illustrates both the ordering being significant 877 as well as a scenario where multiple translations are needed 878 (e.g. wrapping vs. chaining). 879 880 """ 881 882 stmt = select([t1.c.col1, t2.c.col1]).apply_labels() 883 884 sa = stmt.alias() 885 stmt2 = select([t2, sa]) 886 887 a1 = sql_util.ColumnAdapter(stmt) 888 a2 = sql_util.ColumnAdapter(stmt2) 889 890 a2_to_a1 = a2.wrap(a1) 891 a1_to_a2 = a1.wrap(a2) 892 893 # when stmt2 and stmt represent the same column 894 # in different contexts, order of wrapping matters 895 896 # t2.c.col1 via a2 is stmt2.c.col1; then ignored by a1 897 is_(a2_to_a1.columns[t2.c.col1], stmt2.c.col1) 898 # t2.c.col1 via a1 is stmt.c.table2_col1; a2 then 899 # sends this to stmt2.c.table2_col1 900 is_(a1_to_a2.columns[t2.c.col1], stmt2.c.table2_col1) 901 902 # for mutually exclusive columns, order doesn't matter 903 is_(a2_to_a1.columns[t1.c.col1], stmt2.c.table1_col1) 904 is_(a1_to_a2.columns[t1.c.col1], stmt2.c.table1_col1) 905 is_(a2_to_a1.columns[t2.c.col2], stmt2.c.col2) 906 907 def test_wrapping_multiple(self): 908 """illustrate that wrapping runs both adapters""" 909 910 t1a = t1.alias(name="t1a") 911 t2a = t2.alias(name="t2a") 912 a1 = sql_util.ColumnAdapter(t1a) 913 a2 = sql_util.ColumnAdapter(t2a) 914 a3 = a2.wrap(a1) 915 916 stmt = select([t1.c.col1, t2.c.col2]) 917 918 self.assert_compile( 919 a3.traverse(stmt), 920 "SELECT t1a.col1, t2a.col2 FROM table1 AS t1a, table2 AS t2a", 921 ) 922 923 # chaining does too because these adapters don't share any 924 # columns 925 a4 = a2.chain(a1) 926 self.assert_compile( 927 a4.traverse(stmt), 928 "SELECT t1a.col1, t2a.col2 FROM table1 AS t1a, table2 AS t2a", 929 ) 930 931 def test_wrapping_inclusions(self): 932 """test wrapping and inclusion rules together, 933 taking into account multiple objects with equivalent hash identity.""" 934 935 t1a = t1.alias(name="t1a") 936 t2a = t2.alias(name="t2a") 937 a1 = sql_util.ColumnAdapter( 938 t1a, include_fn=lambda col: "a1" in col._annotations 939 ) 940 941 s1 = select([t1a, t2a]).apply_labels().alias() 942 a2 = sql_util.ColumnAdapter( 943 s1, include_fn=lambda col: "a2" in col._annotations 944 ) 945 a3 = a2.wrap(a1) 946 947 c1a1 = t1.c.col1._annotate(dict(a1=True)) 948 c1a2 = t1.c.col1._annotate(dict(a2=True)) 949 c1aa = t1.c.col1._annotate(dict(a1=True, a2=True)) 950 951 c2a1 = t2.c.col1._annotate(dict(a1=True)) 952 c2a2 = t2.c.col1._annotate(dict(a2=True)) 953 c2aa = t2.c.col1._annotate(dict(a1=True, a2=True)) 954 955 is_(a3.columns[c1a1], t1a.c.col1) 956 is_(a3.columns[c1a2], s1.c.t1a_col1) 957 is_(a3.columns[c1aa], s1.c.t1a_col1) 958 959 # not covered by a1, accepted by a2 960 is_(a3.columns[c2aa], s1.c.t2a_col1) 961 962 # not covered by a1, accepted by a2 963 is_(a3.columns[c2a2], s1.c.t2a_col1) 964 # not covered by a1, rejected by a2 965 is_(a3.columns[c2a1], c2a1) 966 967 968class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): 969 __dialect__ = "default" 970 971 @classmethod 972 def setup_class(cls): 973 global t1, t2 974 t1 = table("table1", column("col1"), column("col2"), column("col3")) 975 t2 = table("table2", column("col1"), column("col2"), column("col3")) 976 977 def test_correlation_on_clone(self): 978 t1alias = t1.alias("t1alias") 979 t2alias = t2.alias("t2alias") 980 vis = sql_util.ClauseAdapter(t1alias) 981 982 s = select( 983 [literal_column("*")], from_obj=[t1alias, t2alias] 984 ).as_scalar() 985 assert t2alias in s._froms 986 assert t1alias in s._froms 987 988 self.assert_compile( 989 select([literal_column("*")], t2alias.c.col1 == s), 990 "SELECT * FROM table2 AS t2alias WHERE " 991 "t2alias.col1 = (SELECT * FROM table1 AS " 992 "t1alias)", 993 ) 994 s = vis.traverse(s) 995 996 assert t2alias not in s._froms # not present because it's been 997 # cloned 998 assert t1alias in s._froms # present because the adapter placed 999 # it there 1000 1001 # correlate list on "s" needs to take into account the full 1002 # _cloned_set for each element in _froms when correlating 1003 1004 self.assert_compile( 1005 select([literal_column("*")], t2alias.c.col1 == s), 1006 "SELECT * FROM table2 AS t2alias WHERE " 1007 "t2alias.col1 = (SELECT * FROM table1 AS " 1008 "t1alias)", 1009 ) 1010 s = ( 1011 select([literal_column("*")], from_obj=[t1alias, t2alias]) 1012 .correlate(t2alias) 1013 .as_scalar() 1014 ) 1015 self.assert_compile( 1016 select([literal_column("*")], t2alias.c.col1 == s), 1017 "SELECT * FROM table2 AS t2alias WHERE " 1018 "t2alias.col1 = (SELECT * FROM table1 AS " 1019 "t1alias)", 1020 ) 1021 s = vis.traverse(s) 1022 self.assert_compile( 1023 select([literal_column("*")], t2alias.c.col1 == s), 1024 "SELECT * FROM table2 AS t2alias WHERE " 1025 "t2alias.col1 = (SELECT * FROM table1 AS " 1026 "t1alias)", 1027 ) 1028 s = CloningVisitor().traverse(s) 1029 self.assert_compile( 1030 select([literal_column("*")], t2alias.c.col1 == s), 1031 "SELECT * FROM table2 AS t2alias WHERE " 1032 "t2alias.col1 = (SELECT * FROM table1 AS " 1033 "t1alias)", 1034 ) 1035 1036 s = ( 1037 select([literal_column("*")]) 1038 .where(t1.c.col1 == t2.c.col1) 1039 .as_scalar() 1040 ) 1041 self.assert_compile( 1042 select([t1.c.col1, s]), 1043 "SELECT table1.col1, (SELECT * FROM table2 " 1044 "WHERE table1.col1 = table2.col1) AS " 1045 "anon_1 FROM table1", 1046 ) 1047 vis = sql_util.ClauseAdapter(t1alias) 1048 s = vis.traverse(s) 1049 self.assert_compile( 1050 select([t1alias.c.col1, s]), 1051 "SELECT t1alias.col1, (SELECT * FROM " 1052 "table2 WHERE t1alias.col1 = table2.col1) " 1053 "AS anon_1 FROM table1 AS t1alias", 1054 ) 1055 s = CloningVisitor().traverse(s) 1056 self.assert_compile( 1057 select([t1alias.c.col1, s]), 1058 "SELECT t1alias.col1, (SELECT * FROM " 1059 "table2 WHERE t1alias.col1 = table2.col1) " 1060 "AS anon_1 FROM table1 AS t1alias", 1061 ) 1062 s = ( 1063 select([literal_column("*")]) 1064 .where(t1.c.col1 == t2.c.col1) 1065 .correlate(t1) 1066 .as_scalar() 1067 ) 1068 self.assert_compile( 1069 select([t1.c.col1, s]), 1070 "SELECT table1.col1, (SELECT * FROM table2 " 1071 "WHERE table1.col1 = table2.col1) AS " 1072 "anon_1 FROM table1", 1073 ) 1074 vis = sql_util.ClauseAdapter(t1alias) 1075 s = vis.traverse(s) 1076 self.assert_compile( 1077 select([t1alias.c.col1, s]), 1078 "SELECT t1alias.col1, (SELECT * FROM " 1079 "table2 WHERE t1alias.col1 = table2.col1) " 1080 "AS anon_1 FROM table1 AS t1alias", 1081 ) 1082 s = CloningVisitor().traverse(s) 1083 self.assert_compile( 1084 select([t1alias.c.col1, s]), 1085 "SELECT t1alias.col1, (SELECT * FROM " 1086 "table2 WHERE t1alias.col1 = table2.col1) " 1087 "AS anon_1 FROM table1 AS t1alias", 1088 ) 1089 1090 @testing.fails_on_everything_except() 1091 def test_joins_dont_adapt(self): 1092 # adapting to a join, i.e. ClauseAdapter(t1.join(t2)), doesn't 1093 # make much sense. ClauseAdapter doesn't make any changes if 1094 # it's against a straight join. 1095 1096 users = table("users", column("id")) 1097 addresses = table("addresses", column("id"), column("user_id")) 1098 1099 ualias = users.alias() 1100 1101 s = select( 1102 [func.count(addresses.c.id)], users.c.id == addresses.c.user_id 1103 ).correlate(users) 1104 s = sql_util.ClauseAdapter(ualias).traverse(s) 1105 1106 j1 = addresses.join(ualias, addresses.c.user_id == ualias.c.id) 1107 1108 self.assert_compile( 1109 sql_util.ClauseAdapter(j1).traverse(s), 1110 "SELECT count(addresses.id) AS count_1 " 1111 "FROM addresses WHERE users_1.id = " 1112 "addresses.user_id", 1113 ) 1114 1115 def test_table_to_alias_1(self): 1116 t1alias = t1.alias("t1alias") 1117 1118 vis = sql_util.ClauseAdapter(t1alias) 1119 ff = vis.traverse(func.count(t1.c.col1).label("foo")) 1120 assert list(_from_objects(ff)) == [t1alias] 1121 1122 def test_table_to_alias_2(self): 1123 t1alias = t1.alias("t1alias") 1124 vis = sql_util.ClauseAdapter(t1alias) 1125 self.assert_compile( 1126 vis.traverse(select([literal_column("*")], from_obj=[t1])), 1127 "SELECT * FROM table1 AS t1alias", 1128 ) 1129 1130 def test_table_to_alias_3(self): 1131 t1alias = t1.alias("t1alias") 1132 vis = sql_util.ClauseAdapter(t1alias) 1133 self.assert_compile( 1134 select([literal_column("*")], t1.c.col1 == t2.c.col2), 1135 "SELECT * FROM table1, table2 WHERE table1.col1 = table2.col2", 1136 ) 1137 1138 def test_table_to_alias_4(self): 1139 t1alias = t1.alias("t1alias") 1140 vis = sql_util.ClauseAdapter(t1alias) 1141 self.assert_compile( 1142 vis.traverse( 1143 select([literal_column("*")], t1.c.col1 == t2.c.col2) 1144 ), 1145 "SELECT * FROM table1 AS t1alias, table2 " 1146 "WHERE t1alias.col1 = table2.col2", 1147 ) 1148 1149 def test_table_to_alias_5(self): 1150 t1alias = t1.alias("t1alias") 1151 vis = sql_util.ClauseAdapter(t1alias) 1152 self.assert_compile( 1153 vis.traverse( 1154 select( 1155 [literal_column("*")], 1156 t1.c.col1 == t2.c.col2, 1157 from_obj=[t1, t2], 1158 ) 1159 ), 1160 "SELECT * FROM table1 AS t1alias, table2 " 1161 "WHERE t1alias.col1 = table2.col2", 1162 ) 1163 1164 def test_table_to_alias_6(self): 1165 t1alias = t1.alias("t1alias") 1166 vis = sql_util.ClauseAdapter(t1alias) 1167 self.assert_compile( 1168 select([t1alias, t2]).where( 1169 t1alias.c.col1 1170 == vis.traverse( 1171 select( 1172 [literal_column("*")], 1173 t1.c.col1 == t2.c.col2, 1174 from_obj=[t1, t2], 1175 ).correlate(t1) 1176 ) 1177 ), 1178 "SELECT t1alias.col1, t1alias.col2, t1alias.col3, " 1179 "table2.col1, table2.col2, table2.col3 " 1180 "FROM table1 AS t1alias, table2 WHERE t1alias.col1 = " 1181 "(SELECT * FROM table2 WHERE t1alias.col1 = table2.col2)", 1182 ) 1183 1184 def test_table_to_alias_7(self): 1185 t1alias = t1.alias("t1alias") 1186 vis = sql_util.ClauseAdapter(t1alias) 1187 self.assert_compile( 1188 select([t1alias, t2]).where( 1189 t1alias.c.col1 1190 == vis.traverse( 1191 select( 1192 [literal_column("*")], 1193 t1.c.col1 == t2.c.col2, 1194 from_obj=[t1, t2], 1195 ).correlate(t2) 1196 ) 1197 ), 1198 "SELECT t1alias.col1, t1alias.col2, t1alias.col3, " 1199 "table2.col1, table2.col2, table2.col3 " 1200 "FROM table1 AS t1alias, table2 " 1201 "WHERE t1alias.col1 = " 1202 "(SELECT * FROM table1 AS t1alias " 1203 "WHERE t1alias.col1 = table2.col2)", 1204 ) 1205 1206 def test_table_to_alias_8(self): 1207 t1alias = t1.alias("t1alias") 1208 vis = sql_util.ClauseAdapter(t1alias) 1209 self.assert_compile( 1210 vis.traverse(case([(t1.c.col1 == 5, t1.c.col2)], else_=t1.c.col1)), 1211 "CASE WHEN (t1alias.col1 = :col1_1) THEN " 1212 "t1alias.col2 ELSE t1alias.col1 END", 1213 ) 1214 1215 def test_table_to_alias_9(self): 1216 t1alias = t1.alias("t1alias") 1217 vis = sql_util.ClauseAdapter(t1alias) 1218 self.assert_compile( 1219 vis.traverse( 1220 case([(5, t1.c.col2)], value=t1.c.col1, else_=t1.c.col1) 1221 ), 1222 "CASE t1alias.col1 WHEN :param_1 THEN " 1223 "t1alias.col2 ELSE t1alias.col1 END", 1224 ) 1225 1226 def test_table_to_alias_10(self): 1227 s = select([literal_column("*")], from_obj=[t1]).alias("foo") 1228 self.assert_compile( 1229 s.select(), "SELECT foo.* FROM (SELECT * FROM table1) " "AS foo" 1230 ) 1231 1232 def test_table_to_alias_11(self): 1233 s = select([literal_column("*")], from_obj=[t1]).alias("foo") 1234 t1alias = t1.alias("t1alias") 1235 vis = sql_util.ClauseAdapter(t1alias) 1236 self.assert_compile( 1237 vis.traverse(s.select()), 1238 "SELECT foo.* FROM (SELECT * FROM table1 " "AS t1alias) AS foo", 1239 ) 1240 1241 def test_table_to_alias_12(self): 1242 s = select([literal_column("*")], from_obj=[t1]).alias("foo") 1243 self.assert_compile( 1244 s.select(), "SELECT foo.* FROM (SELECT * FROM table1) " "AS foo" 1245 ) 1246 1247 def test_table_to_alias_13(self): 1248 t1alias = t1.alias("t1alias") 1249 vis = sql_util.ClauseAdapter(t1alias) 1250 ff = vis.traverse(func.count(t1.c.col1).label("foo")) 1251 self.assert_compile( 1252 select([ff]), 1253 "SELECT count(t1alias.col1) AS foo FROM " "table1 AS t1alias", 1254 ) 1255 assert list(_from_objects(ff)) == [t1alias] 1256 1257 # def test_table_to_alias_2(self): 1258 # TODO: self.assert_compile(vis.traverse(select([func.count(t1.c 1259 # .col1).l abel('foo')]), clone=True), "SELECT 1260 # count(t1alias.col1) AS foo FROM table1 AS t1alias") 1261 1262 def test_table_to_alias_14(self): 1263 t1alias = t1.alias("t1alias") 1264 vis = sql_util.ClauseAdapter(t1alias) 1265 t2alias = t2.alias("t2alias") 1266 vis.chain(sql_util.ClauseAdapter(t2alias)) 1267 self.assert_compile( 1268 vis.traverse( 1269 select([literal_column("*")], t1.c.col1 == t2.c.col2) 1270 ), 1271 "SELECT * FROM table1 AS t1alias, table2 " 1272 "AS t2alias WHERE t1alias.col1 = " 1273 "t2alias.col2", 1274 ) 1275 1276 def test_table_to_alias_15(self): 1277 t1alias = t1.alias("t1alias") 1278 vis = sql_util.ClauseAdapter(t1alias) 1279 t2alias = t2.alias("t2alias") 1280 vis.chain(sql_util.ClauseAdapter(t2alias)) 1281 self.assert_compile( 1282 vis.traverse( 1283 select(["*"], t1.c.col1 == t2.c.col2, from_obj=[t1, t2]) 1284 ), 1285 "SELECT * FROM table1 AS t1alias, table2 " 1286 "AS t2alias WHERE t1alias.col1 = " 1287 "t2alias.col2", 1288 ) 1289 1290 def test_table_to_alias_16(self): 1291 t1alias = t1.alias("t1alias") 1292 vis = sql_util.ClauseAdapter(t1alias) 1293 t2alias = t2.alias("t2alias") 1294 vis.chain(sql_util.ClauseAdapter(t2alias)) 1295 self.assert_compile( 1296 select([t1alias, t2alias]).where( 1297 t1alias.c.col1 1298 == vis.traverse( 1299 select( 1300 ["*"], t1.c.col1 == t2.c.col2, from_obj=[t1, t2] 1301 ).correlate(t1) 1302 ) 1303 ), 1304 "SELECT t1alias.col1, t1alias.col2, t1alias.col3, " 1305 "t2alias.col1, t2alias.col2, t2alias.col3 " 1306 "FROM table1 AS t1alias, table2 AS t2alias " 1307 "WHERE t1alias.col1 = " 1308 "(SELECT * FROM table2 AS t2alias " 1309 "WHERE t1alias.col1 = t2alias.col2)", 1310 ) 1311 1312 def test_table_to_alias_17(self): 1313 t1alias = t1.alias("t1alias") 1314 vis = sql_util.ClauseAdapter(t1alias) 1315 t2alias = t2.alias("t2alias") 1316 vis.chain(sql_util.ClauseAdapter(t2alias)) 1317 self.assert_compile( 1318 t2alias.select().where( 1319 t2alias.c.col2 1320 == vis.traverse( 1321 select( 1322 ["*"], t1.c.col1 == t2.c.col2, from_obj=[t1, t2] 1323 ).correlate(t2) 1324 ) 1325 ), 1326 "SELECT t2alias.col1, t2alias.col2, t2alias.col3 " 1327 "FROM table2 AS t2alias WHERE t2alias.col2 = " 1328 "(SELECT * FROM table1 AS t1alias WHERE " 1329 "t1alias.col1 = t2alias.col2)", 1330 ) 1331 1332 def test_include_exclude(self): 1333 m = MetaData() 1334 a = Table( 1335 "a", 1336 m, 1337 Column("id", Integer, primary_key=True), 1338 Column( 1339 "xxx_id", 1340 Integer, 1341 ForeignKey("a.id", name="adf", use_alter=True), 1342 ), 1343 ) 1344 1345 e = a.c.id == a.c.xxx_id 1346 assert str(e) == "a.id = a.xxx_id" 1347 b = a.alias() 1348 1349 e = sql_util.ClauseAdapter( 1350 b, 1351 include_fn=lambda x: x in set([a.c.id]), 1352 equivalents={a.c.id: set([a.c.id])}, 1353 ).traverse(e) 1354 1355 assert str(e) == "a_1.id = a.xxx_id" 1356 1357 def test_recursive_equivalents(self): 1358 m = MetaData() 1359 a = Table("a", m, Column("x", Integer), Column("y", Integer)) 1360 b = Table("b", m, Column("x", Integer), Column("y", Integer)) 1361 c = Table("c", m, Column("x", Integer), Column("y", Integer)) 1362 1363 # force a recursion overflow, by linking a.c.x<->c.c.x, and 1364 # asking for a nonexistent col. corresponding_column should prevent 1365 # endless depth. 1366 adapt = sql_util.ClauseAdapter( 1367 b, equivalents={a.c.x: set([c.c.x]), c.c.x: set([a.c.x])} 1368 ) 1369 assert adapt._corresponding_column(a.c.x, False) is None 1370 1371 def test_multilevel_equivalents(self): 1372 m = MetaData() 1373 a = Table("a", m, Column("x", Integer), Column("y", Integer)) 1374 b = Table("b", m, Column("x", Integer), Column("y", Integer)) 1375 c = Table("c", m, Column("x", Integer), Column("y", Integer)) 1376 1377 alias = select([a]).select_from(a.join(b, a.c.x == b.c.x)).alias() 1378 1379 # two levels of indirection from c.x->b.x->a.x, requires recursive 1380 # corresponding_column call 1381 adapt = sql_util.ClauseAdapter( 1382 alias, equivalents={b.c.x: set([a.c.x]), c.c.x: set([b.c.x])} 1383 ) 1384 assert adapt._corresponding_column(a.c.x, False) is alias.c.x 1385 assert adapt._corresponding_column(c.c.x, False) is alias.c.x 1386 1387 def test_join_to_alias(self): 1388 metadata = MetaData() 1389 a = Table("a", metadata, Column("id", Integer, primary_key=True)) 1390 b = Table( 1391 "b", 1392 metadata, 1393 Column("id", Integer, primary_key=True), 1394 Column("aid", Integer, ForeignKey("a.id")), 1395 ) 1396 c = Table( 1397 "c", 1398 metadata, 1399 Column("id", Integer, primary_key=True), 1400 Column("bid", Integer, ForeignKey("b.id")), 1401 ) 1402 1403 d = Table( 1404 "d", 1405 metadata, 1406 Column("id", Integer, primary_key=True), 1407 Column("aid", Integer, ForeignKey("a.id")), 1408 ) 1409 1410 j1 = a.outerjoin(b) 1411 j2 = select([j1], use_labels=True) 1412 1413 j3 = c.join(j2, j2.c.b_id == c.c.bid) 1414 1415 j4 = j3.outerjoin(d) 1416 self.assert_compile( 1417 j4, 1418 "c JOIN (SELECT a.id AS a_id, b.id AS " 1419 "b_id, b.aid AS b_aid FROM a LEFT OUTER " 1420 "JOIN b ON a.id = b.aid) ON b_id = c.bid " 1421 "LEFT OUTER JOIN d ON a_id = d.aid", 1422 ) 1423 j5 = j3.alias("foo") 1424 j6 = sql_util.ClauseAdapter(j5).copy_and_process([j4])[0] 1425 1426 # this statement takes c join(a join b), wraps it inside an 1427 # aliased "select * from c join(a join b) AS foo". the outermost 1428 # right side "left outer join d" stays the same, except "d" 1429 # joins against foo.a_id instead of plain "a_id" 1430 1431 self.assert_compile( 1432 j6, 1433 "(SELECT c.id AS c_id, c.bid AS c_bid, " 1434 "a_id AS a_id, b_id AS b_id, b_aid AS " 1435 "b_aid FROM c JOIN (SELECT a.id AS a_id, " 1436 "b.id AS b_id, b.aid AS b_aid FROM a LEFT " 1437 "OUTER JOIN b ON a.id = b.aid) ON b_id = " 1438 "c.bid) AS foo LEFT OUTER JOIN d ON " 1439 "foo.a_id = d.aid", 1440 ) 1441 1442 def test_derived_from(self): 1443 assert select([t1]).is_derived_from(t1) 1444 assert not select([t2]).is_derived_from(t1) 1445 assert not t1.is_derived_from(select([t1])) 1446 assert t1.alias().is_derived_from(t1) 1447 1448 s1 = select([t1, t2]).alias("foo") 1449 s2 = select([s1]).limit(5).offset(10).alias() 1450 assert s2.is_derived_from(s1) 1451 s2 = s2._clone() 1452 assert s2.is_derived_from(s1) 1453 1454 def test_aliasedselect_to_aliasedselect_straight(self): 1455 1456 # original issue from ticket #904 1457 1458 s1 = select([t1]).alias("foo") 1459 s2 = select([s1]).limit(5).offset(10).alias() 1460 self.assert_compile( 1461 sql_util.ClauseAdapter(s2).traverse(s1), 1462 "SELECT foo.col1, foo.col2, foo.col3 FROM " 1463 "(SELECT table1.col1 AS col1, table1.col2 " 1464 "AS col2, table1.col3 AS col3 FROM table1) " 1465 "AS foo LIMIT :param_1 OFFSET :param_2", 1466 {"param_1": 5, "param_2": 10}, 1467 ) 1468 1469 def test_aliasedselect_to_aliasedselect_join(self): 1470 s1 = select([t1]).alias("foo") 1471 s2 = select([s1]).limit(5).offset(10).alias() 1472 j = s1.outerjoin(t2, s1.c.col1 == t2.c.col1) 1473 self.assert_compile( 1474 sql_util.ClauseAdapter(s2).traverse(j).select(), 1475 "SELECT anon_1.col1, anon_1.col2, " 1476 "anon_1.col3, table2.col1, table2.col2, " 1477 "table2.col3 FROM (SELECT foo.col1 AS " 1478 "col1, foo.col2 AS col2, foo.col3 AS col3 " 1479 "FROM (SELECT table1.col1 AS col1, " 1480 "table1.col2 AS col2, table1.col3 AS col3 " 1481 "FROM table1) AS foo LIMIT :param_1 OFFSET " 1482 ":param_2) AS anon_1 LEFT OUTER JOIN " 1483 "table2 ON anon_1.col1 = table2.col1", 1484 {"param_1": 5, "param_2": 10}, 1485 ) 1486 1487 def test_aliasedselect_to_aliasedselect_join_nested_table(self): 1488 s1 = select([t1]).alias("foo") 1489 s2 = select([s1]).limit(5).offset(10).alias() 1490 talias = t1.alias("bar") 1491 1492 assert not s2.is_derived_from(talias) 1493 j = s1.outerjoin(talias, s1.c.col1 == talias.c.col1) 1494 1495 self.assert_compile( 1496 sql_util.ClauseAdapter(s2).traverse(j).select(), 1497 "SELECT anon_1.col1, anon_1.col2, " 1498 "anon_1.col3, bar.col1, bar.col2, bar.col3 " 1499 "FROM (SELECT foo.col1 AS col1, foo.col2 " 1500 "AS col2, foo.col3 AS col3 FROM (SELECT " 1501 "table1.col1 AS col1, table1.col2 AS col2, " 1502 "table1.col3 AS col3 FROM table1) AS foo " 1503 "LIMIT :param_1 OFFSET :param_2) AS anon_1 " 1504 "LEFT OUTER JOIN table1 AS bar ON " 1505 "anon_1.col1 = bar.col1", 1506 {"param_1": 5, "param_2": 10}, 1507 ) 1508 1509 def test_functions(self): 1510 self.assert_compile( 1511 sql_util.ClauseAdapter(t1.alias()).traverse(func.count(t1.c.col1)), 1512 "count(table1_1.col1)", 1513 ) 1514 s = select([func.count(t1.c.col1)]) 1515 self.assert_compile( 1516 sql_util.ClauseAdapter(t1.alias()).traverse(s), 1517 "SELECT count(table1_1.col1) AS count_1 " 1518 "FROM table1 AS table1_1", 1519 ) 1520 1521 def test_recursive(self): 1522 metadata = MetaData() 1523 a = Table("a", metadata, Column("id", Integer, primary_key=True)) 1524 b = Table( 1525 "b", 1526 metadata, 1527 Column("id", Integer, primary_key=True), 1528 Column("aid", Integer, ForeignKey("a.id")), 1529 ) 1530 c = Table( 1531 "c", 1532 metadata, 1533 Column("id", Integer, primary_key=True), 1534 Column("bid", Integer, ForeignKey("b.id")), 1535 ) 1536 1537 d = Table( 1538 "d", 1539 metadata, 1540 Column("id", Integer, primary_key=True), 1541 Column("aid", Integer, ForeignKey("a.id")), 1542 ) 1543 1544 u = union( 1545 a.join(b).select().apply_labels(), 1546 a.join(d).select().apply_labels(), 1547 ).alias() 1548 1549 self.assert_compile( 1550 sql_util.ClauseAdapter(u).traverse( 1551 select([c.c.bid]).where(c.c.bid == u.c.b_aid) 1552 ), 1553 "SELECT c.bid " 1554 "FROM c, (SELECT a.id AS a_id, b.id AS b_id, b.aid AS b_aid " 1555 "FROM a JOIN b ON a.id = b.aid UNION SELECT a.id AS a_id, d.id " 1556 "AS d_id, d.aid AS d_aid " 1557 "FROM a JOIN d ON a.id = d.aid) AS anon_1 " 1558 "WHERE c.bid = anon_1.b_aid", 1559 ) 1560 1561 t1 = table("table1", column("col1"), column("col2"), column("col3")) 1562 t2 = table("table2", column("col1"), column("col2"), column("col3")) 1563 1564 def test_label_anonymize_one(self): 1565 t1a = t1.alias() 1566 adapter = sql_util.ClauseAdapter(t1a, anonymize_labels=True) 1567 1568 expr = select([t1.c.col2]).where(t1.c.col3 == 5).label("expr") 1569 expr_adapted = adapter.traverse(expr) 1570 1571 stmt = select([expr, expr_adapted]).order_by(expr, expr_adapted) 1572 self.assert_compile( 1573 stmt, 1574 "SELECT " 1575 "(SELECT table1.col2 FROM table1 WHERE table1.col3 = :col3_1) " 1576 "AS expr, " 1577 "(SELECT table1_1.col2 FROM table1 AS table1_1 " 1578 "WHERE table1_1.col3 = :col3_2) AS anon_1 " 1579 "ORDER BY expr, anon_1", 1580 ) 1581 1582 def test_label_anonymize_two(self): 1583 t1a = t1.alias() 1584 adapter = sql_util.ClauseAdapter(t1a, anonymize_labels=True) 1585 1586 expr = select([t1.c.col2]).where(t1.c.col3 == 5).label(None) 1587 expr_adapted = adapter.traverse(expr) 1588 1589 stmt = select([expr, expr_adapted]).order_by(expr, expr_adapted) 1590 self.assert_compile( 1591 stmt, 1592 "SELECT " 1593 "(SELECT table1.col2 FROM table1 WHERE table1.col3 = :col3_1) " 1594 "AS anon_1, " 1595 "(SELECT table1_1.col2 FROM table1 AS table1_1 " 1596 "WHERE table1_1.col3 = :col3_2) AS anon_2 " 1597 "ORDER BY anon_1, anon_2", 1598 ) 1599 1600 def test_label_anonymize_three(self): 1601 t1a = t1.alias() 1602 adapter = sql_util.ColumnAdapter( 1603 t1a, anonymize_labels=True, allow_label_resolve=False 1604 ) 1605 1606 expr = select([t1.c.col2]).where(t1.c.col3 == 5).label(None) 1607 l1 = expr 1608 is_(l1._order_by_label_element, l1) 1609 eq_(l1._allow_label_resolve, True) 1610 1611 expr_adapted = adapter.traverse(expr) 1612 l2 = expr_adapted 1613 is_(l2._order_by_label_element, l2) 1614 eq_(l2._allow_label_resolve, False) 1615 1616 l3 = adapter.traverse(expr) 1617 is_(l3._order_by_label_element, l3) 1618 eq_(l3._allow_label_resolve, False) 1619 1620 1621class SpliceJoinsTest(fixtures.TestBase, AssertsCompiledSQL): 1622 __dialect__ = "default" 1623 1624 @classmethod 1625 def setup_class(cls): 1626 global table1, table2, table3, table4 1627 1628 def _table(name): 1629 return table(name, column("col1"), column("col2"), column("col3")) 1630 1631 table1, table2, table3, table4 = [ 1632 _table(name) for name in ("table1", "table2", "table3", "table4") 1633 ] 1634 1635 def test_splice(self): 1636 t1, t2, t3, t4 = table1, table2, table1.alias(), table2.alias() 1637 j = ( 1638 t1.join(t2, t1.c.col1 == t2.c.col1) 1639 .join(t3, t2.c.col1 == t3.c.col1) 1640 .join(t4, t4.c.col1 == t1.c.col1) 1641 ) 1642 s = select([t1]).where(t1.c.col2 < 5).alias() 1643 self.assert_compile( 1644 sql_util.splice_joins(s, j), 1645 "(SELECT table1.col1 AS col1, table1.col2 " 1646 "AS col2, table1.col3 AS col3 FROM table1 " 1647 "WHERE table1.col2 < :col2_1) AS anon_1 " 1648 "JOIN table2 ON anon_1.col1 = table2.col1 " 1649 "JOIN table1 AS table1_1 ON table2.col1 = " 1650 "table1_1.col1 JOIN table2 AS table2_1 ON " 1651 "table2_1.col1 = anon_1.col1", 1652 ) 1653 1654 def test_stop_on(self): 1655 t1, t2, t3 = table1, table2, table3 1656 j1 = t1.join(t2, t1.c.col1 == t2.c.col1) 1657 j2 = j1.join(t3, t2.c.col1 == t3.c.col1) 1658 s = select([t1]).select_from(j1).alias() 1659 self.assert_compile( 1660 sql_util.splice_joins(s, j2), 1661 "(SELECT table1.col1 AS col1, table1.col2 " 1662 "AS col2, table1.col3 AS col3 FROM table1 " 1663 "JOIN table2 ON table1.col1 = table2.col1) " 1664 "AS anon_1 JOIN table2 ON anon_1.col1 = " 1665 "table2.col1 JOIN table3 ON table2.col1 = " 1666 "table3.col1", 1667 ) 1668 self.assert_compile( 1669 sql_util.splice_joins(s, j2, j1), 1670 "(SELECT table1.col1 AS col1, table1.col2 " 1671 "AS col2, table1.col3 AS col3 FROM table1 " 1672 "JOIN table2 ON table1.col1 = table2.col1) " 1673 "AS anon_1 JOIN table3 ON table2.col1 = " 1674 "table3.col1", 1675 ) 1676 1677 def test_splice_2(self): 1678 t2a = table2.alias() 1679 t3a = table3.alias() 1680 j1 = table1.join(t2a, table1.c.col1 == t2a.c.col1).join( 1681 t3a, t2a.c.col2 == t3a.c.col2 1682 ) 1683 t2b = table4.alias() 1684 j2 = table1.join(t2b, table1.c.col3 == t2b.c.col3) 1685 self.assert_compile( 1686 sql_util.splice_joins(table1, j1), 1687 "table1 JOIN table2 AS table2_1 ON " 1688 "table1.col1 = table2_1.col1 JOIN table3 " 1689 "AS table3_1 ON table2_1.col2 = " 1690 "table3_1.col2", 1691 ) 1692 self.assert_compile( 1693 sql_util.splice_joins(table1, j2), 1694 "table1 JOIN table4 AS table4_1 ON " "table1.col3 = table4_1.col3", 1695 ) 1696 self.assert_compile( 1697 sql_util.splice_joins(sql_util.splice_joins(table1, j1), j2), 1698 "table1 JOIN table2 AS table2_1 ON " 1699 "table1.col1 = table2_1.col1 JOIN table3 " 1700 "AS table3_1 ON table2_1.col2 = " 1701 "table3_1.col2 JOIN table4 AS table4_1 ON " 1702 "table1.col3 = table4_1.col3", 1703 ) 1704 1705 1706class SelectTest(fixtures.TestBase, AssertsCompiledSQL): 1707 1708 """tests the generative capability of Select""" 1709 1710 __dialect__ = "default" 1711 1712 @classmethod 1713 def setup_class(cls): 1714 global t1, t2 1715 t1 = table("table1", column("col1"), column("col2"), column("col3")) 1716 t2 = table("table2", column("col1"), column("col2"), column("col3")) 1717 1718 def test_columns(self): 1719 s = t1.select() 1720 self.assert_compile( 1721 s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1" 1722 ) 1723 select_copy = s.column(column("yyy")) 1724 self.assert_compile( 1725 select_copy, 1726 "SELECT table1.col1, table1.col2, " "table1.col3, yyy FROM table1", 1727 ) 1728 assert s.columns is not select_copy.columns 1729 assert s._columns is not select_copy._columns 1730 assert s._raw_columns is not select_copy._raw_columns 1731 self.assert_compile( 1732 s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1" 1733 ) 1734 1735 def test_froms(self): 1736 s = t1.select() 1737 self.assert_compile( 1738 s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1" 1739 ) 1740 select_copy = s.select_from(t2) 1741 self.assert_compile( 1742 select_copy, 1743 "SELECT table1.col1, table1.col2, " 1744 "table1.col3 FROM table1, table2", 1745 ) 1746 assert s._froms is not select_copy._froms 1747 self.assert_compile( 1748 s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1" 1749 ) 1750 1751 def test_prefixes(self): 1752 s = t1.select() 1753 self.assert_compile( 1754 s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1" 1755 ) 1756 select_copy = s.prefix_with("FOOBER") 1757 self.assert_compile( 1758 select_copy, 1759 "SELECT FOOBER table1.col1, table1.col2, " 1760 "table1.col3 FROM table1", 1761 ) 1762 self.assert_compile( 1763 s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1" 1764 ) 1765 1766 def test_execution_options(self): 1767 s = select().execution_options(foo="bar") 1768 s2 = s.execution_options(bar="baz") 1769 s3 = s.execution_options(foo="not bar") 1770 # The original select should not be modified. 1771 assert s._execution_options == dict(foo="bar") 1772 # s2 should have its execution_options based on s, though. 1773 assert s2._execution_options == dict(foo="bar", bar="baz") 1774 assert s3._execution_options == dict(foo="not bar") 1775 1776 def test_invalid_options(self): 1777 assert_raises( 1778 exc.ArgumentError, select().execution_options, compiled_cache={} 1779 ) 1780 1781 assert_raises( 1782 exc.ArgumentError, 1783 select().execution_options, 1784 isolation_level="READ_COMMITTED", 1785 ) 1786 1787 # this feature not available yet 1788 def _NOTYET_test_execution_options_in_kwargs(self): 1789 s = select(execution_options=dict(foo="bar")) 1790 s2 = s.execution_options(bar="baz") 1791 # The original select should not be modified. 1792 assert s._execution_options == dict(foo="bar") 1793 # s2 should have its execution_options based on s, though. 1794 assert s2._execution_options == dict(foo="bar", bar="baz") 1795 1796 # this feature not available yet 1797 def _NOTYET_test_execution_options_in_text(self): 1798 s = text("select 42", execution_options=dict(foo="bar")) 1799 assert s._execution_options == dict(foo="bar") 1800 1801 1802class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL): 1803 1804 """Tests the generative capability of Insert, Update""" 1805 1806 __dialect__ = "default" 1807 1808 # fixme: consolidate converage from elsewhere here and expand 1809 1810 @classmethod 1811 def setup_class(cls): 1812 global t1, t2 1813 t1 = table("table1", column("col1"), column("col2"), column("col3")) 1814 t2 = table("table2", column("col1"), column("col2"), column("col3")) 1815 1816 def test_prefixes(self): 1817 i = t1.insert() 1818 self.assert_compile( 1819 i, 1820 "INSERT INTO table1 (col1, col2, col3) " 1821 "VALUES (:col1, :col2, :col3)", 1822 ) 1823 1824 gen = i.prefix_with("foober") 1825 self.assert_compile( 1826 gen, 1827 "INSERT foober INTO table1 (col1, col2, col3) " 1828 "VALUES (:col1, :col2, :col3)", 1829 ) 1830 1831 self.assert_compile( 1832 i, 1833 "INSERT INTO table1 (col1, col2, col3) " 1834 "VALUES (:col1, :col2, :col3)", 1835 ) 1836 1837 i2 = t1.insert(prefixes=["squiznart"]) 1838 self.assert_compile( 1839 i2, 1840 "INSERT squiznart INTO table1 (col1, col2, col3) " 1841 "VALUES (:col1, :col2, :col3)", 1842 ) 1843 1844 gen2 = i2.prefix_with("quux") 1845 self.assert_compile( 1846 gen2, 1847 "INSERT squiznart quux INTO " 1848 "table1 (col1, col2, col3) " 1849 "VALUES (:col1, :col2, :col3)", 1850 ) 1851 1852 def test_add_kwarg(self): 1853 i = t1.insert() 1854 eq_(i.parameters, None) 1855 i = i.values(col1=5) 1856 eq_(i.parameters, {"col1": 5}) 1857 i = i.values(col2=7) 1858 eq_(i.parameters, {"col1": 5, "col2": 7}) 1859 1860 def test_via_tuple_single(self): 1861 i = t1.insert() 1862 eq_(i.parameters, None) 1863 i = i.values((5, 6, 7)) 1864 eq_(i.parameters, {"col1": 5, "col2": 6, "col3": 7}) 1865 1866 def test_kw_and_dict_simultaneously_single(self): 1867 i = t1.insert() 1868 i = i.values({"col1": 5}, col2=7) 1869 eq_(i.parameters, {"col1": 5, "col2": 7}) 1870 1871 def test_via_tuple_multi(self): 1872 i = t1.insert() 1873 eq_(i.parameters, None) 1874 i = i.values([(5, 6, 7), (8, 9, 10)]) 1875 eq_( 1876 i.parameters, 1877 [ 1878 {"col1": 5, "col2": 6, "col3": 7}, 1879 {"col1": 8, "col2": 9, "col3": 10}, 1880 ], 1881 ) 1882 1883 def test_inline_values_single(self): 1884 i = t1.insert(values={"col1": 5}) 1885 eq_(i.parameters, {"col1": 5}) 1886 is_(i._has_multi_parameters, False) 1887 1888 def test_inline_values_multi(self): 1889 i = t1.insert(values=[{"col1": 5}, {"col1": 6}]) 1890 eq_(i.parameters, [{"col1": 5}, {"col1": 6}]) 1891 is_(i._has_multi_parameters, True) 1892 1893 def test_add_dictionary(self): 1894 i = t1.insert() 1895 eq_(i.parameters, None) 1896 i = i.values({"col1": 5}) 1897 eq_(i.parameters, {"col1": 5}) 1898 is_(i._has_multi_parameters, False) 1899 1900 i = i.values({"col1": 6}) 1901 # note replaces 1902 eq_(i.parameters, {"col1": 6}) 1903 is_(i._has_multi_parameters, False) 1904 1905 i = i.values({"col2": 7}) 1906 eq_(i.parameters, {"col1": 6, "col2": 7}) 1907 is_(i._has_multi_parameters, False) 1908 1909 def test_add_kwarg_disallowed_multi(self): 1910 i = t1.insert() 1911 i = i.values([{"col1": 5}, {"col1": 7}]) 1912 assert_raises_message( 1913 exc.InvalidRequestError, 1914 "This construct already has multiple parameter sets.", 1915 i.values, 1916 col2=7, 1917 ) 1918 1919 def test_cant_mix_single_multi_formats_dict_to_list(self): 1920 i = t1.insert().values(col1=5) 1921 assert_raises_message( 1922 exc.ArgumentError, 1923 "Can't mix single-values and multiple values " 1924 "formats in one statement", 1925 i.values, 1926 [{"col1": 6}], 1927 ) 1928 1929 def test_cant_mix_single_multi_formats_list_to_dict(self): 1930 i = t1.insert().values([{"col1": 6}]) 1931 assert_raises_message( 1932 exc.ArgumentError, 1933 "Can't mix single-values and multiple values " 1934 "formats in one statement", 1935 i.values, 1936 {"col1": 5}, 1937 ) 1938 1939 def test_erroneous_multi_args_dicts(self): 1940 i = t1.insert() 1941 assert_raises_message( 1942 exc.ArgumentError, 1943 "Only a single dictionary/tuple or list of " 1944 "dictionaries/tuples is accepted positionally.", 1945 i.values, 1946 {"col1": 5}, 1947 {"col1": 7}, 1948 ) 1949 1950 def test_erroneous_multi_args_tuples(self): 1951 i = t1.insert() 1952 assert_raises_message( 1953 exc.ArgumentError, 1954 "Only a single dictionary/tuple or list of " 1955 "dictionaries/tuples is accepted positionally.", 1956 i.values, 1957 (5, 6, 7), 1958 (8, 9, 10), 1959 ) 1960 1961 def test_erroneous_multi_args_plus_kw(self): 1962 i = t1.insert() 1963 assert_raises_message( 1964 exc.ArgumentError, 1965 "Can't pass kwargs and multiple parameter sets simultaneously", 1966 i.values, 1967 [{"col1": 5}], 1968 col2=7, 1969 ) 1970 1971 def test_update_no_support_multi_values(self): 1972 u = t1.update() 1973 assert_raises_message( 1974 exc.InvalidRequestError, 1975 "This construct does not support multiple parameter sets.", 1976 u.values, 1977 [{"col1": 5}, {"col1": 7}], 1978 ) 1979 1980 def test_update_no_support_multi_constructor(self): 1981 assert_raises_message( 1982 exc.InvalidRequestError, 1983 "This construct does not support multiple parameter sets.", 1984 t1.update, 1985 values=[{"col1": 5}, {"col1": 7}], 1986 ) 1987