1#! coding:utf-8
2
3from sqlalchemy import bindparam
4from sqlalchemy import Column
5from sqlalchemy import column
6from sqlalchemy import exc
7from sqlalchemy import func
8from sqlalchemy import insert
9from sqlalchemy import Integer
10from sqlalchemy import MetaData
11from sqlalchemy import select
12from sqlalchemy import Sequence
13from sqlalchemy import String
14from sqlalchemy import Table
15from sqlalchemy import table
16from sqlalchemy import text
17from sqlalchemy.dialects import mysql
18from sqlalchemy.dialects import postgresql
19from sqlalchemy.engine import default
20from sqlalchemy.sql import crud
21from sqlalchemy.testing import assert_raises
22from sqlalchemy.testing import assert_raises_message
23from sqlalchemy.testing import AssertsCompiledSQL
24from sqlalchemy.testing import eq_
25from sqlalchemy.testing import expect_warnings
26from sqlalchemy.testing import fixtures
27
28
29class _InsertTestBase(object):
30    @classmethod
31    def define_tables(cls, metadata):
32        Table(
33            "mytable",
34            metadata,
35            Column("myid", Integer),
36            Column("name", String(30)),
37            Column("description", String(30)),
38        )
39        Table(
40            "myothertable",
41            metadata,
42            Column("otherid", Integer, primary_key=True),
43            Column("othername", String(30)),
44        )
45        Table(
46            "table_w_defaults",
47            metadata,
48            Column("id", Integer, primary_key=True),
49            Column("x", Integer, default=10),
50            Column("y", Integer, server_default=text("5")),
51            Column("z", Integer, default=lambda: 10),
52        )
53
54
55class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
56    __dialect__ = "default"
57
58    def test_binds_that_match_columns(self):
59        """test bind params named after column names
60        replace the normal SET/VALUES generation."""
61
62        t = table("foo", column("x"), column("y"))
63
64        i = t.insert().values(x=3 + bindparam("x"))
65        self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x))")
66        self.assert_compile(
67            i,
68            "INSERT INTO foo (x, y) VALUES ((:param_1 + :x), :y)",
69            params={"x": 1, "y": 2},
70        )
71
72        i = t.insert().values(x=bindparam("y"))
73        self.assert_compile(i, "INSERT INTO foo (x) VALUES (:y)")
74
75        i = t.insert().values(x=bindparam("y"), y=5)
76        assert_raises(exc.CompileError, i.compile)
77
78        i = t.insert().values(x=3 + bindparam("y"), y=5)
79        assert_raises(exc.CompileError, i.compile)
80
81        i = t.insert().values(x=3 + bindparam("x2"))
82        self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x2))")
83        self.assert_compile(
84            i, "INSERT INTO foo (x) VALUES ((:param_1 + :x2))", params={}
85        )
86        self.assert_compile(
87            i,
88            "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)",
89            params={"x": 1, "y": 2},
90        )
91        self.assert_compile(
92            i,
93            "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)",
94            params={"x2": 1, "y": 2},
95        )
96
97    def test_insert_literal_binds(self):
98        table1 = self.tables.mytable
99        stmt = table1.insert().values(myid=3, name="jack")
100
101        self.assert_compile(
102            stmt,
103            "INSERT INTO mytable (myid, name) VALUES (3, 'jack')",
104            literal_binds=True,
105        )
106
107    def test_insert_literal_binds_sequence_notimplemented(self):
108        table = Table("x", MetaData(), Column("y", Integer, Sequence("y_seq")))
109        dialect = default.DefaultDialect()
110        dialect.supports_sequences = True
111
112        stmt = table.insert().values(myid=3, name="jack")
113
114        assert_raises(
115            NotImplementedError,
116            stmt.compile,
117            compile_kwargs=dict(literal_binds=True),
118            dialect=dialect,
119        )
120
121    def test_inline_defaults(self):
122        m = MetaData()
123        foo = Table("foo", m, Column("id", Integer))
124
125        t = Table(
126            "test",
127            m,
128            Column("col1", Integer, default=func.foo(1)),
129            Column(
130                "col2",
131                Integer,
132                default=select([func.coalesce(func.max(foo.c.id))]),
133            ),
134        )
135
136        self.assert_compile(
137            t.insert(inline=True, values={}),
138            "INSERT INTO test (col1, col2) VALUES (foo(:foo_1), "
139            "(SELECT coalesce(max(foo.id)) AS coalesce_1 FROM "
140            "foo))",
141        )
142
143    def test_generic_insert_bind_params_all_columns(self):
144        table1 = self.tables.mytable
145
146        self.assert_compile(
147            insert(table1),
148            "INSERT INTO mytable (myid, name, description) "
149            "VALUES (:myid, :name, :description)",
150        )
151
152    def test_insert_with_values_dict(self):
153        table1 = self.tables.mytable
154
155        checkparams = {"myid": 3, "name": "jack"}
156
157        self.assert_compile(
158            insert(table1, dict(myid=3, name="jack")),
159            "INSERT INTO mytable (myid, name) VALUES (:myid, :name)",
160            checkparams=checkparams,
161        )
162
163    def test_unconsumed_names_kwargs(self):
164        t = table("t", column("x"), column("y"))
165        assert_raises_message(
166            exc.CompileError,
167            "Unconsumed column names: z",
168            t.insert().values(x=5, z=5).compile,
169        )
170
171    def test_bindparam_name_no_consume_error(self):
172        t = table("t", column("x"), column("y"))
173        # bindparam names don't get counted
174        i = t.insert().values(x=3 + bindparam("x2"))
175        self.assert_compile(i, "INSERT INTO t (x) VALUES ((:param_1 + :x2))")
176
177        # even if in the params list
178        i = t.insert().values(x=3 + bindparam("x2"))
179        self.assert_compile(
180            i, "INSERT INTO t (x) VALUES ((:param_1 + :x2))", params={"x2": 1}
181        )
182
183    def test_unconsumed_names_values_dict(self):
184        table1 = self.tables.mytable
185
186        checkparams = {"myid": 3, "name": "jack", "unknowncol": "oops"}
187
188        stmt = insert(table1, values=checkparams)
189        assert_raises_message(
190            exc.CompileError,
191            "Unconsumed column names: unknowncol",
192            stmt.compile,
193            dialect=postgresql.dialect(),
194        )
195
196    def test_unconsumed_names_multi_values_dict(self):
197        table1 = self.tables.mytable
198
199        checkparams = [
200            {"myid": 3, "name": "jack", "unknowncol": "oops"},
201            {"myid": 4, "name": "someone", "unknowncol": "oops"},
202        ]
203
204        stmt = insert(table1, values=checkparams)
205        assert_raises_message(
206            exc.CompileError,
207            "Unconsumed column names: unknowncol",
208            stmt.compile,
209            dialect=postgresql.dialect(),
210        )
211
212    def test_insert_with_values_tuple(self):
213        table1 = self.tables.mytable
214
215        checkparams = {
216            "myid": 3,
217            "name": "jack",
218            "description": "mydescription",
219        }
220
221        self.assert_compile(
222            insert(table1, (3, "jack", "mydescription")),
223            "INSERT INTO mytable (myid, name, description) "
224            "VALUES (:myid, :name, :description)",
225            checkparams=checkparams,
226        )
227
228    def test_insert_with_values_func(self):
229        table1 = self.tables.mytable
230
231        self.assert_compile(
232            insert(table1, values=dict(myid=func.lala())),
233            "INSERT INTO mytable (myid) VALUES (lala())",
234        )
235
236    def test_insert_with_user_supplied_bind_params(self):
237        table1 = self.tables.mytable
238
239        values = {
240            table1.c.myid: bindparam("userid"),
241            table1.c.name: bindparam("username"),
242        }
243
244        self.assert_compile(
245            insert(table1, values),
246            "INSERT INTO mytable (myid, name) VALUES (:userid, :username)",
247        )
248
249    def test_insert_values(self):
250        table1 = self.tables.mytable
251
252        values1 = {table1.c.myid: bindparam("userid")}
253        values2 = {table1.c.name: bindparam("username")}
254
255        self.assert_compile(
256            insert(table1, values=values1).values(values2),
257            "INSERT INTO mytable (myid, name) VALUES (:userid, :username)",
258        )
259
260    def test_prefix_with(self):
261        table1 = self.tables.mytable
262
263        stmt = (
264            table1.insert()
265            .prefix_with("A", "B", dialect="mysql")
266            .prefix_with("C", "D")
267        )
268
269        self.assert_compile(
270            stmt,
271            "INSERT C D INTO mytable (myid, name, description) "
272            "VALUES (:myid, :name, :description)",
273        )
274
275        self.assert_compile(
276            stmt,
277            "INSERT A B C D INTO mytable (myid, name, description) "
278            "VALUES (%s, %s, %s)",
279            dialect=mysql.dialect(),
280        )
281
282    def test_inline_default(self):
283        metadata = MetaData()
284        table = Table(
285            "sometable",
286            metadata,
287            Column("id", Integer, primary_key=True),
288            Column("foo", Integer, default=func.foobar()),
289        )
290
291        self.assert_compile(
292            table.insert(values={}, inline=True),
293            "INSERT INTO sometable (foo) VALUES (foobar())",
294        )
295
296        self.assert_compile(
297            table.insert(inline=True),
298            "INSERT INTO sometable (foo) VALUES (foobar())",
299            params={},
300        )
301
302    def test_insert_returning_not_in_default(self):
303        table1 = self.tables.mytable
304
305        stmt = table1.insert().returning(table1.c.myid)
306        assert_raises_message(
307            exc.CompileError,
308            "RETURNING is not supported by this dialect's statement compiler.",
309            stmt.compile,
310            dialect=default.DefaultDialect(),
311        )
312
313    def test_insert_from_select_returning(self):
314        table1 = self.tables.mytable
315        sel = select([table1.c.myid, table1.c.name]).where(
316            table1.c.name == "foo"
317        )
318        ins = (
319            self.tables.myothertable.insert()
320            .from_select(("otherid", "othername"), sel)
321            .returning(self.tables.myothertable.c.otherid)
322        )
323        self.assert_compile(
324            ins,
325            "INSERT INTO myothertable (otherid, othername) "
326            "SELECT mytable.myid, mytable.name FROM mytable "
327            "WHERE mytable.name = %(name_1)s RETURNING myothertable.otherid",
328            checkparams={"name_1": "foo"},
329            dialect="postgresql",
330        )
331
332    def test_insert_from_select_select(self):
333        table1 = self.tables.mytable
334        sel = select([table1.c.myid, table1.c.name]).where(
335            table1.c.name == "foo"
336        )
337        ins = self.tables.myothertable.insert().from_select(
338            ("otherid", "othername"), sel
339        )
340        self.assert_compile(
341            ins,
342            "INSERT INTO myothertable (otherid, othername) "
343            "SELECT mytable.myid, mytable.name FROM mytable "
344            "WHERE mytable.name = :name_1",
345            checkparams={"name_1": "foo"},
346        )
347
348    def test_insert_from_select_seq(self):
349        m = MetaData()
350
351        t1 = Table(
352            "t",
353            m,
354            Column("id", Integer, Sequence("id_seq"), primary_key=True),
355            Column("data", String),
356        )
357
358        stmt = t1.insert().from_select(("data",), select([t1.c.data]))
359
360        self.assert_compile(
361            stmt,
362            "INSERT INTO t (data, id) SELECT t.data, "
363            "nextval('id_seq') AS next_value_1 FROM t",
364            dialect=postgresql.dialect(),
365        )
366
367    def test_insert_from_select_cte_one(self):
368        table1 = self.tables.mytable
369
370        cte = select([table1.c.name]).where(table1.c.name == "bar").cte()
371
372        sel = select([table1.c.myid, table1.c.name]).where(
373            table1.c.name == cte.c.name
374        )
375
376        ins = self.tables.myothertable.insert().from_select(
377            ("otherid", "othername"), sel
378        )
379        self.assert_compile(
380            ins,
381            "WITH anon_1 AS "
382            "(SELECT mytable.name AS name FROM mytable "
383            "WHERE mytable.name = :name_1) "
384            "INSERT INTO myothertable (otherid, othername) "
385            "SELECT mytable.myid, mytable.name FROM mytable, anon_1 "
386            "WHERE mytable.name = anon_1.name",
387            checkparams={"name_1": "bar"},
388        )
389
390    def test_insert_from_select_cte_follows_insert_one(self):
391        dialect = default.DefaultDialect()
392        dialect.cte_follows_insert = True
393
394        table1 = self.tables.mytable
395
396        cte = select([table1.c.name]).where(table1.c.name == "bar").cte()
397
398        sel = select([table1.c.myid, table1.c.name]).where(
399            table1.c.name == cte.c.name
400        )
401
402        ins = self.tables.myothertable.insert().from_select(
403            ("otherid", "othername"), sel
404        )
405        self.assert_compile(
406            ins,
407            "INSERT INTO myothertable (otherid, othername) "
408            "WITH anon_1 AS "
409            "(SELECT mytable.name AS name FROM mytable "
410            "WHERE mytable.name = :name_1) "
411            "SELECT mytable.myid, mytable.name FROM mytable, anon_1 "
412            "WHERE mytable.name = anon_1.name",
413            checkparams={"name_1": "bar"},
414            dialect=dialect,
415        )
416
417    def test_insert_from_select_cte_two(self):
418        table1 = self.tables.mytable
419
420        cte = table1.select().cte("c")
421        stmt = cte.select()
422        ins = table1.insert().from_select(table1.c, stmt)
423
424        self.assert_compile(
425            ins,
426            "WITH c AS (SELECT mytable.myid AS myid, mytable.name AS name, "
427            "mytable.description AS description FROM mytable) "
428            "INSERT INTO mytable (myid, name, description) "
429            "SELECT c.myid, c.name, c.description FROM c",
430        )
431
432    def test_insert_from_select_cte_follows_insert_two(self):
433        dialect = default.DefaultDialect()
434        dialect.cte_follows_insert = True
435        table1 = self.tables.mytable
436
437        cte = table1.select().cte("c")
438        stmt = cte.select()
439        ins = table1.insert().from_select(table1.c, stmt)
440
441        self.assert_compile(
442            ins,
443            "INSERT INTO mytable (myid, name, description) "
444            "WITH c AS (SELECT mytable.myid AS myid, mytable.name AS name, "
445            "mytable.description AS description FROM mytable) "
446            "SELECT c.myid, c.name, c.description FROM c",
447            dialect=dialect,
448        )
449
450    def test_insert_from_select_select_alt_ordering(self):
451        table1 = self.tables.mytable
452        sel = select([table1.c.name, table1.c.myid]).where(
453            table1.c.name == "foo"
454        )
455        ins = self.tables.myothertable.insert().from_select(
456            ("othername", "otherid"), sel
457        )
458        self.assert_compile(
459            ins,
460            "INSERT INTO myothertable (othername, otherid) "
461            "SELECT mytable.name, mytable.myid FROM mytable "
462            "WHERE mytable.name = :name_1",
463            checkparams={"name_1": "foo"},
464        )
465
466    def test_insert_from_select_no_defaults(self):
467        metadata = MetaData()
468        table = Table(
469            "sometable",
470            metadata,
471            Column("id", Integer, primary_key=True),
472            Column("foo", Integer, default=func.foobar()),
473        )
474        table1 = self.tables.mytable
475        sel = select([table1.c.myid]).where(table1.c.name == "foo")
476        ins = table.insert().from_select(["id"], sel, include_defaults=False)
477        self.assert_compile(
478            ins,
479            "INSERT INTO sometable (id) SELECT mytable.myid "
480            "FROM mytable WHERE mytable.name = :name_1",
481            checkparams={"name_1": "foo"},
482        )
483
484    def test_insert_from_select_with_sql_defaults(self):
485        metadata = MetaData()
486        table = Table(
487            "sometable",
488            metadata,
489            Column("id", Integer, primary_key=True),
490            Column("foo", Integer, default=func.foobar()),
491        )
492        table1 = self.tables.mytable
493        sel = select([table1.c.myid]).where(table1.c.name == "foo")
494        ins = table.insert().from_select(["id"], sel)
495        self.assert_compile(
496            ins,
497            "INSERT INTO sometable (id, foo) SELECT "
498            "mytable.myid, foobar() AS foobar_1 "
499            "FROM mytable WHERE mytable.name = :name_1",
500            checkparams={"name_1": "foo"},
501        )
502
503    def test_insert_from_select_with_python_defaults(self):
504        metadata = MetaData()
505        table = Table(
506            "sometable",
507            metadata,
508            Column("id", Integer, primary_key=True),
509            Column("foo", Integer, default=12),
510        )
511        table1 = self.tables.mytable
512        sel = select([table1.c.myid]).where(table1.c.name == "foo")
513        ins = table.insert().from_select(["id"], sel)
514        self.assert_compile(
515            ins,
516            "INSERT INTO sometable (id, foo) SELECT "
517            "mytable.myid, :foo AS anon_1 "
518            "FROM mytable WHERE mytable.name = :name_1",
519            # value filled in at execution time
520            checkparams={"name_1": "foo", "foo": None},
521        )
522
523    def test_insert_from_select_override_defaults(self):
524        metadata = MetaData()
525        table = Table(
526            "sometable",
527            metadata,
528            Column("id", Integer, primary_key=True),
529            Column("foo", Integer, default=12),
530        )
531        table1 = self.tables.mytable
532        sel = select([table1.c.myid, table1.c.myid.label("q")]).where(
533            table1.c.name == "foo"
534        )
535        ins = table.insert().from_select(["id", "foo"], sel)
536        self.assert_compile(
537            ins,
538            "INSERT INTO sometable (id, foo) SELECT "
539            "mytable.myid, mytable.myid AS q "
540            "FROM mytable WHERE mytable.name = :name_1",
541            checkparams={"name_1": "foo"},
542        )
543
544    def test_insert_from_select_fn_defaults(self):
545        metadata = MetaData()
546
547        def foo(ctx):
548            return 12
549
550        table = Table(
551            "sometable",
552            metadata,
553            Column("id", Integer, primary_key=True),
554            Column("foo", Integer, default=foo),
555        )
556        table1 = self.tables.mytable
557        sel = select([table1.c.myid]).where(table1.c.name == "foo")
558        ins = table.insert().from_select(["id"], sel)
559        self.assert_compile(
560            ins,
561            "INSERT INTO sometable (id, foo) SELECT "
562            "mytable.myid, :foo AS anon_1 "
563            "FROM mytable WHERE mytable.name = :name_1",
564            # value filled in at execution time
565            checkparams={"name_1": "foo", "foo": None},
566        )
567
568    def test_insert_from_select_dont_mutate_raw_columns(self):
569        # test [ticket:3603]
570        from sqlalchemy import table
571
572        table_ = table(
573            "mytable",
574            Column("foo", String),
575            Column("bar", String, default="baz"),
576        )
577
578        stmt = select([table_.c.foo])
579        insert = table_.insert().from_select(["foo"], stmt)
580
581        self.assert_compile(stmt, "SELECT mytable.foo FROM mytable")
582        self.assert_compile(
583            insert,
584            "INSERT INTO mytable (foo, bar) "
585            "SELECT mytable.foo, :bar AS anon_1 FROM mytable",
586        )
587        self.assert_compile(stmt, "SELECT mytable.foo FROM mytable")
588        self.assert_compile(
589            insert,
590            "INSERT INTO mytable (foo, bar) "
591            "SELECT mytable.foo, :bar AS anon_1 FROM mytable",
592        )
593
594    def test_insert_mix_select_values_exception(self):
595        table1 = self.tables.mytable
596        sel = select([table1.c.myid, table1.c.name]).where(
597            table1.c.name == "foo"
598        )
599        ins = self.tables.myothertable.insert().from_select(
600            ("otherid", "othername"), sel
601        )
602        assert_raises_message(
603            exc.InvalidRequestError,
604            "This construct already inserts from a SELECT",
605            ins.values,
606            othername="5",
607        )
608
609    def test_insert_mix_values_select_exception(self):
610        table1 = self.tables.mytable
611        sel = select([table1.c.myid, table1.c.name]).where(
612            table1.c.name == "foo"
613        )
614        ins = self.tables.myothertable.insert().values(othername="5")
615        assert_raises_message(
616            exc.InvalidRequestError,
617            "This construct already inserts value expressions",
618            ins.from_select,
619            ("otherid", "othername"),
620            sel,
621        )
622
623    def test_insert_from_select_table(self):
624        table1 = self.tables.mytable
625        ins = self.tables.myothertable.insert().from_select(
626            ("otherid", "othername"), table1
627        )
628        # note we aren't checking the number of columns right now
629        self.assert_compile(
630            ins,
631            "INSERT INTO myothertable (otherid, othername) "
632            "SELECT mytable.myid, mytable.name, mytable.description "
633            "FROM mytable",
634            checkparams={},
635        )
636
637    def test_insert_from_select_union(self):
638        mytable = self.tables.mytable
639
640        name = column("name")
641        description = column("desc")
642        sel = select([name, mytable.c.description]).union(
643            select([name, description])
644        )
645        ins = mytable.insert().from_select(
646            [mytable.c.name, mytable.c.description], sel
647        )
648        self.assert_compile(
649            ins,
650            "INSERT INTO mytable (name, description) "
651            "SELECT name, mytable.description FROM mytable "
652            'UNION SELECT name, "desc"',
653        )
654
655    def test_insert_from_select_col_values(self):
656        table1 = self.tables.mytable
657        table2 = self.tables.myothertable
658        sel = select([table1.c.myid, table1.c.name]).where(
659            table1.c.name == "foo"
660        )
661        ins = table2.insert().from_select(
662            (table2.c.otherid, table2.c.othername), sel
663        )
664        self.assert_compile(
665            ins,
666            "INSERT INTO myothertable (otherid, othername) "
667            "SELECT mytable.myid, mytable.name FROM mytable "
668            "WHERE mytable.name = :name_1",
669            checkparams={"name_1": "foo"},
670        )
671
672    def test_anticipate_no_pk_composite_pk(self):
673        t = Table(
674            "t",
675            MetaData(),
676            Column("x", Integer, primary_key=True),
677            Column("y", Integer, primary_key=True),
678        )
679
680        with expect_warnings(
681            "Column 't.y' is marked as a member.*"
682            "Note that as of SQLAlchemy 1.1,"
683        ):
684            self.assert_compile(
685                t.insert(), "INSERT INTO t (x) VALUES (:x)", params={"x": 5}
686            )
687
688    def test_anticipate_no_pk_composite_pk_implicit_returning(self):
689        t = Table(
690            "t",
691            MetaData(),
692            Column("x", Integer, primary_key=True),
693            Column("y", Integer, primary_key=True),
694        )
695        d = postgresql.dialect()
696        d.implicit_returning = True
697
698        with expect_warnings(
699            "Column 't.y' is marked as a member.*"
700            "Note that as of SQLAlchemy 1.1,"
701        ):
702            self.assert_compile(
703                t.insert(),
704                "INSERT INTO t (x) VALUES (%(x)s)",
705                params={"x": 5},
706                dialect=d,
707            )
708
709    def test_anticipate_no_pk_composite_pk_prefetch(self):
710        t = Table(
711            "t",
712            MetaData(),
713            Column("x", Integer, primary_key=True),
714            Column("y", Integer, primary_key=True),
715        )
716        d = postgresql.dialect()
717        d.implicit_returning = False
718        with expect_warnings(
719            "Column 't.y' is marked as a member.*"
720            "Note that as of SQLAlchemy 1.1,"
721        ):
722            self.assert_compile(
723                t.insert(),
724                "INSERT INTO t (x) VALUES (%(x)s)",
725                params={"x": 5},
726                dialect=d,
727            )
728
729    def test_anticipate_nullable_composite_pk(self):
730        t = Table(
731            "t",
732            MetaData(),
733            Column("x", Integer, primary_key=True),
734            Column("y", Integer, primary_key=True, nullable=True),
735        )
736        self.assert_compile(
737            t.insert(), "INSERT INTO t (x) VALUES (:x)", params={"x": 5}
738        )
739
740    def test_anticipate_no_pk_non_composite_pk(self):
741        t = Table(
742            "t",
743            MetaData(),
744            Column("x", Integer, primary_key=True, autoincrement=False),
745            Column("q", Integer),
746        )
747        with expect_warnings(
748            "Column 't.x' is marked as a member.*" "may not store NULL.$"
749        ):
750            self.assert_compile(
751                t.insert(), "INSERT INTO t (q) VALUES (:q)", params={"q": 5}
752            )
753
754    def test_anticipate_no_pk_non_composite_pk_implicit_returning(self):
755        t = Table(
756            "t",
757            MetaData(),
758            Column("x", Integer, primary_key=True, autoincrement=False),
759            Column("q", Integer),
760        )
761        d = postgresql.dialect()
762        d.implicit_returning = True
763        with expect_warnings(
764            "Column 't.x' is marked as a member.*" "may not store NULL.$"
765        ):
766            self.assert_compile(
767                t.insert(),
768                "INSERT INTO t (q) VALUES (%(q)s)",
769                params={"q": 5},
770                dialect=d,
771            )
772
773    def test_anticipate_no_pk_non_composite_pk_prefetch(self):
774        t = Table(
775            "t",
776            MetaData(),
777            Column("x", Integer, primary_key=True, autoincrement=False),
778            Column("q", Integer),
779        )
780        d = postgresql.dialect()
781        d.implicit_returning = False
782
783        with expect_warnings(
784            "Column 't.x' is marked as a member.*" "may not store NULL.$"
785        ):
786            self.assert_compile(
787                t.insert(),
788                "INSERT INTO t (q) VALUES (%(q)s)",
789                params={"q": 5},
790                dialect=d,
791            )
792
793    def test_anticipate_no_pk_lower_case_table(self):
794        t = table(
795            "t",
796            Column("id", Integer, primary_key=True, autoincrement=False),
797            Column("notpk", String(10), nullable=True),
798        )
799        with expect_warnings(
800            "Column 't.id' is marked as a member.*" "may not store NULL.$"
801        ):
802            self.assert_compile(
803                t.insert(), "INSERT INTO t () VALUES ()", params={}
804            )
805
806
807class InsertImplicitReturningTest(
808    _InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL
809):
810    __dialect__ = postgresql.dialect(implicit_returning=True)
811
812    def test_insert_select(self):
813        table1 = self.tables.mytable
814        sel = select([table1.c.myid, table1.c.name]).where(
815            table1.c.name == "foo"
816        )
817        ins = self.tables.myothertable.insert().from_select(
818            ("otherid", "othername"), sel
819        )
820        self.assert_compile(
821            ins,
822            "INSERT INTO myothertable (otherid, othername) "
823            "SELECT mytable.myid, mytable.name FROM mytable "
824            "WHERE mytable.name = %(name_1)s",
825            checkparams={"name_1": "foo"},
826        )
827
828    def test_insert_select_return_defaults(self):
829        table1 = self.tables.mytable
830        sel = select([table1.c.myid, table1.c.name]).where(
831            table1.c.name == "foo"
832        )
833        ins = (
834            self.tables.myothertable.insert()
835            .from_select(("otherid", "othername"), sel)
836            .return_defaults(self.tables.myothertable.c.otherid)
837        )
838        self.assert_compile(
839            ins,
840            "INSERT INTO myothertable (otherid, othername) "
841            "SELECT mytable.myid, mytable.name FROM mytable "
842            "WHERE mytable.name = %(name_1)s",
843            checkparams={"name_1": "foo"},
844        )
845
846    def test_insert_multiple_values(self):
847        ins = self.tables.myothertable.insert().values(
848            [{"othername": "foo"}, {"othername": "bar"}]
849        )
850        self.assert_compile(
851            ins,
852            "INSERT INTO myothertable (othername) "
853            "VALUES (%(othername_m0)s), "
854            "(%(othername_m1)s)",
855            checkparams={"othername_m1": "bar", "othername_m0": "foo"},
856        )
857
858    def test_insert_multiple_values_literal_binds(self):
859        ins = self.tables.myothertable.insert().values(
860            [{"othername": "foo"}, {"othername": "bar"}]
861        )
862        self.assert_compile(
863            ins,
864            "INSERT INTO myothertable (othername) VALUES ('foo'), ('bar')",
865            checkparams={},
866            literal_binds=True,
867        )
868
869    def test_insert_multiple_values_return_defaults(self):
870        # TODO: not sure if this should raise an
871        # error or what
872        ins = (
873            self.tables.myothertable.insert()
874            .values([{"othername": "foo"}, {"othername": "bar"}])
875            .return_defaults(self.tables.myothertable.c.otherid)
876        )
877        self.assert_compile(
878            ins,
879            "INSERT INTO myothertable (othername) "
880            "VALUES (%(othername_m0)s), "
881            "(%(othername_m1)s)",
882            checkparams={"othername_m1": "bar", "othername_m0": "foo"},
883        )
884
885    def test_insert_single_list_values(self):
886        ins = self.tables.myothertable.insert().values([{"othername": "foo"}])
887        self.assert_compile(
888            ins,
889            "INSERT INTO myothertable (othername) "
890            "VALUES (%(othername_m0)s)",
891            checkparams={"othername_m0": "foo"},
892        )
893
894    def test_insert_single_element_values(self):
895        ins = self.tables.myothertable.insert().values({"othername": "foo"})
896        self.assert_compile(
897            ins,
898            "INSERT INTO myothertable (othername) "
899            "VALUES (%(othername)s) RETURNING myothertable.otherid",
900            checkparams={"othername": "foo"},
901        )
902
903
904class EmptyTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
905    __dialect__ = "default"
906
907    def test_empty_insert_default(self):
908        table1 = self.tables.mytable
909
910        stmt = table1.insert().values({})  # hide from 2to3
911        self.assert_compile(stmt, "INSERT INTO mytable () VALUES ()")
912
913    def test_supports_empty_insert_true(self):
914        table1 = self.tables.mytable
915
916        dialect = default.DefaultDialect()
917        dialect.supports_empty_insert = dialect.supports_default_values = True
918
919        stmt = table1.insert().values({})  # hide from 2to3
920        self.assert_compile(
921            stmt, "INSERT INTO mytable DEFAULT VALUES", dialect=dialect
922        )
923
924    def test_supports_empty_insert_false(self):
925        table1 = self.tables.mytable
926
927        dialect = default.DefaultDialect()
928        dialect.supports_empty_insert = dialect.supports_default_values = False
929
930        stmt = table1.insert().values({})  # hide from 2to3
931        assert_raises_message(
932            exc.CompileError,
933            "The 'default' dialect with current database version "
934            "settings does not support empty inserts.",
935            stmt.compile,
936            dialect=dialect,
937        )
938
939    def _test_insert_with_empty_collection_values(self, collection):
940        table1 = self.tables.mytable
941
942        ins = table1.insert().values(collection)
943
944        self.assert_compile(
945            ins, "INSERT INTO mytable () VALUES ()", checkparams={}
946        )
947
948        # empty dict populates on next values call
949        self.assert_compile(
950            ins.values(myid=3),
951            "INSERT INTO mytable (myid) VALUES (:myid)",
952            checkparams={"myid": 3},
953        )
954
955    def test_insert_with_empty_list_values(self):
956        self._test_insert_with_empty_collection_values([])
957
958    def test_insert_with_empty_dict_values(self):
959        self._test_insert_with_empty_collection_values({})
960
961    def test_insert_with_empty_tuple_values(self):
962        self._test_insert_with_empty_collection_values(())
963
964
965class MultirowTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
966    __dialect__ = "default"
967
968    def test_not_supported(self):
969        table1 = self.tables.mytable
970
971        dialect = default.DefaultDialect()
972        stmt = table1.insert().values([{"myid": 1}, {"myid": 2}])
973        assert_raises_message(
974            exc.CompileError,
975            "The 'default' dialect with current database version settings "
976            "does not support in-place multirow inserts.",
977            stmt.compile,
978            dialect=dialect,
979        )
980
981    def test_named(self):
982        table1 = self.tables.mytable
983
984        values = [
985            {"myid": 1, "name": "a", "description": "b"},
986            {"myid": 2, "name": "c", "description": "d"},
987            {"myid": 3, "name": "e", "description": "f"},
988        ]
989
990        checkparams = {
991            "myid_m0": 1,
992            "myid_m1": 2,
993            "myid_m2": 3,
994            "name_m0": "a",
995            "name_m1": "c",
996            "name_m2": "e",
997            "description_m0": "b",
998            "description_m1": "d",
999            "description_m2": "f",
1000        }
1001
1002        dialect = default.DefaultDialect()
1003        dialect.supports_multivalues_insert = True
1004
1005        self.assert_compile(
1006            table1.insert().values(values),
1007            "INSERT INTO mytable (myid, name, description) VALUES "
1008            "(:myid_m0, :name_m0, :description_m0), "
1009            "(:myid_m1, :name_m1, :description_m1), "
1010            "(:myid_m2, :name_m2, :description_m2)",
1011            checkparams=checkparams,
1012            dialect=dialect,
1013        )
1014
1015    def test_named_with_column_objects(self):
1016        table1 = self.tables.mytable
1017
1018        values = [
1019            {table1.c.myid: 1, table1.c.name: "a", table1.c.description: "b"},
1020            {table1.c.myid: 2, table1.c.name: "c", table1.c.description: "d"},
1021            {table1.c.myid: 3, table1.c.name: "e", table1.c.description: "f"},
1022        ]
1023
1024        checkparams = {
1025            "myid_m0": 1,
1026            "myid_m1": 2,
1027            "myid_m2": 3,
1028            "name_m0": "a",
1029            "name_m1": "c",
1030            "name_m2": "e",
1031            "description_m0": "b",
1032            "description_m1": "d",
1033            "description_m2": "f",
1034        }
1035
1036        dialect = default.DefaultDialect()
1037        dialect.supports_multivalues_insert = True
1038
1039        self.assert_compile(
1040            table1.insert().values(values),
1041            "INSERT INTO mytable (myid, name, description) VALUES "
1042            "(:myid_m0, :name_m0, :description_m0), "
1043            "(:myid_m1, :name_m1, :description_m1), "
1044            "(:myid_m2, :name_m2, :description_m2)",
1045            checkparams=checkparams,
1046            dialect=dialect,
1047        )
1048
1049    def test_positional(self):
1050        table1 = self.tables.mytable
1051
1052        values = [
1053            {"myid": 1, "name": "a", "description": "b"},
1054            {"myid": 2, "name": "c", "description": "d"},
1055            {"myid": 3, "name": "e", "description": "f"},
1056        ]
1057
1058        checkpositional = (1, "a", "b", 2, "c", "d", 3, "e", "f")
1059
1060        dialect = default.DefaultDialect()
1061        dialect.supports_multivalues_insert = True
1062        dialect.paramstyle = "format"
1063        dialect.positional = True
1064
1065        self.assert_compile(
1066            table1.insert().values(values),
1067            "INSERT INTO mytable (myid, name, description) VALUES "
1068            "(%s, %s, %s), (%s, %s, %s), (%s, %s, %s)",
1069            checkpositional=checkpositional,
1070            dialect=dialect,
1071        )
1072
1073    def test_positional_w_defaults(self):
1074        table1 = self.tables.table_w_defaults
1075
1076        values = [{"id": 1}, {"id": 2}, {"id": 3}]
1077
1078        checkpositional = (1, None, None, 2, None, None, 3, None, None)
1079
1080        dialect = default.DefaultDialect()
1081        dialect.supports_multivalues_insert = True
1082        dialect.paramstyle = "format"
1083        dialect.positional = True
1084
1085        self.assert_compile(
1086            table1.insert().values(values),
1087            "INSERT INTO table_w_defaults (id, x, z) VALUES "
1088            "(%s, %s, %s), (%s, %s, %s), (%s, %s, %s)",
1089            checkpositional=checkpositional,
1090            check_prefetch=[
1091                table1.c.x,
1092                table1.c.z,
1093                crud._multiparam_column(table1.c.x, 0),
1094                crud._multiparam_column(table1.c.z, 0),
1095                crud._multiparam_column(table1.c.x, 1),
1096                crud._multiparam_column(table1.c.z, 1),
1097            ],
1098            dialect=dialect,
1099        )
1100
1101    def test_inline_default(self):
1102        metadata = MetaData()
1103        table = Table(
1104            "sometable",
1105            metadata,
1106            Column("id", Integer, primary_key=True),
1107            Column("data", String),
1108            Column("foo", Integer, default=func.foobar()),
1109        )
1110
1111        values = [
1112            {"id": 1, "data": "data1"},
1113            {"id": 2, "data": "data2", "foo": "plainfoo"},
1114            {"id": 3, "data": "data3"},
1115        ]
1116
1117        checkparams = {
1118            "id_m0": 1,
1119            "id_m1": 2,
1120            "id_m2": 3,
1121            "data_m0": "data1",
1122            "data_m1": "data2",
1123            "data_m2": "data3",
1124            "foo_m1": "plainfoo",
1125        }
1126
1127        self.assert_compile(
1128            table.insert().values(values),
1129            "INSERT INTO sometable (id, data, foo) VALUES "
1130            "(%(id_m0)s, %(data_m0)s, foobar()), "
1131            "(%(id_m1)s, %(data_m1)s, %(foo_m1)s), "
1132            "(%(id_m2)s, %(data_m2)s, foobar())",
1133            checkparams=checkparams,
1134            dialect=postgresql.dialect(),
1135        )
1136
1137    def test_python_scalar_default(self):
1138        metadata = MetaData()
1139        table = Table(
1140            "sometable",
1141            metadata,
1142            Column("id", Integer, primary_key=True),
1143            Column("data", String),
1144            Column("foo", Integer, default=10),
1145        )
1146
1147        values = [
1148            {"id": 1, "data": "data1"},
1149            {"id": 2, "data": "data2", "foo": 15},
1150            {"id": 3, "data": "data3"},
1151        ]
1152
1153        checkparams = {
1154            "id_m0": 1,
1155            "id_m1": 2,
1156            "id_m2": 3,
1157            "data_m0": "data1",
1158            "data_m1": "data2",
1159            "data_m2": "data3",
1160            "foo": None,  # evaluated later
1161            "foo_m1": 15,
1162            "foo_m2": None,  # evaluated later
1163        }
1164
1165        stmt = table.insert().values(values)
1166
1167        eq_(
1168            dict(
1169                [
1170                    (k, v.type._type_affinity)
1171                    for (k, v) in stmt.compile(
1172                        dialect=postgresql.dialect()
1173                    ).binds.items()
1174                ]
1175            ),
1176            {
1177                "foo": Integer,
1178                "data_m2": String,
1179                "id_m0": Integer,
1180                "id_m2": Integer,
1181                "foo_m1": Integer,
1182                "data_m1": String,
1183                "id_m1": Integer,
1184                "foo_m2": Integer,
1185                "data_m0": String,
1186            },
1187        )
1188
1189        self.assert_compile(
1190            stmt,
1191            "INSERT INTO sometable (id, data, foo) VALUES "
1192            "(%(id_m0)s, %(data_m0)s, %(foo)s), "
1193            "(%(id_m1)s, %(data_m1)s, %(foo_m1)s), "
1194            "(%(id_m2)s, %(data_m2)s, %(foo_m2)s)",
1195            checkparams=checkparams,
1196            dialect=postgresql.dialect(),
1197        )
1198
1199    def test_python_fn_default(self):
1200        metadata = MetaData()
1201        table = Table(
1202            "sometable",
1203            metadata,
1204            Column("id", Integer, primary_key=True),
1205            Column("data", String),
1206            Column("foo", Integer, default=lambda: 10),
1207        )
1208
1209        values = [
1210            {"id": 1, "data": "data1"},
1211            {"id": 2, "data": "data2", "foo": 15},
1212            {"id": 3, "data": "data3"},
1213        ]
1214
1215        checkparams = {
1216            "id_m0": 1,
1217            "id_m1": 2,
1218            "id_m2": 3,
1219            "data_m0": "data1",
1220            "data_m1": "data2",
1221            "data_m2": "data3",
1222            "foo": None,  # evaluated later
1223            "foo_m1": 15,
1224            "foo_m2": None,  # evaluated later
1225        }
1226
1227        stmt = table.insert().values(values)
1228        eq_(
1229            dict(
1230                [
1231                    (k, v.type._type_affinity)
1232                    for (k, v) in stmt.compile(
1233                        dialect=postgresql.dialect()
1234                    ).binds.items()
1235                ]
1236            ),
1237            {
1238                "foo": Integer,
1239                "data_m2": String,
1240                "id_m0": Integer,
1241                "id_m2": Integer,
1242                "foo_m1": Integer,
1243                "data_m1": String,
1244                "id_m1": Integer,
1245                "foo_m2": Integer,
1246                "data_m0": String,
1247            },
1248        )
1249
1250        self.assert_compile(
1251            stmt,
1252            "INSERT INTO sometable (id, data, foo) VALUES "
1253            "(%(id_m0)s, %(data_m0)s, %(foo)s), "
1254            "(%(id_m1)s, %(data_m1)s, %(foo_m1)s), "
1255            "(%(id_m2)s, %(data_m2)s, %(foo_m2)s)",
1256            checkparams=checkparams,
1257            dialect=postgresql.dialect(),
1258        )
1259
1260    def test_sql_functions(self):
1261        metadata = MetaData()
1262        table = Table(
1263            "sometable",
1264            metadata,
1265            Column("id", Integer, primary_key=True),
1266            Column("data", String),
1267            Column("foo", Integer),
1268        )
1269
1270        values = [
1271            {"id": 1, "data": "foo", "foo": func.foob()},
1272            {"id": 2, "data": "bar", "foo": func.foob()},
1273            {"id": 3, "data": "bar", "foo": func.bar()},
1274            {"id": 4, "data": "bar", "foo": 15},
1275            {"id": 5, "data": "bar", "foo": func.foob()},
1276        ]
1277        checkparams = {
1278            "id_m0": 1,
1279            "data_m0": "foo",
1280            "id_m1": 2,
1281            "data_m1": "bar",
1282            "id_m2": 3,
1283            "data_m2": "bar",
1284            "id_m3": 4,
1285            "data_m3": "bar",
1286            "foo_m3": 15,
1287            "id_m4": 5,
1288            "data_m4": "bar",
1289        }
1290
1291        self.assert_compile(
1292            table.insert().values(values),
1293            "INSERT INTO sometable (id, data, foo) VALUES "
1294            "(%(id_m0)s, %(data_m0)s, foob()), "
1295            "(%(id_m1)s, %(data_m1)s, foob()), "
1296            "(%(id_m2)s, %(data_m2)s, bar()), "
1297            "(%(id_m3)s, %(data_m3)s, %(foo_m3)s), "
1298            "(%(id_m4)s, %(data_m4)s, foob())",
1299            checkparams=checkparams,
1300            dialect=postgresql.dialect(),
1301        )
1302
1303    def test_server_default(self):
1304        metadata = MetaData()
1305        table = Table(
1306            "sometable",
1307            metadata,
1308            Column("id", Integer, primary_key=True),
1309            Column("data", String),
1310            Column("foo", Integer, server_default=func.foobar()),
1311        )
1312
1313        values = [
1314            {"id": 1, "data": "data1"},
1315            {"id": 2, "data": "data2", "foo": "plainfoo"},
1316            {"id": 3, "data": "data3"},
1317        ]
1318
1319        checkparams = {
1320            "id_m0": 1,
1321            "id_m1": 2,
1322            "id_m2": 3,
1323            "data_m0": "data1",
1324            "data_m1": "data2",
1325            "data_m2": "data3",
1326        }
1327
1328        self.assert_compile(
1329            table.insert().values(values),
1330            "INSERT INTO sometable (id, data) VALUES "
1331            "(%(id_m0)s, %(data_m0)s), "
1332            "(%(id_m1)s, %(data_m1)s), "
1333            "(%(id_m2)s, %(data_m2)s)",
1334            checkparams=checkparams,
1335            dialect=postgresql.dialect(),
1336        )
1337
1338    def test_server_default_absent_value(self):
1339        metadata = MetaData()
1340        table = Table(
1341            "sometable",
1342            metadata,
1343            Column("id", Integer, primary_key=True),
1344            Column("data", String),
1345            Column("foo", Integer, server_default=func.foobar()),
1346        )
1347
1348        values = [
1349            {"id": 1, "data": "data1", "foo": "plainfoo"},
1350            {"id": 2, "data": "data2"},
1351            {"id": 3, "data": "data3", "foo": "otherfoo"},
1352        ]
1353
1354        assert_raises_message(
1355            exc.CompileError,
1356            "INSERT value for column sometable.foo is explicitly rendered "
1357            "as a boundparameter in the VALUES clause; a Python-side value or "
1358            "SQL expression is required",
1359            table.insert().values(values).compile,
1360        )
1361