1# coding: utf-8
2from sqlalchemy import and_
3from sqlalchemy import cast
4from sqlalchemy import Column
5from sqlalchemy import Computed
6from sqlalchemy import Date
7from sqlalchemy import delete
8from sqlalchemy import Enum
9from sqlalchemy import exc
10from sqlalchemy import func
11from sqlalchemy import Index
12from sqlalchemy import Integer
13from sqlalchemy import MetaData
14from sqlalchemy import null
15from sqlalchemy import schema
16from sqlalchemy import select
17from sqlalchemy import Sequence
18from sqlalchemy import String
19from sqlalchemy import Table
20from sqlalchemy import testing
21from sqlalchemy import Text
22from sqlalchemy import text
23from sqlalchemy import types as sqltypes
24from sqlalchemy import update
25from sqlalchemy.dialects import postgresql
26from sqlalchemy.dialects.postgresql import aggregate_order_by
27from sqlalchemy.dialects.postgresql import ARRAY as PG_ARRAY
28from sqlalchemy.dialects.postgresql import array
29from sqlalchemy.dialects.postgresql import array_agg as pg_array_agg
30from sqlalchemy.dialects.postgresql import ExcludeConstraint
31from sqlalchemy.dialects.postgresql import insert
32from sqlalchemy.dialects.postgresql import TSRANGE
33from sqlalchemy.dialects.postgresql.base import PGDialect
34from sqlalchemy.dialects.postgresql.psycopg2 import PGDialect_psycopg2
35from sqlalchemy.orm import aliased
36from sqlalchemy.orm import mapper
37from sqlalchemy.orm import Session
38from sqlalchemy.sql import column
39from sqlalchemy.sql import literal_column
40from sqlalchemy.sql import operators
41from sqlalchemy.sql import table
42from sqlalchemy.sql import util as sql_util
43from sqlalchemy.testing import engines
44from sqlalchemy.testing import fixtures
45from sqlalchemy.testing.assertions import assert_raises
46from sqlalchemy.testing.assertions import assert_raises_message
47from sqlalchemy.testing.assertions import AssertsCompiledSQL
48from sqlalchemy.testing.assertions import expect_warnings
49from sqlalchemy.testing.assertions import is_
50from sqlalchemy.util import OrderedDict
51from sqlalchemy.util import u
52
53
54class SequenceTest(fixtures.TestBase, AssertsCompiledSQL):
55    __prefer__ = "postgresql"
56
57    def test_format(self):
58        seq = Sequence("my_seq_no_schema")
59        dialect = postgresql.dialect()
60        assert (
61            dialect.identifier_preparer.format_sequence(seq)
62            == "my_seq_no_schema"
63        )
64        seq = Sequence("my_seq", schema="some_schema")
65        assert (
66            dialect.identifier_preparer.format_sequence(seq)
67            == "some_schema.my_seq"
68        )
69        seq = Sequence("My_Seq", schema="Some_Schema")
70        assert (
71            dialect.identifier_preparer.format_sequence(seq)
72            == '"Some_Schema"."My_Seq"'
73        )
74
75    @testing.only_on("postgresql", "foo")
76    @testing.provide_metadata
77    def test_reverse_eng_name(self):
78        metadata = self.metadata
79        engine = engines.testing_engine(options=dict(implicit_returning=False))
80        for tname, cname in [
81            ("tb1" * 30, "abc"),
82            ("tb2", "abc" * 30),
83            ("tb3" * 30, "abc" * 30),
84            ("tb4", "abc"),
85        ]:
86            t = Table(
87                tname[:57],
88                metadata,
89                Column(cname[:57], Integer, primary_key=True),
90            )
91            t.create(engine)
92            r = engine.execute(t.insert())
93            assert r.inserted_primary_key == [1]
94
95
96class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
97
98    __dialect__ = postgresql.dialect()
99
100    def test_update_returning(self):
101        dialect = postgresql.dialect()
102        table1 = table(
103            "mytable",
104            column("myid", Integer),
105            column("name", String(128)),
106            column("description", String(128)),
107        )
108        u = update(table1, values=dict(name="foo")).returning(
109            table1.c.myid, table1.c.name
110        )
111        self.assert_compile(
112            u,
113            "UPDATE mytable SET name=%(name)s "
114            "RETURNING mytable.myid, mytable.name",
115            dialect=dialect,
116        )
117        u = update(table1, values=dict(name="foo")).returning(table1)
118        self.assert_compile(
119            u,
120            "UPDATE mytable SET name=%(name)s "
121            "RETURNING mytable.myid, mytable.name, "
122            "mytable.description",
123            dialect=dialect,
124        )
125        u = update(table1, values=dict(name="foo")).returning(
126            func.length(table1.c.name)
127        )
128        self.assert_compile(
129            u,
130            "UPDATE mytable SET name=%(name)s "
131            "RETURNING length(mytable.name) AS length_1",
132            dialect=dialect,
133        )
134
135    def test_insert_returning(self):
136        dialect = postgresql.dialect()
137        table1 = table(
138            "mytable",
139            column("myid", Integer),
140            column("name", String(128)),
141            column("description", String(128)),
142        )
143
144        i = insert(table1, values=dict(name="foo")).returning(
145            table1.c.myid, table1.c.name
146        )
147        self.assert_compile(
148            i,
149            "INSERT INTO mytable (name) VALUES "
150            "(%(name)s) RETURNING mytable.myid, "
151            "mytable.name",
152            dialect=dialect,
153        )
154        i = insert(table1, values=dict(name="foo")).returning(table1)
155        self.assert_compile(
156            i,
157            "INSERT INTO mytable (name) VALUES "
158            "(%(name)s) RETURNING mytable.myid, "
159            "mytable.name, mytable.description",
160            dialect=dialect,
161        )
162        i = insert(table1, values=dict(name="foo")).returning(
163            func.length(table1.c.name)
164        )
165        self.assert_compile(
166            i,
167            "INSERT INTO mytable (name) VALUES "
168            "(%(name)s) RETURNING length(mytable.name) "
169            "AS length_1",
170            dialect=dialect,
171        )
172
173    def test_create_drop_enum(self):
174        # test escaping and unicode within CREATE TYPE for ENUM
175        typ = postgresql.ENUM(
176            "val1", "val2", "val's 3", u("méil"), name="myname"
177        )
178        self.assert_compile(
179            postgresql.CreateEnumType(typ),
180            u(
181                "CREATE TYPE myname AS "
182                "ENUM ('val1', 'val2', 'val''s 3', 'méil')"
183            ),
184        )
185
186        typ = postgresql.ENUM("val1", "val2", "val's 3", name="PleaseQuoteMe")
187        self.assert_compile(
188            postgresql.CreateEnumType(typ),
189            'CREATE TYPE "PleaseQuoteMe" AS ENUM '
190            "('val1', 'val2', 'val''s 3')",
191        )
192
193    def test_generic_enum(self):
194        e1 = Enum("x", "y", "z", name="somename")
195        e2 = Enum("x", "y", "z", name="somename", schema="someschema")
196        self.assert_compile(
197            postgresql.CreateEnumType(e1),
198            "CREATE TYPE somename AS ENUM ('x', 'y', 'z')",
199        )
200        self.assert_compile(
201            postgresql.CreateEnumType(e2),
202            "CREATE TYPE someschema.somename AS ENUM " "('x', 'y', 'z')",
203        )
204        self.assert_compile(postgresql.DropEnumType(e1), "DROP TYPE somename")
205        self.assert_compile(
206            postgresql.DropEnumType(e2), "DROP TYPE someschema.somename"
207        )
208        t1 = Table("sometable", MetaData(), Column("somecolumn", e1))
209        self.assert_compile(
210            schema.CreateTable(t1),
211            "CREATE TABLE sometable (somecolumn " "somename)",
212        )
213        t1 = Table(
214            "sometable",
215            MetaData(),
216            Column("somecolumn", Enum("x", "y", "z", native_enum=False)),
217        )
218        self.assert_compile(
219            schema.CreateTable(t1),
220            "CREATE TABLE sometable (somecolumn "
221            "VARCHAR(1), CHECK (somecolumn IN ('x', "
222            "'y', 'z')))",
223        )
224
225    def test_create_type_schema_translate(self):
226        e1 = Enum("x", "y", "z", name="somename")
227        e2 = Enum("x", "y", "z", name="somename", schema="someschema")
228        schema_translate_map = {None: "foo", "someschema": "bar"}
229
230        self.assert_compile(
231            postgresql.CreateEnumType(e1),
232            "CREATE TYPE foo.somename AS ENUM ('x', 'y', 'z')",
233            schema_translate_map=schema_translate_map,
234        )
235
236        self.assert_compile(
237            postgresql.CreateEnumType(e2),
238            "CREATE TYPE bar.somename AS ENUM ('x', 'y', 'z')",
239            schema_translate_map=schema_translate_map,
240        )
241
242    def test_create_table_with_schema_type_schema_translate(self):
243        e1 = Enum("x", "y", "z", name="somename")
244        e2 = Enum("x", "y", "z", name="somename", schema="someschema")
245        schema_translate_map = {None: "foo", "someschema": "bar"}
246
247        table = Table(
248            "some_table", MetaData(), Column("q", e1), Column("p", e2)
249        )
250        from sqlalchemy.schema import CreateTable
251
252        self.assert_compile(
253            CreateTable(table),
254            "CREATE TABLE foo.some_table (q foo.somename, p bar.somename)",
255            schema_translate_map=schema_translate_map,
256        )
257
258    def test_create_table_with_tablespace(self):
259        m = MetaData()
260        tbl = Table(
261            "atable",
262            m,
263            Column("id", Integer),
264            postgresql_tablespace="sometablespace",
265        )
266        self.assert_compile(
267            schema.CreateTable(tbl),
268            "CREATE TABLE atable (id INTEGER) TABLESPACE sometablespace",
269        )
270
271    def test_create_table_with_tablespace_quoted(self):
272        # testing quoting of tablespace name
273        m = MetaData()
274        tbl = Table(
275            "anothertable",
276            m,
277            Column("id", Integer),
278            postgresql_tablespace="table",
279        )
280        self.assert_compile(
281            schema.CreateTable(tbl),
282            'CREATE TABLE anothertable (id INTEGER) TABLESPACE "table"',
283        )
284
285    def test_create_table_inherits(self):
286        m = MetaData()
287        tbl = Table(
288            "atable", m, Column("id", Integer), postgresql_inherits="i1"
289        )
290        self.assert_compile(
291            schema.CreateTable(tbl),
292            "CREATE TABLE atable (id INTEGER) INHERITS ( i1 )",
293        )
294
295    def test_create_table_inherits_tuple(self):
296        m = MetaData()
297        tbl = Table(
298            "atable",
299            m,
300            Column("id", Integer),
301            postgresql_inherits=("i1", "i2"),
302        )
303        self.assert_compile(
304            schema.CreateTable(tbl),
305            "CREATE TABLE atable (id INTEGER) INHERITS ( i1, i2 )",
306        )
307
308    def test_create_table_inherits_quoting(self):
309        m = MetaData()
310        tbl = Table(
311            "atable",
312            m,
313            Column("id", Integer),
314            postgresql_inherits=("Quote Me", "quote Me Too"),
315        )
316        self.assert_compile(
317            schema.CreateTable(tbl),
318            "CREATE TABLE atable (id INTEGER) INHERITS "
319            '( "Quote Me", "quote Me Too" )',
320        )
321
322    def test_create_table_partition_by_list(self):
323        m = MetaData()
324        tbl = Table(
325            "atable",
326            m,
327            Column("id", Integer),
328            Column("part_column", Integer),
329            postgresql_partition_by="LIST (part_column)",
330        )
331        self.assert_compile(
332            schema.CreateTable(tbl),
333            "CREATE TABLE atable (id INTEGER, part_column INTEGER) "
334            "PARTITION BY LIST (part_column)",
335        )
336
337    def test_create_table_partition_by_range(self):
338        m = MetaData()
339        tbl = Table(
340            "atable",
341            m,
342            Column("id", Integer),
343            Column("part_column", Integer),
344            postgresql_partition_by="RANGE (part_column)",
345        )
346        self.assert_compile(
347            schema.CreateTable(tbl),
348            "CREATE TABLE atable (id INTEGER, part_column INTEGER) "
349            "PARTITION BY RANGE (part_column)",
350        )
351
352    def test_create_table_with_oids(self):
353        m = MetaData()
354        tbl = Table(
355            "atable", m, Column("id", Integer), postgresql_with_oids=True
356        )
357        self.assert_compile(
358            schema.CreateTable(tbl),
359            "CREATE TABLE atable (id INTEGER) WITH OIDS",
360        )
361
362        tbl2 = Table(
363            "anothertable",
364            m,
365            Column("id", Integer),
366            postgresql_with_oids=False,
367        )
368        self.assert_compile(
369            schema.CreateTable(tbl2),
370            "CREATE TABLE anothertable (id INTEGER) WITHOUT OIDS",
371        )
372
373    def test_create_table_with_oncommit_option(self):
374        m = MetaData()
375        tbl = Table(
376            "atable", m, Column("id", Integer), postgresql_on_commit="drop"
377        )
378        self.assert_compile(
379            schema.CreateTable(tbl),
380            "CREATE TABLE atable (id INTEGER) ON COMMIT DROP",
381        )
382
383    def test_create_table_with_multiple_options(self):
384        m = MetaData()
385        tbl = Table(
386            "atable",
387            m,
388            Column("id", Integer),
389            postgresql_tablespace="sometablespace",
390            postgresql_with_oids=False,
391            postgresql_on_commit="preserve_rows",
392        )
393        self.assert_compile(
394            schema.CreateTable(tbl),
395            "CREATE TABLE atable (id INTEGER) WITHOUT OIDS "
396            "ON COMMIT PRESERVE ROWS TABLESPACE sometablespace",
397        )
398
399    def test_create_partial_index(self):
400        m = MetaData()
401        tbl = Table("testtbl", m, Column("data", Integer))
402        idx = Index(
403            "test_idx1",
404            tbl.c.data,
405            postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10),
406        )
407        idx = Index(
408            "test_idx1",
409            tbl.c.data,
410            postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10),
411        )
412
413        # test quoting and all that
414
415        idx2 = Index(
416            "test_idx2",
417            tbl.c.data,
418            postgresql_where=and_(tbl.c.data > "a", tbl.c.data < "b's"),
419        )
420        self.assert_compile(
421            schema.CreateIndex(idx),
422            "CREATE INDEX test_idx1 ON testtbl (data) "
423            "WHERE data > 5 AND data < 10",
424            dialect=postgresql.dialect(),
425        )
426        self.assert_compile(
427            schema.CreateIndex(idx2),
428            "CREATE INDEX test_idx2 ON testtbl (data) "
429            "WHERE data > 'a' AND data < 'b''s'",
430            dialect=postgresql.dialect(),
431        )
432
433    def test_create_index_with_ops(self):
434        m = MetaData()
435        tbl = Table(
436            "testtbl",
437            m,
438            Column("data", String),
439            Column("data2", Integer, key="d2"),
440        )
441
442        idx = Index(
443            "test_idx1",
444            tbl.c.data,
445            postgresql_ops={"data": "text_pattern_ops"},
446        )
447
448        idx2 = Index(
449            "test_idx2",
450            tbl.c.data,
451            tbl.c.d2,
452            postgresql_ops={"data": "text_pattern_ops", "d2": "int4_ops"},
453        )
454
455        self.assert_compile(
456            schema.CreateIndex(idx),
457            "CREATE INDEX test_idx1 ON testtbl " "(data text_pattern_ops)",
458            dialect=postgresql.dialect(),
459        )
460        self.assert_compile(
461            schema.CreateIndex(idx2),
462            "CREATE INDEX test_idx2 ON testtbl "
463            "(data text_pattern_ops, data2 int4_ops)",
464            dialect=postgresql.dialect(),
465        )
466
467    def test_create_index_with_labeled_ops(self):
468        m = MetaData()
469        tbl = Table(
470            "testtbl",
471            m,
472            Column("data", String),
473            Column("data2", Integer, key="d2"),
474        )
475
476        idx = Index(
477            "test_idx1",
478            func.lower(tbl.c.data).label("data_lower"),
479            postgresql_ops={"data_lower": "text_pattern_ops"},
480        )
481
482        idx2 = Index(
483            "test_idx2",
484            (func.xyz(tbl.c.data) + tbl.c.d2).label("bar"),
485            tbl.c.d2.label("foo"),
486            postgresql_ops={"bar": "text_pattern_ops", "foo": "int4_ops"},
487        )
488
489        self.assert_compile(
490            schema.CreateIndex(idx),
491            "CREATE INDEX test_idx1 ON testtbl "
492            "(lower(data) text_pattern_ops)",
493            dialect=postgresql.dialect(),
494        )
495        self.assert_compile(
496            schema.CreateIndex(idx2),
497            "CREATE INDEX test_idx2 ON testtbl "
498            "((xyz(data) + data2) text_pattern_ops, "
499            "data2 int4_ops)",
500            dialect=postgresql.dialect(),
501        )
502
503    def test_create_index_with_text_or_composite(self):
504        m = MetaData()
505        tbl = Table("testtbl", m, Column("d1", String), Column("d2", Integer))
506
507        idx = Index("test_idx1", text("x"))
508        tbl.append_constraint(idx)
509
510        idx2 = Index("test_idx2", text("y"), tbl.c.d2)
511
512        idx3 = Index(
513            "test_idx2",
514            tbl.c.d1,
515            text("y"),
516            tbl.c.d2,
517            postgresql_ops={"d1": "x1", "d2": "x2"},
518        )
519
520        idx4 = Index(
521            "test_idx2",
522            tbl.c.d1,
523            tbl.c.d2 > 5,
524            text("q"),
525            postgresql_ops={"d1": "x1", "d2": "x2"},
526        )
527
528        idx5 = Index(
529            "test_idx2",
530            tbl.c.d1,
531            (tbl.c.d2 > 5).label("g"),
532            text("q"),
533            postgresql_ops={"d1": "x1", "g": "x2"},
534        )
535
536        self.assert_compile(
537            schema.CreateIndex(idx), "CREATE INDEX test_idx1 ON testtbl (x)"
538        )
539        self.assert_compile(
540            schema.CreateIndex(idx2),
541            "CREATE INDEX test_idx2 ON testtbl (y, d2)",
542        )
543        self.assert_compile(
544            schema.CreateIndex(idx3),
545            "CREATE INDEX test_idx2 ON testtbl (d1 x1, y, d2 x2)",
546        )
547
548        # note that at the moment we do not expect the 'd2' op to
549        # pick up on the "d2 > 5" expression
550        self.assert_compile(
551            schema.CreateIndex(idx4),
552            "CREATE INDEX test_idx2 ON testtbl (d1 x1, (d2 > 5), q)",
553        )
554
555        # however it does work if we label!
556        self.assert_compile(
557            schema.CreateIndex(idx5),
558            "CREATE INDEX test_idx2 ON testtbl (d1 x1, (d2 > 5) x2, q)",
559        )
560
561    def test_create_index_with_using(self):
562        m = MetaData()
563        tbl = Table("testtbl", m, Column("data", String))
564
565        idx1 = Index("test_idx1", tbl.c.data)
566        idx2 = Index("test_idx2", tbl.c.data, postgresql_using="btree")
567        idx3 = Index("test_idx3", tbl.c.data, postgresql_using="hash")
568
569        self.assert_compile(
570            schema.CreateIndex(idx1),
571            "CREATE INDEX test_idx1 ON testtbl " "(data)",
572            dialect=postgresql.dialect(),
573        )
574        self.assert_compile(
575            schema.CreateIndex(idx2),
576            "CREATE INDEX test_idx2 ON testtbl " "USING btree (data)",
577            dialect=postgresql.dialect(),
578        )
579        self.assert_compile(
580            schema.CreateIndex(idx3),
581            "CREATE INDEX test_idx3 ON testtbl " "USING hash (data)",
582            dialect=postgresql.dialect(),
583        )
584
585    def test_create_index_with_with(self):
586        m = MetaData()
587        tbl = Table("testtbl", m, Column("data", String))
588
589        idx1 = Index("test_idx1", tbl.c.data)
590        idx2 = Index(
591            "test_idx2", tbl.c.data, postgresql_with={"fillfactor": 50}
592        )
593        idx3 = Index(
594            "test_idx3",
595            tbl.c.data,
596            postgresql_using="gist",
597            postgresql_with={"buffering": "off"},
598        )
599
600        self.assert_compile(
601            schema.CreateIndex(idx1),
602            "CREATE INDEX test_idx1 ON testtbl " "(data)",
603        )
604        self.assert_compile(
605            schema.CreateIndex(idx2),
606            "CREATE INDEX test_idx2 ON testtbl "
607            "(data) "
608            "WITH (fillfactor = 50)",
609        )
610        self.assert_compile(
611            schema.CreateIndex(idx3),
612            "CREATE INDEX test_idx3 ON testtbl "
613            "USING gist (data) "
614            "WITH (buffering = off)",
615        )
616
617    def test_create_index_with_using_unusual_conditions(self):
618        m = MetaData()
619        tbl = Table("testtbl", m, Column("data", String))
620
621        self.assert_compile(
622            schema.CreateIndex(
623                Index("test_idx1", tbl.c.data, postgresql_using="GIST")
624            ),
625            "CREATE INDEX test_idx1 ON testtbl " "USING gist (data)",
626        )
627
628        self.assert_compile(
629            schema.CreateIndex(
630                Index(
631                    "test_idx1",
632                    tbl.c.data,
633                    postgresql_using="some_custom_method",
634                )
635            ),
636            "CREATE INDEX test_idx1 ON testtbl "
637            "USING some_custom_method (data)",
638        )
639
640        assert_raises_message(
641            exc.CompileError,
642            "Unexpected SQL phrase: 'gin invalid sql'",
643            schema.CreateIndex(
644                Index(
645                    "test_idx2", tbl.c.data, postgresql_using="gin invalid sql"
646                )
647            ).compile,
648            dialect=postgresql.dialect(),
649        )
650
651    def test_create_index_with_tablespace(self):
652        m = MetaData()
653        tbl = Table("testtbl", m, Column("data", String))
654
655        idx1 = Index("test_idx1", tbl.c.data)
656        idx2 = Index(
657            "test_idx2", tbl.c.data, postgresql_tablespace="sometablespace"
658        )
659        idx3 = Index(
660            "test_idx3",
661            tbl.c.data,
662            postgresql_tablespace="another table space",
663        )
664
665        self.assert_compile(
666            schema.CreateIndex(idx1),
667            "CREATE INDEX test_idx1 ON testtbl " "(data)",
668            dialect=postgresql.dialect(),
669        )
670        self.assert_compile(
671            schema.CreateIndex(idx2),
672            "CREATE INDEX test_idx2 ON testtbl "
673            "(data) "
674            "TABLESPACE sometablespace",
675            dialect=postgresql.dialect(),
676        )
677        self.assert_compile(
678            schema.CreateIndex(idx3),
679            "CREATE INDEX test_idx3 ON testtbl "
680            "(data) "
681            'TABLESPACE "another table space"',
682            dialect=postgresql.dialect(),
683        )
684
685    def test_create_index_with_multiple_options(self):
686        m = MetaData()
687        tbl = Table("testtbl", m, Column("data", String))
688
689        idx1 = Index(
690            "test_idx1",
691            tbl.c.data,
692            postgresql_using="btree",
693            postgresql_tablespace="atablespace",
694            postgresql_with={"fillfactor": 60},
695            postgresql_where=and_(tbl.c.data > 5, tbl.c.data < 10),
696        )
697
698        self.assert_compile(
699            schema.CreateIndex(idx1),
700            "CREATE INDEX test_idx1 ON testtbl "
701            "USING btree (data) "
702            "WITH (fillfactor = 60) "
703            "TABLESPACE atablespace "
704            "WHERE data > 5 AND data < 10",
705            dialect=postgresql.dialect(),
706        )
707
708    def test_create_index_expr_gets_parens(self):
709        m = MetaData()
710        tbl = Table("testtbl", m, Column("x", Integer), Column("y", Integer))
711
712        idx1 = Index("test_idx1", 5 / (tbl.c.x + tbl.c.y))
713        self.assert_compile(
714            schema.CreateIndex(idx1),
715            "CREATE INDEX test_idx1 ON testtbl ((5 / (x + y)))",
716        )
717
718    def test_create_index_literals(self):
719        m = MetaData()
720        tbl = Table("testtbl", m, Column("data", Integer))
721
722        idx1 = Index("test_idx1", tbl.c.data + 5)
723        self.assert_compile(
724            schema.CreateIndex(idx1),
725            "CREATE INDEX test_idx1 ON testtbl ((data + 5))",
726        )
727
728    def test_create_index_concurrently(self):
729        m = MetaData()
730        tbl = Table("testtbl", m, Column("data", Integer))
731
732        idx1 = Index("test_idx1", tbl.c.data, postgresql_concurrently=True)
733        self.assert_compile(
734            schema.CreateIndex(idx1),
735            "CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data)",
736        )
737
738        dialect_8_1 = postgresql.dialect()
739        dialect_8_1._supports_create_index_concurrently = False
740        self.assert_compile(
741            schema.CreateIndex(idx1),
742            "CREATE INDEX test_idx1 ON testtbl (data)",
743            dialect=dialect_8_1,
744        )
745
746    def test_drop_index_concurrently(self):
747        m = MetaData()
748        tbl = Table("testtbl", m, Column("data", Integer))
749
750        idx1 = Index("test_idx1", tbl.c.data, postgresql_concurrently=True)
751        self.assert_compile(
752            schema.DropIndex(idx1), "DROP INDEX CONCURRENTLY test_idx1"
753        )
754
755        dialect_9_1 = postgresql.dialect()
756        dialect_9_1._supports_drop_index_concurrently = False
757        self.assert_compile(
758            schema.DropIndex(idx1), "DROP INDEX test_idx1", dialect=dialect_9_1
759        )
760
761    def test_exclude_constraint_min(self):
762        m = MetaData()
763        tbl = Table("testtbl", m, Column("room", Integer, primary_key=True))
764        cons = ExcludeConstraint(("room", "="))
765        tbl.append_constraint(cons)
766        self.assert_compile(
767            schema.AddConstraint(cons),
768            "ALTER TABLE testtbl ADD EXCLUDE USING gist " "(room WITH =)",
769            dialect=postgresql.dialect(),
770        )
771
772    @testing.combinations(
773        (True, "deferred"),
774        (False, "immediate"),
775        argnames="deferrable_value, initially_value",
776    )
777    def test_copy_exclude_constraint_adhoc_columns(
778        self, deferrable_value, initially_value
779    ):
780        meta = MetaData()
781        table = Table(
782            "mytable",
783            meta,
784            Column("myid", Integer, Sequence("foo_id_seq"), primary_key=True),
785            Column("valid_from_date", Date(), nullable=True),
786            Column("valid_thru_date", Date(), nullable=True),
787        )
788        cons = ExcludeConstraint(
789            (
790                literal_column(
791                    "daterange(valid_from_date, valid_thru_date, '[]')"
792                ),
793                "&&",
794            ),
795            where=column("valid_from_date") <= column("valid_thru_date"),
796            name="ex_mytable_valid_date_range",
797            deferrable=deferrable_value,
798            initially=initially_value,
799        )
800
801        table.append_constraint(cons)
802        expected = (
803            "ALTER TABLE mytable ADD CONSTRAINT ex_mytable_valid_date_range "
804            "EXCLUDE USING gist "
805            "(daterange(valid_from_date, valid_thru_date, '[]') WITH &&) "
806            "WHERE (valid_from_date <= valid_thru_date) "
807            "%s %s"
808            % (
809                "NOT DEFERRABLE" if not deferrable_value else "DEFERRABLE",
810                "INITIALLY %s" % initially_value,
811            )
812        )
813        self.assert_compile(
814            schema.AddConstraint(cons),
815            expected,
816            dialect=postgresql.dialect(),
817        )
818
819        meta2 = MetaData()
820        table2 = table.tometadata(meta2)
821        cons2 = [
822            c for c in table2.constraints if isinstance(c, ExcludeConstraint)
823        ][0]
824        self.assert_compile(
825            schema.AddConstraint(cons2),
826            expected,
827            dialect=postgresql.dialect(),
828        )
829
830    def test_exclude_constraint_full(self):
831        m = MetaData()
832        room = Column("room", Integer, primary_key=True)
833        tbl = Table("testtbl", m, room, Column("during", TSRANGE))
834        room = Column("room", Integer, primary_key=True)
835        cons = ExcludeConstraint(
836            (room, "="),
837            ("during", "&&"),
838            name="my_name",
839            using="gist",
840            where="room > 100",
841            deferrable=True,
842            initially="immediate",
843            ops={"room": "my_opclass"},
844        )
845        tbl.append_constraint(cons)
846        self.assert_compile(
847            schema.AddConstraint(cons),
848            "ALTER TABLE testtbl ADD CONSTRAINT my_name "
849            "EXCLUDE USING gist "
850            "(room my_opclass WITH =, during WITH "
851            "&&) WHERE "
852            "(room > 100) DEFERRABLE INITIALLY immediate",
853            dialect=postgresql.dialect(),
854        )
855
856    def test_exclude_constraint_copy(self):
857        m = MetaData()
858        cons = ExcludeConstraint(("room", "="))
859        tbl = Table(
860            "testtbl", m, Column("room", Integer, primary_key=True), cons
861        )
862        # apparently you can't copy a ColumnCollectionConstraint until
863        # after it has been bound to a table...
864        cons_copy = cons.copy()
865        tbl.append_constraint(cons_copy)
866        self.assert_compile(
867            schema.AddConstraint(cons_copy),
868            "ALTER TABLE testtbl ADD EXCLUDE USING gist " "(room WITH =)",
869        )
870
871    def test_exclude_constraint_copy_where_using(self):
872        m = MetaData()
873        tbl = Table("testtbl", m, Column("room", Integer, primary_key=True))
874        cons = ExcludeConstraint(
875            (tbl.c.room, "="), where=tbl.c.room > 5, using="foobar"
876        )
877        tbl.append_constraint(cons)
878        self.assert_compile(
879            schema.AddConstraint(cons),
880            "ALTER TABLE testtbl ADD EXCLUDE USING foobar "
881            "(room WITH =) WHERE (testtbl.room > 5)",
882        )
883
884        m2 = MetaData()
885        tbl2 = tbl.tometadata(m2)
886        self.assert_compile(
887            schema.CreateTable(tbl2),
888            "CREATE TABLE testtbl (room SERIAL NOT NULL, "
889            "PRIMARY KEY (room), "
890            "EXCLUDE USING foobar "
891            "(room WITH =) WHERE (testtbl.room > 5))",
892        )
893
894    def test_exclude_constraint_text(self):
895        m = MetaData()
896        cons = ExcludeConstraint((text("room::TEXT"), "="))
897        Table("testtbl", m, Column("room", String), cons)
898        self.assert_compile(
899            schema.AddConstraint(cons),
900            "ALTER TABLE testtbl ADD EXCLUDE USING gist "
901            "(room::TEXT WITH =)",
902        )
903
904    def test_exclude_constraint_colname_needs_quoting(self):
905        m = MetaData()
906        cons = ExcludeConstraint(("Some Column Name", "="))
907        Table("testtbl", m, Column("Some Column Name", String), cons)
908        self.assert_compile(
909            schema.AddConstraint(cons),
910            "ALTER TABLE testtbl ADD EXCLUDE USING gist "
911            '("Some Column Name" WITH =)',
912        )
913
914    def test_exclude_constraint_with_using_unusual_conditions(self):
915        m = MetaData()
916        cons = ExcludeConstraint(("q", "="), using="not a keyword")
917        Table("testtbl", m, Column("q", String), cons)
918        assert_raises_message(
919            exc.CompileError,
920            "Unexpected SQL phrase: 'not a keyword'",
921            schema.AddConstraint(cons).compile,
922            dialect=postgresql.dialect(),
923        )
924
925    def test_exclude_constraint_cast(self):
926        m = MetaData()
927        tbl = Table("testtbl", m, Column("room", String))
928        cons = ExcludeConstraint((cast(tbl.c.room, Text), "="))
929        tbl.append_constraint(cons)
930        self.assert_compile(
931            schema.AddConstraint(cons),
932            "ALTER TABLE testtbl ADD EXCLUDE USING gist "
933            "(CAST(room AS TEXT) WITH =)",
934        )
935
936    def test_exclude_constraint_cast_quote(self):
937        m = MetaData()
938        tbl = Table("testtbl", m, Column("Room", String))
939        cons = ExcludeConstraint((cast(tbl.c.Room, Text), "="))
940        tbl.append_constraint(cons)
941        self.assert_compile(
942            schema.AddConstraint(cons),
943            "ALTER TABLE testtbl ADD EXCLUDE USING gist "
944            '(CAST("Room" AS TEXT) WITH =)',
945        )
946
947    def test_exclude_constraint_when(self):
948        m = MetaData()
949        tbl = Table("testtbl", m, Column("room", String))
950        cons = ExcludeConstraint(("room", "="), where=tbl.c.room.in_(["12"]))
951        tbl.append_constraint(cons)
952        self.assert_compile(
953            schema.AddConstraint(cons),
954            "ALTER TABLE testtbl ADD EXCLUDE USING gist "
955            "(room WITH =) WHERE (testtbl.room IN ('12'))",
956            dialect=postgresql.dialect(),
957        )
958
959    def test_exclude_constraint_ops_many(self):
960        m = MetaData()
961        tbl = Table(
962            "testtbl", m, Column("room", String), Column("during", TSRANGE)
963        )
964        cons = ExcludeConstraint(
965            ("room", "="),
966            ("during", "&&"),
967            ops={"room": "first_opsclass", "during": "second_opclass"},
968        )
969        tbl.append_constraint(cons)
970        self.assert_compile(
971            schema.AddConstraint(cons),
972            "ALTER TABLE testtbl ADD EXCLUDE USING gist "
973            "(room first_opsclass WITH =, during second_opclass WITH &&)",
974            dialect=postgresql.dialect(),
975        )
976
977    def test_substring(self):
978        self.assert_compile(
979            func.substring("abc", 1, 2),
980            "SUBSTRING(%(substring_1)s FROM %(substring_2)s "
981            "FOR %(substring_3)s)",
982        )
983        self.assert_compile(
984            func.substring("abc", 1),
985            "SUBSTRING(%(substring_1)s FROM %(substring_2)s)",
986        )
987
988    def test_for_update(self):
989        table1 = table(
990            "mytable", column("myid"), column("name"), column("description")
991        )
992
993        self.assert_compile(
994            table1.select(table1.c.myid == 7).with_for_update(),
995            "SELECT mytable.myid, mytable.name, mytable.description "
996            "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE",
997        )
998
999        self.assert_compile(
1000            table1.select(table1.c.myid == 7).with_for_update(nowait=True),
1001            "SELECT mytable.myid, mytable.name, mytable.description "
1002            "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE NOWAIT",
1003        )
1004
1005        self.assert_compile(
1006            table1.select(table1.c.myid == 7).with_for_update(
1007                skip_locked=True
1008            ),
1009            "SELECT mytable.myid, mytable.name, mytable.description "
1010            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1011            "FOR UPDATE SKIP LOCKED",
1012        )
1013
1014        self.assert_compile(
1015            table1.select(table1.c.myid == 7).with_for_update(read=True),
1016            "SELECT mytable.myid, mytable.name, mytable.description "
1017            "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE",
1018        )
1019
1020        self.assert_compile(
1021            table1.select(table1.c.myid == 7).with_for_update(
1022                read=True, nowait=True
1023            ),
1024            "SELECT mytable.myid, mytable.name, mytable.description "
1025            "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE NOWAIT",
1026        )
1027
1028        self.assert_compile(
1029            table1.select(table1.c.myid == 7).with_for_update(
1030                key_share=True, nowait=True
1031            ),
1032            "SELECT mytable.myid, mytable.name, mytable.description "
1033            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1034            "FOR NO KEY UPDATE NOWAIT",
1035        )
1036
1037        self.assert_compile(
1038            table1.select(table1.c.myid == 7).with_for_update(
1039                key_share=True, read=True, nowait=True
1040            ),
1041            "SELECT mytable.myid, mytable.name, mytable.description "
1042            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1043            "FOR KEY SHARE NOWAIT",
1044        )
1045
1046        self.assert_compile(
1047            table1.select(table1.c.myid == 7).with_for_update(
1048                read=True, skip_locked=True
1049            ),
1050            "SELECT mytable.myid, mytable.name, mytable.description "
1051            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1052            "FOR SHARE SKIP LOCKED",
1053        )
1054
1055        self.assert_compile(
1056            table1.select(table1.c.myid == 7).with_for_update(
1057                of=table1.c.myid
1058            ),
1059            "SELECT mytable.myid, mytable.name, mytable.description "
1060            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1061            "FOR UPDATE OF mytable",
1062        )
1063
1064        self.assert_compile(
1065            table1.select(table1.c.myid == 7).with_for_update(
1066                read=True, nowait=True, of=table1
1067            ),
1068            "SELECT mytable.myid, mytable.name, mytable.description "
1069            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1070            "FOR SHARE OF mytable NOWAIT",
1071        )
1072
1073        self.assert_compile(
1074            table1.select(table1.c.myid == 7).with_for_update(
1075                key_share=True, read=True, nowait=True, of=table1
1076            ),
1077            "SELECT mytable.myid, mytable.name, mytable.description "
1078            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1079            "FOR KEY SHARE OF mytable NOWAIT",
1080        )
1081
1082        self.assert_compile(
1083            table1.select(table1.c.myid == 7).with_for_update(
1084                read=True, nowait=True, of=table1.c.myid
1085            ),
1086            "SELECT mytable.myid, mytable.name, mytable.description "
1087            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1088            "FOR SHARE OF mytable NOWAIT",
1089        )
1090
1091        self.assert_compile(
1092            table1.select(table1.c.myid == 7).with_for_update(
1093                read=True, nowait=True, of=[table1.c.myid, table1.c.name]
1094            ),
1095            "SELECT mytable.myid, mytable.name, mytable.description "
1096            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1097            "FOR SHARE OF mytable NOWAIT",
1098        )
1099
1100        self.assert_compile(
1101            table1.select(table1.c.myid == 7).with_for_update(
1102                read=True,
1103                skip_locked=True,
1104                of=[table1.c.myid, table1.c.name],
1105                key_share=True,
1106            ),
1107            "SELECT mytable.myid, mytable.name, mytable.description "
1108            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1109            "FOR KEY SHARE OF mytable SKIP LOCKED",
1110        )
1111
1112        self.assert_compile(
1113            table1.select(table1.c.myid == 7).with_for_update(
1114                skip_locked=True, of=[table1.c.myid, table1.c.name]
1115            ),
1116            "SELECT mytable.myid, mytable.name, mytable.description "
1117            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1118            "FOR UPDATE OF mytable SKIP LOCKED",
1119        )
1120
1121        self.assert_compile(
1122            table1.select(table1.c.myid == 7).with_for_update(
1123                read=True, skip_locked=True, of=[table1.c.myid, table1.c.name]
1124            ),
1125            "SELECT mytable.myid, mytable.name, mytable.description "
1126            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1127            "FOR SHARE OF mytable SKIP LOCKED",
1128        )
1129
1130        self.assert_compile(
1131            table1.select(table1.c.myid == 7).with_for_update(
1132                key_share=True, nowait=True, of=[table1.c.myid, table1.c.name]
1133            ),
1134            "SELECT mytable.myid, mytable.name, mytable.description "
1135            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1136            "FOR NO KEY UPDATE OF mytable NOWAIT",
1137        )
1138
1139        self.assert_compile(
1140            table1.select(table1.c.myid == 7).with_for_update(
1141                key_share=True,
1142                skip_locked=True,
1143                of=[table1.c.myid, table1.c.name],
1144            ),
1145            "SELECT mytable.myid, mytable.name, mytable.description "
1146            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1147            "FOR NO KEY UPDATE OF mytable SKIP LOCKED",
1148        )
1149
1150        self.assert_compile(
1151            table1.select(table1.c.myid == 7).with_for_update(
1152                key_share=True, of=[table1.c.myid, table1.c.name]
1153            ),
1154            "SELECT mytable.myid, mytable.name, mytable.description "
1155            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1156            "FOR NO KEY UPDATE OF mytable",
1157        )
1158
1159        self.assert_compile(
1160            table1.select(table1.c.myid == 7).with_for_update(key_share=True),
1161            "SELECT mytable.myid, mytable.name, mytable.description "
1162            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1163            "FOR NO KEY UPDATE",
1164        )
1165
1166        self.assert_compile(
1167            table1.select(table1.c.myid == 7).with_for_update(
1168                read=True, key_share=True
1169            ),
1170            "SELECT mytable.myid, mytable.name, mytable.description "
1171            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1172            "FOR KEY SHARE",
1173        )
1174
1175        self.assert_compile(
1176            table1.select(table1.c.myid == 7).with_for_update(
1177                read=True, key_share=True, of=table1
1178            ),
1179            "SELECT mytable.myid, mytable.name, mytable.description "
1180            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1181            "FOR KEY SHARE OF mytable",
1182        )
1183
1184        self.assert_compile(
1185            table1.select(table1.c.myid == 7).with_for_update(
1186                read=True, of=table1
1187            ),
1188            "SELECT mytable.myid, mytable.name, mytable.description "
1189            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1190            "FOR SHARE OF mytable",
1191        )
1192
1193        self.assert_compile(
1194            table1.select(table1.c.myid == 7).with_for_update(
1195                read=True, key_share=True, skip_locked=True
1196            ),
1197            "SELECT mytable.myid, mytable.name, mytable.description "
1198            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1199            "FOR KEY SHARE SKIP LOCKED",
1200        )
1201
1202        self.assert_compile(
1203            table1.select(table1.c.myid == 7).with_for_update(
1204                key_share=True, skip_locked=True
1205            ),
1206            "SELECT mytable.myid, mytable.name, mytable.description "
1207            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1208            "FOR NO KEY UPDATE SKIP LOCKED",
1209        )
1210
1211        ta = table1.alias()
1212        self.assert_compile(
1213            ta.select(ta.c.myid == 7).with_for_update(
1214                of=[ta.c.myid, ta.c.name]
1215            ),
1216            "SELECT mytable_1.myid, mytable_1.name, mytable_1.description "
1217            "FROM mytable AS mytable_1 "
1218            "WHERE mytable_1.myid = %(myid_1)s FOR UPDATE OF mytable_1",
1219        )
1220
1221        table2 = table("table2", column("mytable_id"))
1222        join = table2.join(table1, table2.c.mytable_id == table1.c.myid)
1223        self.assert_compile(
1224            join.select(table2.c.mytable_id == 7).with_for_update(of=[join]),
1225            "SELECT table2.mytable_id, "
1226            "mytable.myid, mytable.name, mytable.description "
1227            "FROM table2 "
1228            "JOIN mytable ON table2.mytable_id = mytable.myid "
1229            "WHERE table2.mytable_id = %(mytable_id_1)s "
1230            "FOR UPDATE OF mytable, table2",
1231        )
1232
1233        join = table2.join(ta, table2.c.mytable_id == ta.c.myid)
1234        self.assert_compile(
1235            join.select(table2.c.mytable_id == 7).with_for_update(of=[join]),
1236            "SELECT table2.mytable_id, "
1237            "mytable_1.myid, mytable_1.name, mytable_1.description "
1238            "FROM table2 "
1239            "JOIN mytable AS mytable_1 "
1240            "ON table2.mytable_id = mytable_1.myid "
1241            "WHERE table2.mytable_id = %(mytable_id_1)s "
1242            "FOR UPDATE OF mytable_1, table2",
1243        )
1244
1245        # ensure of=text() for of works
1246        self.assert_compile(
1247            table1.select(table1.c.myid == 7).with_for_update(
1248                of=text("table1")
1249            ),
1250            "SELECT mytable.myid, mytable.name, mytable.description "
1251            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1252            "FOR UPDATE OF table1",
1253        )
1254
1255        # ensure literal_column of works
1256        self.assert_compile(
1257            table1.select(table1.c.myid == 7).with_for_update(
1258                of=literal_column("table1")
1259            ),
1260            "SELECT mytable.myid, mytable.name, mytable.description "
1261            "FROM mytable WHERE mytable.myid = %(myid_1)s "
1262            "FOR UPDATE OF table1",
1263        )
1264
1265    def test_for_update_with_schema(self):
1266        m = MetaData()
1267        table1 = Table(
1268            "mytable", m, Column("myid"), Column("name"), schema="testschema"
1269        )
1270
1271        self.assert_compile(
1272            table1.select(table1.c.myid == 7).with_for_update(of=table1),
1273            "SELECT testschema.mytable.myid, testschema.mytable.name "
1274            "FROM testschema.mytable "
1275            "WHERE testschema.mytable.myid = %(myid_1)s "
1276            "FOR UPDATE OF mytable",
1277        )
1278
1279    def test_reserved_words(self):
1280        table = Table(
1281            "pg_table",
1282            MetaData(),
1283            Column("col1", Integer),
1284            Column("variadic", Integer),
1285        )
1286        x = select([table.c.col1, table.c.variadic])
1287
1288        self.assert_compile(
1289            x, """SELECT pg_table.col1, pg_table."variadic" FROM pg_table"""
1290        )
1291
1292    def test_array(self):
1293        c = Column("x", postgresql.ARRAY(Integer))
1294
1295        self.assert_compile(
1296            cast(c, postgresql.ARRAY(Integer)), "CAST(x AS INTEGER[])"
1297        )
1298        self.assert_compile(c[5], "x[%(x_1)s]", checkparams={"x_1": 5})
1299
1300        self.assert_compile(
1301            c[5:7], "x[%(x_1)s:%(x_2)s]", checkparams={"x_2": 7, "x_1": 5}
1302        )
1303        self.assert_compile(
1304            c[5:7][2:3],
1305            "x[%(x_1)s:%(x_2)s][%(param_1)s:%(param_2)s]",
1306            checkparams={"x_2": 7, "x_1": 5, "param_1": 2, "param_2": 3},
1307        )
1308        self.assert_compile(
1309            c[5:7][3],
1310            "x[%(x_1)s:%(x_2)s][%(param_1)s]",
1311            checkparams={"x_2": 7, "x_1": 5, "param_1": 3},
1312        )
1313
1314        self.assert_compile(
1315            c.contains([1]),
1316            "x @> %(x_1)s::INTEGER[]",
1317            checkparams={"x_1": [1]},
1318            dialect=PGDialect_psycopg2(),
1319        )
1320        self.assert_compile(
1321            c.contained_by([2]),
1322            "x <@ %(x_1)s::INTEGER[]",
1323            checkparams={"x_1": [2]},
1324            dialect=PGDialect_psycopg2(),
1325        )
1326        self.assert_compile(
1327            c.contained_by([2]),
1328            "x <@ %(x_1)s",
1329            checkparams={"x_1": [2]},
1330            dialect=PGDialect(),
1331        )
1332        self.assert_compile(
1333            c.overlap([3]),
1334            "x && %(x_1)s::INTEGER[]",
1335            checkparams={"x_1": [3]},
1336            dialect=PGDialect_psycopg2(),
1337        )
1338        self.assert_compile(
1339            postgresql.Any(4, c),
1340            "%(param_1)s = ANY (x)",
1341            checkparams={"param_1": 4},
1342        )
1343
1344        self.assert_compile(
1345            c.any(5),
1346            "%(param_1)s = ANY (x)",
1347            checkparams={"param_1": 5},
1348        )
1349
1350        self.assert_compile(
1351            ~c.any(5),
1352            "NOT (%(param_1)s = ANY (x))",
1353            checkparams={"param_1": 5},
1354        )
1355
1356        self.assert_compile(
1357            c.all(5),
1358            "%(param_1)s = ALL (x)",
1359            checkparams={"param_1": 5},
1360        )
1361
1362        self.assert_compile(
1363            ~c.all(5),
1364            "NOT (%(param_1)s = ALL (x))",
1365            checkparams={"param_1": 5},
1366        )
1367
1368        self.assert_compile(
1369            c.any(5, operator=operators.ne),
1370            "%(param_1)s != ANY (x)",
1371            checkparams={"param_1": 5},
1372        )
1373        self.assert_compile(
1374            postgresql.All(6, c, operator=operators.gt),
1375            "%(param_1)s > ALL (x)",
1376            checkparams={"param_1": 6},
1377        )
1378        self.assert_compile(
1379            c.all(7, operator=operators.lt),
1380            "%(param_1)s < ALL (x)",
1381            checkparams={"param_1": 7},
1382        )
1383
1384    @testing.combinations((True,), (False,))
1385    def test_array_zero_indexes(self, zero_indexes):
1386        c = Column("x", postgresql.ARRAY(Integer, zero_indexes=zero_indexes))
1387
1388        add_one = 1 if zero_indexes else 0
1389
1390        self.assert_compile(
1391            cast(c, postgresql.ARRAY(Integer, zero_indexes=zero_indexes)),
1392            "CAST(x AS INTEGER[])",
1393        )
1394        self.assert_compile(
1395            c[5], "x[%(x_1)s]", checkparams={"x_1": 5 + add_one}
1396        )
1397
1398        self.assert_compile(
1399            c[5:7],
1400            "x[%(x_1)s:%(x_2)s]",
1401            checkparams={"x_2": 7 + add_one, "x_1": 5 + add_one},
1402        )
1403        self.assert_compile(
1404            c[5:7][2:3],
1405            "x[%(x_1)s:%(x_2)s][%(param_1)s:%(param_2)s]",
1406            checkparams={
1407                "x_2": 7 + add_one,
1408                "x_1": 5 + add_one,
1409                "param_1": 2 + add_one,
1410                "param_2": 3 + add_one,
1411            },
1412        )
1413        self.assert_compile(
1414            c[5:7][3],
1415            "x[%(x_1)s:%(x_2)s][%(param_1)s]",
1416            checkparams={
1417                "x_2": 7 + add_one,
1418                "x_1": 5 + add_one,
1419                "param_1": 3 + add_one,
1420            },
1421        )
1422
1423    def test_array_literal_type(self):
1424        isinstance(postgresql.array([1, 2]).type, postgresql.ARRAY)
1425        is_(postgresql.array([1, 2]).type.item_type._type_affinity, Integer)
1426
1427        is_(
1428            postgresql.array(
1429                [1, 2], type_=String
1430            ).type.item_type._type_affinity,
1431            String,
1432        )
1433
1434    def test_array_literal(self):
1435        self.assert_compile(
1436            func.array_dims(
1437                postgresql.array([1, 2]) + postgresql.array([3, 4, 5])
1438            ),
1439            "array_dims(ARRAY[%(param_1)s, %(param_2)s] || "
1440            "ARRAY[%(param_3)s, %(param_4)s, %(param_5)s])",
1441            checkparams={
1442                "param_5": 5,
1443                "param_4": 4,
1444                "param_1": 1,
1445                "param_3": 3,
1446                "param_2": 2,
1447            },
1448        )
1449
1450    def test_array_literal_compare(self):
1451        self.assert_compile(
1452            postgresql.array([1, 2]) == [3, 4, 5],
1453            "ARRAY[%(param_1)s, %(param_2)s] = "
1454            "ARRAY[%(param_3)s, %(param_4)s, %(param_5)s]",
1455            checkparams={
1456                "param_5": 5,
1457                "param_4": 4,
1458                "param_1": 1,
1459                "param_3": 3,
1460                "param_2": 2,
1461            },
1462        )
1463
1464    def test_array_literal_contains(self):
1465        self.assert_compile(
1466            postgresql.array([1, 2]).contains([3, 4, 5]),
1467            "ARRAY[%(param_1)s, %(param_2)s] @> ARRAY[%(param_3)s, "
1468            "%(param_4)s, %(param_5)s]",
1469            checkparams={
1470                "param_1": 1,
1471                "param_2": 2,
1472                "param_3": 3,
1473                "param_4": 4,
1474                "param_5": 5,
1475            },
1476        )
1477
1478        self.assert_compile(
1479            postgresql.array(["a", "b"]).contains([""]),
1480            "ARRAY[%(param_1)s, %(param_2)s] @> ARRAY[%(param_3)s]",
1481            checkparams={"param_1": "a", "param_2": "b", "param_3": ""},
1482        )
1483
1484        self.assert_compile(
1485            postgresql.array(["a", "b"]).contains([]),
1486            "ARRAY[%(param_1)s, %(param_2)s] @> ARRAY[]",
1487            checkparams={"param_1": "a", "param_2": "b"},
1488        )
1489
1490        self.assert_compile(
1491            postgresql.array(["a", "b"]).contains([0]),
1492            "ARRAY[%(param_1)s, %(param_2)s] @> ARRAY[%(param_3)s]",
1493            checkparams={"param_1": "a", "param_2": "b", "param_3": 0},
1494        )
1495
1496    def test_array_literal_contained_by(self):
1497        self.assert_compile(
1498            postgresql.array(["a", "b"]).contained_by(["a", "b", "c"]),
1499            "ARRAY[%(param_1)s, %(param_2)s] <@ ARRAY[%(param_3)s, "
1500            "%(param_4)s, %(param_5)s]",
1501            checkparams={
1502                "param_1": "a",
1503                "param_2": "b",
1504                "param_3": "a",
1505                "param_4": "b",
1506                "param_5": "c",
1507            },
1508        )
1509
1510        self.assert_compile(
1511            postgresql.array([1, 2]).contained_by([3, 4, 5]),
1512            "ARRAY[%(param_1)s, %(param_2)s] <@ ARRAY[%(param_3)s, "
1513            "%(param_4)s, %(param_5)s]",
1514            checkparams={
1515                "param_1": 1,
1516                "param_2": 2,
1517                "param_3": 3,
1518                "param_4": 4,
1519                "param_5": 5,
1520            },
1521        )
1522
1523        self.assert_compile(
1524            postgresql.array(["a", "b"]).contained_by([""]),
1525            "ARRAY[%(param_1)s, %(param_2)s] <@ ARRAY[%(param_3)s]",
1526            checkparams={"param_1": "a", "param_2": "b", "param_3": ""},
1527        )
1528
1529        self.assert_compile(
1530            postgresql.array(["a", "b"]).contained_by([]),
1531            "ARRAY[%(param_1)s, %(param_2)s] <@ ARRAY[]",
1532            checkparams={"param_1": "a", "param_2": "b"},
1533        )
1534
1535        self.assert_compile(
1536            postgresql.array(["a", "b"]).contained_by([0]),
1537            "ARRAY[%(param_1)s, %(param_2)s] <@ ARRAY[%(param_3)s]",
1538            checkparams={"param_1": "a", "param_2": "b", "param_3": 0},
1539        )
1540
1541    def test_array_literal_insert(self):
1542        m = MetaData()
1543        t = Table("t", m, Column("data", postgresql.ARRAY(Integer)))
1544        self.assert_compile(
1545            t.insert().values(data=array([1, 2, 3])),
1546            "INSERT INTO t (data) VALUES (ARRAY[%(param_1)s, "
1547            "%(param_2)s, %(param_3)s])",
1548        )
1549
1550    def test_update_array(self):
1551        m = MetaData()
1552        t = Table("t", m, Column("data", postgresql.ARRAY(Integer)))
1553        self.assert_compile(
1554            t.update().values({t.c.data: [1, 3, 4]}),
1555            "UPDATE t SET data=%(data)s::INTEGER[]",
1556            checkparams={"data": [1, 3, 4]},
1557        )
1558
1559    def test_update_array_element(self):
1560        m = MetaData()
1561        t = Table("t", m, Column("data", postgresql.ARRAY(Integer)))
1562        self.assert_compile(
1563            t.update().values({t.c.data[5]: 1}),
1564            "UPDATE t SET data[%(data_1)s]=%(param_1)s",
1565            checkparams={"data_1": 5, "param_1": 1},
1566        )
1567
1568    def test_update_array_slice(self):
1569        m = MetaData()
1570        t = Table("t", m, Column("data", postgresql.ARRAY(Integer)))
1571
1572        # psycopg2-specific, has a cast
1573        self.assert_compile(
1574            t.update().values({t.c.data[2:5]: [2, 3, 4]}),
1575            "UPDATE t SET data[%(data_1)s:%(data_2)s]="
1576            "%(param_1)s::INTEGER[]",
1577            checkparams={"param_1": [2, 3, 4], "data_2": 5, "data_1": 2},
1578            dialect=PGDialect_psycopg2(),
1579        )
1580
1581        # default dialect does not, as DBAPIs may be doing this for us
1582        self.assert_compile(
1583            t.update().values({t.c.data[2:5]: [2, 3, 4]}),
1584            "UPDATE t SET data[%s:%s]=" "%s",
1585            checkparams={"param_1": [2, 3, 4], "data_2": 5, "data_1": 2},
1586            dialect=PGDialect(paramstyle="format"),
1587        )
1588
1589    def test_from_only(self):
1590        m = MetaData()
1591        tbl1 = Table("testtbl1", m, Column("id", Integer))
1592        tbl2 = Table("testtbl2", m, Column("id", Integer))
1593
1594        stmt = tbl1.select().with_hint(tbl1, "ONLY", "postgresql")
1595        expected = "SELECT testtbl1.id FROM ONLY testtbl1"
1596        self.assert_compile(stmt, expected)
1597
1598        talias1 = tbl1.alias("foo")
1599        stmt = talias1.select().with_hint(talias1, "ONLY", "postgresql")
1600        expected = "SELECT foo.id FROM ONLY testtbl1 AS foo"
1601        self.assert_compile(stmt, expected)
1602
1603        stmt = select([tbl1, tbl2]).with_hint(tbl1, "ONLY", "postgresql")
1604        expected = (
1605            "SELECT testtbl1.id, testtbl2.id FROM ONLY testtbl1, " "testtbl2"
1606        )
1607        self.assert_compile(stmt, expected)
1608
1609        stmt = select([tbl1, tbl2]).with_hint(tbl2, "ONLY", "postgresql")
1610        expected = (
1611            "SELECT testtbl1.id, testtbl2.id FROM testtbl1, ONLY " "testtbl2"
1612        )
1613        self.assert_compile(stmt, expected)
1614
1615        stmt = select([tbl1, tbl2])
1616        stmt = stmt.with_hint(tbl1, "ONLY", "postgresql")
1617        stmt = stmt.with_hint(tbl2, "ONLY", "postgresql")
1618        expected = (
1619            "SELECT testtbl1.id, testtbl2.id FROM ONLY testtbl1, "
1620            "ONLY testtbl2"
1621        )
1622        self.assert_compile(stmt, expected)
1623
1624        stmt = update(tbl1, values=dict(id=1))
1625        stmt = stmt.with_hint("ONLY", dialect_name="postgresql")
1626        expected = "UPDATE ONLY testtbl1 SET id=%(id)s"
1627        self.assert_compile(stmt, expected)
1628
1629        stmt = delete(tbl1).with_hint(
1630            "ONLY", selectable=tbl1, dialect_name="postgresql"
1631        )
1632        expected = "DELETE FROM ONLY testtbl1"
1633        self.assert_compile(stmt, expected)
1634
1635        tbl3 = Table("testtbl3", m, Column("id", Integer), schema="testschema")
1636        stmt = tbl3.select().with_hint(tbl3, "ONLY", "postgresql")
1637        expected = (
1638            "SELECT testschema.testtbl3.id FROM " "ONLY testschema.testtbl3"
1639        )
1640        self.assert_compile(stmt, expected)
1641
1642        assert_raises(
1643            exc.CompileError,
1644            tbl3.select().with_hint(tbl3, "FAKE", "postgresql").compile,
1645            dialect=postgresql.dialect(),
1646        )
1647
1648    def test_aggregate_order_by_one(self):
1649        m = MetaData()
1650        table = Table("table1", m, Column("a", Integer), Column("b", Integer))
1651        expr = func.array_agg(aggregate_order_by(table.c.a, table.c.b.desc()))
1652        stmt = select([expr])
1653
1654        # note this tests that the object exports FROM objects
1655        # correctly
1656        self.assert_compile(
1657            stmt,
1658            "SELECT array_agg(table1.a ORDER BY table1.b DESC) "
1659            "AS array_agg_1 FROM table1",
1660        )
1661
1662    def test_aggregate_order_by_two(self):
1663        m = MetaData()
1664        table = Table("table1", m, Column("a", Integer), Column("b", Integer))
1665        expr = func.string_agg(
1666            table.c.a, aggregate_order_by(literal_column("','"), table.c.a)
1667        )
1668        stmt = select([expr])
1669
1670        self.assert_compile(
1671            stmt,
1672            "SELECT string_agg(table1.a, ',' ORDER BY table1.a) "
1673            "AS string_agg_1 FROM table1",
1674        )
1675
1676    def test_aggregate_order_by_multi_col(self):
1677        m = MetaData()
1678        table = Table("table1", m, Column("a", Integer), Column("b", Integer))
1679        expr = func.string_agg(
1680            table.c.a,
1681            aggregate_order_by(
1682                literal_column("','"), table.c.a, table.c.b.desc()
1683            ),
1684        )
1685        stmt = select([expr])
1686
1687        self.assert_compile(
1688            stmt,
1689            "SELECT string_agg(table1.a, "
1690            "',' ORDER BY table1.a, table1.b DESC) "
1691            "AS string_agg_1 FROM table1",
1692        )
1693
1694    def test_aggregate_orcer_by_no_arg(self):
1695        assert_raises_message(
1696            TypeError,
1697            "at least one ORDER BY element is required",
1698            aggregate_order_by,
1699            literal_column("','"),
1700        )
1701
1702    def test_pg_array_agg_implicit_pg_array(self):
1703
1704        expr = pg_array_agg(column("data", Integer))
1705        assert isinstance(expr.type, PG_ARRAY)
1706        is_(expr.type.item_type._type_affinity, Integer)
1707
1708    def test_pg_array_agg_uses_base_array(self):
1709
1710        expr = pg_array_agg(column("data", sqltypes.ARRAY(Integer)))
1711        assert isinstance(expr.type, sqltypes.ARRAY)
1712        assert not isinstance(expr.type, PG_ARRAY)
1713        is_(expr.type.item_type._type_affinity, Integer)
1714
1715    def test_pg_array_agg_uses_pg_array(self):
1716
1717        expr = pg_array_agg(column("data", PG_ARRAY(Integer)))
1718        assert isinstance(expr.type, PG_ARRAY)
1719        is_(expr.type.item_type._type_affinity, Integer)
1720
1721    def test_pg_array_agg_explicit_base_array(self):
1722
1723        expr = pg_array_agg(
1724            column("data", sqltypes.ARRAY(Integer)),
1725            type_=sqltypes.ARRAY(Integer),
1726        )
1727        assert isinstance(expr.type, sqltypes.ARRAY)
1728        assert not isinstance(expr.type, PG_ARRAY)
1729        is_(expr.type.item_type._type_affinity, Integer)
1730
1731    def test_pg_array_agg_explicit_pg_array(self):
1732
1733        expr = pg_array_agg(
1734            column("data", sqltypes.ARRAY(Integer)), type_=PG_ARRAY(Integer)
1735        )
1736        assert isinstance(expr.type, PG_ARRAY)
1737        is_(expr.type.item_type._type_affinity, Integer)
1738
1739    def test_aggregate_order_by_adapt(self):
1740        m = MetaData()
1741        table = Table("table1", m, Column("a", Integer), Column("b", Integer))
1742        expr = func.array_agg(aggregate_order_by(table.c.a, table.c.b.desc()))
1743        stmt = select([expr])
1744
1745        a1 = table.alias("foo")
1746        stmt2 = sql_util.ClauseAdapter(a1).traverse(stmt)
1747        self.assert_compile(
1748            stmt2,
1749            "SELECT array_agg(foo.a ORDER BY foo.b DESC) AS array_agg_1 "
1750            "FROM table1 AS foo",
1751        )
1752
1753    def test_array_agg_w_filter_subscript(self):
1754        series = func.generate_series(1, 100).alias("series")
1755        series_col = column("series")
1756        query = select(
1757            [func.array_agg(series_col).filter(series_col % 2 == 0)[3]]
1758        ).select_from(series)
1759        self.assert_compile(
1760            query,
1761            "SELECT (array_agg(series) FILTER "
1762            "(WHERE series %% %(series_1)s = %(param_1)s))[%(param_2)s] "
1763            "AS anon_1 FROM "
1764            "generate_series(%(generate_series_1)s, %(generate_series_2)s) "
1765            "AS series",
1766        )
1767
1768    def test_delete_extra_froms(self):
1769        t1 = table("t1", column("c1"))
1770        t2 = table("t2", column("c1"))
1771        q = delete(t1).where(t1.c.c1 == t2.c.c1)
1772        self.assert_compile(q, "DELETE FROM t1 USING t2 WHERE t1.c1 = t2.c1")
1773
1774    def test_delete_extra_froms_alias(self):
1775        a1 = table("t1", column("c1")).alias("a1")
1776        t2 = table("t2", column("c1"))
1777        q = delete(a1).where(a1.c.c1 == t2.c.c1)
1778        self.assert_compile(
1779            q, "DELETE FROM t1 AS a1 USING t2 WHERE a1.c1 = t2.c1"
1780        )
1781
1782    @testing.combinations(
1783        ("no_persisted", " STORED", "ignore"),
1784        ("persisted_none", " STORED", None),
1785        ("persisted_true", " STORED", True),
1786        id_="iaa",
1787    )
1788    def test_column_computed(self, text, persisted):
1789        m = MetaData()
1790        kwargs = {"persisted": persisted} if persisted != "ignore" else {}
1791        t = Table(
1792            "t",
1793            m,
1794            Column("x", Integer),
1795            Column("y", Integer, Computed("x + 2", **kwargs)),
1796        )
1797        self.assert_compile(
1798            schema.CreateTable(t),
1799            "CREATE TABLE t (x INTEGER, y INTEGER GENERATED "
1800            "ALWAYS AS (x + 2)%s)" % text,
1801        )
1802
1803    def test_column_computed_persisted_false(self):
1804        m = MetaData()
1805        t = Table(
1806            "t",
1807            m,
1808            Column("x", Integer),
1809            Column("y", Integer, Computed("x + 2", persisted=False)),
1810        )
1811        assert_raises_message(
1812            exc.CompileError,
1813            "PostrgreSQL computed columns do not support 'virtual'",
1814            schema.CreateTable(t).compile,
1815            dialect=postgresql.dialect(),
1816        )
1817
1818
1819class InsertOnConflictTest(fixtures.TestBase, AssertsCompiledSQL):
1820    __dialect__ = postgresql.dialect()
1821
1822    def setup(self):
1823        self.table1 = table1 = table(
1824            "mytable",
1825            column("myid", Integer),
1826            column("name", String(128)),
1827            column("description", String(128)),
1828        )
1829        md = MetaData()
1830        self.table_with_metadata = Table(
1831            "mytable",
1832            md,
1833            Column("myid", Integer, primary_key=True),
1834            Column("name", String(128)),
1835            Column("description", String(128)),
1836        )
1837        self.unique_constr = schema.UniqueConstraint(
1838            table1.c.name, name="uq_name"
1839        )
1840        self.excl_constr = ExcludeConstraint(
1841            (table1.c.name, "="),
1842            (table1.c.description, "&&"),
1843            name="excl_thing",
1844        )
1845        self.excl_constr_anon = ExcludeConstraint(
1846            (self.table_with_metadata.c.name, "="),
1847            (self.table_with_metadata.c.description, "&&"),
1848            where=self.table_with_metadata.c.description != "foo",
1849        )
1850        self.goofy_index = Index(
1851            "goofy_index", table1.c.name, postgresql_where=table1.c.name > "m"
1852        )
1853
1854    def test_do_nothing_no_target(self):
1855
1856        i = insert(
1857            self.table1, values=dict(name="foo")
1858        ).on_conflict_do_nothing()
1859        self.assert_compile(
1860            i,
1861            "INSERT INTO mytable (name) VALUES "
1862            "(%(name)s) ON CONFLICT DO NOTHING",
1863        )
1864
1865    def test_do_nothing_index_elements_target(self):
1866
1867        i = insert(
1868            self.table1, values=dict(name="foo")
1869        ).on_conflict_do_nothing(index_elements=["myid"])
1870        self.assert_compile(
1871            i,
1872            "INSERT INTO mytable (name) VALUES "
1873            "(%(name)s) ON CONFLICT (myid) DO NOTHING",
1874        )
1875
1876    def test_do_update_set_clause_none(self):
1877        i = insert(self.table_with_metadata).values(myid=1, name="foo")
1878        i = i.on_conflict_do_update(
1879            index_elements=["myid"],
1880            set_=OrderedDict([("name", "I'm a name"), ("description", None)]),
1881        )
1882        self.assert_compile(
1883            i,
1884            "INSERT INTO mytable (myid, name) VALUES "
1885            "(%(myid)s, %(name)s) ON CONFLICT (myid) "
1886            "DO UPDATE SET name = %(param_1)s, "
1887            "description = %(param_2)s",
1888            {
1889                "myid": 1,
1890                "name": "foo",
1891                "param_1": "I'm a name",
1892                "param_2": None,
1893            },
1894        )
1895
1896    def test_do_update_set_clause_literal(self):
1897        i = insert(self.table_with_metadata).values(myid=1, name="foo")
1898        i = i.on_conflict_do_update(
1899            index_elements=["myid"],
1900            set_=OrderedDict(
1901                [("name", "I'm a name"), ("description", null())]
1902            ),
1903        )
1904        self.assert_compile(
1905            i,
1906            "INSERT INTO mytable (myid, name) VALUES "
1907            "(%(myid)s, %(name)s) ON CONFLICT (myid) "
1908            "DO UPDATE SET name = %(param_1)s, "
1909            "description = NULL",
1910            {"myid": 1, "name": "foo", "param_1": "I'm a name"},
1911        )
1912
1913    def test_do_update_str_index_elements_target_one(self):
1914        i = insert(self.table_with_metadata).values(myid=1, name="foo")
1915        i = i.on_conflict_do_update(
1916            index_elements=["myid"],
1917            set_=OrderedDict(
1918                [
1919                    ("name", i.excluded.name),
1920                    ("description", i.excluded.description),
1921                ]
1922            ),
1923        )
1924        self.assert_compile(
1925            i,
1926            "INSERT INTO mytable (myid, name) VALUES "
1927            "(%(myid)s, %(name)s) ON CONFLICT (myid) "
1928            "DO UPDATE SET name = excluded.name, "
1929            "description = excluded.description",
1930        )
1931
1932    def test_do_update_str_index_elements_target_two(self):
1933        i = insert(self.table1, values=dict(name="foo"))
1934        i = i.on_conflict_do_update(
1935            index_elements=["myid"], set_=dict(name=i.excluded.name)
1936        )
1937        self.assert_compile(
1938            i,
1939            "INSERT INTO mytable (name) VALUES "
1940            "(%(name)s) ON CONFLICT (myid) "
1941            "DO UPDATE SET name = excluded.name",
1942        )
1943
1944    def test_do_update_col_index_elements_target(self):
1945        i = insert(self.table1, values=dict(name="foo"))
1946        i = i.on_conflict_do_update(
1947            index_elements=[self.table1.c.myid],
1948            set_=dict(name=i.excluded.name),
1949        )
1950        self.assert_compile(
1951            i,
1952            "INSERT INTO mytable (name) VALUES "
1953            "(%(name)s) ON CONFLICT (myid) "
1954            "DO UPDATE SET name = excluded.name",
1955        )
1956
1957    def test_do_update_unnamed_pk_constraint_target(self):
1958        i = insert(self.table_with_metadata, values=dict(myid=1, name="foo"))
1959        i = i.on_conflict_do_update(
1960            constraint=self.table_with_metadata.primary_key,
1961            set_=dict(name=i.excluded.name),
1962        )
1963        self.assert_compile(
1964            i,
1965            "INSERT INTO mytable (myid, name) VALUES "
1966            "(%(myid)s, %(name)s) ON CONFLICT (myid) "
1967            "DO UPDATE SET name = excluded.name",
1968        )
1969
1970    def test_do_update_pk_constraint_index_elements_target(self):
1971        i = insert(self.table_with_metadata, values=dict(myid=1, name="foo"))
1972        i = i.on_conflict_do_update(
1973            index_elements=self.table_with_metadata.primary_key,
1974            set_=dict(name=i.excluded.name),
1975        )
1976        self.assert_compile(
1977            i,
1978            "INSERT INTO mytable (myid, name) VALUES "
1979            "(%(myid)s, %(name)s) ON CONFLICT (myid) "
1980            "DO UPDATE SET name = excluded.name",
1981        )
1982
1983    def test_do_update_named_unique_constraint_target(self):
1984        i = insert(self.table1, values=dict(name="foo"))
1985        i = i.on_conflict_do_update(
1986            constraint=self.unique_constr, set_=dict(myid=i.excluded.myid)
1987        )
1988        self.assert_compile(
1989            i,
1990            "INSERT INTO mytable (name) VALUES "
1991            "(%(name)s) ON CONFLICT ON CONSTRAINT uq_name "
1992            "DO UPDATE SET myid = excluded.myid",
1993        )
1994
1995    def test_do_update_string_constraint_target(self):
1996        i = insert(self.table1, values=dict(name="foo"))
1997        i = i.on_conflict_do_update(
1998            constraint=self.unique_constr.name, set_=dict(myid=i.excluded.myid)
1999        )
2000        self.assert_compile(
2001            i,
2002            "INSERT INTO mytable (name) VALUES "
2003            "(%(name)s) ON CONFLICT ON CONSTRAINT uq_name "
2004            "DO UPDATE SET myid = excluded.myid",
2005        )
2006
2007    def test_do_update_index_elements_where_target(self):
2008        i = insert(self.table1, values=dict(name="foo"))
2009        i = i.on_conflict_do_update(
2010            index_elements=self.goofy_index.expressions,
2011            index_where=self.goofy_index.dialect_options["postgresql"][
2012                "where"
2013            ],
2014            set_=dict(name=i.excluded.name),
2015        )
2016        self.assert_compile(
2017            i,
2018            "INSERT INTO mytable (name) VALUES "
2019            "(%(name)s) ON CONFLICT (name) "
2020            "WHERE name > %(name_1)s "
2021            "DO UPDATE SET name = excluded.name",
2022        )
2023
2024    def test_do_update_index_elements_where_target_multivalues(self):
2025        i = insert(
2026            self.table1,
2027            values=[dict(name="foo"), dict(name="bar"), dict(name="bat")],
2028        )
2029        i = i.on_conflict_do_update(
2030            index_elements=self.goofy_index.expressions,
2031            index_where=self.goofy_index.dialect_options["postgresql"][
2032                "where"
2033            ],
2034            set_=dict(name=i.excluded.name),
2035        )
2036        self.assert_compile(
2037            i,
2038            "INSERT INTO mytable (name) "
2039            "VALUES (%(name_m0)s), (%(name_m1)s), (%(name_m2)s) "
2040            "ON CONFLICT (name) "
2041            "WHERE name > %(name_1)s "
2042            "DO UPDATE SET name = excluded.name",
2043            checkparams={
2044                "name_1": "m",
2045                "name_m0": "foo",
2046                "name_m1": "bar",
2047                "name_m2": "bat",
2048            },
2049        )
2050
2051    def test_do_update_unnamed_index_target(self):
2052        i = insert(self.table1, values=dict(name="foo"))
2053
2054        unnamed_goofy = Index(
2055            None, self.table1.c.name, postgresql_where=self.table1.c.name > "m"
2056        )
2057
2058        i = i.on_conflict_do_update(
2059            constraint=unnamed_goofy, set_=dict(name=i.excluded.name)
2060        )
2061        self.assert_compile(
2062            i,
2063            "INSERT INTO mytable (name) VALUES "
2064            "(%(name)s) ON CONFLICT (name) "
2065            "WHERE name > %(name_1)s "
2066            "DO UPDATE SET name = excluded.name",
2067        )
2068
2069    def test_do_update_unnamed_exclude_constraint_target(self):
2070        i = insert(self.table1, values=dict(name="foo"))
2071        i = i.on_conflict_do_update(
2072            constraint=self.excl_constr_anon, set_=dict(name=i.excluded.name)
2073        )
2074        self.assert_compile(
2075            i,
2076            "INSERT INTO mytable (name) VALUES "
2077            "(%(name)s) ON CONFLICT (name, description) "
2078            "WHERE description != %(description_1)s "
2079            "DO UPDATE SET name = excluded.name",
2080        )
2081
2082    def test_do_update_add_whereclause(self):
2083        i = insert(self.table1, values=dict(name="foo"))
2084        i = i.on_conflict_do_update(
2085            constraint=self.excl_constr_anon,
2086            set_=dict(name=i.excluded.name),
2087            where=(
2088                (self.table1.c.name != "brah")
2089                & (self.table1.c.description != "brah")
2090            ),
2091        )
2092        self.assert_compile(
2093            i,
2094            "INSERT INTO mytable (name) VALUES "
2095            "(%(name)s) ON CONFLICT (name, description) "
2096            "WHERE description != %(description_1)s "
2097            "DO UPDATE SET name = excluded.name "
2098            "WHERE mytable.name != %(name_1)s "
2099            "AND mytable.description != %(description_2)s",
2100        )
2101
2102    def test_do_update_add_whereclause_references_excluded(self):
2103        i = insert(self.table1, values=dict(name="foo"))
2104        i = i.on_conflict_do_update(
2105            constraint=self.excl_constr_anon,
2106            set_=dict(name=i.excluded.name),
2107            where=((self.table1.c.name != i.excluded.name)),
2108        )
2109        self.assert_compile(
2110            i,
2111            "INSERT INTO mytable (name) VALUES "
2112            "(%(name)s) ON CONFLICT (name, description) "
2113            "WHERE description != %(description_1)s "
2114            "DO UPDATE SET name = excluded.name "
2115            "WHERE mytable.name != excluded.name",
2116        )
2117
2118    def test_do_update_additional_colnames(self):
2119        i = insert(self.table1, values=dict(name="bar"))
2120        i = i.on_conflict_do_update(
2121            constraint=self.excl_constr_anon,
2122            set_=dict(name="somename", unknown="unknown"),
2123        )
2124        with expect_warnings(
2125            "Additional column names not matching any "
2126            "column keys in table 'mytable': 'unknown'"
2127        ):
2128            self.assert_compile(
2129                i,
2130                "INSERT INTO mytable (name) VALUES "
2131                "(%(name)s) ON CONFLICT (name, description) "
2132                "WHERE description != %(description_1)s "
2133                "DO UPDATE SET name = %(param_1)s, "
2134                "unknown = %(param_2)s",
2135                checkparams={
2136                    "name": "bar",
2137                    "description_1": "foo",
2138                    "param_1": "somename",
2139                    "param_2": "unknown",
2140                },
2141            )
2142
2143    def test_on_conflict_as_cte(self):
2144        i = insert(self.table1, values=dict(name="foo"))
2145        i = (
2146            i.on_conflict_do_update(
2147                constraint=self.excl_constr_anon,
2148                set_=dict(name=i.excluded.name),
2149                where=((self.table1.c.name != i.excluded.name)),
2150            )
2151            .returning(literal_column("1"))
2152            .cte("i_upsert")
2153        )
2154
2155        stmt = select([i])
2156
2157        self.assert_compile(
2158            stmt,
2159            "WITH i_upsert AS "
2160            "(INSERT INTO mytable (name) VALUES (%(name)s) "
2161            "ON CONFLICT (name, description) "
2162            "WHERE description != %(description_1)s "
2163            "DO UPDATE SET name = excluded.name "
2164            "WHERE mytable.name != excluded.name RETURNING 1) "
2165            "SELECT i_upsert.1 "
2166            "FROM i_upsert",
2167        )
2168
2169    def test_quote_raw_string_col(self):
2170        t = table("t", column("FancyName"), column("other name"))
2171
2172        stmt = (
2173            insert(t)
2174            .values(FancyName="something new")
2175            .on_conflict_do_update(
2176                index_elements=["FancyName", "other name"],
2177                set_=OrderedDict(
2178                    [
2179                        ("FancyName", "something updated"),
2180                        ("other name", "something else"),
2181                    ]
2182                ),
2183            )
2184        )
2185
2186        self.assert_compile(
2187            stmt,
2188            'INSERT INTO t ("FancyName") VALUES (%(FancyName)s) '
2189            'ON CONFLICT ("FancyName", "other name") '
2190            'DO UPDATE SET "FancyName" = %(param_1)s, '
2191            '"other name" = %(param_2)s',
2192            {
2193                "param_1": "something updated",
2194                "param_2": "something else",
2195                "FancyName": "something new",
2196            },
2197        )
2198
2199
2200class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL):
2201
2202    """Test 'DISTINCT' with SQL expression language and orm.Query with
2203    an emphasis on PG's 'DISTINCT ON' syntax.
2204
2205    """
2206
2207    __dialect__ = postgresql.dialect()
2208
2209    def setup(self):
2210        self.table = Table(
2211            "t",
2212            MetaData(),
2213            Column("id", Integer, primary_key=True),
2214            Column("a", String),
2215            Column("b", String),
2216        )
2217
2218    def test_plain_generative(self):
2219        self.assert_compile(
2220            select([self.table]).distinct(),
2221            "SELECT DISTINCT t.id, t.a, t.b FROM t",
2222        )
2223
2224    def test_on_columns_generative(self):
2225        self.assert_compile(
2226            select([self.table]).distinct(self.table.c.a),
2227            "SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t",
2228        )
2229
2230    def test_on_columns_generative_multi_call(self):
2231        self.assert_compile(
2232            select([self.table])
2233            .distinct(self.table.c.a)
2234            .distinct(self.table.c.b),
2235            "SELECT DISTINCT ON (t.a, t.b) t.id, t.a, t.b FROM t",
2236        )
2237
2238    def test_plain_inline(self):
2239        self.assert_compile(
2240            select([self.table], distinct=True),
2241            "SELECT DISTINCT t.id, t.a, t.b FROM t",
2242        )
2243
2244    def test_on_columns_inline_list(self):
2245        self.assert_compile(
2246            select(
2247                [self.table], distinct=[self.table.c.a, self.table.c.b]
2248            ).order_by(self.table.c.a, self.table.c.b),
2249            "SELECT DISTINCT ON (t.a, t.b) t.id, "
2250            "t.a, t.b FROM t ORDER BY t.a, t.b",
2251        )
2252
2253    def test_on_columns_inline_scalar(self):
2254        self.assert_compile(
2255            select([self.table], distinct=self.table.c.a),
2256            "SELECT DISTINCT ON (t.a) t.id, t.a, t.b FROM t",
2257        )
2258
2259    def test_literal_binds(self):
2260        self.assert_compile(
2261            select([self.table]).distinct(self.table.c.a == 10),
2262            "SELECT DISTINCT ON (t.a = 10) t.id, t.a, t.b FROM t",
2263            literal_binds=True,
2264        )
2265
2266    def test_query_plain(self):
2267        sess = Session()
2268        self.assert_compile(
2269            sess.query(self.table).distinct(),
2270            "SELECT DISTINCT t.id AS t_id, t.a AS t_a, " "t.b AS t_b FROM t",
2271        )
2272
2273    def test_query_on_columns(self):
2274        sess = Session()
2275        self.assert_compile(
2276            sess.query(self.table).distinct(self.table.c.a),
2277            "SELECT DISTINCT ON (t.a) t.id AS t_id, t.a AS t_a, "
2278            "t.b AS t_b FROM t",
2279        )
2280
2281    def test_query_on_columns_multi_call(self):
2282        sess = Session()
2283        self.assert_compile(
2284            sess.query(self.table)
2285            .distinct(self.table.c.a)
2286            .distinct(self.table.c.b),
2287            "SELECT DISTINCT ON (t.a, t.b) t.id AS t_id, t.a AS t_a, "
2288            "t.b AS t_b FROM t",
2289        )
2290
2291    def test_query_on_columns_subquery(self):
2292        sess = Session()
2293
2294        class Foo(object):
2295            pass
2296
2297        mapper(Foo, self.table)
2298        sess = Session()
2299        self.assert_compile(
2300            sess.query(Foo).from_self().distinct(Foo.a, Foo.b),
2301            "SELECT DISTINCT ON (anon_1.t_a, anon_1.t_b) anon_1.t_id "
2302            "AS anon_1_t_id, anon_1.t_a AS anon_1_t_a, anon_1.t_b "
2303            "AS anon_1_t_b FROM (SELECT t.id AS t_id, t.a AS t_a, "
2304            "t.b AS t_b FROM t) AS anon_1",
2305        )
2306
2307    def test_query_distinct_on_aliased(self):
2308        class Foo(object):
2309            pass
2310
2311        mapper(Foo, self.table)
2312        a1 = aliased(Foo)
2313        sess = Session()
2314        self.assert_compile(
2315            sess.query(a1).distinct(a1.a),
2316            "SELECT DISTINCT ON (t_1.a) t_1.id AS t_1_id, "
2317            "t_1.a AS t_1_a, t_1.b AS t_1_b FROM t AS t_1",
2318        )
2319
2320    def test_distinct_on_subquery_anon(self):
2321
2322        sq = select([self.table]).alias()
2323        q = (
2324            select([self.table.c.id, sq.c.id])
2325            .distinct(sq.c.id)
2326            .where(self.table.c.id == sq.c.id)
2327        )
2328
2329        self.assert_compile(
2330            q,
2331            "SELECT DISTINCT ON (anon_1.id) t.id, anon_1.id "
2332            "FROM t, (SELECT t.id AS id, t.a AS a, t.b "
2333            "AS b FROM t) AS anon_1 WHERE t.id = anon_1.id",
2334        )
2335
2336    def test_distinct_on_subquery_named(self):
2337        sq = select([self.table]).alias("sq")
2338        q = (
2339            select([self.table.c.id, sq.c.id])
2340            .distinct(sq.c.id)
2341            .where(self.table.c.id == sq.c.id)
2342        )
2343        self.assert_compile(
2344            q,
2345            "SELECT DISTINCT ON (sq.id) t.id, sq.id "
2346            "FROM t, (SELECT t.id AS id, t.a AS a, "
2347            "t.b AS b FROM t) AS sq WHERE t.id = sq.id",
2348        )
2349
2350
2351class FullTextSearchTest(fixtures.TestBase, AssertsCompiledSQL):
2352
2353    """Tests for full text searching"""
2354
2355    __dialect__ = postgresql.dialect()
2356
2357    def setup(self):
2358        self.table = Table(
2359            "t",
2360            MetaData(),
2361            Column("id", Integer, primary_key=True),
2362            Column("title", String),
2363            Column("body", String),
2364        )
2365        self.table_alt = table(
2366            "mytable",
2367            column("id", Integer),
2368            column("title", String(128)),
2369            column("body", String(128)),
2370        )
2371
2372    def _raise_query(self, q):
2373        """
2374        useful for debugging. just do...
2375        self._raise_query(q)
2376        """
2377        c = q.compile(dialect=postgresql.dialect())
2378        raise ValueError(c)
2379
2380    def test_match_basic(self):
2381        s = select([self.table_alt.c.id]).where(
2382            self.table_alt.c.title.match("somestring")
2383        )
2384        self.assert_compile(
2385            s,
2386            "SELECT mytable.id "
2387            "FROM mytable "
2388            "WHERE mytable.title @@ to_tsquery(%(title_1)s)",
2389        )
2390
2391    def test_match_regconfig(self):
2392        s = select([self.table_alt.c.id]).where(
2393            self.table_alt.c.title.match(
2394                "somestring", postgresql_regconfig="english"
2395            )
2396        )
2397        self.assert_compile(
2398            s,
2399            "SELECT mytable.id "
2400            "FROM mytable "
2401            """WHERE mytable.title @@ to_tsquery('english', %(title_1)s)""",
2402        )
2403
2404    def test_match_tsvector(self):
2405        s = select([self.table_alt.c.id]).where(
2406            func.to_tsvector(self.table_alt.c.title).match("somestring")
2407        )
2408        self.assert_compile(
2409            s,
2410            "SELECT mytable.id "
2411            "FROM mytable "
2412            "WHERE to_tsvector(mytable.title) "
2413            "@@ to_tsquery(%(to_tsvector_1)s)",
2414        )
2415
2416    def test_match_tsvectorconfig(self):
2417        s = select([self.table_alt.c.id]).where(
2418            func.to_tsvector("english", self.table_alt.c.title).match(
2419                "somestring"
2420            )
2421        )
2422        self.assert_compile(
2423            s,
2424            "SELECT mytable.id "
2425            "FROM mytable "
2426            "WHERE to_tsvector(%(to_tsvector_1)s, mytable.title) @@ "
2427            "to_tsquery(%(to_tsvector_2)s)",
2428        )
2429
2430    def test_match_tsvectorconfig_regconfig(self):
2431        s = select([self.table_alt.c.id]).where(
2432            func.to_tsvector("english", self.table_alt.c.title).match(
2433                "somestring", postgresql_regconfig="english"
2434            )
2435        )
2436        self.assert_compile(
2437            s,
2438            "SELECT mytable.id "
2439            "FROM mytable "
2440            "WHERE to_tsvector(%(to_tsvector_1)s, mytable.title) @@ "
2441            """to_tsquery('english', %(to_tsvector_2)s)""",
2442        )
2443