1from sqlalchemy import alias
2from sqlalchemy import Column
3from sqlalchemy import column
4from sqlalchemy import Enum
5from sqlalchemy import exc
6from sqlalchemy import ForeignKey
7from sqlalchemy import Integer
8from sqlalchemy import String
9from sqlalchemy import Table
10from sqlalchemy import testing
11from sqlalchemy import true
12from sqlalchemy.engine import default
13from sqlalchemy.sql import select
14from sqlalchemy.sql import Values
15from sqlalchemy.sql.compiler import FROM_LINTING
16from sqlalchemy.testing import AssertsCompiledSQL
17from sqlalchemy.testing import expect_raises_message
18from sqlalchemy.testing import fixtures
19from sqlalchemy.util import OrderedDict
20
21
22class ValuesTest(fixtures.TablesTest, AssertsCompiledSQL):
23    __dialect__ = default.DefaultDialect(supports_native_boolean=True)
24
25    run_setup_bind = None
26
27    run_create_tables = None
28
29    @classmethod
30    def define_tables(cls, metadata):
31        Table(
32            "people",
33            metadata,
34            Column("people_id", Integer, primary_key=True),
35            Column("age", Integer),
36            Column("name", String(30)),
37        )
38        Table(
39            "bookcases",
40            metadata,
41            Column("bookcase_id", Integer, primary_key=True),
42            Column(
43                "bookcase_owner_id", Integer, ForeignKey("people.people_id")
44            ),
45            Column("bookcase_shelves", Integer),
46            Column("bookcase_width", Integer),
47        )
48        Table(
49            "books",
50            metadata,
51            Column("book_id", Integer, primary_key=True),
52            Column(
53                "bookcase_id", Integer, ForeignKey("bookcases.bookcase_id")
54            ),
55            Column("book_owner_id", Integer, ForeignKey("people.people_id")),
56            Column("book_weight", Integer),
57        )
58
59    def test_wrong_number_of_elements(self):
60        v1 = Values(
61            column("CaseSensitive", Integer),
62            column("has spaces", String),
63            name="Spaces and Cases",
64        ).data([(1, "textA", 99), (2, "textB", 88)])
65
66        with expect_raises_message(
67            exc.ArgumentError,
68            r"Wrong number of elements for 2-tuple: \(1, 'textA', 99\)",
69        ):
70            str(v1)
71
72    def test_column_quoting(self):
73        v1 = Values(
74            column("CaseSensitive", Integer),
75            column("has spaces", String),
76            column("number", Integer),
77            name="Spaces and Cases",
78        ).data([(1, "textA", 99), (2, "textB", 88)])
79        self.assert_compile(
80            select(v1),
81            'SELECT "Spaces and Cases"."CaseSensitive", '
82            '"Spaces and Cases"."has spaces", "Spaces and Cases".number FROM '
83            "(VALUES (:param_1, :param_2, :param_3), "
84            "(:param_4, :param_5, :param_6)) "
85            'AS "Spaces and Cases" ("CaseSensitive", "has spaces", number)',
86        )
87
88    def test_values_in_cte_params(self):
89        cte1 = select(
90            Values(
91                column("col1", String),
92                column("col2", Integer),
93                name="temp_table",
94            ).data([("a", 2), ("b", 3)])
95        ).cte("cte1")
96
97        cte2 = select(cte1.c.col1).where(cte1.c.col1 == "q").cte("cte2")
98        stmt = select(cte2.c.col1)
99
100        dialect = default.DefaultDialect()
101        dialect.positional = True
102        dialect.paramstyle = "numeric"
103        self.assert_compile(
104            stmt,
105            "WITH cte1 AS (SELECT temp_table.col1 AS col1, "
106            "temp_table.col2 AS col2 FROM (VALUES (:1, :2), (:3, :4)) AS "
107            "temp_table (col1, col2)), "
108            "cte2 AS "
109            "(SELECT cte1.col1 AS col1 FROM cte1 WHERE cte1.col1 = :5) "
110            "SELECT cte2.col1 FROM cte2",
111            checkpositional=("a", 2, "b", 3, "q"),
112            dialect=dialect,
113        )
114
115        self.assert_compile(
116            stmt,
117            "WITH cte1 AS (SELECT temp_table.col1 AS col1, "
118            "temp_table.col2 AS col2 FROM (VALUES ('a', 2), ('b', 3)) "
119            "AS temp_table (col1, col2)), "
120            "cte2 AS "
121            "(SELECT cte1.col1 AS col1 FROM cte1 WHERE cte1.col1 = 'q') "
122            "SELECT cte2.col1 FROM cte2",
123            literal_binds=True,
124            dialect=dialect,
125        )
126
127    def test_values_in_cte_literal_binds(self):
128        cte1 = select(
129            Values(
130                column("col1", String),
131                column("col2", Integer),
132                name="temp_table",
133                literal_binds=True,
134            ).data([("a", 2), ("b", 3)])
135        ).cte("cte1")
136
137        cte2 = select(cte1.c.col1).where(cte1.c.col1 == "q").cte("cte2")
138        stmt = select(cte2.c.col1)
139
140        self.assert_compile(
141            stmt,
142            "WITH cte1 AS (SELECT temp_table.col1 AS col1, "
143            "temp_table.col2 AS col2 FROM (VALUES ('a', 2), ('b', 3)) "
144            "AS temp_table (col1, col2)), "
145            "cte2 AS "
146            "(SELECT cte1.col1 AS col1 FROM cte1 WHERE cte1.col1 = :col1_1) "
147            "SELECT cte2.col1 FROM cte2",
148            checkparams={"col1_1": "q"},
149        )
150
151    @testing.fixture
152    def literal_parameter_fixture(self):
153        def go(literal_binds, omit=None):
154            cols = [
155                column("mykey", Integer),
156                column("mytext", String),
157                column("myint", Integer),
158            ]
159            if omit:
160                for idx in omit:
161                    cols[idx] = column(cols[idx].name)
162
163            return Values(
164                *cols, name="myvalues", literal_binds=literal_binds
165            ).data([(1, "textA", 99), (2, "textB", 88)])
166
167        return go
168
169    @testing.fixture
170    def tricky_types_parameter_fixture(self):
171        class SomeEnum(object):
172            # Implements PEP 435 in the minimal fashion needed by SQLAlchemy
173            __members__ = OrderedDict()
174
175            def __init__(self, name, value, alias=None):
176                self.name = name
177                self.value = value
178                self.__members__[name] = self
179                setattr(self.__class__, name, self)
180                if alias:
181                    self.__members__[alias] = self
182                    setattr(self.__class__, alias, self)
183
184        one = SomeEnum("one", 1)
185        two = SomeEnum("two", 2)
186
187        class MumPyString(str):
188            """some kind of string, can't imagine where such a thing might
189            be found
190
191            """
192
193        class MumPyNumber(int):
194            """some kind of int, can't imagine where such a thing might
195            be found
196
197            """
198
199        def go(literal_binds, omit=None):
200            cols = [
201                column("mykey", Integer),
202                column("mytext", String),
203                column("myenum", Enum(SomeEnum)),
204            ]
205            if omit:
206                for idx in omit:
207                    cols[idx] = column(cols[idx].name)
208
209            return Values(
210                *cols, name="myvalues", literal_binds=literal_binds
211            ).data(
212                [
213                    (MumPyNumber(1), MumPyString("textA"), one),
214                    (MumPyNumber(2), MumPyString("textB"), two),
215                ]
216            )
217
218        return go
219
220    def test_bound_parameters(self, literal_parameter_fixture):
221        literal_parameter_fixture = literal_parameter_fixture(False)
222
223        stmt = select(literal_parameter_fixture)
224
225        self.assert_compile(
226            stmt,
227            "SELECT myvalues.mykey, myvalues.mytext, myvalues.myint FROM "
228            "(VALUES (:param_1, :param_2, :param_3), "
229            "(:param_4, :param_5, :param_6)"
230            ") AS myvalues (mykey, mytext, myint)",
231            checkparams={
232                "param_1": 1,
233                "param_2": "textA",
234                "param_3": 99,
235                "param_4": 2,
236                "param_5": "textB",
237                "param_6": 88,
238            },
239        )
240
241    def test_literal_parameters(self, literal_parameter_fixture):
242        literal_parameter_fixture = literal_parameter_fixture(True)
243
244        stmt = select(literal_parameter_fixture)
245
246        self.assert_compile(
247            stmt,
248            "SELECT myvalues.mykey, myvalues.mytext, myvalues.myint FROM "
249            "(VALUES (1, 'textA', 99), (2, 'textB', 88)"
250            ") AS myvalues (mykey, mytext, myint)",
251            checkparams={},
252        )
253
254    def test_literal_parameters_not_every_type_given(
255        self, literal_parameter_fixture
256    ):
257        literal_parameter_fixture = literal_parameter_fixture(True, omit=(1,))
258
259        stmt = select(literal_parameter_fixture)
260
261        self.assert_compile(
262            stmt,
263            "SELECT myvalues.mykey, myvalues.mytext, myvalues.myint FROM "
264            "(VALUES (1, 'textA', 99), (2, 'textB', 88)"
265            ") AS myvalues (mykey, mytext, myint)",
266            checkparams={},
267        )
268
269    def test_use_cols_tricky_not_every_type_given(
270        self, tricky_types_parameter_fixture
271    ):
272        literal_parameter_fixture = tricky_types_parameter_fixture(
273            True, omit=(1,)
274        )
275
276        stmt = select(literal_parameter_fixture)
277
278        with expect_raises_message(
279            exc.CompileError,
280            "Don't know how to render literal SQL value: 'textA'",
281        ):
282            str(stmt)
283
284    def test_use_cols_for_types(self, tricky_types_parameter_fixture):
285        literal_parameter_fixture = tricky_types_parameter_fixture(True)
286
287        stmt = select(literal_parameter_fixture)
288
289        self.assert_compile(
290            stmt,
291            "SELECT myvalues.mykey, myvalues.mytext, myvalues.myenum FROM "
292            "(VALUES (1, 'textA', 'one'), (2, 'textB', 'two')"
293            ") AS myvalues (mykey, mytext, myenum)",
294            checkparams={},
295        )
296
297    def test_with_join_unnamed(self):
298        people = self.tables.people
299        values = Values(
300            column("column1", Integer),
301            column("column2", Integer),
302        ).data([(1, 1), (2, 1), (3, 2), (3, 3)])
303        stmt = select(people, values).select_from(
304            people.join(values, values.c.column2 == people.c.people_id)
305        )
306        self.assert_compile(
307            stmt,
308            "SELECT people.people_id, people.age, people.name, column1, "
309            "column2 FROM people JOIN (VALUES (:param_1, :param_2), "
310            "(:param_3, :param_4), (:param_5, :param_6), "
311            "(:param_7, :param_8)) "
312            "ON people.people_id = column2",
313            checkparams={
314                "param_1": 1,
315                "param_2": 1,
316                "param_3": 2,
317                "param_4": 1,
318                "param_5": 3,
319                "param_6": 2,
320                "param_7": 3,
321                "param_8": 3,
322            },
323        )
324
325    def test_with_join_named(self):
326        people = self.tables.people
327        values = Values(
328            column("bookcase_id", Integer),
329            column("bookcase_owner_id", Integer),
330            name="bookcases",
331        ).data([(1, 1), (2, 1), (3, 2), (3, 3)])
332        stmt = select(people, values).select_from(
333            people.join(
334                values, values.c.bookcase_owner_id == people.c.people_id
335            )
336        )
337        self.assert_compile(
338            stmt,
339            "SELECT people.people_id, people.age, people.name, "
340            "bookcases.bookcase_id, bookcases.bookcase_owner_id FROM people "
341            "JOIN (VALUES (:param_1, :param_2), (:param_3, :param_4), "
342            "(:param_5, :param_6), (:param_7, :param_8)) AS bookcases "
343            "(bookcase_id, bookcase_owner_id) "
344            "ON people.people_id = bookcases.bookcase_owner_id",
345            checkparams={
346                "param_1": 1,
347                "param_2": 1,
348                "param_3": 2,
349                "param_4": 1,
350                "param_5": 3,
351                "param_6": 2,
352                "param_7": 3,
353                "param_8": 3,
354            },
355        )
356
357    def test_with_aliased_join(self):
358        people = self.tables.people
359        values = (
360            Values(
361                column("bookcase_id", Integer),
362                column("bookcase_owner_id", Integer),
363            )
364            .data([(1, 1), (2, 1), (3, 2), (3, 3)])
365            .alias("bookcases")
366        )
367        stmt = select(people, values).select_from(
368            people.join(
369                values, values.c.bookcase_owner_id == people.c.people_id
370            )
371        )
372        self.assert_compile(
373            stmt,
374            "SELECT people.people_id, people.age, people.name, "
375            "bookcases.bookcase_id, bookcases.bookcase_owner_id FROM people "
376            "JOIN (VALUES (:param_1, :param_2), (:param_3, :param_4), "
377            "(:param_5, :param_6), (:param_7, :param_8)) AS bookcases "
378            "(bookcase_id, bookcase_owner_id) "
379            "ON people.people_id = bookcases.bookcase_owner_id",
380            checkparams={
381                "param_1": 1,
382                "param_2": 1,
383                "param_3": 2,
384                "param_4": 1,
385                "param_5": 3,
386                "param_6": 2,
387                "param_7": 3,
388                "param_8": 3,
389            },
390        )
391
392    def test_with_standalone_aliased_join(self):
393        people = self.tables.people
394        values = Values(
395            column("bookcase_id", Integer),
396            column("bookcase_owner_id", Integer),
397        ).data([(1, 1), (2, 1), (3, 2), (3, 3)])
398        values = alias(values, "bookcases")
399
400        stmt = select(people, values).select_from(
401            people.join(
402                values, values.c.bookcase_owner_id == people.c.people_id
403            )
404        )
405        self.assert_compile(
406            stmt,
407            "SELECT people.people_id, people.age, people.name, "
408            "bookcases.bookcase_id, bookcases.bookcase_owner_id FROM people "
409            "JOIN (VALUES (:param_1, :param_2), (:param_3, :param_4), "
410            "(:param_5, :param_6), (:param_7, :param_8)) AS bookcases "
411            "(bookcase_id, bookcase_owner_id) "
412            "ON people.people_id = bookcases.bookcase_owner_id",
413            checkparams={
414                "param_1": 1,
415                "param_2": 1,
416                "param_3": 2,
417                "param_4": 1,
418                "param_5": 3,
419                "param_6": 2,
420                "param_7": 3,
421                "param_8": 3,
422            },
423        )
424
425    def test_lateral(self):
426        people = self.tables.people
427        values = (
428            Values(
429                column("bookcase_id", Integer),
430                column("bookcase_owner_id", Integer),
431                name="bookcases",
432            )
433            .data([(1, 1), (2, 1), (3, 2), (3, 3)])
434            .lateral()
435        )
436        stmt = select(people, values).select_from(people.join(values, true()))
437        self.assert_compile(
438            stmt,
439            "SELECT people.people_id, people.age, people.name, "
440            "bookcases.bookcase_id, bookcases.bookcase_owner_id FROM people "
441            "JOIN LATERAL (VALUES (:param_1, :param_2), (:param_3, :param_4), "
442            "(:param_5, :param_6), (:param_7, :param_8)) AS bookcases "
443            "(bookcase_id, bookcase_owner_id) "
444            "ON true",
445            checkparams={
446                "param_1": 1,
447                "param_2": 1,
448                "param_3": 2,
449                "param_4": 1,
450                "param_5": 3,
451                "param_6": 2,
452                "param_7": 3,
453                "param_8": 3,
454            },
455        )
456
457    def test_from_linting_named(self):
458        people = self.tables.people
459        values = Values(
460            column("bookcase_id", Integer),
461            column("bookcase_owner_id", Integer),
462            name="bookcases",
463        ).data([(1, 1), (2, 1), (3, 2), (3, 3)])
464        stmt = select(people, values)
465
466        with testing.expect_warnings(
467            r"SELECT statement has a cartesian product between FROM "
468            r'element\(s\) "(?:bookcases|people)" and '
469            r'FROM element "(?:people|bookcases)"'
470        ):
471            stmt.compile(linting=FROM_LINTING)
472
473    def test_from_linting_unnamed(self):
474        people = self.tables.people
475        values = Values(
476            column("bookcase_id", Integer),
477            column("bookcase_owner_id", Integer),
478        ).data([(1, 1), (2, 1), (3, 2), (3, 3)])
479        stmt = select(people, values)
480
481        with testing.expect_warnings(
482            r"SELECT statement has a cartesian product between FROM "
483            r'element\(s\) "(?:\(unnamed VALUES element\)|people)" and '
484            r'FROM element "(?:people|\(unnamed VALUES element\))"'
485        ):
486            stmt.compile(linting=FROM_LINTING)
487