1# coding: utf-8
2import contextlib
3import datetime
4import logging
5import logging.handlers
6
7from sqlalchemy import BigInteger
8from sqlalchemy import bindparam
9from sqlalchemy import cast
10from sqlalchemy import Column
11from sqlalchemy import DateTime
12from sqlalchemy import dialects
13from sqlalchemy import event
14from sqlalchemy import exc
15from sqlalchemy import extract
16from sqlalchemy import func
17from sqlalchemy import Integer
18from sqlalchemy import literal
19from sqlalchemy import literal_column
20from sqlalchemy import MetaData
21from sqlalchemy import Numeric
22from sqlalchemy import schema
23from sqlalchemy import select
24from sqlalchemy import Sequence
25from sqlalchemy import SmallInteger
26from sqlalchemy import String
27from sqlalchemy import Table
28from sqlalchemy import testing
29from sqlalchemy import text
30from sqlalchemy import TypeDecorator
31from sqlalchemy.dialects.postgresql import base as postgresql
32from sqlalchemy.dialects.postgresql import psycopg2 as psycopg2_dialect
33from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_BATCH
34from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_DEFAULT
35from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_VALUES
36from sqlalchemy.engine import engine_from_config
37from sqlalchemy.engine import url
38from sqlalchemy.testing import engines
39from sqlalchemy.testing import expect_deprecated
40from sqlalchemy.testing import fixtures
41from sqlalchemy.testing import is_
42from sqlalchemy.testing import mock
43from sqlalchemy.testing.assertions import assert_raises
44from sqlalchemy.testing.assertions import assert_raises_message
45from sqlalchemy.testing.assertions import AssertsCompiledSQL
46from sqlalchemy.testing.assertions import AssertsExecutionResults
47from sqlalchemy.testing.assertions import eq_
48from sqlalchemy.testing.assertions import eq_regex
49from sqlalchemy.testing.assertions import ne_
50from ...engine import test_execute
51
52
53class DialectTest(fixtures.TestBase):
54    """python-side dialect tests.  """
55
56    def test_version_parsing(self):
57        def mock_conn(res):
58            return mock.Mock(
59                execute=mock.Mock(
60                    return_value=mock.Mock(scalar=mock.Mock(return_value=res))
61                )
62            )
63
64        dialect = postgresql.dialect()
65        for string, version in [
66            (
67                "PostgreSQL 8.3.8 on i686-redhat-linux-gnu, compiled by "
68                "GCC gcc (GCC) 4.1.2 20070925 (Red Hat 4.1.2-33)",
69                (8, 3, 8),
70            ),
71            (
72                "PostgreSQL 8.5devel on x86_64-unknown-linux-gnu, "
73                "compiled by GCC gcc (GCC) 4.4.2, 64-bit",
74                (8, 5),
75            ),
76            (
77                "EnterpriseDB 9.1.2.2 on x86_64-unknown-linux-gnu, "
78                "compiled by gcc (GCC) 4.1.2 20080704 (Red Hat 4.1.2-50), "
79                "64-bit",
80                (9, 1, 2),
81            ),
82            (
83                "[PostgreSQL 9.2.4 ] VMware vFabric Postgres 9.2.4.0 "
84                "release build 1080137",
85                (9, 2, 4),
86            ),
87            (
88                "PostgreSQL 10devel on x86_64-pc-linux-gnu"
89                "compiled by gcc (GCC) 6.3.1 20170306, 64-bit",
90                (10,),
91            ),
92            (
93                "PostgreSQL 10beta1 on x86_64-pc-linux-gnu, "
94                "compiled by gcc (GCC) 4.8.5 20150623 "
95                "(Red Hat 4.8.5-11), 64-bit",
96                (10,),
97            ),
98        ]:
99            eq_(dialect._get_server_version_info(mock_conn(string)), version)
100
101    def test_deprecated_dialect_name_still_loads(self):
102        dialects.registry.clear()
103        with expect_deprecated(
104            "The 'postgres' dialect name " "has been renamed to 'postgresql'"
105        ):
106            dialect = url.URL("postgres").get_dialect()
107        is_(dialect, postgresql.dialect)
108
109    @testing.requires.psycopg2_compatibility
110    def test_pg_dialect_use_native_unicode_from_config(self):
111        config = {
112            "sqlalchemy.url": testing.db.url,
113            "sqlalchemy.use_native_unicode": "false",
114        }
115
116        e = engine_from_config(config, _initialize=False)
117        eq_(e.dialect.use_native_unicode, False)
118
119        config = {
120            "sqlalchemy.url": testing.db.url,
121            "sqlalchemy.use_native_unicode": "true",
122        }
123
124        e = engine_from_config(config, _initialize=False)
125        eq_(e.dialect.use_native_unicode, True)
126
127    def test_psycopg2_empty_connection_string(self):
128        dialect = psycopg2_dialect.dialect()
129        u = url.make_url("postgresql://")
130        cargs, cparams = dialect.create_connect_args(u)
131        eq_(cargs, [""])
132        eq_(cparams, {})
133
134    def test_psycopg2_nonempty_connection_string(self):
135        dialect = psycopg2_dialect.dialect()
136        u = url.make_url("postgresql://host")
137        cargs, cparams = dialect.create_connect_args(u)
138        eq_(cargs, [])
139        eq_(cparams, {"host": "host"})
140
141    def test_psycopg2_empty_connection_string_w_query_one(self):
142        dialect = psycopg2_dialect.dialect()
143        u = url.make_url("postgresql:///?service=swh-log")
144        cargs, cparams = dialect.create_connect_args(u)
145        eq_(cargs, [])
146        eq_(cparams, {"service": "swh-log"})
147
148    def test_psycopg2_empty_connection_string_w_query_two(self):
149        dialect = psycopg2_dialect.dialect()
150        u = url.make_url("postgresql:///?any_random_thing=yes")
151        cargs, cparams = dialect.create_connect_args(u)
152        eq_(cargs, [])
153        eq_(cparams, {"any_random_thing": "yes"})
154
155    def test_psycopg2_nonempty_connection_string_w_query(self):
156        dialect = psycopg2_dialect.dialect()
157        u = url.make_url("postgresql://somehost/?any_random_thing=yes")
158        cargs, cparams = dialect.create_connect_args(u)
159        eq_(cargs, [])
160        eq_(cparams, {"host": "somehost", "any_random_thing": "yes"})
161
162    def test_psycopg2_nonempty_connection_string_w_query_two(self):
163        dialect = psycopg2_dialect.dialect()
164        url_string = "postgresql://USER:PASS@/DB?host=hostA"
165        u = url.make_url(url_string)
166        cargs, cparams = dialect.create_connect_args(u)
167        eq_(cargs, [])
168        eq_(cparams["host"], "hostA")
169
170    def test_psycopg2_nonempty_connection_string_w_query_three(self):
171        dialect = psycopg2_dialect.dialect()
172        url_string = (
173            "postgresql://USER:PASS@/DB"
174            "?host=hostA:portA&host=hostB&host=hostC"
175        )
176        u = url.make_url(url_string)
177        cargs, cparams = dialect.create_connect_args(u)
178        eq_(cargs, [])
179        eq_(cparams["host"], "hostA:portA,hostB,hostC")
180
181
182class ExecuteManyMode(object):
183    __only_on__ = "postgresql+psycopg2"
184    __backend__ = True
185
186    run_create_tables = "each"
187
188    options = None
189
190    @classmethod
191    def define_tables(cls, metadata):
192        Table(
193            "data",
194            metadata,
195            Column("id", Integer, primary_key=True),
196            Column("x", String),
197            Column("y", String),
198            Column("z", Integer, server_default="5"),
199        )
200
201    @contextlib.contextmanager
202    def expect_deprecated_opts(self):
203        yield
204
205    def setup(self):
206        super(ExecuteManyMode, self).setup()
207        with self.expect_deprecated_opts():
208            self.engine = engines.testing_engine(options=self.options)
209
210    def teardown(self):
211        self.engine.dispose()
212        super(ExecuteManyMode, self).teardown()
213
214    def test_insert(self):
215        from psycopg2 import extras
216
217        if self.engine.dialect.executemany_mode is EXECUTEMANY_BATCH:
218            meth = extras.execute_batch
219            stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)"
220            expected_kwargs = {}
221        else:
222            meth = extras.execute_values
223            stmt = "INSERT INTO data (x, y) VALUES %s"
224            expected_kwargs = {"template": "(%(x)s, %(y)s)"}
225
226        with mock.patch.object(
227            extras, meth.__name__, side_effect=meth
228        ) as mock_exec:
229            with self.engine.connect() as conn:
230                conn.execute(
231                    self.tables.data.insert(),
232                    [
233                        {"x": "x1", "y": "y1"},
234                        {"x": "x2", "y": "y2"},
235                        {"x": "x3", "y": "y3"},
236                    ],
237                )
238
239                eq_(
240                    conn.execute(select([self.tables.data])).fetchall(),
241                    [
242                        (1, "x1", "y1", 5),
243                        (2, "x2", "y2", 5),
244                        (3, "x3", "y3", 5),
245                    ],
246                )
247        eq_(
248            mock_exec.mock_calls,
249            [
250                mock.call(
251                    mock.ANY,
252                    stmt,
253                    (
254                        {"x": "x1", "y": "y1"},
255                        {"x": "x2", "y": "y2"},
256                        {"x": "x3", "y": "y3"},
257                    ),
258                    **expected_kwargs
259                )
260            ],
261        )
262
263    def test_insert_no_page_size(self):
264        from psycopg2 import extras
265
266        eng = self.engine
267        if eng.dialect.executemany_mode is EXECUTEMANY_BATCH:
268            meth = extras.execute_batch
269            stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)"
270            expected_kwargs = {}
271        else:
272            meth = extras.execute_values
273            stmt = "INSERT INTO data (x, y) VALUES %s"
274            expected_kwargs = {"template": "(%(x)s, %(y)s)"}
275
276        with mock.patch.object(
277            extras, meth.__name__, side_effect=meth
278        ) as mock_exec:
279            with eng.connect() as conn:
280                conn.execute(
281                    self.tables.data.insert(),
282                    [
283                        {"x": "x1", "y": "y1"},
284                        {"x": "x2", "y": "y2"},
285                        {"x": "x3", "y": "y3"},
286                    ],
287                )
288
289        eq_(
290            mock_exec.mock_calls,
291            [
292                mock.call(
293                    mock.ANY,
294                    stmt,
295                    (
296                        {"x": "x1", "y": "y1"},
297                        {"x": "x2", "y": "y2"},
298                        {"x": "x3", "y": "y3"},
299                    ),
300                    **expected_kwargs
301                )
302            ],
303        )
304
305    def test_insert_page_size(self):
306        from psycopg2 import extras
307
308        opts = self.options.copy()
309        opts["executemany_batch_page_size"] = 500
310        opts["executemany_values_page_size"] = 1000
311
312        with self.expect_deprecated_opts():
313            eng = engines.testing_engine(options=opts)
314
315        if eng.dialect.executemany_mode is EXECUTEMANY_BATCH:
316            meth = extras.execute_batch
317            stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)"
318            expected_kwargs = {"page_size": 500}
319        else:
320            meth = extras.execute_values
321            stmt = "INSERT INTO data (x, y) VALUES %s"
322            expected_kwargs = {"page_size": 1000, "template": "(%(x)s, %(y)s)"}
323
324        with mock.patch.object(
325            extras, meth.__name__, side_effect=meth
326        ) as mock_exec:
327            with eng.connect() as conn:
328                conn.execute(
329                    self.tables.data.insert(),
330                    [
331                        {"x": "x1", "y": "y1"},
332                        {"x": "x2", "y": "y2"},
333                        {"x": "x3", "y": "y3"},
334                    ],
335                )
336
337        eq_(
338            mock_exec.mock_calls,
339            [
340                mock.call(
341                    mock.ANY,
342                    stmt,
343                    (
344                        {"x": "x1", "y": "y1"},
345                        {"x": "x2", "y": "y2"},
346                        {"x": "x3", "y": "y3"},
347                    ),
348                    **expected_kwargs
349                )
350            ],
351        )
352
353    def test_update_fallback(self):
354        from psycopg2 import extras
355
356        eng = self.engine
357        meth = extras.execute_batch
358        stmt = "UPDATE data SET y=%(yval)s WHERE data.x = %(xval)s"
359        expected_kwargs = {}
360
361        with mock.patch.object(
362            extras, meth.__name__, side_effect=meth
363        ) as mock_exec:
364            with eng.connect() as conn:
365                conn.execute(
366                    self.tables.data.update()
367                    .where(self.tables.data.c.x == bindparam("xval"))
368                    .values(y=bindparam("yval")),
369                    [
370                        {"xval": "x1", "yval": "y5"},
371                        {"xval": "x3", "yval": "y6"},
372                    ],
373                )
374
375        eq_(
376            mock_exec.mock_calls,
377            [
378                mock.call(
379                    mock.ANY,
380                    stmt,
381                    (
382                        {"xval": "x1", "yval": "y5"},
383                        {"xval": "x3", "yval": "y6"},
384                    ),
385                    **expected_kwargs
386                )
387            ],
388        )
389
390    def test_not_sane_rowcount(self):
391        self.engine.connect().close()
392        assert not self.engine.dialect.supports_sane_multi_rowcount
393
394    def test_update(self):
395        with self.engine.connect() as conn:
396            conn.execute(
397                self.tables.data.insert(),
398                [
399                    {"x": "x1", "y": "y1"},
400                    {"x": "x2", "y": "y2"},
401                    {"x": "x3", "y": "y3"},
402                ],
403            )
404
405            conn.execute(
406                self.tables.data.update()
407                .where(self.tables.data.c.x == bindparam("xval"))
408                .values(y=bindparam("yval")),
409                [{"xval": "x1", "yval": "y5"}, {"xval": "x3", "yval": "y6"}],
410            )
411            eq_(
412                conn.execute(
413                    select([self.tables.data]).order_by(self.tables.data.c.id)
414                ).fetchall(),
415                [(1, "x1", "y5", 5), (2, "x2", "y2", 5), (3, "x3", "y6", 5)],
416            )
417
418
419class UseBatchModeTest(ExecuteManyMode, fixtures.TablesTest):
420    options = {"use_batch_mode": True}
421
422    def expect_deprecated_opts(self):
423        return expect_deprecated(
424            "The psycopg2 use_batch_mode flag is superseded by "
425            "executemany_mode='batch'"
426        )
427
428
429class ExecutemanyBatchModeTest(ExecuteManyMode, fixtures.TablesTest):
430    options = {"executemany_mode": "batch"}
431
432
433class ExecutemanyValuesInsertsTest(ExecuteManyMode, fixtures.TablesTest):
434    options = {"executemany_mode": "values"}
435
436    def test_insert_w_newlines(self):
437        from psycopg2 import extras
438
439        t = self.tables.data
440
441        ins = t.insert(inline=True).values(
442            id=bindparam("id"),
443            x=select([literal_column("5")]).select_from(self.tables.data),
444            y=bindparam("y"),
445            z=bindparam("z"),
446        )
447        # compiled SQL has a newline in it
448        eq_(
449            str(ins.compile(testing.db)),
450            "INSERT INTO data (id, x, y, z) VALUES (%(id)s, "
451            "(SELECT 5 \nFROM data), %(y)s, %(z)s)",
452        )
453        meth = extras.execute_values
454        with mock.patch.object(
455            extras, "execute_values", side_effect=meth
456        ) as mock_exec:
457
458            with self.engine.connect() as conn:
459                conn.execute(
460                    ins,
461                    [
462                        {"id": 1, "y": "y1", "z": 1},
463                        {"id": 2, "y": "y2", "z": 2},
464                        {"id": 3, "y": "y3", "z": 3},
465                    ],
466                )
467
468        eq_(
469            mock_exec.mock_calls,
470            [
471                mock.call(
472                    mock.ANY,
473                    "INSERT INTO data (id, x, y, z) VALUES %s",
474                    (
475                        {"id": 1, "y": "y1", "z": 1},
476                        {"id": 2, "y": "y2", "z": 2},
477                        {"id": 3, "y": "y3", "z": 3},
478                    ),
479                    template="(%(id)s, (SELECT 5 \nFROM data), %(y)s, %(z)s)",
480                )
481            ],
482        )
483
484    def test_insert_modified_by_event(self):
485        from psycopg2 import extras
486
487        t = self.tables.data
488
489        ins = t.insert(inline=True).values(
490            id=bindparam("id"),
491            x=select([literal_column("5")]).select_from(self.tables.data),
492            y=bindparam("y"),
493            z=bindparam("z"),
494        )
495        # compiled SQL has a newline in it
496        eq_(
497            str(ins.compile(testing.db)),
498            "INSERT INTO data (id, x, y, z) VALUES (%(id)s, "
499            "(SELECT 5 \nFROM data), %(y)s, %(z)s)",
500        )
501        meth = extras.execute_batch
502        with mock.patch.object(
503            extras, "execute_values"
504        ) as mock_values, mock.patch.object(
505            extras, "execute_batch", side_effect=meth
506        ) as mock_batch:
507
508            with self.engine.connect() as conn:
509
510                # create an event hook that will change the statement to
511                # something else, meaning the dialect has to detect that
512                # insert_single_values_expr is no longer useful
513                @event.listens_for(conn, "before_cursor_execute", retval=True)
514                def before_cursor_execute(
515                    conn, cursor, statement, parameters, context, executemany
516                ):
517                    statement = (
518                        "INSERT INTO data (id, y, z) VALUES "
519                        "(%(id)s, %(y)s, %(z)s)"
520                    )
521                    return statement, parameters
522
523                conn.execute(
524                    ins,
525                    [
526                        {"id": 1, "y": "y1", "z": 1},
527                        {"id": 2, "y": "y2", "z": 2},
528                        {"id": 3, "y": "y3", "z": 3},
529                    ],
530                )
531
532        eq_(mock_values.mock_calls, [])
533        eq_(
534            mock_batch.mock_calls,
535            [
536                mock.call(
537                    mock.ANY,
538                    "INSERT INTO data (id, y, z) VALUES "
539                    "(%(id)s, %(y)s, %(z)s)",
540                    (
541                        {"id": 1, "y": "y1", "z": 1},
542                        {"id": 2, "y": "y2", "z": 2},
543                        {"id": 3, "y": "y3", "z": 3},
544                    ),
545                )
546            ],
547        )
548
549
550class ExecutemanyFlagOptionsTest(fixtures.TablesTest):
551    __only_on__ = "postgresql+psycopg2"
552    __backend__ = True
553
554    def test_executemany_correct_flag_options(self):
555        for opt, expected in [
556            (None, EXECUTEMANY_DEFAULT),
557            ("batch", EXECUTEMANY_BATCH),
558            ("values", EXECUTEMANY_VALUES),
559        ]:
560            self.engine = engines.testing_engine(
561                options={"executemany_mode": opt}
562            )
563            is_(self.engine.dialect.executemany_mode, expected)
564
565    def test_executemany_wrong_flag_options(self):
566        for opt in [1, True, "batch_insert"]:
567            assert_raises_message(
568                exc.ArgumentError,
569                "Invalid value for 'executemany_mode': %r" % opt,
570                engines.testing_engine,
571                options={"executemany_mode": opt},
572            )
573
574
575class MiscBackendTest(
576    fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL
577):
578
579    __only_on__ = "postgresql"
580    __backend__ = True
581
582    @testing.provide_metadata
583    def test_date_reflection(self):
584        metadata = self.metadata
585        Table(
586            "pgdate",
587            metadata,
588            Column("date1", DateTime(timezone=True)),
589            Column("date2", DateTime(timezone=False)),
590        )
591        metadata.create_all()
592        m2 = MetaData(testing.db)
593        t2 = Table("pgdate", m2, autoload=True)
594        assert t2.c.date1.type.timezone is True
595        assert t2.c.date2.type.timezone is False
596
597    @testing.requires.psycopg2_compatibility
598    def test_psycopg2_version(self):
599        v = testing.db.dialect.psycopg2_version
600        assert testing.db.dialect.dbapi.__version__.startswith(
601            ".".join(str(x) for x in v)
602        )
603
604    @testing.requires.psycopg2_compatibility
605    def test_psycopg2_non_standard_err(self):
606        # under pypy the name here is psycopg2cffi
607        psycopg2 = testing.db.dialect.dbapi
608        TransactionRollbackError = __import__(
609            "%s.extensions" % psycopg2.__name__
610        ).extensions.TransactionRollbackError
611
612        exception = exc.DBAPIError.instance(
613            "some statement",
614            {},
615            TransactionRollbackError("foo"),
616            psycopg2.Error,
617        )
618        assert isinstance(exception, exc.OperationalError)
619
620    @testing.requires.no_coverage
621    @testing.requires.psycopg2_compatibility
622    def test_notice_logging(self):
623        log = logging.getLogger("sqlalchemy.dialects.postgresql")
624        buf = logging.handlers.BufferingHandler(100)
625        lev = log.level
626        log.addHandler(buf)
627        log.setLevel(logging.INFO)
628        try:
629            conn = testing.db.connect()
630            trans = conn.begin()
631            try:
632                conn.execute(
633                    """
634CREATE OR REPLACE FUNCTION note(message varchar) RETURNS integer AS $$
635BEGIN
636  RAISE NOTICE 'notice: %%', message;
637  RETURN NULL;
638END;
639$$ LANGUAGE plpgsql;
640"""
641                )
642                conn.execute("SELECT note('hi there')")
643                conn.execute("SELECT note('another note')")
644            finally:
645                trans.rollback()
646        finally:
647            log.removeHandler(buf)
648            log.setLevel(lev)
649        msgs = " ".join(b.msg for b in buf.buffer)
650        eq_regex(
651            msgs,
652            "NOTICE:  notice: hi there(\nCONTEXT: .*?)? "
653            "NOTICE:  notice: another note(\nCONTEXT: .*?)?",
654        )
655
656    @testing.requires.psycopg2_or_pg8000_compatibility
657    @engines.close_open_connections
658    def test_client_encoding(self):
659        c = testing.db.connect()
660        current_encoding = c.execute("show client_encoding").fetchone()[0]
661        c.close()
662
663        # attempt to use an encoding that's not
664        # already set
665        if current_encoding == "UTF8":
666            test_encoding = "LATIN1"
667        else:
668            test_encoding = "UTF8"
669
670        e = engines.testing_engine(options={"client_encoding": test_encoding})
671        c = e.connect()
672        new_encoding = c.execute("show client_encoding").fetchone()[0]
673        eq_(new_encoding, test_encoding)
674
675    @testing.requires.psycopg2_or_pg8000_compatibility
676    @engines.close_open_connections
677    def test_autocommit_isolation_level(self):
678        c = testing.db.connect().execution_options(
679            isolation_level="AUTOCOMMIT"
680        )
681        # If we're really in autocommit mode then we'll get an error saying
682        # that the prepared transaction doesn't exist. Otherwise, we'd
683        # get an error saying that the command can't be run within a
684        # transaction.
685        assert_raises_message(
686            exc.ProgrammingError,
687            'prepared transaction with identifier "gilberte" does not exist',
688            c.execute,
689            "commit prepared 'gilberte'",
690        )
691
692    @testing.fails_on(
693        "+zxjdbc",
694        "Can't infer the SQL type to use for an instance "
695        "of org.python.core.PyObjectDerived.",
696    )
697    def test_extract(self):
698        fivedaysago = testing.db.scalar(
699            select([func.now()])
700        ) - datetime.timedelta(days=5)
701        for field, exp in (
702            ("year", fivedaysago.year),
703            ("month", fivedaysago.month),
704            ("day", fivedaysago.day),
705        ):
706            r = testing.db.execute(
707                select(
708                    [extract(field, func.now() + datetime.timedelta(days=-5))]
709                )
710            ).scalar()
711            eq_(r, exp)
712
713    @testing.provide_metadata
714    def test_checksfor_sequence(self):
715        meta1 = self.metadata
716        seq = Sequence("fooseq")
717        t = Table("mytable", meta1, Column("col1", Integer, seq))
718        seq.drop()
719        testing.db.execute("CREATE SEQUENCE fooseq")
720        t.create(checkfirst=True)
721
722    @testing.provide_metadata
723    def test_schema_roundtrips(self):
724        meta = self.metadata
725        users = Table(
726            "users",
727            meta,
728            Column("id", Integer, primary_key=True),
729            Column("name", String(50)),
730            schema="test_schema",
731        )
732        users.create()
733        users.insert().execute(id=1, name="name1")
734        users.insert().execute(id=2, name="name2")
735        users.insert().execute(id=3, name="name3")
736        users.insert().execute(id=4, name="name4")
737        eq_(
738            users.select().where(users.c.name == "name2").execute().fetchall(),
739            [(2, "name2")],
740        )
741        eq_(
742            users.select(use_labels=True)
743            .where(users.c.name == "name2")
744            .execute()
745            .fetchall(),
746            [(2, "name2")],
747        )
748        users.delete().where(users.c.id == 3).execute()
749        eq_(
750            users.select().where(users.c.name == "name3").execute().fetchall(),
751            [],
752        )
753        users.update().where(users.c.name == "name4").execute(name="newname")
754        eq_(
755            users.select(use_labels=True)
756            .where(users.c.id == 4)
757            .execute()
758            .fetchall(),
759            [(4, "newname")],
760        )
761
762    def test_quoted_name_bindparam_ok(self):
763        from sqlalchemy.sql.elements import quoted_name
764
765        with testing.db.connect() as conn:
766            eq_(
767                conn.scalar(
768                    select(
769                        [
770                            cast(
771                                literal(quoted_name("some_name", False)),
772                                String,
773                            )
774                        ]
775                    )
776                ),
777                "some_name",
778            )
779
780    def test_preexecute_passivedefault(self):
781        """test that when we get a primary key column back from
782        reflecting a table which has a default value on it, we pre-
783        execute that DefaultClause upon insert."""
784
785        try:
786            meta = MetaData(testing.db)
787            testing.db.execute(
788                """
789             CREATE TABLE speedy_users
790             (
791                 speedy_user_id   SERIAL     PRIMARY KEY,
792
793                 user_name        VARCHAR    NOT NULL,
794                 user_password    VARCHAR    NOT NULL
795             );
796            """
797            )
798            t = Table("speedy_users", meta, autoload=True)
799            r = t.insert().execute(user_name="user", user_password="lala")
800            assert r.inserted_primary_key == [1]
801            result = t.select().execute().fetchall()
802            assert result == [(1, "user", "lala")]
803        finally:
804            testing.db.execute("drop table speedy_users")
805
806    @testing.fails_on("+zxjdbc", "psycopg2/pg8000 specific assertion")
807    @testing.requires.psycopg2_or_pg8000_compatibility
808    def test_numeric_raise(self):
809        stmt = text("select cast('hi' as char) as hi").columns(hi=Numeric)
810        assert_raises(exc.InvalidRequestError, testing.db.execute, stmt)
811
812    @testing.only_if(
813        "postgresql >= 8.2", "requires standard_conforming_strings"
814    )
815    def test_serial_integer(self):
816        class BITD(TypeDecorator):
817            impl = Integer
818
819            def load_dialect_impl(self, dialect):
820                if dialect.name == "postgresql":
821                    return BigInteger()
822                else:
823                    return Integer()
824
825        for version, type_, expected in [
826            (None, Integer, "SERIAL"),
827            (None, BigInteger, "BIGSERIAL"),
828            ((9, 1), SmallInteger, "SMALLINT"),
829            ((9, 2), SmallInteger, "SMALLSERIAL"),
830            (None, postgresql.INTEGER, "SERIAL"),
831            (None, postgresql.BIGINT, "BIGSERIAL"),
832            (
833                None,
834                Integer().with_variant(BigInteger(), "postgresql"),
835                "BIGSERIAL",
836            ),
837            (
838                None,
839                Integer().with_variant(postgresql.BIGINT, "postgresql"),
840                "BIGSERIAL",
841            ),
842            (
843                (9, 2),
844                Integer().with_variant(SmallInteger, "postgresql"),
845                "SMALLSERIAL",
846            ),
847            (None, BITD(), "BIGSERIAL"),
848        ]:
849            m = MetaData()
850
851            t = Table("t", m, Column("c", type_, primary_key=True))
852
853            if version:
854                dialect = postgresql.dialect()
855                dialect._get_server_version_info = mock.Mock(
856                    return_value=version
857                )
858                dialect.initialize(testing.db.connect())
859            else:
860                dialect = testing.db.dialect
861
862            ddl_compiler = dialect.ddl_compiler(dialect, schema.CreateTable(t))
863            eq_(
864                ddl_compiler.get_column_specification(t.c.c),
865                "c %s NOT NULL" % expected,
866            )
867
868    @testing.requires.psycopg2_compatibility
869    def test_initial_transaction_state(self):
870        from psycopg2.extensions import STATUS_IN_TRANSACTION
871
872        engine = engines.testing_engine()
873        with engine.connect() as conn:
874            ne_(conn.connection.status, STATUS_IN_TRANSACTION)
875
876
877class AutocommitTextTest(test_execute.AutocommitTextTest):
878    __only_on__ = "postgresql"
879
880    def test_grant(self):
881        self._test_keyword("GRANT USAGE ON SCHEMA fooschema TO foorole")
882
883    def test_import_foreign_schema(self):
884        self._test_keyword("IMPORT FOREIGN SCHEMA foob")
885
886    def test_refresh_view(self):
887        self._test_keyword("REFRESH MATERIALIZED VIEW fooview")
888
889    def test_revoke(self):
890        self._test_keyword("REVOKE USAGE ON SCHEMA fooschema FROM foorole")
891
892    def test_truncate(self):
893        self._test_keyword("TRUNCATE footable")
894