1from sqlalchemy import and_
2from sqlalchemy import bindparam
3from sqlalchemy import case
4from sqlalchemy import Column
5from sqlalchemy import exc
6from sqlalchemy import extract
7from sqlalchemy import ForeignKey
8from sqlalchemy import func
9from sqlalchemy import Integer
10from sqlalchemy import literal_column
11from sqlalchemy import MetaData
12from sqlalchemy import select
13from sqlalchemy import String
14from sqlalchemy import Table
15from sqlalchemy import testing
16from sqlalchemy import text
17from sqlalchemy import tuple_
18from sqlalchemy import union
19from sqlalchemy.sql import ClauseElement
20from sqlalchemy.sql import column
21from sqlalchemy.sql import operators
22from sqlalchemy.sql import table
23from sqlalchemy.sql import util as sql_util
24from sqlalchemy.sql import visitors
25from sqlalchemy.sql.expression import _clone
26from sqlalchemy.sql.expression import _from_objects
27from sqlalchemy.sql.visitors import ClauseVisitor
28from sqlalchemy.sql.visitors import cloned_traverse
29from sqlalchemy.sql.visitors import CloningVisitor
30from sqlalchemy.sql.visitors import ReplacingCloningVisitor
31from sqlalchemy.testing import assert_raises
32from sqlalchemy.testing import assert_raises_message
33from sqlalchemy.testing import AssertsCompiledSQL
34from sqlalchemy.testing import AssertsExecutionResults
35from sqlalchemy.testing import eq_
36from sqlalchemy.testing import fixtures
37from sqlalchemy.testing import is_
38from sqlalchemy.testing import is_not_
39
40A = B = t1 = t2 = t3 = table1 = table2 = table3 = table4 = None
41
42
43class TraversalTest(fixtures.TestBase, AssertsExecutionResults):
44
45    """test ClauseVisitor's traversal, particularly its
46    ability to copy and modify a ClauseElement in place."""
47
48    @classmethod
49    def setup_class(cls):
50        global A, B
51
52        # establish two fictitious ClauseElements.
53        # define deep equality semantics as well as deep
54        # identity semantics.
55        class A(ClauseElement):
56            __visit_name__ = "a"
57
58            def __init__(self, expr):
59                self.expr = expr
60
61            def is_other(self, other):
62                return other is self
63
64            __hash__ = ClauseElement.__hash__
65
66            def __eq__(self, other):
67                return other.expr == self.expr
68
69            def __ne__(self, other):
70                return other.expr != self.expr
71
72            def __str__(self):
73                return "A(%s)" % repr(self.expr)
74
75        class B(ClauseElement):
76            __visit_name__ = "b"
77
78            def __init__(self, *items):
79                self.items = items
80
81            def is_other(self, other):
82                if other is not self:
83                    return False
84                for i1, i2 in zip(self.items, other.items):
85                    if i1 is not i2:
86                        return False
87                return True
88
89            __hash__ = ClauseElement.__hash__
90
91            def __eq__(self, other):
92                for i1, i2 in zip(self.items, other.items):
93                    if i1 != i2:
94                        return False
95                return True
96
97            def __ne__(self, other):
98                for i1, i2 in zip(self.items, other.items):
99                    if i1 != i2:
100                        return True
101                return False
102
103            def _copy_internals(self, clone=_clone):
104                self.items = [clone(i) for i in self.items]
105
106            def get_children(self, **kwargs):
107                return self.items
108
109            def __str__(self):
110                return "B(%s)" % repr([str(i) for i in self.items])
111
112    def test_test_classes(self):
113        a1 = A("expr1")
114        struct = B(a1, A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))
115        struct2 = B(a1, A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))
116        struct3 = B(
117            a1, A("expr2"), B(A("expr1b"), A("expr2bmodified")), A("expr3")
118        )
119
120        assert a1.is_other(a1)
121        assert struct.is_other(struct)
122        assert struct == struct2
123        assert struct != struct3
124        assert not struct.is_other(struct2)
125        assert not struct.is_other(struct3)
126
127    def test_clone(self):
128        struct = B(
129            A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")
130        )
131
132        class Vis(CloningVisitor):
133            def visit_a(self, a):
134                pass
135
136            def visit_b(self, b):
137                pass
138
139        vis = Vis()
140        s2 = vis.traverse(struct)
141        assert struct == s2
142        assert not struct.is_other(s2)
143
144    def test_no_clone(self):
145        struct = B(
146            A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")
147        )
148
149        class Vis(ClauseVisitor):
150            def visit_a(self, a):
151                pass
152
153            def visit_b(self, b):
154                pass
155
156        vis = Vis()
157        s2 = vis.traverse(struct)
158        assert struct == s2
159        assert struct.is_other(s2)
160
161    def test_clone_anon_label(self):
162        from sqlalchemy.sql.elements import Grouping
163
164        c1 = Grouping(literal_column("q"))
165        s1 = select([c1])
166
167        class Vis(CloningVisitor):
168            def visit_grouping(self, elem):
169                pass
170
171        vis = Vis()
172        s2 = vis.traverse(s1)
173        eq_(list(s2.inner_columns)[0].anon_label, c1.anon_label)
174
175    def test_change_in_place(self):
176        struct = B(
177            A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")
178        )
179        struct2 = B(
180            A("expr1"),
181            A("expr2modified"),
182            B(A("expr1b"), A("expr2b")),
183            A("expr3"),
184        )
185        struct3 = B(
186            A("expr1"),
187            A("expr2"),
188            B(A("expr1b"), A("expr2bmodified")),
189            A("expr3"),
190        )
191
192        class Vis(CloningVisitor):
193            def visit_a(self, a):
194                if a.expr == "expr2":
195                    a.expr = "expr2modified"
196
197            def visit_b(self, b):
198                pass
199
200        vis = Vis()
201        s2 = vis.traverse(struct)
202        assert struct != s2
203        assert not struct.is_other(s2)
204        assert struct2 == s2
205
206        class Vis2(CloningVisitor):
207            def visit_a(self, a):
208                if a.expr == "expr2b":
209                    a.expr = "expr2bmodified"
210
211            def visit_b(self, b):
212                pass
213
214        vis2 = Vis2()
215        s3 = vis2.traverse(struct)
216        assert struct != s3
217        assert struct3 == s3
218
219    def test_visit_name(self):
220        # override fns in testlib/schema.py
221        from sqlalchemy import Column
222
223        class CustomObj(Column):
224            pass
225
226        assert CustomObj.__visit_name__ == Column.__visit_name__ == "column"
227
228        foo, bar = CustomObj("foo", String), CustomObj("bar", String)
229        bin_ = foo == bar
230        set(ClauseVisitor().iterate(bin_))
231        assert set(ClauseVisitor().iterate(bin_)) == set([foo, bar, bin_])
232
233
234class BinaryEndpointTraversalTest(fixtures.TestBase):
235
236    """test the special binary product visit"""
237
238    def _assert_traversal(self, expr, expected):
239        canary = []
240
241        def visit(binary, l, r):
242            canary.append((binary.operator, l, r))
243            print(binary.operator, l, r)
244
245        sql_util.visit_binary_product(visit, expr)
246        eq_(canary, expected)
247
248    def test_basic(self):
249        a, b = column("a"), column("b")
250        self._assert_traversal(a == b, [(operators.eq, a, b)])
251
252    def test_with_tuples(self):
253        a, b, c, d, b1, b1a, b1b, e, f = (
254            column("a"),
255            column("b"),
256            column("c"),
257            column("d"),
258            column("b1"),
259            column("b1a"),
260            column("b1b"),
261            column("e"),
262            column("f"),
263        )
264        expr = tuple_(a, b, b1 == tuple_(b1a, b1b == d), c) > tuple_(
265            func.go(e + f)
266        )
267        self._assert_traversal(
268            expr,
269            [
270                (operators.gt, a, e),
271                (operators.gt, a, f),
272                (operators.gt, b, e),
273                (operators.gt, b, f),
274                (operators.eq, b1, b1a),
275                (operators.eq, b1b, d),
276                (operators.gt, c, e),
277                (operators.gt, c, f),
278            ],
279        )
280
281    def test_composed(self):
282        a, b, e, f, q, j, r = (
283            column("a"),
284            column("b"),
285            column("e"),
286            column("f"),
287            column("q"),
288            column("j"),
289            column("r"),
290        )
291        expr = and_((a + b) == q + func.sum(e + f), and_(j == r, f == q))
292        self._assert_traversal(
293            expr,
294            [
295                (operators.eq, a, q),
296                (operators.eq, a, e),
297                (operators.eq, a, f),
298                (operators.eq, b, q),
299                (operators.eq, b, e),
300                (operators.eq, b, f),
301                (operators.eq, j, r),
302                (operators.eq, f, q),
303            ],
304        )
305
306    def test_subquery(self):
307        a, b, c = column("a"), column("b"), column("c")
308        subq = select([c]).where(c == a).as_scalar()
309        expr = and_(a == b, b == subq)
310        self._assert_traversal(
311            expr, [(operators.eq, a, b), (operators.eq, b, subq)]
312        )
313
314
315class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):
316
317    """test copy-in-place behavior of various ClauseElements."""
318
319    __dialect__ = "default"
320
321    @classmethod
322    def setup_class(cls):
323        global t1, t2, t3
324        t1 = table("table1", column("col1"), column("col2"), column("col3"))
325        t2 = table("table2", column("col1"), column("col2"), column("col3"))
326        t3 = Table(
327            "table3",
328            MetaData(),
329            Column("col1", Integer),
330            Column("col2", Integer),
331        )
332
333    def test_binary(self):
334        clause = t1.c.col2 == t2.c.col2
335        eq_(str(clause), str(CloningVisitor().traverse(clause)))
336
337    def test_binary_anon_label_quirk(self):
338        t = table("t1", column("col1"))
339
340        f = t.c.col1 * 5
341        self.assert_compile(
342            select([f]), "SELECT t1.col1 * :col1_1 AS anon_1 FROM t1"
343        )
344
345        f.anon_label
346
347        a = t.alias()
348        f = sql_util.ClauseAdapter(a).traverse(f)
349
350        self.assert_compile(
351            select([f]), "SELECT t1_1.col1 * :col1_1 AS anon_1 FROM t1 AS t1_1"
352        )
353
354    def test_join(self):
355        clause = t1.join(t2, t1.c.col2 == t2.c.col2)
356        c1 = str(clause)
357        assert str(clause) == str(CloningVisitor().traverse(clause))
358
359        class Vis(CloningVisitor):
360            def visit_binary(self, binary):
361                binary.right = t2.c.col3
362
363        clause2 = Vis().traverse(clause)
364        assert c1 == str(clause)
365        assert str(clause2) == str(t1.join(t2, t1.c.col2 == t2.c.col3))
366
367    def test_aliased_column_adapt(self):
368        clause = t1.select()
369
370        aliased = t1.select().alias()
371        aliased2 = t1.alias()
372
373        adapter = sql_util.ColumnAdapter(aliased)
374
375        f = select([adapter.columns[c] for c in aliased2.c]).select_from(
376            aliased
377        )
378
379        s = select([aliased2]).select_from(aliased)
380        eq_(str(s), str(f))
381
382        f = select([adapter.columns[func.count(aliased2.c.col1)]]).select_from(
383            aliased
384        )
385        eq_(
386            str(select([func.count(aliased2.c.col1)]).select_from(aliased)),
387            str(f),
388        )
389
390    def test_aliased_cloned_column_adapt_inner(self):
391        clause = select([t1.c.col1, func.foo(t1.c.col2).label("foo")])
392
393        aliased1 = select([clause.c.col1, clause.c.foo])
394        aliased2 = clause
395        aliased2.c.col1, aliased2.c.foo
396        aliased3 = cloned_traverse(aliased2, {}, {})
397
398        # fixed by [ticket:2419].   the inside columns
399        # on aliased3 have _is_clone_of pointers to those of
400        # aliased2.  corresponding_column checks these
401        # now.
402        adapter = sql_util.ColumnAdapter(aliased1)
403        f1 = select([adapter.columns[c] for c in aliased2._raw_columns])
404        f2 = select([adapter.columns[c] for c in aliased3._raw_columns])
405        eq_(str(f1), str(f2))
406
407    def test_aliased_cloned_column_adapt_exported(self):
408        clause = select([t1.c.col1, func.foo(t1.c.col2).label("foo")])
409
410        aliased1 = select([clause.c.col1, clause.c.foo])
411        aliased2 = clause
412        aliased2.c.col1, aliased2.c.foo
413        aliased3 = cloned_traverse(aliased2, {}, {})
414
415        # also fixed by [ticket:2419].  When we look at the
416        # *outside* columns of aliased3, they previously did not
417        # have an _is_clone_of pointer.   But we now modified _make_proxy
418        # to assign this.
419        adapter = sql_util.ColumnAdapter(aliased1)
420        f1 = select([adapter.columns[c] for c in aliased2.c])
421        f2 = select([adapter.columns[c] for c in aliased3.c])
422        eq_(str(f1), str(f2))
423
424    def test_aliased_cloned_schema_column_adapt_exported(self):
425        clause = select([t3.c.col1, func.foo(t3.c.col2).label("foo")])
426
427        aliased1 = select([clause.c.col1, clause.c.foo])
428        aliased2 = clause
429        aliased2.c.col1, aliased2.c.foo
430        aliased3 = cloned_traverse(aliased2, {}, {})
431
432        # also fixed by [ticket:2419].  When we look at the
433        # *outside* columns of aliased3, they previously did not
434        # have an _is_clone_of pointer.   But we now modified _make_proxy
435        # to assign this.
436        adapter = sql_util.ColumnAdapter(aliased1)
437        f1 = select([adapter.columns[c] for c in aliased2.c])
438        f2 = select([adapter.columns[c] for c in aliased3.c])
439        eq_(str(f1), str(f2))
440
441    def test_labeled_expression_adapt(self):
442        lbl_x = (t3.c.col1 == 1).label("x")
443        t3_alias = t3.alias()
444
445        adapter = sql_util.ColumnAdapter(t3_alias)
446
447        lblx_adapted = adapter.traverse(lbl_x)
448        is_not_(lblx_adapted._element, lbl_x._element)
449
450        lblx_adapted = adapter.traverse(lbl_x)
451        self.assert_compile(
452            select([lblx_adapted.self_group()]),
453            "SELECT (table3_1.col1 = :col1_1) AS x FROM table3 AS table3_1",
454        )
455
456        self.assert_compile(
457            select([lblx_adapted.is_(True)]),
458            "SELECT (table3_1.col1 = :col1_1) IS 1 AS anon_1 "
459            "FROM table3 AS table3_1",
460        )
461
462    def test_cte_w_union(self):
463        t = select([func.values(1).label("n")]).cte("t", recursive=True)
464        t = t.union_all(select([t.c.n + 1]).where(t.c.n < 100))
465        s = select([func.sum(t.c.n)])
466
467        from sqlalchemy.sql.visitors import cloned_traverse
468
469        cloned = cloned_traverse(s, {}, {})
470
471        self.assert_compile(
472            cloned,
473            "WITH RECURSIVE t(n) AS "
474            "(SELECT values(:values_1) AS n "
475            "UNION ALL SELECT t.n + :n_1 AS anon_1 "
476            "FROM t "
477            "WHERE t.n < :n_2) "
478            "SELECT sum(t.n) AS sum_1 FROM t",
479        )
480
481    def test_aliased_cte_w_union(self):
482        t = (
483            select([func.values(1).label("n")])
484            .cte("t", recursive=True)
485            .alias("foo")
486        )
487        t = t.union_all(select([t.c.n + 1]).where(t.c.n < 100))
488        s = select([func.sum(t.c.n)])
489
490        from sqlalchemy.sql.visitors import cloned_traverse
491
492        cloned = cloned_traverse(s, {}, {})
493
494        self.assert_compile(
495            cloned,
496            "WITH RECURSIVE foo(n) AS (SELECT values(:values_1) AS n "
497            "UNION ALL SELECT foo.n + :n_1 AS anon_1 FROM foo "
498            "WHERE foo.n < :n_2) SELECT sum(foo.n) AS sum_1 FROM foo",
499        )
500
501    def test_text(self):
502        clause = text(
503            "select * from table where foo=:bar", bindparams=[bindparam("bar")]
504        )
505        c1 = str(clause)
506
507        class Vis(CloningVisitor):
508            def visit_textclause(self, text):
509                text.text = text.text + " SOME MODIFIER=:lala"
510                text._bindparams["lala"] = bindparam("lala")
511
512        clause2 = Vis().traverse(clause)
513        assert c1 == str(clause)
514        assert str(clause2) == c1 + " SOME MODIFIER=:lala"
515        assert list(clause._bindparams.keys()) == ["bar"]
516        assert set(clause2._bindparams.keys()) == set(["bar", "lala"])
517
518    def test_select(self):
519        s2 = select([t1])
520        s2_assert = str(s2)
521        s3_assert = str(select([t1], t1.c.col2 == 7))
522
523        class Vis(CloningVisitor):
524            def visit_select(self, select):
525                select.append_whereclause(t1.c.col2 == 7)
526
527        s3 = Vis().traverse(s2)
528        assert str(s3) == s3_assert
529        assert str(s2) == s2_assert
530        print(str(s2))
531        print(str(s3))
532
533        class Vis(ClauseVisitor):
534            def visit_select(self, select):
535                select.append_whereclause(t1.c.col2 == 7)
536
537        Vis().traverse(s2)
538        assert str(s2) == s3_assert
539
540        s4_assert = str(select([t1], and_(t1.c.col2 == 7, t1.c.col3 == 9)))
541
542        class Vis(CloningVisitor):
543            def visit_select(self, select):
544                select.append_whereclause(t1.c.col3 == 9)
545
546        s4 = Vis().traverse(s3)
547        print(str(s3))
548        print(str(s4))
549        assert str(s4) == s4_assert
550        assert str(s3) == s3_assert
551
552        s5_assert = str(select([t1], and_(t1.c.col2 == 7, t1.c.col1 == 9)))
553
554        class Vis(CloningVisitor):
555            def visit_binary(self, binary):
556                if binary.left is t1.c.col3:
557                    binary.left = t1.c.col1
558                    binary.right = bindparam("col1", unique=True)
559
560        s5 = Vis().traverse(s4)
561        print(str(s4))
562        print(str(s5))
563        assert str(s5) == s5_assert
564        assert str(s4) == s4_assert
565
566    def test_union(self):
567        u = union(t1.select(), t2.select())
568        u2 = CloningVisitor().traverse(u)
569        assert str(u) == str(u2)
570        assert [str(c) for c in u2.c] == [str(c) for c in u.c]
571
572        u = union(t1.select(), t2.select())
573        cols = [str(c) for c in u.c]
574        u2 = CloningVisitor().traverse(u)
575        assert str(u) == str(u2)
576        assert [str(c) for c in u2.c] == cols
577
578        s1 = select([t1], t1.c.col1 == bindparam("id_param"))
579        s2 = select([t2])
580        u = union(s1, s2)
581
582        u2 = u.params(id_param=7)
583        u3 = u.params(id_param=10)
584        assert str(u) == str(u2) == str(u3)
585        assert u2.compile().params == {"id_param": 7}
586        assert u3.compile().params == {"id_param": 10}
587
588    def test_in(self):
589        expr = t1.c.col1.in_(["foo", "bar"])
590        expr2 = CloningVisitor().traverse(expr)
591        assert str(expr) == str(expr2)
592
593    def test_over(self):
594        expr = func.row_number().over(order_by=t1.c.col1)
595        expr2 = CloningVisitor().traverse(expr)
596        assert str(expr) == str(expr2)
597
598        assert expr in visitors.iterate(expr, {})
599
600    def test_within_group(self):
601        expr = func.row_number().within_group(t1.c.col1)
602        expr2 = CloningVisitor().traverse(expr)
603        assert str(expr) == str(expr2)
604
605        assert expr in visitors.iterate(expr, {})
606
607    def test_funcfilter(self):
608        expr = func.count(1).filter(t1.c.col1 > 1)
609        expr2 = CloningVisitor().traverse(expr)
610        assert str(expr) == str(expr2)
611
612    def test_adapt_union(self):
613        u = union(
614            t1.select().where(t1.c.col1 == 4),
615            t1.select().where(t1.c.col1 == 5),
616        ).alias()
617
618        assert sql_util.ClauseAdapter(u).traverse(t1) is u
619
620    def test_binds(self):
621        """test that unique bindparams change their name upon clone()
622        to prevent conflicts"""
623
624        s = select([t1], t1.c.col1 == bindparam(None, unique=True)).alias()
625        s2 = CloningVisitor().traverse(s).alias()
626        s3 = select([s], s.c.col2 == s2.c.col2)
627
628        self.assert_compile(
629            s3,
630            "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM "
631            "(SELECT table1.col1 AS col1, table1.col2 AS col2, "
632            "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_1) "
633            "AS anon_1, "
634            "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 "
635            "AS col3 FROM table1 WHERE table1.col1 = :param_2) AS anon_2 "
636            "WHERE anon_1.col2 = anon_2.col2",
637        )
638
639        s = select([t1], t1.c.col1 == 4).alias()
640        s2 = CloningVisitor().traverse(s).alias()
641        s3 = select([s], s.c.col2 == s2.c.col2)
642        self.assert_compile(
643            s3,
644            "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM "
645            "(SELECT table1.col1 AS col1, table1.col2 AS col2, "
646            "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) "
647            "AS anon_1, "
648            "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 "
649            "AS col3 FROM table1 WHERE table1.col1 = :col1_2) AS anon_2 "
650            "WHERE anon_1.col2 = anon_2.col2",
651        )
652
653    def test_extract(self):
654        s = select([extract("foo", t1.c.col1).label("col1")])
655        self.assert_compile(
656            s, "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1"
657        )
658
659        s2 = CloningVisitor().traverse(s).alias()
660        s3 = select([s2.c.col1])
661        self.assert_compile(
662            s, "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1"
663        )
664        self.assert_compile(
665            s3,
666            "SELECT anon_1.col1 FROM (SELECT EXTRACT(foo FROM "
667            "table1.col1) AS col1 FROM table1) AS anon_1",
668        )
669
670    @testing.emits_warning(".*replaced by another column with the same key")
671    def test_alias(self):
672        subq = t2.select().alias("subq")
673        s = select(
674            [t1.c.col1, subq.c.col1],
675            from_obj=[t1, subq, t1.join(subq, t1.c.col1 == subq.c.col2)],
676        )
677        orig = str(s)
678        s2 = CloningVisitor().traverse(s)
679        assert orig == str(s) == str(s2)
680
681        s4 = CloningVisitor().traverse(s2)
682        assert orig == str(s) == str(s2) == str(s4)
683
684        s3 = sql_util.ClauseAdapter(table("foo")).traverse(s)
685        assert orig == str(s) == str(s3)
686
687        s4 = sql_util.ClauseAdapter(table("foo")).traverse(s3)
688        assert orig == str(s) == str(s3) == str(s4)
689
690        subq = subq.alias("subq")
691        s = select(
692            [t1.c.col1, subq.c.col1],
693            from_obj=[t1, subq, t1.join(subq, t1.c.col1 == subq.c.col2)],
694        )
695        s5 = CloningVisitor().traverse(s)
696        assert orig == str(s) == str(s5)
697
698    def test_correlated_select(self):
699        s = select(
700            [literal_column("*")], t1.c.col1 == t2.c.col1, from_obj=[t1, t2]
701        ).correlate(t2)
702
703        class Vis(CloningVisitor):
704            def visit_select(self, select):
705                select.append_whereclause(t1.c.col2 == 7)
706
707        self.assert_compile(
708            select([t2]).where(t2.c.col1 == Vis().traverse(s)),
709            "SELECT table2.col1, table2.col2, table2.col3 "
710            "FROM table2 WHERE table2.col1 = "
711            "(SELECT * FROM table1 WHERE table1.col1 = table2.col1 "
712            "AND table1.col2 = :col2_1)",
713        )
714
715    def test_this_thing(self):
716        s = select([t1]).where(t1.c.col1 == "foo").alias()
717        s2 = select([s.c.col1])
718
719        self.assert_compile(
720            s2,
721            "SELECT anon_1.col1 FROM (SELECT "
722            "table1.col1 AS col1, table1.col2 AS col2, "
723            "table1.col3 AS col3 FROM table1 WHERE "
724            "table1.col1 = :col1_1) AS anon_1",
725        )
726        t1a = t1.alias()
727        s2 = sql_util.ClauseAdapter(t1a).traverse(s2)
728        self.assert_compile(
729            s2,
730            "SELECT anon_1.col1 FROM (SELECT "
731            "table1_1.col1 AS col1, table1_1.col2 AS "
732            "col2, table1_1.col3 AS col3 FROM table1 "
733            "AS table1_1 WHERE table1_1.col1 = "
734            ":col1_1) AS anon_1",
735        )
736
737    def test_select_fromtwice_one(self):
738        t1a = t1.alias()
739
740        s = select([1], t1.c.col1 == t1a.c.col1, from_obj=t1a).correlate(t1a)
741        s = select([t1]).where(t1.c.col1 == s)
742        self.assert_compile(
743            s,
744            "SELECT table1.col1, table1.col2, table1.col3 FROM table1 "
745            "WHERE table1.col1 = "
746            "(SELECT 1 FROM table1, table1 AS table1_1 "
747            "WHERE table1.col1 = table1_1.col1)",
748        )
749        s = CloningVisitor().traverse(s)
750        self.assert_compile(
751            s,
752            "SELECT table1.col1, table1.col2, table1.col3 FROM table1 "
753            "WHERE table1.col1 = "
754            "(SELECT 1 FROM table1, table1 AS table1_1 "
755            "WHERE table1.col1 = table1_1.col1)",
756        )
757
758    def test_select_fromtwice_two(self):
759        s = select([t1]).where(t1.c.col1 == "foo").alias()
760
761        s2 = select([1], t1.c.col1 == s.c.col1, from_obj=s).correlate(t1)
762        s3 = select([t1]).where(t1.c.col1 == s2)
763        self.assert_compile(
764            s3,
765            "SELECT table1.col1, table1.col2, table1.col3 "
766            "FROM table1 WHERE table1.col1 = "
767            "(SELECT 1 FROM "
768            "(SELECT table1.col1 AS col1, table1.col2 AS col2, "
769            "table1.col3 AS col3 FROM table1 "
770            "WHERE table1.col1 = :col1_1) "
771            "AS anon_1 WHERE table1.col1 = anon_1.col1)",
772        )
773
774        s4 = ReplacingCloningVisitor().traverse(s3)
775        self.assert_compile(
776            s4,
777            "SELECT table1.col1, table1.col2, table1.col3 "
778            "FROM table1 WHERE table1.col1 = "
779            "(SELECT 1 FROM "
780            "(SELECT table1.col1 AS col1, table1.col2 AS col2, "
781            "table1.col3 AS col3 FROM table1 "
782            "WHERE table1.col1 = :col1_1) "
783            "AS anon_1 WHERE table1.col1 = anon_1.col1)",
784        )
785
786
787class ColumnAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
788    __dialect__ = "default"
789
790    @classmethod
791    def setup_class(cls):
792        global t1, t2
793        t1 = table(
794            "table1",
795            column("col1"),
796            column("col2"),
797            column("col3"),
798            column("col4"),
799        )
800        t2 = table("table2", column("col1"), column("col2"), column("col3"))
801
802    def test_traverse_memoizes_w_columns(self):
803        t1a = t1.alias()
804        adapter = sql_util.ColumnAdapter(t1a, anonymize_labels=True)
805
806        expr = select([t1a.c.col1]).label("x")
807        expr_adapted = adapter.traverse(expr)
808        is_not_(expr, expr_adapted)
809        is_(adapter.columns[expr], expr_adapted)
810
811    def test_traverse_memoizes_w_itself(self):
812        t1a = t1.alias()
813        adapter = sql_util.ColumnAdapter(t1a, anonymize_labels=True)
814
815        expr = select([t1a.c.col1]).label("x")
816        expr_adapted = adapter.traverse(expr)
817        is_not_(expr, expr_adapted)
818        is_(adapter.traverse(expr), expr_adapted)
819
820    def test_columns_memoizes_w_itself(self):
821        t1a = t1.alias()
822        adapter = sql_util.ColumnAdapter(t1a, anonymize_labels=True)
823
824        expr = select([t1a.c.col1]).label("x")
825        expr_adapted = adapter.columns[expr]
826        is_not_(expr, expr_adapted)
827        is_(adapter.columns[expr], expr_adapted)
828
829    def test_wrapping_fallthrough(self):
830        t1a = t1.alias(name="t1a")
831        t2a = t2.alias(name="t2a")
832        a1 = sql_util.ColumnAdapter(t1a)
833
834        s1 = select([t1a.c.col1, t2a.c.col1]).apply_labels().alias()
835        a2 = sql_util.ColumnAdapter(s1)
836        a3 = a2.wrap(a1)
837        a4 = a1.wrap(a2)
838        a5 = a1.chain(a2)
839
840        # t1.c.col1 -> s1.c.t1a_col1
841
842        # adapted by a2
843        is_(a3.columns[t1.c.col1], s1.c.t1a_col1)
844        is_(a4.columns[t1.c.col1], s1.c.t1a_col1)
845
846        # chaining can't fall through because a1 grabs it
847        # first
848        is_(a5.columns[t1.c.col1], t1a.c.col1)
849
850        # t2.c.col1 -> s1.c.t2a_col1
851
852        # adapted by a2
853        is_(a3.columns[t2.c.col1], s1.c.t2a_col1)
854        is_(a4.columns[t2.c.col1], s1.c.t2a_col1)
855        # chaining, t2 hits s1
856        is_(a5.columns[t2.c.col1], s1.c.t2a_col1)
857
858        # t1.c.col2 -> t1a.c.col2
859
860        # fallthrough to a1
861        is_(a3.columns[t1.c.col2], t1a.c.col2)
862        is_(a4.columns[t1.c.col2], t1a.c.col2)
863
864        # chaining hits a1
865        is_(a5.columns[t1.c.col2], t1a.c.col2)
866
867        # t2.c.col2 -> t2.c.col2
868
869        # fallthrough to no adaption
870        is_(a3.columns[t2.c.col2], t2.c.col2)
871        is_(a4.columns[t2.c.col2], t2.c.col2)
872
873    def test_wrapping_ordering(self):
874        """illustrate an example where order of wrappers matters.
875
876        This test illustrates both the ordering being significant
877        as well as a scenario where multiple translations are needed
878        (e.g. wrapping vs. chaining).
879
880        """
881
882        stmt = select([t1.c.col1, t2.c.col1]).apply_labels()
883
884        sa = stmt.alias()
885        stmt2 = select([t2, sa])
886
887        a1 = sql_util.ColumnAdapter(stmt)
888        a2 = sql_util.ColumnAdapter(stmt2)
889
890        a2_to_a1 = a2.wrap(a1)
891        a1_to_a2 = a1.wrap(a2)
892
893        # when stmt2 and stmt represent the same column
894        # in different contexts, order of wrapping matters
895
896        # t2.c.col1 via a2 is stmt2.c.col1; then ignored by a1
897        is_(a2_to_a1.columns[t2.c.col1], stmt2.c.col1)
898        # t2.c.col1 via a1 is stmt.c.table2_col1; a2 then
899        # sends this to stmt2.c.table2_col1
900        is_(a1_to_a2.columns[t2.c.col1], stmt2.c.table2_col1)
901
902        # for mutually exclusive columns, order doesn't matter
903        is_(a2_to_a1.columns[t1.c.col1], stmt2.c.table1_col1)
904        is_(a1_to_a2.columns[t1.c.col1], stmt2.c.table1_col1)
905        is_(a2_to_a1.columns[t2.c.col2], stmt2.c.col2)
906
907    def test_wrapping_multiple(self):
908        """illustrate that wrapping runs both adapters"""
909
910        t1a = t1.alias(name="t1a")
911        t2a = t2.alias(name="t2a")
912        a1 = sql_util.ColumnAdapter(t1a)
913        a2 = sql_util.ColumnAdapter(t2a)
914        a3 = a2.wrap(a1)
915
916        stmt = select([t1.c.col1, t2.c.col2])
917
918        self.assert_compile(
919            a3.traverse(stmt),
920            "SELECT t1a.col1, t2a.col2 FROM table1 AS t1a, table2 AS t2a",
921        )
922
923        # chaining does too because these adapters don't share any
924        # columns
925        a4 = a2.chain(a1)
926        self.assert_compile(
927            a4.traverse(stmt),
928            "SELECT t1a.col1, t2a.col2 FROM table1 AS t1a, table2 AS t2a",
929        )
930
931    def test_wrapping_inclusions(self):
932        """test wrapping and inclusion rules together,
933        taking into account multiple objects with equivalent hash identity."""
934
935        t1a = t1.alias(name="t1a")
936        t2a = t2.alias(name="t2a")
937        a1 = sql_util.ColumnAdapter(
938            t1a, include_fn=lambda col: "a1" in col._annotations
939        )
940
941        s1 = select([t1a, t2a]).apply_labels().alias()
942        a2 = sql_util.ColumnAdapter(
943            s1, include_fn=lambda col: "a2" in col._annotations
944        )
945        a3 = a2.wrap(a1)
946
947        c1a1 = t1.c.col1._annotate(dict(a1=True))
948        c1a2 = t1.c.col1._annotate(dict(a2=True))
949        c1aa = t1.c.col1._annotate(dict(a1=True, a2=True))
950
951        c2a1 = t2.c.col1._annotate(dict(a1=True))
952        c2a2 = t2.c.col1._annotate(dict(a2=True))
953        c2aa = t2.c.col1._annotate(dict(a1=True, a2=True))
954
955        is_(a3.columns[c1a1], t1a.c.col1)
956        is_(a3.columns[c1a2], s1.c.t1a_col1)
957        is_(a3.columns[c1aa], s1.c.t1a_col1)
958
959        # not covered by a1, accepted by a2
960        is_(a3.columns[c2aa], s1.c.t2a_col1)
961
962        # not covered by a1, accepted by a2
963        is_(a3.columns[c2a2], s1.c.t2a_col1)
964        # not covered by a1, rejected by a2
965        is_(a3.columns[c2a1], c2a1)
966
967
968class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):
969    __dialect__ = "default"
970
971    @classmethod
972    def setup_class(cls):
973        global t1, t2
974        t1 = table("table1", column("col1"), column("col2"), column("col3"))
975        t2 = table("table2", column("col1"), column("col2"), column("col3"))
976
977    def test_correlation_on_clone(self):
978        t1alias = t1.alias("t1alias")
979        t2alias = t2.alias("t2alias")
980        vis = sql_util.ClauseAdapter(t1alias)
981
982        s = select(
983            [literal_column("*")], from_obj=[t1alias, t2alias]
984        ).as_scalar()
985        assert t2alias in s._froms
986        assert t1alias in s._froms
987
988        self.assert_compile(
989            select([literal_column("*")], t2alias.c.col1 == s),
990            "SELECT * FROM table2 AS t2alias WHERE "
991            "t2alias.col1 = (SELECT * FROM table1 AS "
992            "t1alias)",
993        )
994        s = vis.traverse(s)
995
996        assert t2alias not in s._froms  # not present because it's been
997        # cloned
998        assert t1alias in s._froms  # present because the adapter placed
999        # it there
1000
1001        # correlate list on "s" needs to take into account the full
1002        # _cloned_set for each element in _froms when correlating
1003
1004        self.assert_compile(
1005            select([literal_column("*")], t2alias.c.col1 == s),
1006            "SELECT * FROM table2 AS t2alias WHERE "
1007            "t2alias.col1 = (SELECT * FROM table1 AS "
1008            "t1alias)",
1009        )
1010        s = (
1011            select([literal_column("*")], from_obj=[t1alias, t2alias])
1012            .correlate(t2alias)
1013            .as_scalar()
1014        )
1015        self.assert_compile(
1016            select([literal_column("*")], t2alias.c.col1 == s),
1017            "SELECT * FROM table2 AS t2alias WHERE "
1018            "t2alias.col1 = (SELECT * FROM table1 AS "
1019            "t1alias)",
1020        )
1021        s = vis.traverse(s)
1022        self.assert_compile(
1023            select([literal_column("*")], t2alias.c.col1 == s),
1024            "SELECT * FROM table2 AS t2alias WHERE "
1025            "t2alias.col1 = (SELECT * FROM table1 AS "
1026            "t1alias)",
1027        )
1028        s = CloningVisitor().traverse(s)
1029        self.assert_compile(
1030            select([literal_column("*")], t2alias.c.col1 == s),
1031            "SELECT * FROM table2 AS t2alias WHERE "
1032            "t2alias.col1 = (SELECT * FROM table1 AS "
1033            "t1alias)",
1034        )
1035
1036        s = (
1037            select([literal_column("*")])
1038            .where(t1.c.col1 == t2.c.col1)
1039            .as_scalar()
1040        )
1041        self.assert_compile(
1042            select([t1.c.col1, s]),
1043            "SELECT table1.col1, (SELECT * FROM table2 "
1044            "WHERE table1.col1 = table2.col1) AS "
1045            "anon_1 FROM table1",
1046        )
1047        vis = sql_util.ClauseAdapter(t1alias)
1048        s = vis.traverse(s)
1049        self.assert_compile(
1050            select([t1alias.c.col1, s]),
1051            "SELECT t1alias.col1, (SELECT * FROM "
1052            "table2 WHERE t1alias.col1 = table2.col1) "
1053            "AS anon_1 FROM table1 AS t1alias",
1054        )
1055        s = CloningVisitor().traverse(s)
1056        self.assert_compile(
1057            select([t1alias.c.col1, s]),
1058            "SELECT t1alias.col1, (SELECT * FROM "
1059            "table2 WHERE t1alias.col1 = table2.col1) "
1060            "AS anon_1 FROM table1 AS t1alias",
1061        )
1062        s = (
1063            select([literal_column("*")])
1064            .where(t1.c.col1 == t2.c.col1)
1065            .correlate(t1)
1066            .as_scalar()
1067        )
1068        self.assert_compile(
1069            select([t1.c.col1, s]),
1070            "SELECT table1.col1, (SELECT * FROM table2 "
1071            "WHERE table1.col1 = table2.col1) AS "
1072            "anon_1 FROM table1",
1073        )
1074        vis = sql_util.ClauseAdapter(t1alias)
1075        s = vis.traverse(s)
1076        self.assert_compile(
1077            select([t1alias.c.col1, s]),
1078            "SELECT t1alias.col1, (SELECT * FROM "
1079            "table2 WHERE t1alias.col1 = table2.col1) "
1080            "AS anon_1 FROM table1 AS t1alias",
1081        )
1082        s = CloningVisitor().traverse(s)
1083        self.assert_compile(
1084            select([t1alias.c.col1, s]),
1085            "SELECT t1alias.col1, (SELECT * FROM "
1086            "table2 WHERE t1alias.col1 = table2.col1) "
1087            "AS anon_1 FROM table1 AS t1alias",
1088        )
1089
1090    @testing.fails_on_everything_except()
1091    def test_joins_dont_adapt(self):
1092        # adapting to a join, i.e. ClauseAdapter(t1.join(t2)), doesn't
1093        # make much sense. ClauseAdapter doesn't make any changes if
1094        # it's against a straight join.
1095
1096        users = table("users", column("id"))
1097        addresses = table("addresses", column("id"), column("user_id"))
1098
1099        ualias = users.alias()
1100
1101        s = select(
1102            [func.count(addresses.c.id)], users.c.id == addresses.c.user_id
1103        ).correlate(users)
1104        s = sql_util.ClauseAdapter(ualias).traverse(s)
1105
1106        j1 = addresses.join(ualias, addresses.c.user_id == ualias.c.id)
1107
1108        self.assert_compile(
1109            sql_util.ClauseAdapter(j1).traverse(s),
1110            "SELECT count(addresses.id) AS count_1 "
1111            "FROM addresses WHERE users_1.id = "
1112            "addresses.user_id",
1113        )
1114
1115    def test_table_to_alias_1(self):
1116        t1alias = t1.alias("t1alias")
1117
1118        vis = sql_util.ClauseAdapter(t1alias)
1119        ff = vis.traverse(func.count(t1.c.col1).label("foo"))
1120        assert list(_from_objects(ff)) == [t1alias]
1121
1122    def test_table_to_alias_2(self):
1123        t1alias = t1.alias("t1alias")
1124        vis = sql_util.ClauseAdapter(t1alias)
1125        self.assert_compile(
1126            vis.traverse(select([literal_column("*")], from_obj=[t1])),
1127            "SELECT * FROM table1 AS t1alias",
1128        )
1129
1130    def test_table_to_alias_3(self):
1131        t1alias = t1.alias("t1alias")
1132        vis = sql_util.ClauseAdapter(t1alias)
1133        self.assert_compile(
1134            select([literal_column("*")], t1.c.col1 == t2.c.col2),
1135            "SELECT * FROM table1, table2 WHERE table1.col1 = table2.col2",
1136        )
1137
1138    def test_table_to_alias_4(self):
1139        t1alias = t1.alias("t1alias")
1140        vis = sql_util.ClauseAdapter(t1alias)
1141        self.assert_compile(
1142            vis.traverse(
1143                select([literal_column("*")], t1.c.col1 == t2.c.col2)
1144            ),
1145            "SELECT * FROM table1 AS t1alias, table2 "
1146            "WHERE t1alias.col1 = table2.col2",
1147        )
1148
1149    def test_table_to_alias_5(self):
1150        t1alias = t1.alias("t1alias")
1151        vis = sql_util.ClauseAdapter(t1alias)
1152        self.assert_compile(
1153            vis.traverse(
1154                select(
1155                    [literal_column("*")],
1156                    t1.c.col1 == t2.c.col2,
1157                    from_obj=[t1, t2],
1158                )
1159            ),
1160            "SELECT * FROM table1 AS t1alias, table2 "
1161            "WHERE t1alias.col1 = table2.col2",
1162        )
1163
1164    def test_table_to_alias_6(self):
1165        t1alias = t1.alias("t1alias")
1166        vis = sql_util.ClauseAdapter(t1alias)
1167        self.assert_compile(
1168            select([t1alias, t2]).where(
1169                t1alias.c.col1
1170                == vis.traverse(
1171                    select(
1172                        [literal_column("*")],
1173                        t1.c.col1 == t2.c.col2,
1174                        from_obj=[t1, t2],
1175                    ).correlate(t1)
1176                )
1177            ),
1178            "SELECT t1alias.col1, t1alias.col2, t1alias.col3, "
1179            "table2.col1, table2.col2, table2.col3 "
1180            "FROM table1 AS t1alias, table2 WHERE t1alias.col1 = "
1181            "(SELECT * FROM table2 WHERE t1alias.col1 = table2.col2)",
1182        )
1183
1184    def test_table_to_alias_7(self):
1185        t1alias = t1.alias("t1alias")
1186        vis = sql_util.ClauseAdapter(t1alias)
1187        self.assert_compile(
1188            select([t1alias, t2]).where(
1189                t1alias.c.col1
1190                == vis.traverse(
1191                    select(
1192                        [literal_column("*")],
1193                        t1.c.col1 == t2.c.col2,
1194                        from_obj=[t1, t2],
1195                    ).correlate(t2)
1196                )
1197            ),
1198            "SELECT t1alias.col1, t1alias.col2, t1alias.col3, "
1199            "table2.col1, table2.col2, table2.col3 "
1200            "FROM table1 AS t1alias, table2 "
1201            "WHERE t1alias.col1 = "
1202            "(SELECT * FROM table1 AS t1alias "
1203            "WHERE t1alias.col1 = table2.col2)",
1204        )
1205
1206    def test_table_to_alias_8(self):
1207        t1alias = t1.alias("t1alias")
1208        vis = sql_util.ClauseAdapter(t1alias)
1209        self.assert_compile(
1210            vis.traverse(case([(t1.c.col1 == 5, t1.c.col2)], else_=t1.c.col1)),
1211            "CASE WHEN (t1alias.col1 = :col1_1) THEN "
1212            "t1alias.col2 ELSE t1alias.col1 END",
1213        )
1214
1215    def test_table_to_alias_9(self):
1216        t1alias = t1.alias("t1alias")
1217        vis = sql_util.ClauseAdapter(t1alias)
1218        self.assert_compile(
1219            vis.traverse(
1220                case([(5, t1.c.col2)], value=t1.c.col1, else_=t1.c.col1)
1221            ),
1222            "CASE t1alias.col1 WHEN :param_1 THEN "
1223            "t1alias.col2 ELSE t1alias.col1 END",
1224        )
1225
1226    def test_table_to_alias_10(self):
1227        s = select([literal_column("*")], from_obj=[t1]).alias("foo")
1228        self.assert_compile(
1229            s.select(), "SELECT foo.* FROM (SELECT * FROM table1) " "AS foo"
1230        )
1231
1232    def test_table_to_alias_11(self):
1233        s = select([literal_column("*")], from_obj=[t1]).alias("foo")
1234        t1alias = t1.alias("t1alias")
1235        vis = sql_util.ClauseAdapter(t1alias)
1236        self.assert_compile(
1237            vis.traverse(s.select()),
1238            "SELECT foo.* FROM (SELECT * FROM table1 " "AS t1alias) AS foo",
1239        )
1240
1241    def test_table_to_alias_12(self):
1242        s = select([literal_column("*")], from_obj=[t1]).alias("foo")
1243        self.assert_compile(
1244            s.select(), "SELECT foo.* FROM (SELECT * FROM table1) " "AS foo"
1245        )
1246
1247    def test_table_to_alias_13(self):
1248        t1alias = t1.alias("t1alias")
1249        vis = sql_util.ClauseAdapter(t1alias)
1250        ff = vis.traverse(func.count(t1.c.col1).label("foo"))
1251        self.assert_compile(
1252            select([ff]),
1253            "SELECT count(t1alias.col1) AS foo FROM " "table1 AS t1alias",
1254        )
1255        assert list(_from_objects(ff)) == [t1alias]
1256
1257    # def test_table_to_alias_2(self):
1258    # TODO: self.assert_compile(vis.traverse(select([func.count(t1.c
1259    # .col1).l abel('foo')]), clone=True), "SELECT
1260    # count(t1alias.col1) AS foo FROM table1 AS t1alias")
1261
1262    def test_table_to_alias_14(self):
1263        t1alias = t1.alias("t1alias")
1264        vis = sql_util.ClauseAdapter(t1alias)
1265        t2alias = t2.alias("t2alias")
1266        vis.chain(sql_util.ClauseAdapter(t2alias))
1267        self.assert_compile(
1268            vis.traverse(
1269                select([literal_column("*")], t1.c.col1 == t2.c.col2)
1270            ),
1271            "SELECT * FROM table1 AS t1alias, table2 "
1272            "AS t2alias WHERE t1alias.col1 = "
1273            "t2alias.col2",
1274        )
1275
1276    def test_table_to_alias_15(self):
1277        t1alias = t1.alias("t1alias")
1278        vis = sql_util.ClauseAdapter(t1alias)
1279        t2alias = t2.alias("t2alias")
1280        vis.chain(sql_util.ClauseAdapter(t2alias))
1281        self.assert_compile(
1282            vis.traverse(
1283                select(["*"], t1.c.col1 == t2.c.col2, from_obj=[t1, t2])
1284            ),
1285            "SELECT * FROM table1 AS t1alias, table2 "
1286            "AS t2alias WHERE t1alias.col1 = "
1287            "t2alias.col2",
1288        )
1289
1290    def test_table_to_alias_16(self):
1291        t1alias = t1.alias("t1alias")
1292        vis = sql_util.ClauseAdapter(t1alias)
1293        t2alias = t2.alias("t2alias")
1294        vis.chain(sql_util.ClauseAdapter(t2alias))
1295        self.assert_compile(
1296            select([t1alias, t2alias]).where(
1297                t1alias.c.col1
1298                == vis.traverse(
1299                    select(
1300                        ["*"], t1.c.col1 == t2.c.col2, from_obj=[t1, t2]
1301                    ).correlate(t1)
1302                )
1303            ),
1304            "SELECT t1alias.col1, t1alias.col2, t1alias.col3, "
1305            "t2alias.col1, t2alias.col2, t2alias.col3 "
1306            "FROM table1 AS t1alias, table2 AS t2alias "
1307            "WHERE t1alias.col1 = "
1308            "(SELECT * FROM table2 AS t2alias "
1309            "WHERE t1alias.col1 = t2alias.col2)",
1310        )
1311
1312    def test_table_to_alias_17(self):
1313        t1alias = t1.alias("t1alias")
1314        vis = sql_util.ClauseAdapter(t1alias)
1315        t2alias = t2.alias("t2alias")
1316        vis.chain(sql_util.ClauseAdapter(t2alias))
1317        self.assert_compile(
1318            t2alias.select().where(
1319                t2alias.c.col2
1320                == vis.traverse(
1321                    select(
1322                        ["*"], t1.c.col1 == t2.c.col2, from_obj=[t1, t2]
1323                    ).correlate(t2)
1324                )
1325            ),
1326            "SELECT t2alias.col1, t2alias.col2, t2alias.col3 "
1327            "FROM table2 AS t2alias WHERE t2alias.col2 = "
1328            "(SELECT * FROM table1 AS t1alias WHERE "
1329            "t1alias.col1 = t2alias.col2)",
1330        )
1331
1332    def test_include_exclude(self):
1333        m = MetaData()
1334        a = Table(
1335            "a",
1336            m,
1337            Column("id", Integer, primary_key=True),
1338            Column(
1339                "xxx_id",
1340                Integer,
1341                ForeignKey("a.id", name="adf", use_alter=True),
1342            ),
1343        )
1344
1345        e = a.c.id == a.c.xxx_id
1346        assert str(e) == "a.id = a.xxx_id"
1347        b = a.alias()
1348
1349        e = sql_util.ClauseAdapter(
1350            b,
1351            include_fn=lambda x: x in set([a.c.id]),
1352            equivalents={a.c.id: set([a.c.id])},
1353        ).traverse(e)
1354
1355        assert str(e) == "a_1.id = a.xxx_id"
1356
1357    def test_recursive_equivalents(self):
1358        m = MetaData()
1359        a = Table("a", m, Column("x", Integer), Column("y", Integer))
1360        b = Table("b", m, Column("x", Integer), Column("y", Integer))
1361        c = Table("c", m, Column("x", Integer), Column("y", Integer))
1362
1363        # force a recursion overflow, by linking a.c.x<->c.c.x, and
1364        # asking for a nonexistent col.  corresponding_column should prevent
1365        # endless depth.
1366        adapt = sql_util.ClauseAdapter(
1367            b, equivalents={a.c.x: set([c.c.x]), c.c.x: set([a.c.x])}
1368        )
1369        assert adapt._corresponding_column(a.c.x, False) is None
1370
1371    def test_multilevel_equivalents(self):
1372        m = MetaData()
1373        a = Table("a", m, Column("x", Integer), Column("y", Integer))
1374        b = Table("b", m, Column("x", Integer), Column("y", Integer))
1375        c = Table("c", m, Column("x", Integer), Column("y", Integer))
1376
1377        alias = select([a]).select_from(a.join(b, a.c.x == b.c.x)).alias()
1378
1379        # two levels of indirection from c.x->b.x->a.x, requires recursive
1380        # corresponding_column call
1381        adapt = sql_util.ClauseAdapter(
1382            alias, equivalents={b.c.x: set([a.c.x]), c.c.x: set([b.c.x])}
1383        )
1384        assert adapt._corresponding_column(a.c.x, False) is alias.c.x
1385        assert adapt._corresponding_column(c.c.x, False) is alias.c.x
1386
1387    def test_join_to_alias(self):
1388        metadata = MetaData()
1389        a = Table("a", metadata, Column("id", Integer, primary_key=True))
1390        b = Table(
1391            "b",
1392            metadata,
1393            Column("id", Integer, primary_key=True),
1394            Column("aid", Integer, ForeignKey("a.id")),
1395        )
1396        c = Table(
1397            "c",
1398            metadata,
1399            Column("id", Integer, primary_key=True),
1400            Column("bid", Integer, ForeignKey("b.id")),
1401        )
1402
1403        d = Table(
1404            "d",
1405            metadata,
1406            Column("id", Integer, primary_key=True),
1407            Column("aid", Integer, ForeignKey("a.id")),
1408        )
1409
1410        j1 = a.outerjoin(b)
1411        j2 = select([j1], use_labels=True)
1412
1413        j3 = c.join(j2, j2.c.b_id == c.c.bid)
1414
1415        j4 = j3.outerjoin(d)
1416        self.assert_compile(
1417            j4,
1418            "c JOIN (SELECT a.id AS a_id, b.id AS "
1419            "b_id, b.aid AS b_aid FROM a LEFT OUTER "
1420            "JOIN b ON a.id = b.aid) ON b_id = c.bid "
1421            "LEFT OUTER JOIN d ON a_id = d.aid",
1422        )
1423        j5 = j3.alias("foo")
1424        j6 = sql_util.ClauseAdapter(j5).copy_and_process([j4])[0]
1425
1426        # this statement takes c join(a join b), wraps it inside an
1427        # aliased "select * from c join(a join b) AS foo". the outermost
1428        # right side "left outer join d" stays the same, except "d"
1429        # joins against foo.a_id instead of plain "a_id"
1430
1431        self.assert_compile(
1432            j6,
1433            "(SELECT c.id AS c_id, c.bid AS c_bid, "
1434            "a_id AS a_id, b_id AS b_id, b_aid AS "
1435            "b_aid FROM c JOIN (SELECT a.id AS a_id, "
1436            "b.id AS b_id, b.aid AS b_aid FROM a LEFT "
1437            "OUTER JOIN b ON a.id = b.aid) ON b_id = "
1438            "c.bid) AS foo LEFT OUTER JOIN d ON "
1439            "foo.a_id = d.aid",
1440        )
1441
1442    def test_derived_from(self):
1443        assert select([t1]).is_derived_from(t1)
1444        assert not select([t2]).is_derived_from(t1)
1445        assert not t1.is_derived_from(select([t1]))
1446        assert t1.alias().is_derived_from(t1)
1447
1448        s1 = select([t1, t2]).alias("foo")
1449        s2 = select([s1]).limit(5).offset(10).alias()
1450        assert s2.is_derived_from(s1)
1451        s2 = s2._clone()
1452        assert s2.is_derived_from(s1)
1453
1454    def test_aliasedselect_to_aliasedselect_straight(self):
1455
1456        # original issue from ticket #904
1457
1458        s1 = select([t1]).alias("foo")
1459        s2 = select([s1]).limit(5).offset(10).alias()
1460        self.assert_compile(
1461            sql_util.ClauseAdapter(s2).traverse(s1),
1462            "SELECT foo.col1, foo.col2, foo.col3 FROM "
1463            "(SELECT table1.col1 AS col1, table1.col2 "
1464            "AS col2, table1.col3 AS col3 FROM table1) "
1465            "AS foo LIMIT :param_1 OFFSET :param_2",
1466            {"param_1": 5, "param_2": 10},
1467        )
1468
1469    def test_aliasedselect_to_aliasedselect_join(self):
1470        s1 = select([t1]).alias("foo")
1471        s2 = select([s1]).limit(5).offset(10).alias()
1472        j = s1.outerjoin(t2, s1.c.col1 == t2.c.col1)
1473        self.assert_compile(
1474            sql_util.ClauseAdapter(s2).traverse(j).select(),
1475            "SELECT anon_1.col1, anon_1.col2, "
1476            "anon_1.col3, table2.col1, table2.col2, "
1477            "table2.col3 FROM (SELECT foo.col1 AS "
1478            "col1, foo.col2 AS col2, foo.col3 AS col3 "
1479            "FROM (SELECT table1.col1 AS col1, "
1480            "table1.col2 AS col2, table1.col3 AS col3 "
1481            "FROM table1) AS foo LIMIT :param_1 OFFSET "
1482            ":param_2) AS anon_1 LEFT OUTER JOIN "
1483            "table2 ON anon_1.col1 = table2.col1",
1484            {"param_1": 5, "param_2": 10},
1485        )
1486
1487    def test_aliasedselect_to_aliasedselect_join_nested_table(self):
1488        s1 = select([t1]).alias("foo")
1489        s2 = select([s1]).limit(5).offset(10).alias()
1490        talias = t1.alias("bar")
1491
1492        assert not s2.is_derived_from(talias)
1493        j = s1.outerjoin(talias, s1.c.col1 == talias.c.col1)
1494
1495        self.assert_compile(
1496            sql_util.ClauseAdapter(s2).traverse(j).select(),
1497            "SELECT anon_1.col1, anon_1.col2, "
1498            "anon_1.col3, bar.col1, bar.col2, bar.col3 "
1499            "FROM (SELECT foo.col1 AS col1, foo.col2 "
1500            "AS col2, foo.col3 AS col3 FROM (SELECT "
1501            "table1.col1 AS col1, table1.col2 AS col2, "
1502            "table1.col3 AS col3 FROM table1) AS foo "
1503            "LIMIT :param_1 OFFSET :param_2) AS anon_1 "
1504            "LEFT OUTER JOIN table1 AS bar ON "
1505            "anon_1.col1 = bar.col1",
1506            {"param_1": 5, "param_2": 10},
1507        )
1508
1509    def test_functions(self):
1510        self.assert_compile(
1511            sql_util.ClauseAdapter(t1.alias()).traverse(func.count(t1.c.col1)),
1512            "count(table1_1.col1)",
1513        )
1514        s = select([func.count(t1.c.col1)])
1515        self.assert_compile(
1516            sql_util.ClauseAdapter(t1.alias()).traverse(s),
1517            "SELECT count(table1_1.col1) AS count_1 "
1518            "FROM table1 AS table1_1",
1519        )
1520
1521    def test_recursive(self):
1522        metadata = MetaData()
1523        a = Table("a", metadata, Column("id", Integer, primary_key=True))
1524        b = Table(
1525            "b",
1526            metadata,
1527            Column("id", Integer, primary_key=True),
1528            Column("aid", Integer, ForeignKey("a.id")),
1529        )
1530        c = Table(
1531            "c",
1532            metadata,
1533            Column("id", Integer, primary_key=True),
1534            Column("bid", Integer, ForeignKey("b.id")),
1535        )
1536
1537        d = Table(
1538            "d",
1539            metadata,
1540            Column("id", Integer, primary_key=True),
1541            Column("aid", Integer, ForeignKey("a.id")),
1542        )
1543
1544        u = union(
1545            a.join(b).select().apply_labels(),
1546            a.join(d).select().apply_labels(),
1547        ).alias()
1548
1549        self.assert_compile(
1550            sql_util.ClauseAdapter(u).traverse(
1551                select([c.c.bid]).where(c.c.bid == u.c.b_aid)
1552            ),
1553            "SELECT c.bid "
1554            "FROM c, (SELECT a.id AS a_id, b.id AS b_id, b.aid AS b_aid "
1555            "FROM a JOIN b ON a.id = b.aid UNION SELECT a.id AS a_id, d.id "
1556            "AS d_id, d.aid AS d_aid "
1557            "FROM a JOIN d ON a.id = d.aid) AS anon_1 "
1558            "WHERE c.bid = anon_1.b_aid",
1559        )
1560
1561        t1 = table("table1", column("col1"), column("col2"), column("col3"))
1562        t2 = table("table2", column("col1"), column("col2"), column("col3"))
1563
1564    def test_label_anonymize_one(self):
1565        t1a = t1.alias()
1566        adapter = sql_util.ClauseAdapter(t1a, anonymize_labels=True)
1567
1568        expr = select([t1.c.col2]).where(t1.c.col3 == 5).label("expr")
1569        expr_adapted = adapter.traverse(expr)
1570
1571        stmt = select([expr, expr_adapted]).order_by(expr, expr_adapted)
1572        self.assert_compile(
1573            stmt,
1574            "SELECT "
1575            "(SELECT table1.col2 FROM table1 WHERE table1.col3 = :col3_1) "
1576            "AS expr, "
1577            "(SELECT table1_1.col2 FROM table1 AS table1_1 "
1578            "WHERE table1_1.col3 = :col3_2) AS anon_1 "
1579            "ORDER BY expr, anon_1",
1580        )
1581
1582    def test_label_anonymize_two(self):
1583        t1a = t1.alias()
1584        adapter = sql_util.ClauseAdapter(t1a, anonymize_labels=True)
1585
1586        expr = select([t1.c.col2]).where(t1.c.col3 == 5).label(None)
1587        expr_adapted = adapter.traverse(expr)
1588
1589        stmt = select([expr, expr_adapted]).order_by(expr, expr_adapted)
1590        self.assert_compile(
1591            stmt,
1592            "SELECT "
1593            "(SELECT table1.col2 FROM table1 WHERE table1.col3 = :col3_1) "
1594            "AS anon_1, "
1595            "(SELECT table1_1.col2 FROM table1 AS table1_1 "
1596            "WHERE table1_1.col3 = :col3_2) AS anon_2 "
1597            "ORDER BY anon_1, anon_2",
1598        )
1599
1600    def test_label_anonymize_three(self):
1601        t1a = t1.alias()
1602        adapter = sql_util.ColumnAdapter(
1603            t1a, anonymize_labels=True, allow_label_resolve=False
1604        )
1605
1606        expr = select([t1.c.col2]).where(t1.c.col3 == 5).label(None)
1607        l1 = expr
1608        is_(l1._order_by_label_element, l1)
1609        eq_(l1._allow_label_resolve, True)
1610
1611        expr_adapted = adapter.traverse(expr)
1612        l2 = expr_adapted
1613        is_(l2._order_by_label_element, l2)
1614        eq_(l2._allow_label_resolve, False)
1615
1616        l3 = adapter.traverse(expr)
1617        is_(l3._order_by_label_element, l3)
1618        eq_(l3._allow_label_resolve, False)
1619
1620
1621class SpliceJoinsTest(fixtures.TestBase, AssertsCompiledSQL):
1622    __dialect__ = "default"
1623
1624    @classmethod
1625    def setup_class(cls):
1626        global table1, table2, table3, table4
1627
1628        def _table(name):
1629            return table(name, column("col1"), column("col2"), column("col3"))
1630
1631        table1, table2, table3, table4 = [
1632            _table(name) for name in ("table1", "table2", "table3", "table4")
1633        ]
1634
1635    def test_splice(self):
1636        t1, t2, t3, t4 = table1, table2, table1.alias(), table2.alias()
1637        j = (
1638            t1.join(t2, t1.c.col1 == t2.c.col1)
1639            .join(t3, t2.c.col1 == t3.c.col1)
1640            .join(t4, t4.c.col1 == t1.c.col1)
1641        )
1642        s = select([t1]).where(t1.c.col2 < 5).alias()
1643        self.assert_compile(
1644            sql_util.splice_joins(s, j),
1645            "(SELECT table1.col1 AS col1, table1.col2 "
1646            "AS col2, table1.col3 AS col3 FROM table1 "
1647            "WHERE table1.col2 < :col2_1) AS anon_1 "
1648            "JOIN table2 ON anon_1.col1 = table2.col1 "
1649            "JOIN table1 AS table1_1 ON table2.col1 = "
1650            "table1_1.col1 JOIN table2 AS table2_1 ON "
1651            "table2_1.col1 = anon_1.col1",
1652        )
1653
1654    def test_stop_on(self):
1655        t1, t2, t3 = table1, table2, table3
1656        j1 = t1.join(t2, t1.c.col1 == t2.c.col1)
1657        j2 = j1.join(t3, t2.c.col1 == t3.c.col1)
1658        s = select([t1]).select_from(j1).alias()
1659        self.assert_compile(
1660            sql_util.splice_joins(s, j2),
1661            "(SELECT table1.col1 AS col1, table1.col2 "
1662            "AS col2, table1.col3 AS col3 FROM table1 "
1663            "JOIN table2 ON table1.col1 = table2.col1) "
1664            "AS anon_1 JOIN table2 ON anon_1.col1 = "
1665            "table2.col1 JOIN table3 ON table2.col1 = "
1666            "table3.col1",
1667        )
1668        self.assert_compile(
1669            sql_util.splice_joins(s, j2, j1),
1670            "(SELECT table1.col1 AS col1, table1.col2 "
1671            "AS col2, table1.col3 AS col3 FROM table1 "
1672            "JOIN table2 ON table1.col1 = table2.col1) "
1673            "AS anon_1 JOIN table3 ON table2.col1 = "
1674            "table3.col1",
1675        )
1676
1677    def test_splice_2(self):
1678        t2a = table2.alias()
1679        t3a = table3.alias()
1680        j1 = table1.join(t2a, table1.c.col1 == t2a.c.col1).join(
1681            t3a, t2a.c.col2 == t3a.c.col2
1682        )
1683        t2b = table4.alias()
1684        j2 = table1.join(t2b, table1.c.col3 == t2b.c.col3)
1685        self.assert_compile(
1686            sql_util.splice_joins(table1, j1),
1687            "table1 JOIN table2 AS table2_1 ON "
1688            "table1.col1 = table2_1.col1 JOIN table3 "
1689            "AS table3_1 ON table2_1.col2 = "
1690            "table3_1.col2",
1691        )
1692        self.assert_compile(
1693            sql_util.splice_joins(table1, j2),
1694            "table1 JOIN table4 AS table4_1 ON " "table1.col3 = table4_1.col3",
1695        )
1696        self.assert_compile(
1697            sql_util.splice_joins(sql_util.splice_joins(table1, j1), j2),
1698            "table1 JOIN table2 AS table2_1 ON "
1699            "table1.col1 = table2_1.col1 JOIN table3 "
1700            "AS table3_1 ON table2_1.col2 = "
1701            "table3_1.col2 JOIN table4 AS table4_1 ON "
1702            "table1.col3 = table4_1.col3",
1703        )
1704
1705
1706class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
1707
1708    """tests the generative capability of Select"""
1709
1710    __dialect__ = "default"
1711
1712    @classmethod
1713    def setup_class(cls):
1714        global t1, t2
1715        t1 = table("table1", column("col1"), column("col2"), column("col3"))
1716        t2 = table("table2", column("col1"), column("col2"), column("col3"))
1717
1718    def test_columns(self):
1719        s = t1.select()
1720        self.assert_compile(
1721            s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1"
1722        )
1723        select_copy = s.column(column("yyy"))
1724        self.assert_compile(
1725            select_copy,
1726            "SELECT table1.col1, table1.col2, " "table1.col3, yyy FROM table1",
1727        )
1728        assert s.columns is not select_copy.columns
1729        assert s._columns is not select_copy._columns
1730        assert s._raw_columns is not select_copy._raw_columns
1731        self.assert_compile(
1732            s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1"
1733        )
1734
1735    def test_froms(self):
1736        s = t1.select()
1737        self.assert_compile(
1738            s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1"
1739        )
1740        select_copy = s.select_from(t2)
1741        self.assert_compile(
1742            select_copy,
1743            "SELECT table1.col1, table1.col2, "
1744            "table1.col3 FROM table1, table2",
1745        )
1746        assert s._froms is not select_copy._froms
1747        self.assert_compile(
1748            s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1"
1749        )
1750
1751    def test_prefixes(self):
1752        s = t1.select()
1753        self.assert_compile(
1754            s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1"
1755        )
1756        select_copy = s.prefix_with("FOOBER")
1757        self.assert_compile(
1758            select_copy,
1759            "SELECT FOOBER table1.col1, table1.col2, "
1760            "table1.col3 FROM table1",
1761        )
1762        self.assert_compile(
1763            s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1"
1764        )
1765
1766    def test_execution_options(self):
1767        s = select().execution_options(foo="bar")
1768        s2 = s.execution_options(bar="baz")
1769        s3 = s.execution_options(foo="not bar")
1770        # The original select should not be modified.
1771        assert s._execution_options == dict(foo="bar")
1772        # s2 should have its execution_options based on s, though.
1773        assert s2._execution_options == dict(foo="bar", bar="baz")
1774        assert s3._execution_options == dict(foo="not bar")
1775
1776    def test_invalid_options(self):
1777        assert_raises(
1778            exc.ArgumentError, select().execution_options, compiled_cache={}
1779        )
1780
1781        assert_raises(
1782            exc.ArgumentError,
1783            select().execution_options,
1784            isolation_level="READ_COMMITTED",
1785        )
1786
1787    # this feature not available yet
1788    def _NOTYET_test_execution_options_in_kwargs(self):
1789        s = select(execution_options=dict(foo="bar"))
1790        s2 = s.execution_options(bar="baz")
1791        # The original select should not be modified.
1792        assert s._execution_options == dict(foo="bar")
1793        # s2 should have its execution_options based on s, though.
1794        assert s2._execution_options == dict(foo="bar", bar="baz")
1795
1796    # this feature not available yet
1797    def _NOTYET_test_execution_options_in_text(self):
1798        s = text("select 42", execution_options=dict(foo="bar"))
1799        assert s._execution_options == dict(foo="bar")
1800
1801
1802class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):
1803
1804    """Tests the generative capability of Insert, Update"""
1805
1806    __dialect__ = "default"
1807
1808    # fixme: consolidate converage from elsewhere here and expand
1809
1810    @classmethod
1811    def setup_class(cls):
1812        global t1, t2
1813        t1 = table("table1", column("col1"), column("col2"), column("col3"))
1814        t2 = table("table2", column("col1"), column("col2"), column("col3"))
1815
1816    def test_prefixes(self):
1817        i = t1.insert()
1818        self.assert_compile(
1819            i,
1820            "INSERT INTO table1 (col1, col2, col3) "
1821            "VALUES (:col1, :col2, :col3)",
1822        )
1823
1824        gen = i.prefix_with("foober")
1825        self.assert_compile(
1826            gen,
1827            "INSERT foober INTO table1 (col1, col2, col3) "
1828            "VALUES (:col1, :col2, :col3)",
1829        )
1830
1831        self.assert_compile(
1832            i,
1833            "INSERT INTO table1 (col1, col2, col3) "
1834            "VALUES (:col1, :col2, :col3)",
1835        )
1836
1837        i2 = t1.insert(prefixes=["squiznart"])
1838        self.assert_compile(
1839            i2,
1840            "INSERT squiznart INTO table1 (col1, col2, col3) "
1841            "VALUES (:col1, :col2, :col3)",
1842        )
1843
1844        gen2 = i2.prefix_with("quux")
1845        self.assert_compile(
1846            gen2,
1847            "INSERT squiznart quux INTO "
1848            "table1 (col1, col2, col3) "
1849            "VALUES (:col1, :col2, :col3)",
1850        )
1851
1852    def test_add_kwarg(self):
1853        i = t1.insert()
1854        eq_(i.parameters, None)
1855        i = i.values(col1=5)
1856        eq_(i.parameters, {"col1": 5})
1857        i = i.values(col2=7)
1858        eq_(i.parameters, {"col1": 5, "col2": 7})
1859
1860    def test_via_tuple_single(self):
1861        i = t1.insert()
1862        eq_(i.parameters, None)
1863        i = i.values((5, 6, 7))
1864        eq_(i.parameters, {"col1": 5, "col2": 6, "col3": 7})
1865
1866    def test_kw_and_dict_simultaneously_single(self):
1867        i = t1.insert()
1868        i = i.values({"col1": 5}, col2=7)
1869        eq_(i.parameters, {"col1": 5, "col2": 7})
1870
1871    def test_via_tuple_multi(self):
1872        i = t1.insert()
1873        eq_(i.parameters, None)
1874        i = i.values([(5, 6, 7), (8, 9, 10)])
1875        eq_(
1876            i.parameters,
1877            [
1878                {"col1": 5, "col2": 6, "col3": 7},
1879                {"col1": 8, "col2": 9, "col3": 10},
1880            ],
1881        )
1882
1883    def test_inline_values_single(self):
1884        i = t1.insert(values={"col1": 5})
1885        eq_(i.parameters, {"col1": 5})
1886        is_(i._has_multi_parameters, False)
1887
1888    def test_inline_values_multi(self):
1889        i = t1.insert(values=[{"col1": 5}, {"col1": 6}])
1890        eq_(i.parameters, [{"col1": 5}, {"col1": 6}])
1891        is_(i._has_multi_parameters, True)
1892
1893    def test_add_dictionary(self):
1894        i = t1.insert()
1895        eq_(i.parameters, None)
1896        i = i.values({"col1": 5})
1897        eq_(i.parameters, {"col1": 5})
1898        is_(i._has_multi_parameters, False)
1899
1900        i = i.values({"col1": 6})
1901        # note replaces
1902        eq_(i.parameters, {"col1": 6})
1903        is_(i._has_multi_parameters, False)
1904
1905        i = i.values({"col2": 7})
1906        eq_(i.parameters, {"col1": 6, "col2": 7})
1907        is_(i._has_multi_parameters, False)
1908
1909    def test_add_kwarg_disallowed_multi(self):
1910        i = t1.insert()
1911        i = i.values([{"col1": 5}, {"col1": 7}])
1912        assert_raises_message(
1913            exc.InvalidRequestError,
1914            "This construct already has multiple parameter sets.",
1915            i.values,
1916            col2=7,
1917        )
1918
1919    def test_cant_mix_single_multi_formats_dict_to_list(self):
1920        i = t1.insert().values(col1=5)
1921        assert_raises_message(
1922            exc.ArgumentError,
1923            "Can't mix single-values and multiple values "
1924            "formats in one statement",
1925            i.values,
1926            [{"col1": 6}],
1927        )
1928
1929    def test_cant_mix_single_multi_formats_list_to_dict(self):
1930        i = t1.insert().values([{"col1": 6}])
1931        assert_raises_message(
1932            exc.ArgumentError,
1933            "Can't mix single-values and multiple values "
1934            "formats in one statement",
1935            i.values,
1936            {"col1": 5},
1937        )
1938
1939    def test_erroneous_multi_args_dicts(self):
1940        i = t1.insert()
1941        assert_raises_message(
1942            exc.ArgumentError,
1943            "Only a single dictionary/tuple or list of "
1944            "dictionaries/tuples is accepted positionally.",
1945            i.values,
1946            {"col1": 5},
1947            {"col1": 7},
1948        )
1949
1950    def test_erroneous_multi_args_tuples(self):
1951        i = t1.insert()
1952        assert_raises_message(
1953            exc.ArgumentError,
1954            "Only a single dictionary/tuple or list of "
1955            "dictionaries/tuples is accepted positionally.",
1956            i.values,
1957            (5, 6, 7),
1958            (8, 9, 10),
1959        )
1960
1961    def test_erroneous_multi_args_plus_kw(self):
1962        i = t1.insert()
1963        assert_raises_message(
1964            exc.ArgumentError,
1965            "Can't pass kwargs and multiple parameter sets simultaneously",
1966            i.values,
1967            [{"col1": 5}],
1968            col2=7,
1969        )
1970
1971    def test_update_no_support_multi_values(self):
1972        u = t1.update()
1973        assert_raises_message(
1974            exc.InvalidRequestError,
1975            "This construct does not support multiple parameter sets.",
1976            u.values,
1977            [{"col1": 5}, {"col1": 7}],
1978        )
1979
1980    def test_update_no_support_multi_constructor(self):
1981        assert_raises_message(
1982            exc.InvalidRequestError,
1983            "This construct does not support multiple parameter sets.",
1984            t1.update,
1985            values=[{"col1": 5}, {"col1": 7}],
1986        )
1987