1from sqlalchemy import alias 2from sqlalchemy import Column 3from sqlalchemy import column 4from sqlalchemy import Enum 5from sqlalchemy import exc 6from sqlalchemy import ForeignKey 7from sqlalchemy import Integer 8from sqlalchemy import String 9from sqlalchemy import Table 10from sqlalchemy import testing 11from sqlalchemy import true 12from sqlalchemy.engine import default 13from sqlalchemy.sql import select 14from sqlalchemy.sql import Values 15from sqlalchemy.sql.compiler import FROM_LINTING 16from sqlalchemy.testing import AssertsCompiledSQL 17from sqlalchemy.testing import expect_raises_message 18from sqlalchemy.testing import fixtures 19from sqlalchemy.util import OrderedDict 20 21 22class ValuesTest(fixtures.TablesTest, AssertsCompiledSQL): 23 __dialect__ = default.DefaultDialect(supports_native_boolean=True) 24 25 run_setup_bind = None 26 27 run_create_tables = None 28 29 @classmethod 30 def define_tables(cls, metadata): 31 Table( 32 "people", 33 metadata, 34 Column("people_id", Integer, primary_key=True), 35 Column("age", Integer), 36 Column("name", String(30)), 37 ) 38 Table( 39 "bookcases", 40 metadata, 41 Column("bookcase_id", Integer, primary_key=True), 42 Column( 43 "bookcase_owner_id", Integer, ForeignKey("people.people_id") 44 ), 45 Column("bookcase_shelves", Integer), 46 Column("bookcase_width", Integer), 47 ) 48 Table( 49 "books", 50 metadata, 51 Column("book_id", Integer, primary_key=True), 52 Column( 53 "bookcase_id", Integer, ForeignKey("bookcases.bookcase_id") 54 ), 55 Column("book_owner_id", Integer, ForeignKey("people.people_id")), 56 Column("book_weight", Integer), 57 ) 58 59 def test_wrong_number_of_elements(self): 60 v1 = Values( 61 column("CaseSensitive", Integer), 62 column("has spaces", String), 63 name="Spaces and Cases", 64 ).data([(1, "textA", 99), (2, "textB", 88)]) 65 66 with expect_raises_message( 67 exc.ArgumentError, 68 r"Wrong number of elements for 2-tuple: \(1, 'textA', 99\)", 69 ): 70 str(v1) 71 72 def test_column_quoting(self): 73 v1 = Values( 74 column("CaseSensitive", Integer), 75 column("has spaces", String), 76 column("number", Integer), 77 name="Spaces and Cases", 78 ).data([(1, "textA", 99), (2, "textB", 88)]) 79 self.assert_compile( 80 select(v1), 81 'SELECT "Spaces and Cases"."CaseSensitive", ' 82 '"Spaces and Cases"."has spaces", "Spaces and Cases".number FROM ' 83 "(VALUES (:param_1, :param_2, :param_3), " 84 "(:param_4, :param_5, :param_6)) " 85 'AS "Spaces and Cases" ("CaseSensitive", "has spaces", number)', 86 ) 87 88 def test_values_in_cte_params(self): 89 cte1 = select( 90 Values( 91 column("col1", String), 92 column("col2", Integer), 93 name="temp_table", 94 ).data([("a", 2), ("b", 3)]) 95 ).cte("cte1") 96 97 cte2 = select(cte1.c.col1).where(cte1.c.col1 == "q").cte("cte2") 98 stmt = select(cte2.c.col1) 99 100 dialect = default.DefaultDialect() 101 dialect.positional = True 102 dialect.paramstyle = "numeric" 103 self.assert_compile( 104 stmt, 105 "WITH cte1 AS (SELECT temp_table.col1 AS col1, " 106 "temp_table.col2 AS col2 FROM (VALUES (:1, :2), (:3, :4)) AS " 107 "temp_table (col1, col2)), " 108 "cte2 AS " 109 "(SELECT cte1.col1 AS col1 FROM cte1 WHERE cte1.col1 = :5) " 110 "SELECT cte2.col1 FROM cte2", 111 checkpositional=("a", 2, "b", 3, "q"), 112 dialect=dialect, 113 ) 114 115 self.assert_compile( 116 stmt, 117 "WITH cte1 AS (SELECT temp_table.col1 AS col1, " 118 "temp_table.col2 AS col2 FROM (VALUES ('a', 2), ('b', 3)) " 119 "AS temp_table (col1, col2)), " 120 "cte2 AS " 121 "(SELECT cte1.col1 AS col1 FROM cte1 WHERE cte1.col1 = 'q') " 122 "SELECT cte2.col1 FROM cte2", 123 literal_binds=True, 124 dialect=dialect, 125 ) 126 127 def test_values_in_cte_literal_binds(self): 128 cte1 = select( 129 Values( 130 column("col1", String), 131 column("col2", Integer), 132 name="temp_table", 133 literal_binds=True, 134 ).data([("a", 2), ("b", 3)]) 135 ).cte("cte1") 136 137 cte2 = select(cte1.c.col1).where(cte1.c.col1 == "q").cte("cte2") 138 stmt = select(cte2.c.col1) 139 140 self.assert_compile( 141 stmt, 142 "WITH cte1 AS (SELECT temp_table.col1 AS col1, " 143 "temp_table.col2 AS col2 FROM (VALUES ('a', 2), ('b', 3)) " 144 "AS temp_table (col1, col2)), " 145 "cte2 AS " 146 "(SELECT cte1.col1 AS col1 FROM cte1 WHERE cte1.col1 = :col1_1) " 147 "SELECT cte2.col1 FROM cte2", 148 checkparams={"col1_1": "q"}, 149 ) 150 151 @testing.fixture 152 def literal_parameter_fixture(self): 153 def go(literal_binds, omit=None): 154 cols = [ 155 column("mykey", Integer), 156 column("mytext", String), 157 column("myint", Integer), 158 ] 159 if omit: 160 for idx in omit: 161 cols[idx] = column(cols[idx].name) 162 163 return Values( 164 *cols, name="myvalues", literal_binds=literal_binds 165 ).data([(1, "textA", 99), (2, "textB", 88)]) 166 167 return go 168 169 @testing.fixture 170 def tricky_types_parameter_fixture(self): 171 class SomeEnum(object): 172 # Implements PEP 435 in the minimal fashion needed by SQLAlchemy 173 __members__ = OrderedDict() 174 175 def __init__(self, name, value, alias=None): 176 self.name = name 177 self.value = value 178 self.__members__[name] = self 179 setattr(self.__class__, name, self) 180 if alias: 181 self.__members__[alias] = self 182 setattr(self.__class__, alias, self) 183 184 one = SomeEnum("one", 1) 185 two = SomeEnum("two", 2) 186 187 class MumPyString(str): 188 """some kind of string, can't imagine where such a thing might 189 be found 190 191 """ 192 193 class MumPyNumber(int): 194 """some kind of int, can't imagine where such a thing might 195 be found 196 197 """ 198 199 def go(literal_binds, omit=None): 200 cols = [ 201 column("mykey", Integer), 202 column("mytext", String), 203 column("myenum", Enum(SomeEnum)), 204 ] 205 if omit: 206 for idx in omit: 207 cols[idx] = column(cols[idx].name) 208 209 return Values( 210 *cols, name="myvalues", literal_binds=literal_binds 211 ).data( 212 [ 213 (MumPyNumber(1), MumPyString("textA"), one), 214 (MumPyNumber(2), MumPyString("textB"), two), 215 ] 216 ) 217 218 return go 219 220 def test_bound_parameters(self, literal_parameter_fixture): 221 literal_parameter_fixture = literal_parameter_fixture(False) 222 223 stmt = select(literal_parameter_fixture) 224 225 self.assert_compile( 226 stmt, 227 "SELECT myvalues.mykey, myvalues.mytext, myvalues.myint FROM " 228 "(VALUES (:param_1, :param_2, :param_3), " 229 "(:param_4, :param_5, :param_6)" 230 ") AS myvalues (mykey, mytext, myint)", 231 checkparams={ 232 "param_1": 1, 233 "param_2": "textA", 234 "param_3": 99, 235 "param_4": 2, 236 "param_5": "textB", 237 "param_6": 88, 238 }, 239 ) 240 241 def test_literal_parameters(self, literal_parameter_fixture): 242 literal_parameter_fixture = literal_parameter_fixture(True) 243 244 stmt = select(literal_parameter_fixture) 245 246 self.assert_compile( 247 stmt, 248 "SELECT myvalues.mykey, myvalues.mytext, myvalues.myint FROM " 249 "(VALUES (1, 'textA', 99), (2, 'textB', 88)" 250 ") AS myvalues (mykey, mytext, myint)", 251 checkparams={}, 252 ) 253 254 def test_literal_parameters_not_every_type_given( 255 self, literal_parameter_fixture 256 ): 257 literal_parameter_fixture = literal_parameter_fixture(True, omit=(1,)) 258 259 stmt = select(literal_parameter_fixture) 260 261 self.assert_compile( 262 stmt, 263 "SELECT myvalues.mykey, myvalues.mytext, myvalues.myint FROM " 264 "(VALUES (1, 'textA', 99), (2, 'textB', 88)" 265 ") AS myvalues (mykey, mytext, myint)", 266 checkparams={}, 267 ) 268 269 def test_use_cols_tricky_not_every_type_given( 270 self, tricky_types_parameter_fixture 271 ): 272 literal_parameter_fixture = tricky_types_parameter_fixture( 273 True, omit=(1,) 274 ) 275 276 stmt = select(literal_parameter_fixture) 277 278 with expect_raises_message( 279 exc.CompileError, 280 "Don't know how to render literal SQL value: 'textA'", 281 ): 282 str(stmt) 283 284 def test_use_cols_for_types(self, tricky_types_parameter_fixture): 285 literal_parameter_fixture = tricky_types_parameter_fixture(True) 286 287 stmt = select(literal_parameter_fixture) 288 289 self.assert_compile( 290 stmt, 291 "SELECT myvalues.mykey, myvalues.mytext, myvalues.myenum FROM " 292 "(VALUES (1, 'textA', 'one'), (2, 'textB', 'two')" 293 ") AS myvalues (mykey, mytext, myenum)", 294 checkparams={}, 295 ) 296 297 def test_with_join_unnamed(self): 298 people = self.tables.people 299 values = Values( 300 column("column1", Integer), 301 column("column2", Integer), 302 ).data([(1, 1), (2, 1), (3, 2), (3, 3)]) 303 stmt = select(people, values).select_from( 304 people.join(values, values.c.column2 == people.c.people_id) 305 ) 306 self.assert_compile( 307 stmt, 308 "SELECT people.people_id, people.age, people.name, column1, " 309 "column2 FROM people JOIN (VALUES (:param_1, :param_2), " 310 "(:param_3, :param_4), (:param_5, :param_6), " 311 "(:param_7, :param_8)) " 312 "ON people.people_id = column2", 313 checkparams={ 314 "param_1": 1, 315 "param_2": 1, 316 "param_3": 2, 317 "param_4": 1, 318 "param_5": 3, 319 "param_6": 2, 320 "param_7": 3, 321 "param_8": 3, 322 }, 323 ) 324 325 def test_with_join_named(self): 326 people = self.tables.people 327 values = Values( 328 column("bookcase_id", Integer), 329 column("bookcase_owner_id", Integer), 330 name="bookcases", 331 ).data([(1, 1), (2, 1), (3, 2), (3, 3)]) 332 stmt = select(people, values).select_from( 333 people.join( 334 values, values.c.bookcase_owner_id == people.c.people_id 335 ) 336 ) 337 self.assert_compile( 338 stmt, 339 "SELECT people.people_id, people.age, people.name, " 340 "bookcases.bookcase_id, bookcases.bookcase_owner_id FROM people " 341 "JOIN (VALUES (:param_1, :param_2), (:param_3, :param_4), " 342 "(:param_5, :param_6), (:param_7, :param_8)) AS bookcases " 343 "(bookcase_id, bookcase_owner_id) " 344 "ON people.people_id = bookcases.bookcase_owner_id", 345 checkparams={ 346 "param_1": 1, 347 "param_2": 1, 348 "param_3": 2, 349 "param_4": 1, 350 "param_5": 3, 351 "param_6": 2, 352 "param_7": 3, 353 "param_8": 3, 354 }, 355 ) 356 357 def test_with_aliased_join(self): 358 people = self.tables.people 359 values = ( 360 Values( 361 column("bookcase_id", Integer), 362 column("bookcase_owner_id", Integer), 363 ) 364 .data([(1, 1), (2, 1), (3, 2), (3, 3)]) 365 .alias("bookcases") 366 ) 367 stmt = select(people, values).select_from( 368 people.join( 369 values, values.c.bookcase_owner_id == people.c.people_id 370 ) 371 ) 372 self.assert_compile( 373 stmt, 374 "SELECT people.people_id, people.age, people.name, " 375 "bookcases.bookcase_id, bookcases.bookcase_owner_id FROM people " 376 "JOIN (VALUES (:param_1, :param_2), (:param_3, :param_4), " 377 "(:param_5, :param_6), (:param_7, :param_8)) AS bookcases " 378 "(bookcase_id, bookcase_owner_id) " 379 "ON people.people_id = bookcases.bookcase_owner_id", 380 checkparams={ 381 "param_1": 1, 382 "param_2": 1, 383 "param_3": 2, 384 "param_4": 1, 385 "param_5": 3, 386 "param_6": 2, 387 "param_7": 3, 388 "param_8": 3, 389 }, 390 ) 391 392 def test_with_standalone_aliased_join(self): 393 people = self.tables.people 394 values = Values( 395 column("bookcase_id", Integer), 396 column("bookcase_owner_id", Integer), 397 ).data([(1, 1), (2, 1), (3, 2), (3, 3)]) 398 values = alias(values, "bookcases") 399 400 stmt = select(people, values).select_from( 401 people.join( 402 values, values.c.bookcase_owner_id == people.c.people_id 403 ) 404 ) 405 self.assert_compile( 406 stmt, 407 "SELECT people.people_id, people.age, people.name, " 408 "bookcases.bookcase_id, bookcases.bookcase_owner_id FROM people " 409 "JOIN (VALUES (:param_1, :param_2), (:param_3, :param_4), " 410 "(:param_5, :param_6), (:param_7, :param_8)) AS bookcases " 411 "(bookcase_id, bookcase_owner_id) " 412 "ON people.people_id = bookcases.bookcase_owner_id", 413 checkparams={ 414 "param_1": 1, 415 "param_2": 1, 416 "param_3": 2, 417 "param_4": 1, 418 "param_5": 3, 419 "param_6": 2, 420 "param_7": 3, 421 "param_8": 3, 422 }, 423 ) 424 425 def test_lateral(self): 426 people = self.tables.people 427 values = ( 428 Values( 429 column("bookcase_id", Integer), 430 column("bookcase_owner_id", Integer), 431 name="bookcases", 432 ) 433 .data([(1, 1), (2, 1), (3, 2), (3, 3)]) 434 .lateral() 435 ) 436 stmt = select(people, values).select_from(people.join(values, true())) 437 self.assert_compile( 438 stmt, 439 "SELECT people.people_id, people.age, people.name, " 440 "bookcases.bookcase_id, bookcases.bookcase_owner_id FROM people " 441 "JOIN LATERAL (VALUES (:param_1, :param_2), (:param_3, :param_4), " 442 "(:param_5, :param_6), (:param_7, :param_8)) AS bookcases " 443 "(bookcase_id, bookcase_owner_id) " 444 "ON true", 445 checkparams={ 446 "param_1": 1, 447 "param_2": 1, 448 "param_3": 2, 449 "param_4": 1, 450 "param_5": 3, 451 "param_6": 2, 452 "param_7": 3, 453 "param_8": 3, 454 }, 455 ) 456 457 def test_from_linting_named(self): 458 people = self.tables.people 459 values = Values( 460 column("bookcase_id", Integer), 461 column("bookcase_owner_id", Integer), 462 name="bookcases", 463 ).data([(1, 1), (2, 1), (3, 2), (3, 3)]) 464 stmt = select(people, values) 465 466 with testing.expect_warnings( 467 r"SELECT statement has a cartesian product between FROM " 468 r'element\(s\) "(?:bookcases|people)" and ' 469 r'FROM element "(?:people|bookcases)"' 470 ): 471 stmt.compile(linting=FROM_LINTING) 472 473 def test_from_linting_unnamed(self): 474 people = self.tables.people 475 values = Values( 476 column("bookcase_id", Integer), 477 column("bookcase_owner_id", Integer), 478 ).data([(1, 1), (2, 1), (3, 2), (3, 3)]) 479 stmt = select(people, values) 480 481 with testing.expect_warnings( 482 r"SELECT statement has a cartesian product between FROM " 483 r'element\(s\) "(?:\(unnamed VALUES element\)|people)" and ' 484 r'FROM element "(?:people|\(unnamed VALUES element\))"' 485 ): 486 stmt.compile(linting=FROM_LINTING) 487