1#! coding:utf-8 2 3from sqlalchemy import bindparam 4from sqlalchemy import Column 5from sqlalchemy import column 6from sqlalchemy import exc 7from sqlalchemy import func 8from sqlalchemy import insert 9from sqlalchemy import Integer 10from sqlalchemy import MetaData 11from sqlalchemy import select 12from sqlalchemy import Sequence 13from sqlalchemy import String 14from sqlalchemy import Table 15from sqlalchemy import table 16from sqlalchemy import text 17from sqlalchemy.dialects import mysql 18from sqlalchemy.dialects import postgresql 19from sqlalchemy.engine import default 20from sqlalchemy.sql import crud 21from sqlalchemy.testing import assert_raises 22from sqlalchemy.testing import assert_raises_message 23from sqlalchemy.testing import AssertsCompiledSQL 24from sqlalchemy.testing import eq_ 25from sqlalchemy.testing import expect_warnings 26from sqlalchemy.testing import fixtures 27 28 29class _InsertTestBase(object): 30 @classmethod 31 def define_tables(cls, metadata): 32 Table( 33 "mytable", 34 metadata, 35 Column("myid", Integer), 36 Column("name", String(30)), 37 Column("description", String(30)), 38 ) 39 Table( 40 "myothertable", 41 metadata, 42 Column("otherid", Integer, primary_key=True), 43 Column("othername", String(30)), 44 ) 45 Table( 46 "table_w_defaults", 47 metadata, 48 Column("id", Integer, primary_key=True), 49 Column("x", Integer, default=10), 50 Column("y", Integer, server_default=text("5")), 51 Column("z", Integer, default=lambda: 10), 52 ) 53 54 55class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL): 56 __dialect__ = "default" 57 58 def test_binds_that_match_columns(self): 59 """test bind params named after column names 60 replace the normal SET/VALUES generation.""" 61 62 t = table("foo", column("x"), column("y")) 63 64 i = t.insert().values(x=3 + bindparam("x")) 65 self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x))") 66 self.assert_compile( 67 i, 68 "INSERT INTO foo (x, y) VALUES ((:param_1 + :x), :y)", 69 params={"x": 1, "y": 2}, 70 ) 71 72 i = t.insert().values(x=bindparam("y")) 73 self.assert_compile(i, "INSERT INTO foo (x) VALUES (:y)") 74 75 i = t.insert().values(x=bindparam("y"), y=5) 76 assert_raises(exc.CompileError, i.compile) 77 78 i = t.insert().values(x=3 + bindparam("y"), y=5) 79 assert_raises(exc.CompileError, i.compile) 80 81 i = t.insert().values(x=3 + bindparam("x2")) 82 self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x2))") 83 self.assert_compile( 84 i, "INSERT INTO foo (x) VALUES ((:param_1 + :x2))", params={} 85 ) 86 self.assert_compile( 87 i, 88 "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)", 89 params={"x": 1, "y": 2}, 90 ) 91 self.assert_compile( 92 i, 93 "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)", 94 params={"x2": 1, "y": 2}, 95 ) 96 97 def test_insert_literal_binds(self): 98 table1 = self.tables.mytable 99 stmt = table1.insert().values(myid=3, name="jack") 100 101 self.assert_compile( 102 stmt, 103 "INSERT INTO mytable (myid, name) VALUES (3, 'jack')", 104 literal_binds=True, 105 ) 106 107 def test_insert_literal_binds_sequence_notimplemented(self): 108 table = Table("x", MetaData(), Column("y", Integer, Sequence("y_seq"))) 109 dialect = default.DefaultDialect() 110 dialect.supports_sequences = True 111 112 stmt = table.insert().values(myid=3, name="jack") 113 114 assert_raises( 115 NotImplementedError, 116 stmt.compile, 117 compile_kwargs=dict(literal_binds=True), 118 dialect=dialect, 119 ) 120 121 def test_inline_defaults(self): 122 m = MetaData() 123 foo = Table("foo", m, Column("id", Integer)) 124 125 t = Table( 126 "test", 127 m, 128 Column("col1", Integer, default=func.foo(1)), 129 Column( 130 "col2", 131 Integer, 132 default=select([func.coalesce(func.max(foo.c.id))]), 133 ), 134 ) 135 136 self.assert_compile( 137 t.insert(inline=True, values={}), 138 "INSERT INTO test (col1, col2) VALUES (foo(:foo_1), " 139 "(SELECT coalesce(max(foo.id)) AS coalesce_1 FROM " 140 "foo))", 141 ) 142 143 def test_generic_insert_bind_params_all_columns(self): 144 table1 = self.tables.mytable 145 146 self.assert_compile( 147 insert(table1), 148 "INSERT INTO mytable (myid, name, description) " 149 "VALUES (:myid, :name, :description)", 150 ) 151 152 def test_insert_with_values_dict(self): 153 table1 = self.tables.mytable 154 155 checkparams = {"myid": 3, "name": "jack"} 156 157 self.assert_compile( 158 insert(table1, dict(myid=3, name="jack")), 159 "INSERT INTO mytable (myid, name) VALUES (:myid, :name)", 160 checkparams=checkparams, 161 ) 162 163 def test_unconsumed_names_kwargs(self): 164 t = table("t", column("x"), column("y")) 165 assert_raises_message( 166 exc.CompileError, 167 "Unconsumed column names: z", 168 t.insert().values(x=5, z=5).compile, 169 ) 170 171 def test_bindparam_name_no_consume_error(self): 172 t = table("t", column("x"), column("y")) 173 # bindparam names don't get counted 174 i = t.insert().values(x=3 + bindparam("x2")) 175 self.assert_compile(i, "INSERT INTO t (x) VALUES ((:param_1 + :x2))") 176 177 # even if in the params list 178 i = t.insert().values(x=3 + bindparam("x2")) 179 self.assert_compile( 180 i, "INSERT INTO t (x) VALUES ((:param_1 + :x2))", params={"x2": 1} 181 ) 182 183 def test_unconsumed_names_values_dict(self): 184 table1 = self.tables.mytable 185 186 checkparams = {"myid": 3, "name": "jack", "unknowncol": "oops"} 187 188 stmt = insert(table1, values=checkparams) 189 assert_raises_message( 190 exc.CompileError, 191 "Unconsumed column names: unknowncol", 192 stmt.compile, 193 dialect=postgresql.dialect(), 194 ) 195 196 def test_unconsumed_names_multi_values_dict(self): 197 table1 = self.tables.mytable 198 199 checkparams = [ 200 {"myid": 3, "name": "jack", "unknowncol": "oops"}, 201 {"myid": 4, "name": "someone", "unknowncol": "oops"}, 202 ] 203 204 stmt = insert(table1, values=checkparams) 205 assert_raises_message( 206 exc.CompileError, 207 "Unconsumed column names: unknowncol", 208 stmt.compile, 209 dialect=postgresql.dialect(), 210 ) 211 212 def test_insert_with_values_tuple(self): 213 table1 = self.tables.mytable 214 215 checkparams = { 216 "myid": 3, 217 "name": "jack", 218 "description": "mydescription", 219 } 220 221 self.assert_compile( 222 insert(table1, (3, "jack", "mydescription")), 223 "INSERT INTO mytable (myid, name, description) " 224 "VALUES (:myid, :name, :description)", 225 checkparams=checkparams, 226 ) 227 228 def test_insert_with_values_func(self): 229 table1 = self.tables.mytable 230 231 self.assert_compile( 232 insert(table1, values=dict(myid=func.lala())), 233 "INSERT INTO mytable (myid) VALUES (lala())", 234 ) 235 236 def test_insert_with_user_supplied_bind_params(self): 237 table1 = self.tables.mytable 238 239 values = { 240 table1.c.myid: bindparam("userid"), 241 table1.c.name: bindparam("username"), 242 } 243 244 self.assert_compile( 245 insert(table1, values), 246 "INSERT INTO mytable (myid, name) VALUES (:userid, :username)", 247 ) 248 249 def test_insert_values(self): 250 table1 = self.tables.mytable 251 252 values1 = {table1.c.myid: bindparam("userid")} 253 values2 = {table1.c.name: bindparam("username")} 254 255 self.assert_compile( 256 insert(table1, values=values1).values(values2), 257 "INSERT INTO mytable (myid, name) VALUES (:userid, :username)", 258 ) 259 260 def test_prefix_with(self): 261 table1 = self.tables.mytable 262 263 stmt = ( 264 table1.insert() 265 .prefix_with("A", "B", dialect="mysql") 266 .prefix_with("C", "D") 267 ) 268 269 self.assert_compile( 270 stmt, 271 "INSERT C D INTO mytable (myid, name, description) " 272 "VALUES (:myid, :name, :description)", 273 ) 274 275 self.assert_compile( 276 stmt, 277 "INSERT A B C D INTO mytable (myid, name, description) " 278 "VALUES (%s, %s, %s)", 279 dialect=mysql.dialect(), 280 ) 281 282 def test_inline_default(self): 283 metadata = MetaData() 284 table = Table( 285 "sometable", 286 metadata, 287 Column("id", Integer, primary_key=True), 288 Column("foo", Integer, default=func.foobar()), 289 ) 290 291 self.assert_compile( 292 table.insert(values={}, inline=True), 293 "INSERT INTO sometable (foo) VALUES (foobar())", 294 ) 295 296 self.assert_compile( 297 table.insert(inline=True), 298 "INSERT INTO sometable (foo) VALUES (foobar())", 299 params={}, 300 ) 301 302 def test_insert_returning_not_in_default(self): 303 table1 = self.tables.mytable 304 305 stmt = table1.insert().returning(table1.c.myid) 306 assert_raises_message( 307 exc.CompileError, 308 "RETURNING is not supported by this dialect's statement compiler.", 309 stmt.compile, 310 dialect=default.DefaultDialect(), 311 ) 312 313 def test_insert_from_select_returning(self): 314 table1 = self.tables.mytable 315 sel = select([table1.c.myid, table1.c.name]).where( 316 table1.c.name == "foo" 317 ) 318 ins = ( 319 self.tables.myothertable.insert() 320 .from_select(("otherid", "othername"), sel) 321 .returning(self.tables.myothertable.c.otherid) 322 ) 323 self.assert_compile( 324 ins, 325 "INSERT INTO myothertable (otherid, othername) " 326 "SELECT mytable.myid, mytable.name FROM mytable " 327 "WHERE mytable.name = %(name_1)s RETURNING myothertable.otherid", 328 checkparams={"name_1": "foo"}, 329 dialect="postgresql", 330 ) 331 332 def test_insert_from_select_select(self): 333 table1 = self.tables.mytable 334 sel = select([table1.c.myid, table1.c.name]).where( 335 table1.c.name == "foo" 336 ) 337 ins = self.tables.myothertable.insert().from_select( 338 ("otherid", "othername"), sel 339 ) 340 self.assert_compile( 341 ins, 342 "INSERT INTO myothertable (otherid, othername) " 343 "SELECT mytable.myid, mytable.name FROM mytable " 344 "WHERE mytable.name = :name_1", 345 checkparams={"name_1": "foo"}, 346 ) 347 348 def test_insert_from_select_seq(self): 349 m = MetaData() 350 351 t1 = Table( 352 "t", 353 m, 354 Column("id", Integer, Sequence("id_seq"), primary_key=True), 355 Column("data", String), 356 ) 357 358 stmt = t1.insert().from_select(("data",), select([t1.c.data])) 359 360 self.assert_compile( 361 stmt, 362 "INSERT INTO t (data, id) SELECT t.data, " 363 "nextval('id_seq') AS next_value_1 FROM t", 364 dialect=postgresql.dialect(), 365 ) 366 367 def test_insert_from_select_cte_one(self): 368 table1 = self.tables.mytable 369 370 cte = select([table1.c.name]).where(table1.c.name == "bar").cte() 371 372 sel = select([table1.c.myid, table1.c.name]).where( 373 table1.c.name == cte.c.name 374 ) 375 376 ins = self.tables.myothertable.insert().from_select( 377 ("otherid", "othername"), sel 378 ) 379 self.assert_compile( 380 ins, 381 "WITH anon_1 AS " 382 "(SELECT mytable.name AS name FROM mytable " 383 "WHERE mytable.name = :name_1) " 384 "INSERT INTO myothertable (otherid, othername) " 385 "SELECT mytable.myid, mytable.name FROM mytable, anon_1 " 386 "WHERE mytable.name = anon_1.name", 387 checkparams={"name_1": "bar"}, 388 ) 389 390 def test_insert_from_select_cte_follows_insert_one(self): 391 dialect = default.DefaultDialect() 392 dialect.cte_follows_insert = True 393 394 table1 = self.tables.mytable 395 396 cte = select([table1.c.name]).where(table1.c.name == "bar").cte() 397 398 sel = select([table1.c.myid, table1.c.name]).where( 399 table1.c.name == cte.c.name 400 ) 401 402 ins = self.tables.myothertable.insert().from_select( 403 ("otherid", "othername"), sel 404 ) 405 self.assert_compile( 406 ins, 407 "INSERT INTO myothertable (otherid, othername) " 408 "WITH anon_1 AS " 409 "(SELECT mytable.name AS name FROM mytable " 410 "WHERE mytable.name = :name_1) " 411 "SELECT mytable.myid, mytable.name FROM mytable, anon_1 " 412 "WHERE mytable.name = anon_1.name", 413 checkparams={"name_1": "bar"}, 414 dialect=dialect, 415 ) 416 417 def test_insert_from_select_cte_two(self): 418 table1 = self.tables.mytable 419 420 cte = table1.select().cte("c") 421 stmt = cte.select() 422 ins = table1.insert().from_select(table1.c, stmt) 423 424 self.assert_compile( 425 ins, 426 "WITH c AS (SELECT mytable.myid AS myid, mytable.name AS name, " 427 "mytable.description AS description FROM mytable) " 428 "INSERT INTO mytable (myid, name, description) " 429 "SELECT c.myid, c.name, c.description FROM c", 430 ) 431 432 def test_insert_from_select_cte_follows_insert_two(self): 433 dialect = default.DefaultDialect() 434 dialect.cte_follows_insert = True 435 table1 = self.tables.mytable 436 437 cte = table1.select().cte("c") 438 stmt = cte.select() 439 ins = table1.insert().from_select(table1.c, stmt) 440 441 self.assert_compile( 442 ins, 443 "INSERT INTO mytable (myid, name, description) " 444 "WITH c AS (SELECT mytable.myid AS myid, mytable.name AS name, " 445 "mytable.description AS description FROM mytable) " 446 "SELECT c.myid, c.name, c.description FROM c", 447 dialect=dialect, 448 ) 449 450 def test_insert_from_select_select_alt_ordering(self): 451 table1 = self.tables.mytable 452 sel = select([table1.c.name, table1.c.myid]).where( 453 table1.c.name == "foo" 454 ) 455 ins = self.tables.myothertable.insert().from_select( 456 ("othername", "otherid"), sel 457 ) 458 self.assert_compile( 459 ins, 460 "INSERT INTO myothertable (othername, otherid) " 461 "SELECT mytable.name, mytable.myid FROM mytable " 462 "WHERE mytable.name = :name_1", 463 checkparams={"name_1": "foo"}, 464 ) 465 466 def test_insert_from_select_no_defaults(self): 467 metadata = MetaData() 468 table = Table( 469 "sometable", 470 metadata, 471 Column("id", Integer, primary_key=True), 472 Column("foo", Integer, default=func.foobar()), 473 ) 474 table1 = self.tables.mytable 475 sel = select([table1.c.myid]).where(table1.c.name == "foo") 476 ins = table.insert().from_select(["id"], sel, include_defaults=False) 477 self.assert_compile( 478 ins, 479 "INSERT INTO sometable (id) SELECT mytable.myid " 480 "FROM mytable WHERE mytable.name = :name_1", 481 checkparams={"name_1": "foo"}, 482 ) 483 484 def test_insert_from_select_with_sql_defaults(self): 485 metadata = MetaData() 486 table = Table( 487 "sometable", 488 metadata, 489 Column("id", Integer, primary_key=True), 490 Column("foo", Integer, default=func.foobar()), 491 ) 492 table1 = self.tables.mytable 493 sel = select([table1.c.myid]).where(table1.c.name == "foo") 494 ins = table.insert().from_select(["id"], sel) 495 self.assert_compile( 496 ins, 497 "INSERT INTO sometable (id, foo) SELECT " 498 "mytable.myid, foobar() AS foobar_1 " 499 "FROM mytable WHERE mytable.name = :name_1", 500 checkparams={"name_1": "foo"}, 501 ) 502 503 def test_insert_from_select_with_python_defaults(self): 504 metadata = MetaData() 505 table = Table( 506 "sometable", 507 metadata, 508 Column("id", Integer, primary_key=True), 509 Column("foo", Integer, default=12), 510 ) 511 table1 = self.tables.mytable 512 sel = select([table1.c.myid]).where(table1.c.name == "foo") 513 ins = table.insert().from_select(["id"], sel) 514 self.assert_compile( 515 ins, 516 "INSERT INTO sometable (id, foo) SELECT " 517 "mytable.myid, :foo AS anon_1 " 518 "FROM mytable WHERE mytable.name = :name_1", 519 # value filled in at execution time 520 checkparams={"name_1": "foo", "foo": None}, 521 ) 522 523 def test_insert_from_select_override_defaults(self): 524 metadata = MetaData() 525 table = Table( 526 "sometable", 527 metadata, 528 Column("id", Integer, primary_key=True), 529 Column("foo", Integer, default=12), 530 ) 531 table1 = self.tables.mytable 532 sel = select([table1.c.myid, table1.c.myid.label("q")]).where( 533 table1.c.name == "foo" 534 ) 535 ins = table.insert().from_select(["id", "foo"], sel) 536 self.assert_compile( 537 ins, 538 "INSERT INTO sometable (id, foo) SELECT " 539 "mytable.myid, mytable.myid AS q " 540 "FROM mytable WHERE mytable.name = :name_1", 541 checkparams={"name_1": "foo"}, 542 ) 543 544 def test_insert_from_select_fn_defaults(self): 545 metadata = MetaData() 546 547 def foo(ctx): 548 return 12 549 550 table = Table( 551 "sometable", 552 metadata, 553 Column("id", Integer, primary_key=True), 554 Column("foo", Integer, default=foo), 555 ) 556 table1 = self.tables.mytable 557 sel = select([table1.c.myid]).where(table1.c.name == "foo") 558 ins = table.insert().from_select(["id"], sel) 559 self.assert_compile( 560 ins, 561 "INSERT INTO sometable (id, foo) SELECT " 562 "mytable.myid, :foo AS anon_1 " 563 "FROM mytable WHERE mytable.name = :name_1", 564 # value filled in at execution time 565 checkparams={"name_1": "foo", "foo": None}, 566 ) 567 568 def test_insert_from_select_dont_mutate_raw_columns(self): 569 # test [ticket:3603] 570 from sqlalchemy import table 571 572 table_ = table( 573 "mytable", 574 Column("foo", String), 575 Column("bar", String, default="baz"), 576 ) 577 578 stmt = select([table_.c.foo]) 579 insert = table_.insert().from_select(["foo"], stmt) 580 581 self.assert_compile(stmt, "SELECT mytable.foo FROM mytable") 582 self.assert_compile( 583 insert, 584 "INSERT INTO mytable (foo, bar) " 585 "SELECT mytable.foo, :bar AS anon_1 FROM mytable", 586 ) 587 self.assert_compile(stmt, "SELECT mytable.foo FROM mytable") 588 self.assert_compile( 589 insert, 590 "INSERT INTO mytable (foo, bar) " 591 "SELECT mytable.foo, :bar AS anon_1 FROM mytable", 592 ) 593 594 def test_insert_mix_select_values_exception(self): 595 table1 = self.tables.mytable 596 sel = select([table1.c.myid, table1.c.name]).where( 597 table1.c.name == "foo" 598 ) 599 ins = self.tables.myothertable.insert().from_select( 600 ("otherid", "othername"), sel 601 ) 602 assert_raises_message( 603 exc.InvalidRequestError, 604 "This construct already inserts from a SELECT", 605 ins.values, 606 othername="5", 607 ) 608 609 def test_insert_mix_values_select_exception(self): 610 table1 = self.tables.mytable 611 sel = select([table1.c.myid, table1.c.name]).where( 612 table1.c.name == "foo" 613 ) 614 ins = self.tables.myothertable.insert().values(othername="5") 615 assert_raises_message( 616 exc.InvalidRequestError, 617 "This construct already inserts value expressions", 618 ins.from_select, 619 ("otherid", "othername"), 620 sel, 621 ) 622 623 def test_insert_from_select_table(self): 624 table1 = self.tables.mytable 625 ins = self.tables.myothertable.insert().from_select( 626 ("otherid", "othername"), table1 627 ) 628 # note we aren't checking the number of columns right now 629 self.assert_compile( 630 ins, 631 "INSERT INTO myothertable (otherid, othername) " 632 "SELECT mytable.myid, mytable.name, mytable.description " 633 "FROM mytable", 634 checkparams={}, 635 ) 636 637 def test_insert_from_select_union(self): 638 mytable = self.tables.mytable 639 640 name = column("name") 641 description = column("desc") 642 sel = select([name, mytable.c.description]).union( 643 select([name, description]) 644 ) 645 ins = mytable.insert().from_select( 646 [mytable.c.name, mytable.c.description], sel 647 ) 648 self.assert_compile( 649 ins, 650 "INSERT INTO mytable (name, description) " 651 "SELECT name, mytable.description FROM mytable " 652 'UNION SELECT name, "desc"', 653 ) 654 655 def test_insert_from_select_col_values(self): 656 table1 = self.tables.mytable 657 table2 = self.tables.myothertable 658 sel = select([table1.c.myid, table1.c.name]).where( 659 table1.c.name == "foo" 660 ) 661 ins = table2.insert().from_select( 662 (table2.c.otherid, table2.c.othername), sel 663 ) 664 self.assert_compile( 665 ins, 666 "INSERT INTO myothertable (otherid, othername) " 667 "SELECT mytable.myid, mytable.name FROM mytable " 668 "WHERE mytable.name = :name_1", 669 checkparams={"name_1": "foo"}, 670 ) 671 672 def test_anticipate_no_pk_composite_pk(self): 673 t = Table( 674 "t", 675 MetaData(), 676 Column("x", Integer, primary_key=True), 677 Column("y", Integer, primary_key=True), 678 ) 679 680 with expect_warnings( 681 "Column 't.y' is marked as a member.*" 682 "Note that as of SQLAlchemy 1.1," 683 ): 684 self.assert_compile( 685 t.insert(), "INSERT INTO t (x) VALUES (:x)", params={"x": 5} 686 ) 687 688 def test_anticipate_no_pk_composite_pk_implicit_returning(self): 689 t = Table( 690 "t", 691 MetaData(), 692 Column("x", Integer, primary_key=True), 693 Column("y", Integer, primary_key=True), 694 ) 695 d = postgresql.dialect() 696 d.implicit_returning = True 697 698 with expect_warnings( 699 "Column 't.y' is marked as a member.*" 700 "Note that as of SQLAlchemy 1.1," 701 ): 702 self.assert_compile( 703 t.insert(), 704 "INSERT INTO t (x) VALUES (%(x)s)", 705 params={"x": 5}, 706 dialect=d, 707 ) 708 709 def test_anticipate_no_pk_composite_pk_prefetch(self): 710 t = Table( 711 "t", 712 MetaData(), 713 Column("x", Integer, primary_key=True), 714 Column("y", Integer, primary_key=True), 715 ) 716 d = postgresql.dialect() 717 d.implicit_returning = False 718 with expect_warnings( 719 "Column 't.y' is marked as a member.*" 720 "Note that as of SQLAlchemy 1.1," 721 ): 722 self.assert_compile( 723 t.insert(), 724 "INSERT INTO t (x) VALUES (%(x)s)", 725 params={"x": 5}, 726 dialect=d, 727 ) 728 729 def test_anticipate_nullable_composite_pk(self): 730 t = Table( 731 "t", 732 MetaData(), 733 Column("x", Integer, primary_key=True), 734 Column("y", Integer, primary_key=True, nullable=True), 735 ) 736 self.assert_compile( 737 t.insert(), "INSERT INTO t (x) VALUES (:x)", params={"x": 5} 738 ) 739 740 def test_anticipate_no_pk_non_composite_pk(self): 741 t = Table( 742 "t", 743 MetaData(), 744 Column("x", Integer, primary_key=True, autoincrement=False), 745 Column("q", Integer), 746 ) 747 with expect_warnings( 748 "Column 't.x' is marked as a member.*" "may not store NULL.$" 749 ): 750 self.assert_compile( 751 t.insert(), "INSERT INTO t (q) VALUES (:q)", params={"q": 5} 752 ) 753 754 def test_anticipate_no_pk_non_composite_pk_implicit_returning(self): 755 t = Table( 756 "t", 757 MetaData(), 758 Column("x", Integer, primary_key=True, autoincrement=False), 759 Column("q", Integer), 760 ) 761 d = postgresql.dialect() 762 d.implicit_returning = True 763 with expect_warnings( 764 "Column 't.x' is marked as a member.*" "may not store NULL.$" 765 ): 766 self.assert_compile( 767 t.insert(), 768 "INSERT INTO t (q) VALUES (%(q)s)", 769 params={"q": 5}, 770 dialect=d, 771 ) 772 773 def test_anticipate_no_pk_non_composite_pk_prefetch(self): 774 t = Table( 775 "t", 776 MetaData(), 777 Column("x", Integer, primary_key=True, autoincrement=False), 778 Column("q", Integer), 779 ) 780 d = postgresql.dialect() 781 d.implicit_returning = False 782 783 with expect_warnings( 784 "Column 't.x' is marked as a member.*" "may not store NULL.$" 785 ): 786 self.assert_compile( 787 t.insert(), 788 "INSERT INTO t (q) VALUES (%(q)s)", 789 params={"q": 5}, 790 dialect=d, 791 ) 792 793 def test_anticipate_no_pk_lower_case_table(self): 794 t = table( 795 "t", 796 Column("id", Integer, primary_key=True, autoincrement=False), 797 Column("notpk", String(10), nullable=True), 798 ) 799 with expect_warnings( 800 "Column 't.id' is marked as a member.*" "may not store NULL.$" 801 ): 802 self.assert_compile( 803 t.insert(), "INSERT INTO t () VALUES ()", params={} 804 ) 805 806 807class InsertImplicitReturningTest( 808 _InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL 809): 810 __dialect__ = postgresql.dialect(implicit_returning=True) 811 812 def test_insert_select(self): 813 table1 = self.tables.mytable 814 sel = select([table1.c.myid, table1.c.name]).where( 815 table1.c.name == "foo" 816 ) 817 ins = self.tables.myothertable.insert().from_select( 818 ("otherid", "othername"), sel 819 ) 820 self.assert_compile( 821 ins, 822 "INSERT INTO myothertable (otherid, othername) " 823 "SELECT mytable.myid, mytable.name FROM mytable " 824 "WHERE mytable.name = %(name_1)s", 825 checkparams={"name_1": "foo"}, 826 ) 827 828 def test_insert_select_return_defaults(self): 829 table1 = self.tables.mytable 830 sel = select([table1.c.myid, table1.c.name]).where( 831 table1.c.name == "foo" 832 ) 833 ins = ( 834 self.tables.myothertable.insert() 835 .from_select(("otherid", "othername"), sel) 836 .return_defaults(self.tables.myothertable.c.otherid) 837 ) 838 self.assert_compile( 839 ins, 840 "INSERT INTO myothertable (otherid, othername) " 841 "SELECT mytable.myid, mytable.name FROM mytable " 842 "WHERE mytable.name = %(name_1)s", 843 checkparams={"name_1": "foo"}, 844 ) 845 846 def test_insert_multiple_values(self): 847 ins = self.tables.myothertable.insert().values( 848 [{"othername": "foo"}, {"othername": "bar"}] 849 ) 850 self.assert_compile( 851 ins, 852 "INSERT INTO myothertable (othername) " 853 "VALUES (%(othername_m0)s), " 854 "(%(othername_m1)s)", 855 checkparams={"othername_m1": "bar", "othername_m0": "foo"}, 856 ) 857 858 def test_insert_multiple_values_literal_binds(self): 859 ins = self.tables.myothertable.insert().values( 860 [{"othername": "foo"}, {"othername": "bar"}] 861 ) 862 self.assert_compile( 863 ins, 864 "INSERT INTO myothertable (othername) VALUES ('foo'), ('bar')", 865 checkparams={}, 866 literal_binds=True, 867 ) 868 869 def test_insert_multiple_values_return_defaults(self): 870 # TODO: not sure if this should raise an 871 # error or what 872 ins = ( 873 self.tables.myothertable.insert() 874 .values([{"othername": "foo"}, {"othername": "bar"}]) 875 .return_defaults(self.tables.myothertable.c.otherid) 876 ) 877 self.assert_compile( 878 ins, 879 "INSERT INTO myothertable (othername) " 880 "VALUES (%(othername_m0)s), " 881 "(%(othername_m1)s)", 882 checkparams={"othername_m1": "bar", "othername_m0": "foo"}, 883 ) 884 885 def test_insert_single_list_values(self): 886 ins = self.tables.myothertable.insert().values([{"othername": "foo"}]) 887 self.assert_compile( 888 ins, 889 "INSERT INTO myothertable (othername) " 890 "VALUES (%(othername_m0)s)", 891 checkparams={"othername_m0": "foo"}, 892 ) 893 894 def test_insert_single_element_values(self): 895 ins = self.tables.myothertable.insert().values({"othername": "foo"}) 896 self.assert_compile( 897 ins, 898 "INSERT INTO myothertable (othername) " 899 "VALUES (%(othername)s) RETURNING myothertable.otherid", 900 checkparams={"othername": "foo"}, 901 ) 902 903 904class EmptyTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL): 905 __dialect__ = "default" 906 907 def test_empty_insert_default(self): 908 table1 = self.tables.mytable 909 910 stmt = table1.insert().values({}) # hide from 2to3 911 self.assert_compile(stmt, "INSERT INTO mytable () VALUES ()") 912 913 def test_supports_empty_insert_true(self): 914 table1 = self.tables.mytable 915 916 dialect = default.DefaultDialect() 917 dialect.supports_empty_insert = dialect.supports_default_values = True 918 919 stmt = table1.insert().values({}) # hide from 2to3 920 self.assert_compile( 921 stmt, "INSERT INTO mytable DEFAULT VALUES", dialect=dialect 922 ) 923 924 def test_supports_empty_insert_false(self): 925 table1 = self.tables.mytable 926 927 dialect = default.DefaultDialect() 928 dialect.supports_empty_insert = dialect.supports_default_values = False 929 930 stmt = table1.insert().values({}) # hide from 2to3 931 assert_raises_message( 932 exc.CompileError, 933 "The 'default' dialect with current database version " 934 "settings does not support empty inserts.", 935 stmt.compile, 936 dialect=dialect, 937 ) 938 939 def _test_insert_with_empty_collection_values(self, collection): 940 table1 = self.tables.mytable 941 942 ins = table1.insert().values(collection) 943 944 self.assert_compile( 945 ins, "INSERT INTO mytable () VALUES ()", checkparams={} 946 ) 947 948 # empty dict populates on next values call 949 self.assert_compile( 950 ins.values(myid=3), 951 "INSERT INTO mytable (myid) VALUES (:myid)", 952 checkparams={"myid": 3}, 953 ) 954 955 def test_insert_with_empty_list_values(self): 956 self._test_insert_with_empty_collection_values([]) 957 958 def test_insert_with_empty_dict_values(self): 959 self._test_insert_with_empty_collection_values({}) 960 961 def test_insert_with_empty_tuple_values(self): 962 self._test_insert_with_empty_collection_values(()) 963 964 965class MultirowTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL): 966 __dialect__ = "default" 967 968 def test_not_supported(self): 969 table1 = self.tables.mytable 970 971 dialect = default.DefaultDialect() 972 stmt = table1.insert().values([{"myid": 1}, {"myid": 2}]) 973 assert_raises_message( 974 exc.CompileError, 975 "The 'default' dialect with current database version settings " 976 "does not support in-place multirow inserts.", 977 stmt.compile, 978 dialect=dialect, 979 ) 980 981 def test_named(self): 982 table1 = self.tables.mytable 983 984 values = [ 985 {"myid": 1, "name": "a", "description": "b"}, 986 {"myid": 2, "name": "c", "description": "d"}, 987 {"myid": 3, "name": "e", "description": "f"}, 988 ] 989 990 checkparams = { 991 "myid_m0": 1, 992 "myid_m1": 2, 993 "myid_m2": 3, 994 "name_m0": "a", 995 "name_m1": "c", 996 "name_m2": "e", 997 "description_m0": "b", 998 "description_m1": "d", 999 "description_m2": "f", 1000 } 1001 1002 dialect = default.DefaultDialect() 1003 dialect.supports_multivalues_insert = True 1004 1005 self.assert_compile( 1006 table1.insert().values(values), 1007 "INSERT INTO mytable (myid, name, description) VALUES " 1008 "(:myid_m0, :name_m0, :description_m0), " 1009 "(:myid_m1, :name_m1, :description_m1), " 1010 "(:myid_m2, :name_m2, :description_m2)", 1011 checkparams=checkparams, 1012 dialect=dialect, 1013 ) 1014 1015 def test_named_with_column_objects(self): 1016 table1 = self.tables.mytable 1017 1018 values = [ 1019 {table1.c.myid: 1, table1.c.name: "a", table1.c.description: "b"}, 1020 {table1.c.myid: 2, table1.c.name: "c", table1.c.description: "d"}, 1021 {table1.c.myid: 3, table1.c.name: "e", table1.c.description: "f"}, 1022 ] 1023 1024 checkparams = { 1025 "myid_m0": 1, 1026 "myid_m1": 2, 1027 "myid_m2": 3, 1028 "name_m0": "a", 1029 "name_m1": "c", 1030 "name_m2": "e", 1031 "description_m0": "b", 1032 "description_m1": "d", 1033 "description_m2": "f", 1034 } 1035 1036 dialect = default.DefaultDialect() 1037 dialect.supports_multivalues_insert = True 1038 1039 self.assert_compile( 1040 table1.insert().values(values), 1041 "INSERT INTO mytable (myid, name, description) VALUES " 1042 "(:myid_m0, :name_m0, :description_m0), " 1043 "(:myid_m1, :name_m1, :description_m1), " 1044 "(:myid_m2, :name_m2, :description_m2)", 1045 checkparams=checkparams, 1046 dialect=dialect, 1047 ) 1048 1049 def test_positional(self): 1050 table1 = self.tables.mytable 1051 1052 values = [ 1053 {"myid": 1, "name": "a", "description": "b"}, 1054 {"myid": 2, "name": "c", "description": "d"}, 1055 {"myid": 3, "name": "e", "description": "f"}, 1056 ] 1057 1058 checkpositional = (1, "a", "b", 2, "c", "d", 3, "e", "f") 1059 1060 dialect = default.DefaultDialect() 1061 dialect.supports_multivalues_insert = True 1062 dialect.paramstyle = "format" 1063 dialect.positional = True 1064 1065 self.assert_compile( 1066 table1.insert().values(values), 1067 "INSERT INTO mytable (myid, name, description) VALUES " 1068 "(%s, %s, %s), (%s, %s, %s), (%s, %s, %s)", 1069 checkpositional=checkpositional, 1070 dialect=dialect, 1071 ) 1072 1073 def test_positional_w_defaults(self): 1074 table1 = self.tables.table_w_defaults 1075 1076 values = [{"id": 1}, {"id": 2}, {"id": 3}] 1077 1078 checkpositional = (1, None, None, 2, None, None, 3, None, None) 1079 1080 dialect = default.DefaultDialect() 1081 dialect.supports_multivalues_insert = True 1082 dialect.paramstyle = "format" 1083 dialect.positional = True 1084 1085 self.assert_compile( 1086 table1.insert().values(values), 1087 "INSERT INTO table_w_defaults (id, x, z) VALUES " 1088 "(%s, %s, %s), (%s, %s, %s), (%s, %s, %s)", 1089 checkpositional=checkpositional, 1090 check_prefetch=[ 1091 table1.c.x, 1092 table1.c.z, 1093 crud._multiparam_column(table1.c.x, 0), 1094 crud._multiparam_column(table1.c.z, 0), 1095 crud._multiparam_column(table1.c.x, 1), 1096 crud._multiparam_column(table1.c.z, 1), 1097 ], 1098 dialect=dialect, 1099 ) 1100 1101 def test_inline_default(self): 1102 metadata = MetaData() 1103 table = Table( 1104 "sometable", 1105 metadata, 1106 Column("id", Integer, primary_key=True), 1107 Column("data", String), 1108 Column("foo", Integer, default=func.foobar()), 1109 ) 1110 1111 values = [ 1112 {"id": 1, "data": "data1"}, 1113 {"id": 2, "data": "data2", "foo": "plainfoo"}, 1114 {"id": 3, "data": "data3"}, 1115 ] 1116 1117 checkparams = { 1118 "id_m0": 1, 1119 "id_m1": 2, 1120 "id_m2": 3, 1121 "data_m0": "data1", 1122 "data_m1": "data2", 1123 "data_m2": "data3", 1124 "foo_m1": "plainfoo", 1125 } 1126 1127 self.assert_compile( 1128 table.insert().values(values), 1129 "INSERT INTO sometable (id, data, foo) VALUES " 1130 "(%(id_m0)s, %(data_m0)s, foobar()), " 1131 "(%(id_m1)s, %(data_m1)s, %(foo_m1)s), " 1132 "(%(id_m2)s, %(data_m2)s, foobar())", 1133 checkparams=checkparams, 1134 dialect=postgresql.dialect(), 1135 ) 1136 1137 def test_python_scalar_default(self): 1138 metadata = MetaData() 1139 table = Table( 1140 "sometable", 1141 metadata, 1142 Column("id", Integer, primary_key=True), 1143 Column("data", String), 1144 Column("foo", Integer, default=10), 1145 ) 1146 1147 values = [ 1148 {"id": 1, "data": "data1"}, 1149 {"id": 2, "data": "data2", "foo": 15}, 1150 {"id": 3, "data": "data3"}, 1151 ] 1152 1153 checkparams = { 1154 "id_m0": 1, 1155 "id_m1": 2, 1156 "id_m2": 3, 1157 "data_m0": "data1", 1158 "data_m1": "data2", 1159 "data_m2": "data3", 1160 "foo": None, # evaluated later 1161 "foo_m1": 15, 1162 "foo_m2": None, # evaluated later 1163 } 1164 1165 stmt = table.insert().values(values) 1166 1167 eq_( 1168 dict( 1169 [ 1170 (k, v.type._type_affinity) 1171 for (k, v) in stmt.compile( 1172 dialect=postgresql.dialect() 1173 ).binds.items() 1174 ] 1175 ), 1176 { 1177 "foo": Integer, 1178 "data_m2": String, 1179 "id_m0": Integer, 1180 "id_m2": Integer, 1181 "foo_m1": Integer, 1182 "data_m1": String, 1183 "id_m1": Integer, 1184 "foo_m2": Integer, 1185 "data_m0": String, 1186 }, 1187 ) 1188 1189 self.assert_compile( 1190 stmt, 1191 "INSERT INTO sometable (id, data, foo) VALUES " 1192 "(%(id_m0)s, %(data_m0)s, %(foo)s), " 1193 "(%(id_m1)s, %(data_m1)s, %(foo_m1)s), " 1194 "(%(id_m2)s, %(data_m2)s, %(foo_m2)s)", 1195 checkparams=checkparams, 1196 dialect=postgresql.dialect(), 1197 ) 1198 1199 def test_python_fn_default(self): 1200 metadata = MetaData() 1201 table = Table( 1202 "sometable", 1203 metadata, 1204 Column("id", Integer, primary_key=True), 1205 Column("data", String), 1206 Column("foo", Integer, default=lambda: 10), 1207 ) 1208 1209 values = [ 1210 {"id": 1, "data": "data1"}, 1211 {"id": 2, "data": "data2", "foo": 15}, 1212 {"id": 3, "data": "data3"}, 1213 ] 1214 1215 checkparams = { 1216 "id_m0": 1, 1217 "id_m1": 2, 1218 "id_m2": 3, 1219 "data_m0": "data1", 1220 "data_m1": "data2", 1221 "data_m2": "data3", 1222 "foo": None, # evaluated later 1223 "foo_m1": 15, 1224 "foo_m2": None, # evaluated later 1225 } 1226 1227 stmt = table.insert().values(values) 1228 eq_( 1229 dict( 1230 [ 1231 (k, v.type._type_affinity) 1232 for (k, v) in stmt.compile( 1233 dialect=postgresql.dialect() 1234 ).binds.items() 1235 ] 1236 ), 1237 { 1238 "foo": Integer, 1239 "data_m2": String, 1240 "id_m0": Integer, 1241 "id_m2": Integer, 1242 "foo_m1": Integer, 1243 "data_m1": String, 1244 "id_m1": Integer, 1245 "foo_m2": Integer, 1246 "data_m0": String, 1247 }, 1248 ) 1249 1250 self.assert_compile( 1251 stmt, 1252 "INSERT INTO sometable (id, data, foo) VALUES " 1253 "(%(id_m0)s, %(data_m0)s, %(foo)s), " 1254 "(%(id_m1)s, %(data_m1)s, %(foo_m1)s), " 1255 "(%(id_m2)s, %(data_m2)s, %(foo_m2)s)", 1256 checkparams=checkparams, 1257 dialect=postgresql.dialect(), 1258 ) 1259 1260 def test_sql_functions(self): 1261 metadata = MetaData() 1262 table = Table( 1263 "sometable", 1264 metadata, 1265 Column("id", Integer, primary_key=True), 1266 Column("data", String), 1267 Column("foo", Integer), 1268 ) 1269 1270 values = [ 1271 {"id": 1, "data": "foo", "foo": func.foob()}, 1272 {"id": 2, "data": "bar", "foo": func.foob()}, 1273 {"id": 3, "data": "bar", "foo": func.bar()}, 1274 {"id": 4, "data": "bar", "foo": 15}, 1275 {"id": 5, "data": "bar", "foo": func.foob()}, 1276 ] 1277 checkparams = { 1278 "id_m0": 1, 1279 "data_m0": "foo", 1280 "id_m1": 2, 1281 "data_m1": "bar", 1282 "id_m2": 3, 1283 "data_m2": "bar", 1284 "id_m3": 4, 1285 "data_m3": "bar", 1286 "foo_m3": 15, 1287 "id_m4": 5, 1288 "data_m4": "bar", 1289 } 1290 1291 self.assert_compile( 1292 table.insert().values(values), 1293 "INSERT INTO sometable (id, data, foo) VALUES " 1294 "(%(id_m0)s, %(data_m0)s, foob()), " 1295 "(%(id_m1)s, %(data_m1)s, foob()), " 1296 "(%(id_m2)s, %(data_m2)s, bar()), " 1297 "(%(id_m3)s, %(data_m3)s, %(foo_m3)s), " 1298 "(%(id_m4)s, %(data_m4)s, foob())", 1299 checkparams=checkparams, 1300 dialect=postgresql.dialect(), 1301 ) 1302 1303 def test_server_default(self): 1304 metadata = MetaData() 1305 table = Table( 1306 "sometable", 1307 metadata, 1308 Column("id", Integer, primary_key=True), 1309 Column("data", String), 1310 Column("foo", Integer, server_default=func.foobar()), 1311 ) 1312 1313 values = [ 1314 {"id": 1, "data": "data1"}, 1315 {"id": 2, "data": "data2", "foo": "plainfoo"}, 1316 {"id": 3, "data": "data3"}, 1317 ] 1318 1319 checkparams = { 1320 "id_m0": 1, 1321 "id_m1": 2, 1322 "id_m2": 3, 1323 "data_m0": "data1", 1324 "data_m1": "data2", 1325 "data_m2": "data3", 1326 } 1327 1328 self.assert_compile( 1329 table.insert().values(values), 1330 "INSERT INTO sometable (id, data) VALUES " 1331 "(%(id_m0)s, %(data_m0)s), " 1332 "(%(id_m1)s, %(data_m1)s), " 1333 "(%(id_m2)s, %(data_m2)s)", 1334 checkparams=checkparams, 1335 dialect=postgresql.dialect(), 1336 ) 1337 1338 def test_server_default_absent_value(self): 1339 metadata = MetaData() 1340 table = Table( 1341 "sometable", 1342 metadata, 1343 Column("id", Integer, primary_key=True), 1344 Column("data", String), 1345 Column("foo", Integer, server_default=func.foobar()), 1346 ) 1347 1348 values = [ 1349 {"id": 1, "data": "data1", "foo": "plainfoo"}, 1350 {"id": 2, "data": "data2"}, 1351 {"id": 3, "data": "data3", "foo": "otherfoo"}, 1352 ] 1353 1354 assert_raises_message( 1355 exc.CompileError, 1356 "INSERT value for column sometable.foo is explicitly rendered " 1357 "as a boundparameter in the VALUES clause; a Python-side value or " 1358 "SQL expression is required", 1359 table.insert().values(values).compile, 1360 ) 1361