1# Copyright (C) 2016-present the asyncpg authors and contributors
2# <see AUTHORS file>
3#
4# This module is part of asyncpg and is released under
5# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
6
7
8import asyncio
9import asyncpg
10import collections
11import collections.abc
12import functools
13import itertools
14import inspect
15import os
16import sys
17import time
18import traceback
19import typing
20import warnings
21import weakref
22
23from . import compat
24from . import connect_utils
25from . import cursor
26from . import exceptions
27from . import introspection
28from . import prepared_stmt
29from . import protocol
30from . import serverversion
31from . import transaction
32from . import utils
33
34
35class ConnectionMeta(type):
36
37    def __instancecheck__(cls, instance):
38        mro = type(instance).__mro__
39        return Connection in mro or _ConnectionProxy in mro
40
41
42class Connection(metaclass=ConnectionMeta):
43    """A representation of a database session.
44
45    Connections are created by calling :func:`~asyncpg.connection.connect`.
46    """
47
48    __slots__ = ('_protocol', '_transport', '_loop',
49                 '_top_xact', '_aborted',
50                 '_pool_release_ctr', '_stmt_cache', '_stmts_to_close',
51                 '_listeners', '_server_version', '_server_caps',
52                 '_intro_query', '_reset_query', '_proxy',
53                 '_stmt_exclusive_section', '_config', '_params', '_addr',
54                 '_log_listeners', '_termination_listeners', '_cancellations',
55                 '_source_traceback', '__weakref__')
56
57    def __init__(self, protocol, transport, loop,
58                 addr,
59                 config: connect_utils._ClientConfiguration,
60                 params: connect_utils._ConnectionParameters):
61        self._protocol = protocol
62        self._transport = transport
63        self._loop = loop
64        self._top_xact = None
65        self._aborted = False
66        # Incremented every time the connection is released back to a pool.
67        # Used to catch invalid references to connection-related resources
68        # post-release (e.g. explicit prepared statements).
69        self._pool_release_ctr = 0
70
71        self._addr = addr
72        self._config = config
73        self._params = params
74
75        self._stmt_cache = _StatementCache(
76            loop=loop,
77            max_size=config.statement_cache_size,
78            on_remove=functools.partial(
79                _weak_maybe_gc_stmt, weakref.ref(self)),
80            max_lifetime=config.max_cached_statement_lifetime)
81
82        self._stmts_to_close = set()
83
84        self._listeners = {}
85        self._log_listeners = set()
86        self._cancellations = set()
87        self._termination_listeners = set()
88
89        settings = self._protocol.get_settings()
90        ver_string = settings.server_version
91        self._server_version = \
92            serverversion.split_server_version_string(ver_string)
93
94        self._server_caps = _detect_server_capabilities(
95            self._server_version, settings)
96
97        if self._server_version < (14, 0):
98            self._intro_query = introspection.INTRO_LOOKUP_TYPES_13
99        else:
100            self._intro_query = introspection.INTRO_LOOKUP_TYPES
101
102        self._reset_query = None
103        self._proxy = None
104
105        # Used to serialize operations that might involve anonymous
106        # statements.  Specifically, we want to make the following
107        # operation atomic:
108        #    ("prepare an anonymous statement", "use the statement")
109        #
110        # Used for `con.fetchval()`, `con.fetch()`, `con.fetchrow()`,
111        # `con.execute()`, and `con.executemany()`.
112        self._stmt_exclusive_section = _Atomic()
113
114        if loop.get_debug():
115            self._source_traceback = _extract_stack()
116        else:
117            self._source_traceback = None
118
119    def __del__(self):
120        if not self.is_closed() and self._protocol is not None:
121            if self._source_traceback:
122                msg = "unclosed connection {!r}; created at:\n {}".format(
123                    self, self._source_traceback)
124            else:
125                msg = (
126                    "unclosed connection {!r}; run in asyncio debug "
127                    "mode to show the traceback of connection "
128                    "origin".format(self)
129                )
130
131            warnings.warn(msg, ResourceWarning)
132            if not self._loop.is_closed():
133                self.terminate()
134
135    async def add_listener(self, channel, callback):
136        """Add a listener for Postgres notifications.
137
138        :param str channel: Channel to listen on.
139
140        :param callable callback:
141            A callable or a coroutine function receiving the following
142            arguments:
143            **connection**: a Connection the callback is registered with;
144            **pid**: PID of the Postgres server that sent the notification;
145            **channel**: name of the channel the notification was sent to;
146            **payload**: the payload.
147
148        .. versionchanged:: 0.24.0
149            The ``callback`` argument may be a coroutine function.
150        """
151        self._check_open()
152        if channel not in self._listeners:
153            await self.fetch('LISTEN {}'.format(utils._quote_ident(channel)))
154            self._listeners[channel] = set()
155        self._listeners[channel].add(_Callback.from_callable(callback))
156
157    async def remove_listener(self, channel, callback):
158        """Remove a listening callback on the specified channel."""
159        if self.is_closed():
160            return
161        if channel not in self._listeners:
162            return
163        cb = _Callback.from_callable(callback)
164        if cb not in self._listeners[channel]:
165            return
166        self._listeners[channel].remove(cb)
167        if not self._listeners[channel]:
168            del self._listeners[channel]
169            await self.fetch('UNLISTEN {}'.format(utils._quote_ident(channel)))
170
171    def add_log_listener(self, callback):
172        """Add a listener for Postgres log messages.
173
174        It will be called when asyncronous NoticeResponse is received
175        from the connection.  Possible message types are: WARNING, NOTICE,
176        DEBUG, INFO, or LOG.
177
178        :param callable callback:
179            A callable or a coroutine function receiving the following
180            arguments:
181            **connection**: a Connection the callback is registered with;
182            **message**: the `exceptions.PostgresLogMessage` message.
183
184        .. versionadded:: 0.12.0
185
186        .. versionchanged:: 0.24.0
187            The ``callback`` argument may be a coroutine function.
188        """
189        if self.is_closed():
190            raise exceptions.InterfaceError('connection is closed')
191        self._log_listeners.add(_Callback.from_callable(callback))
192
193    def remove_log_listener(self, callback):
194        """Remove a listening callback for log messages.
195
196        .. versionadded:: 0.12.0
197        """
198        self._log_listeners.discard(_Callback.from_callable(callback))
199
200    def add_termination_listener(self, callback):
201        """Add a listener that will be called when the connection is closed.
202
203        :param callable callback:
204            A callable or a coroutine function receiving one argument:
205            **connection**: a Connection the callback is registered with.
206
207        .. versionadded:: 0.21.0
208
209        .. versionchanged:: 0.24.0
210            The ``callback`` argument may be a coroutine function.
211        """
212        self._termination_listeners.add(_Callback.from_callable(callback))
213
214    def remove_termination_listener(self, callback):
215        """Remove a listening callback for connection termination.
216
217        :param callable callback:
218            The callable or coroutine function that was passed to
219            :meth:`Connection.add_termination_listener`.
220
221        .. versionadded:: 0.21.0
222        """
223        self._termination_listeners.discard(_Callback.from_callable(callback))
224
225    def get_server_pid(self):
226        """Return the PID of the Postgres server the connection is bound to."""
227        return self._protocol.get_server_pid()
228
229    def get_server_version(self):
230        """Return the version of the connected PostgreSQL server.
231
232        The returned value is a named tuple similar to that in
233        ``sys.version_info``:
234
235        .. code-block:: pycon
236
237            >>> con.get_server_version()
238            ServerVersion(major=9, minor=6, micro=1,
239                          releaselevel='final', serial=0)
240
241        .. versionadded:: 0.8.0
242        """
243        return self._server_version
244
245    def get_settings(self):
246        """Return connection settings.
247
248        :return: :class:`~asyncpg.ConnectionSettings`.
249        """
250        return self._protocol.get_settings()
251
252    def transaction(self, *, isolation=None, readonly=False,
253                    deferrable=False):
254        """Create a :class:`~transaction.Transaction` object.
255
256        Refer to `PostgreSQL documentation`_ on the meaning of transaction
257        parameters.
258
259        :param isolation: Transaction isolation mode, can be one of:
260                          `'serializable'`, `'repeatable_read'`,
261                          `'read_committed'`. If not specified, the behavior
262                          is up to the server and session, which is usually
263                          ``read_committed``.
264
265        :param readonly: Specifies whether or not this transaction is
266                         read-only.
267
268        :param deferrable: Specifies whether or not this transaction is
269                           deferrable.
270
271        .. _`PostgreSQL documentation`:
272                https://www.postgresql.org/docs/
273                current/static/sql-set-transaction.html
274        """
275        self._check_open()
276        return transaction.Transaction(self, isolation, readonly, deferrable)
277
278    def is_in_transaction(self):
279        """Return True if Connection is currently inside a transaction.
280
281        :return bool: True if inside transaction, False otherwise.
282
283        .. versionadded:: 0.16.0
284        """
285        return self._protocol.is_in_transaction()
286
287    async def execute(self, query: str, *args, timeout: float=None) -> str:
288        """Execute an SQL command (or commands).
289
290        This method can execute many SQL commands at once, when no arguments
291        are provided.
292
293        Example:
294
295        .. code-block:: pycon
296
297            >>> await con.execute('''
298            ...     CREATE TABLE mytab (a int);
299            ...     INSERT INTO mytab (a) VALUES (100), (200), (300);
300            ... ''')
301            INSERT 0 3
302
303            >>> await con.execute('''
304            ...     INSERT INTO mytab (a) VALUES ($1), ($2)
305            ... ''', 10, 20)
306            INSERT 0 2
307
308        :param args: Query arguments.
309        :param float timeout: Optional timeout value in seconds.
310        :return str: Status of the last SQL command.
311
312        .. versionchanged:: 0.5.4
313           Made it possible to pass query arguments.
314        """
315        self._check_open()
316
317        if not args:
318            return await self._protocol.query(query, timeout)
319
320        _, status, _ = await self._execute(
321            query,
322            args,
323            0,
324            timeout,
325            return_status=True,
326        )
327        return status.decode()
328
329    async def executemany(self, command: str, args, *, timeout: float=None):
330        """Execute an SQL *command* for each sequence of arguments in *args*.
331
332        Example:
333
334        .. code-block:: pycon
335
336            >>> await con.executemany('''
337            ...     INSERT INTO mytab (a) VALUES ($1, $2, $3);
338            ... ''', [(1, 2, 3), (4, 5, 6)])
339
340        :param command: Command to execute.
341        :param args: An iterable containing sequences of arguments.
342        :param float timeout: Optional timeout value in seconds.
343        :return None: This method discards the results of the operations.
344
345        .. versionadded:: 0.7.0
346
347        .. versionchanged:: 0.11.0
348           `timeout` became a keyword-only parameter.
349
350        .. versionchanged:: 0.22.0
351           ``executemany()`` is now an atomic operation, which means that
352           either all executions succeed, or none at all.  This is in contrast
353           to prior versions, where the effect of already-processed iterations
354           would remain in place when an error has occurred, unless
355           ``executemany()`` was called in a transaction.
356        """
357        self._check_open()
358        return await self._executemany(command, args, timeout)
359
360    async def _get_statement(
361        self,
362        query,
363        timeout,
364        *,
365        named=False,
366        use_cache=True,
367        ignore_custom_codec=False,
368        record_class=None
369    ):
370        if record_class is None:
371            record_class = self._protocol.get_record_class()
372        else:
373            _check_record_class(record_class)
374
375        if use_cache:
376            statement = self._stmt_cache.get(
377                (query, record_class, ignore_custom_codec)
378            )
379            if statement is not None:
380                return statement
381
382            # Only use the cache when:
383            #  * `statement_cache_size` is greater than 0;
384            #  * query size is less than `max_cacheable_statement_size`.
385            use_cache = self._stmt_cache.get_max_size() > 0
386            if (use_cache and
387                    self._config.max_cacheable_statement_size and
388                    len(query) > self._config.max_cacheable_statement_size):
389                use_cache = False
390
391        if isinstance(named, str):
392            stmt_name = named
393        elif use_cache or named:
394            stmt_name = self._get_unique_id('stmt')
395        else:
396            stmt_name = ''
397
398        statement = await self._protocol.prepare(
399            stmt_name,
400            query,
401            timeout,
402            record_class=record_class,
403            ignore_custom_codec=ignore_custom_codec,
404        )
405        need_reprepare = False
406        types_with_missing_codecs = statement._init_types()
407        tries = 0
408        while types_with_missing_codecs:
409            settings = self._protocol.get_settings()
410
411            # Introspect newly seen types and populate the
412            # codec cache.
413            types, intro_stmt = await self._introspect_types(
414                types_with_missing_codecs, timeout)
415
416            settings.register_data_types(types)
417
418            # The introspection query has used an anonymous statement,
419            # which has blown away the anonymous statement we've prepared
420            # for the query, so we need to re-prepare it.
421            need_reprepare = not intro_stmt.name and not statement.name
422            types_with_missing_codecs = statement._init_types()
423            tries += 1
424            if tries > 5:
425                # In the vast majority of cases there will be only
426                # one iteration.  In rare cases, there might be a race
427                # with reload_schema_state(), which would cause a
428                # second try.  More than five is clearly a bug.
429                raise exceptions.InternalClientError(
430                    'could not resolve query result and/or argument types '
431                    'in {} attempts'.format(tries)
432                )
433
434        # Now that types have been resolved, populate the codec pipeline
435        # for the statement.
436        statement._init_codecs()
437
438        if need_reprepare:
439            await self._protocol.prepare(
440                stmt_name,
441                query,
442                timeout,
443                state=statement,
444                record_class=record_class,
445            )
446
447        if use_cache:
448            self._stmt_cache.put(
449                (query, record_class, ignore_custom_codec), statement)
450
451        # If we've just created a new statement object, check if there
452        # are any statements for GC.
453        if self._stmts_to_close:
454            await self._cleanup_stmts()
455
456        return statement
457
458    async def _introspect_types(self, typeoids, timeout):
459        return await self.__execute(
460            self._intro_query,
461            (list(typeoids),),
462            0,
463            timeout,
464            ignore_custom_codec=True,
465        )
466
467    async def _introspect_type(self, typename, schema):
468        if (
469            schema == 'pg_catalog'
470            and typename.lower() in protocol.BUILTIN_TYPE_NAME_MAP
471        ):
472            typeoid = protocol.BUILTIN_TYPE_NAME_MAP[typename.lower()]
473            rows = await self._execute(
474                introspection.TYPE_BY_OID,
475                [typeoid],
476                limit=0,
477                timeout=None,
478                ignore_custom_codec=True,
479            )
480        else:
481            rows = await self._execute(
482                introspection.TYPE_BY_NAME,
483                [typename, schema],
484                limit=1,
485                timeout=None,
486                ignore_custom_codec=True,
487            )
488
489        if not rows:
490            raise ValueError(
491                'unknown type: {}.{}'.format(schema, typename))
492
493        return rows[0]
494
495    def cursor(
496        self,
497        query,
498        *args,
499        prefetch=None,
500        timeout=None,
501        record_class=None
502    ):
503        """Return a *cursor factory* for the specified query.
504
505        :param args:
506            Query arguments.
507        :param int prefetch:
508            The number of rows the *cursor iterator*
509            will prefetch (defaults to ``50``.)
510        :param float timeout:
511            Optional timeout in seconds.
512        :param type record_class:
513            If specified, the class to use for records returned by this cursor.
514            Must be a subclass of :class:`~asyncpg.Record`.  If not specified,
515            a per-connection *record_class* is used.
516
517        :return:
518            A :class:`~cursor.CursorFactory` object.
519
520        .. versionchanged:: 0.22.0
521            Added the *record_class* parameter.
522        """
523        self._check_open()
524        return cursor.CursorFactory(
525            self,
526            query,
527            None,
528            args,
529            prefetch,
530            timeout,
531            record_class,
532        )
533
534    async def prepare(
535        self,
536        query,
537        *,
538        name=None,
539        timeout=None,
540        record_class=None,
541    ):
542        """Create a *prepared statement* for the specified query.
543
544        :param str query:
545            Text of the query to create a prepared statement for.
546        :param str name:
547            Optional name of the returned prepared statement.  If not
548            specified, the name is auto-generated.
549        :param float timeout:
550            Optional timeout value in seconds.
551        :param type record_class:
552            If specified, the class to use for records returned by the
553            prepared statement.  Must be a subclass of
554            :class:`~asyncpg.Record`.  If not specified, a per-connection
555            *record_class* is used.
556
557        :return:
558            A :class:`~prepared_stmt.PreparedStatement` instance.
559
560        .. versionchanged:: 0.22.0
561            Added the *record_class* parameter.
562
563        .. versionchanged:: 0.25.0
564            Added the *name* parameter.
565        """
566        return await self._prepare(
567            query,
568            name=name,
569            timeout=timeout,
570            use_cache=False,
571            record_class=record_class,
572        )
573
574    async def _prepare(
575        self,
576        query,
577        *,
578        name=None,
579        timeout=None,
580        use_cache: bool=False,
581        record_class=None
582    ):
583        self._check_open()
584        stmt = await self._get_statement(
585            query,
586            timeout,
587            named=True if name is None else name,
588            use_cache=use_cache,
589            record_class=record_class,
590        )
591        return prepared_stmt.PreparedStatement(self, query, stmt)
592
593    async def fetch(
594        self,
595        query,
596        *args,
597        timeout=None,
598        record_class=None
599    ) -> list:
600        """Run a query and return the results as a list of :class:`Record`.
601
602        :param str query:
603            Query text.
604        :param args:
605            Query arguments.
606        :param float timeout:
607            Optional timeout value in seconds.
608        :param type record_class:
609            If specified, the class to use for records returned by this method.
610            Must be a subclass of :class:`~asyncpg.Record`.  If not specified,
611            a per-connection *record_class* is used.
612
613        :return list:
614            A list of :class:`~asyncpg.Record` instances.  If specified, the
615            actual type of list elements would be *record_class*.
616
617        .. versionchanged:: 0.22.0
618            Added the *record_class* parameter.
619        """
620        self._check_open()
621        return await self._execute(
622            query,
623            args,
624            0,
625            timeout,
626            record_class=record_class,
627        )
628
629    async def fetchval(self, query, *args, column=0, timeout=None):
630        """Run a query and return a value in the first row.
631
632        :param str query: Query text.
633        :param args: Query arguments.
634        :param int column: Numeric index within the record of the value to
635                           return (defaults to 0).
636        :param float timeout: Optional timeout value in seconds.
637                            If not specified, defaults to the value of
638                            ``command_timeout`` argument to the ``Connection``
639                            instance constructor.
640
641        :return: The value of the specified column of the first record, or
642                 None if no records were returned by the query.
643        """
644        self._check_open()
645        data = await self._execute(query, args, 1, timeout)
646        if not data:
647            return None
648        return data[0][column]
649
650    async def fetchrow(
651        self,
652        query,
653        *args,
654        timeout=None,
655        record_class=None
656    ):
657        """Run a query and return the first row.
658
659        :param str query:
660            Query text
661        :param args:
662            Query arguments
663        :param float timeout:
664            Optional timeout value in seconds.
665        :param type record_class:
666            If specified, the class to use for the value returned by this
667            method.  Must be a subclass of :class:`~asyncpg.Record`.
668            If not specified, a per-connection *record_class* is used.
669
670        :return:
671            The first row as a :class:`~asyncpg.Record` instance, or None if
672            no records were returned by the query.  If specified,
673            *record_class* is used as the type for the result value.
674
675        .. versionchanged:: 0.22.0
676            Added the *record_class* parameter.
677        """
678        self._check_open()
679        data = await self._execute(
680            query,
681            args,
682            1,
683            timeout,
684            record_class=record_class,
685        )
686        if not data:
687            return None
688        return data[0]
689
690    async def copy_from_table(self, table_name, *, output,
691                              columns=None, schema_name=None, timeout=None,
692                              format=None, oids=None, delimiter=None,
693                              null=None, header=None, quote=None,
694                              escape=None, force_quote=None, encoding=None):
695        """Copy table contents to a file or file-like object.
696
697        :param str table_name:
698            The name of the table to copy data from.
699
700        :param output:
701            A :term:`path-like object <python:path-like object>`,
702            or a :term:`file-like object <python:file-like object>`, or
703            a :term:`coroutine function <python:coroutine function>`
704            that takes a ``bytes`` instance as a sole argument.
705
706        :param list columns:
707            An optional list of column names to copy.
708
709        :param str schema_name:
710            An optional schema name to qualify the table.
711
712        :param float timeout:
713            Optional timeout value in seconds.
714
715        The remaining keyword arguments are ``COPY`` statement options,
716        see `COPY statement documentation`_ for details.
717
718        :return: The status string of the COPY command.
719
720        Example:
721
722        .. code-block:: pycon
723
724            >>> import asyncpg
725            >>> import asyncio
726            >>> async def run():
727            ...     con = await asyncpg.connect(user='postgres')
728            ...     result = await con.copy_from_table(
729            ...         'mytable', columns=('foo', 'bar'),
730            ...         output='file.csv', format='csv')
731            ...     print(result)
732            ...
733            >>> asyncio.get_event_loop().run_until_complete(run())
734            'COPY 100'
735
736        .. _`COPY statement documentation`:
737            https://www.postgresql.org/docs/current/static/sql-copy.html
738
739        .. versionadded:: 0.11.0
740        """
741        tabname = utils._quote_ident(table_name)
742        if schema_name:
743            tabname = utils._quote_ident(schema_name) + '.' + tabname
744
745        if columns:
746            cols = '({})'.format(
747                ', '.join(utils._quote_ident(c) for c in columns))
748        else:
749            cols = ''
750
751        opts = self._format_copy_opts(
752            format=format, oids=oids, delimiter=delimiter,
753            null=null, header=header, quote=quote, escape=escape,
754            force_quote=force_quote, encoding=encoding
755        )
756
757        copy_stmt = 'COPY {tab}{cols} TO STDOUT {opts}'.format(
758            tab=tabname, cols=cols, opts=opts)
759
760        return await self._copy_out(copy_stmt, output, timeout)
761
762    async def copy_from_query(self, query, *args, output,
763                              timeout=None, format=None, oids=None,
764                              delimiter=None, null=None, header=None,
765                              quote=None, escape=None, force_quote=None,
766                              encoding=None):
767        """Copy the results of a query to a file or file-like object.
768
769        :param str query:
770            The query to copy the results of.
771
772        :param args:
773            Query arguments.
774
775        :param output:
776            A :term:`path-like object <python:path-like object>`,
777            or a :term:`file-like object <python:file-like object>`, or
778            a :term:`coroutine function <python:coroutine function>`
779            that takes a ``bytes`` instance as a sole argument.
780
781        :param float timeout:
782            Optional timeout value in seconds.
783
784        The remaining keyword arguments are ``COPY`` statement options,
785        see `COPY statement documentation`_ for details.
786
787        :return: The status string of the COPY command.
788
789        Example:
790
791        .. code-block:: pycon
792
793            >>> import asyncpg
794            >>> import asyncio
795            >>> async def run():
796            ...     con = await asyncpg.connect(user='postgres')
797            ...     result = await con.copy_from_query(
798            ...         'SELECT foo, bar FROM mytable WHERE foo > $1', 10,
799            ...         output='file.csv', format='csv')
800            ...     print(result)
801            ...
802            >>> asyncio.get_event_loop().run_until_complete(run())
803            'COPY 10'
804
805        .. _`COPY statement documentation`:
806            https://www.postgresql.org/docs/current/static/sql-copy.html
807
808        .. versionadded:: 0.11.0
809        """
810        opts = self._format_copy_opts(
811            format=format, oids=oids, delimiter=delimiter,
812            null=null, header=header, quote=quote, escape=escape,
813            force_quote=force_quote, encoding=encoding
814        )
815
816        if args:
817            query = await utils._mogrify(self, query, args)
818
819        copy_stmt = 'COPY ({query}) TO STDOUT {opts}'.format(
820            query=query, opts=opts)
821
822        return await self._copy_out(copy_stmt, output, timeout)
823
824    async def copy_to_table(self, table_name, *, source,
825                            columns=None, schema_name=None, timeout=None,
826                            format=None, oids=None, freeze=None,
827                            delimiter=None, null=None, header=None,
828                            quote=None, escape=None, force_quote=None,
829                            force_not_null=None, force_null=None,
830                            encoding=None):
831        """Copy data to the specified table.
832
833        :param str table_name:
834            The name of the table to copy data to.
835
836        :param source:
837            A :term:`path-like object <python:path-like object>`,
838            or a :term:`file-like object <python:file-like object>`, or
839            an :term:`asynchronous iterable <python:asynchronous iterable>`
840            that returns ``bytes``, or an object supporting the
841            :ref:`buffer protocol <python:bufferobjects>`.
842
843        :param list columns:
844            An optional list of column names to copy.
845
846        :param str schema_name:
847            An optional schema name to qualify the table.
848
849        :param float timeout:
850            Optional timeout value in seconds.
851
852        The remaining keyword arguments are ``COPY`` statement options,
853        see `COPY statement documentation`_ for details.
854
855        :return: The status string of the COPY command.
856
857        Example:
858
859        .. code-block:: pycon
860
861            >>> import asyncpg
862            >>> import asyncio
863            >>> async def run():
864            ...     con = await asyncpg.connect(user='postgres')
865            ...     result = await con.copy_to_table(
866            ...         'mytable', source='datafile.tbl')
867            ...     print(result)
868            ...
869            >>> asyncio.get_event_loop().run_until_complete(run())
870            'COPY 140000'
871
872        .. _`COPY statement documentation`:
873            https://www.postgresql.org/docs/current/static/sql-copy.html
874
875        .. versionadded:: 0.11.0
876        """
877        tabname = utils._quote_ident(table_name)
878        if schema_name:
879            tabname = utils._quote_ident(schema_name) + '.' + tabname
880
881        if columns:
882            cols = '({})'.format(
883                ', '.join(utils._quote_ident(c) for c in columns))
884        else:
885            cols = ''
886
887        opts = self._format_copy_opts(
888            format=format, oids=oids, freeze=freeze, delimiter=delimiter,
889            null=null, header=header, quote=quote, escape=escape,
890            force_not_null=force_not_null, force_null=force_null,
891            encoding=encoding
892        )
893
894        copy_stmt = 'COPY {tab}{cols} FROM STDIN {opts}'.format(
895            tab=tabname, cols=cols, opts=opts)
896
897        return await self._copy_in(copy_stmt, source, timeout)
898
899    async def copy_records_to_table(self, table_name, *, records,
900                                    columns=None, schema_name=None,
901                                    timeout=None):
902        """Copy a list of records to the specified table using binary COPY.
903
904        :param str table_name:
905            The name of the table to copy data to.
906
907        :param records:
908            An iterable returning row tuples to copy into the table.
909            :term:`Asynchronous iterables <python:asynchronous iterable>`
910            are also supported.
911
912        :param list columns:
913            An optional list of column names to copy.
914
915        :param str schema_name:
916            An optional schema name to qualify the table.
917
918        :param float timeout:
919            Optional timeout value in seconds.
920
921        :return: The status string of the COPY command.
922
923        Example:
924
925        .. code-block:: pycon
926
927            >>> import asyncpg
928            >>> import asyncio
929            >>> async def run():
930            ...     con = await asyncpg.connect(user='postgres')
931            ...     result = await con.copy_records_to_table(
932            ...         'mytable', records=[
933            ...             (1, 'foo', 'bar'),
934            ...             (2, 'ham', 'spam')])
935            ...     print(result)
936            ...
937            >>> asyncio.get_event_loop().run_until_complete(run())
938            'COPY 2'
939
940        Asynchronous record iterables are also supported:
941
942        .. code-block:: pycon
943
944            >>> import asyncpg
945            >>> import asyncio
946            >>> async def run():
947            ...     con = await asyncpg.connect(user='postgres')
948            ...     async def record_gen(size):
949            ...         for i in range(size):
950            ...             yield (i,)
951            ...     result = await con.copy_records_to_table(
952            ...         'mytable', records=record_gen(100))
953            ...     print(result)
954            ...
955            >>> asyncio.get_event_loop().run_until_complete(run())
956            'COPY 100'
957
958        .. versionadded:: 0.11.0
959
960        .. versionchanged:: 0.24.0
961            The ``records`` argument may be an asynchronous iterable.
962        """
963        tabname = utils._quote_ident(table_name)
964        if schema_name:
965            tabname = utils._quote_ident(schema_name) + '.' + tabname
966
967        if columns:
968            col_list = ', '.join(utils._quote_ident(c) for c in columns)
969            cols = '({})'.format(col_list)
970        else:
971            col_list = '*'
972            cols = ''
973
974        intro_query = 'SELECT {cols} FROM {tab} LIMIT 1'.format(
975            tab=tabname, cols=col_list)
976
977        intro_ps = await self._prepare(intro_query, use_cache=True)
978
979        opts = '(FORMAT binary)'
980
981        copy_stmt = 'COPY {tab}{cols} FROM STDIN {opts}'.format(
982            tab=tabname, cols=cols, opts=opts)
983
984        return await self._protocol.copy_in(
985            copy_stmt, None, None, records, intro_ps._state, timeout)
986
987    def _format_copy_opts(self, *, format=None, oids=None, freeze=None,
988                          delimiter=None, null=None, header=None, quote=None,
989                          escape=None, force_quote=None, force_not_null=None,
990                          force_null=None, encoding=None):
991        kwargs = dict(locals())
992        kwargs.pop('self')
993        opts = []
994
995        if force_quote is not None and isinstance(force_quote, bool):
996            kwargs.pop('force_quote')
997            if force_quote:
998                opts.append('FORCE_QUOTE *')
999
1000        for k, v in kwargs.items():
1001            if v is not None:
1002                if k in ('force_not_null', 'force_null', 'force_quote'):
1003                    v = '(' + ', '.join(utils._quote_ident(c) for c in v) + ')'
1004                elif k in ('oids', 'freeze', 'header'):
1005                    v = str(v)
1006                else:
1007                    v = utils._quote_literal(v)
1008
1009                opts.append('{} {}'.format(k.upper(), v))
1010
1011        if opts:
1012            return '(' + ', '.join(opts) + ')'
1013        else:
1014            return ''
1015
1016    async def _copy_out(self, copy_stmt, output, timeout):
1017        try:
1018            path = os.fspath(output)
1019        except TypeError:
1020            # output is not a path-like object
1021            path = None
1022
1023        writer = None
1024        opened_by_us = False
1025        run_in_executor = self._loop.run_in_executor
1026
1027        if path is not None:
1028            # a path
1029            f = await run_in_executor(None, open, path, 'wb')
1030            opened_by_us = True
1031        elif hasattr(output, 'write'):
1032            # file-like
1033            f = output
1034        elif callable(output):
1035            # assuming calling output returns an awaitable.
1036            writer = output
1037        else:
1038            raise TypeError(
1039                'output is expected to be a file-like object, '
1040                'a path-like object or a coroutine function, '
1041                'not {}'.format(type(output).__name__)
1042            )
1043
1044        if writer is None:
1045            async def _writer(data):
1046                await run_in_executor(None, f.write, data)
1047            writer = _writer
1048
1049        try:
1050            return await self._protocol.copy_out(copy_stmt, writer, timeout)
1051        finally:
1052            if opened_by_us:
1053                f.close()
1054
1055    async def _copy_in(self, copy_stmt, source, timeout):
1056        try:
1057            path = os.fspath(source)
1058        except TypeError:
1059            # source is not a path-like object
1060            path = None
1061
1062        f = None
1063        reader = None
1064        data = None
1065        opened_by_us = False
1066        run_in_executor = self._loop.run_in_executor
1067
1068        if path is not None:
1069            # a path
1070            f = await run_in_executor(None, open, path, 'rb')
1071            opened_by_us = True
1072        elif hasattr(source, 'read'):
1073            # file-like
1074            f = source
1075        elif isinstance(source, collections.abc.AsyncIterable):
1076            # assuming calling output returns an awaitable.
1077            # copy_in() is designed to handle very large amounts of data, and
1078            # the source async iterable is allowed to return an arbitrary
1079            # amount of data on every iteration.
1080            reader = source
1081        else:
1082            # assuming source is an instance supporting the buffer protocol.
1083            data = source
1084
1085        if f is not None:
1086            # Copying from a file-like object.
1087            class _Reader:
1088                def __aiter__(self):
1089                    return self
1090
1091                async def __anext__(self):
1092                    data = await run_in_executor(None, f.read, 524288)
1093                    if len(data) == 0:
1094                        raise StopAsyncIteration
1095                    else:
1096                        return data
1097
1098            reader = _Reader()
1099
1100        try:
1101            return await self._protocol.copy_in(
1102                copy_stmt, reader, data, None, None, timeout)
1103        finally:
1104            if opened_by_us:
1105                await run_in_executor(None, f.close)
1106
1107    async def set_type_codec(self, typename, *,
1108                             schema='public', encoder, decoder,
1109                             format='text'):
1110        """Set an encoder/decoder pair for the specified data type.
1111
1112        :param typename:
1113            Name of the data type the codec is for.
1114
1115        :param schema:
1116            Schema name of the data type the codec is for
1117            (defaults to ``'public'``)
1118
1119        :param format:
1120            The type of the argument received by the *decoder* callback,
1121            and the type of the *encoder* callback return value.
1122
1123            If *format* is ``'text'`` (the default), the exchange datum is a
1124            ``str`` instance containing valid text representation of the
1125            data type.
1126
1127            If *format* is ``'binary'``, the exchange datum is a ``bytes``
1128            instance containing valid _binary_ representation of the
1129            data type.
1130
1131            If *format* is ``'tuple'``, the exchange datum is a type-specific
1132            ``tuple`` of values.  The table below lists supported data
1133            types and their format for this mode.
1134
1135            +-----------------+---------------------------------------------+
1136            |  Type           |                Tuple layout                 |
1137            +=================+=============================================+
1138            | ``interval``    | (``months``, ``days``, ``microseconds``)    |
1139            +-----------------+---------------------------------------------+
1140            | ``date``        | (``date ordinal relative to Jan 1 2000``,)  |
1141            |                 | ``-2^31`` for negative infinity timestamp   |
1142            |                 | ``2^31-1`` for positive infinity timestamp. |
1143            +-----------------+---------------------------------------------+
1144            | ``timestamp``   | (``microseconds relative to Jan 1 2000``,)  |
1145            |                 | ``-2^63`` for negative infinity timestamp   |
1146            |                 | ``2^63-1`` for positive infinity timestamp. |
1147            +-----------------+---------------------------------------------+
1148            | ``timestamp     | (``microseconds relative to Jan 1 2000      |
1149            | with time zone``| UTC``,)                                     |
1150            |                 | ``-2^63`` for negative infinity timestamp   |
1151            |                 | ``2^63-1`` for positive infinity timestamp. |
1152            +-----------------+---------------------------------------------+
1153            | ``time``        | (``microseconds``,)                         |
1154            +-----------------+---------------------------------------------+
1155            | ``time with     | (``microseconds``,                          |
1156            | time zone``     | ``time zone offset in seconds``)            |
1157            +-----------------+---------------------------------------------+
1158
1159        :param encoder:
1160            Callable accepting a Python object as a single argument and
1161            returning a value encoded according to *format*.
1162
1163        :param decoder:
1164            Callable accepting a single argument encoded according to *format*
1165            and returning a decoded Python object.
1166
1167        Example:
1168
1169        .. code-block:: pycon
1170
1171            >>> import asyncpg
1172            >>> import asyncio
1173            >>> import datetime
1174            >>> from dateutil.relativedelta import relativedelta
1175            >>> async def run():
1176            ...     con = await asyncpg.connect(user='postgres')
1177            ...     def encoder(delta):
1178            ...         ndelta = delta.normalized()
1179            ...         return (ndelta.years * 12 + ndelta.months,
1180            ...                 ndelta.days,
1181            ...                 ((ndelta.hours * 3600 +
1182            ...                    ndelta.minutes * 60 +
1183            ...                    ndelta.seconds) * 1000000 +
1184            ...                  ndelta.microseconds))
1185            ...     def decoder(tup):
1186            ...         return relativedelta(months=tup[0], days=tup[1],
1187            ...                              microseconds=tup[2])
1188            ...     await con.set_type_codec(
1189            ...         'interval', schema='pg_catalog', encoder=encoder,
1190            ...         decoder=decoder, format='tuple')
1191            ...     result = await con.fetchval(
1192            ...         "SELECT '2 years 3 mons 1 day'::interval")
1193            ...     print(result)
1194            ...     print(datetime.datetime(2002, 1, 1) + result)
1195            ...
1196            >>> asyncio.get_event_loop().run_until_complete(run())
1197            relativedelta(years=+2, months=+3, days=+1)
1198            2004-04-02 00:00:00
1199
1200        .. versionadded:: 0.12.0
1201            Added the ``format`` keyword argument and support for 'tuple'
1202            format.
1203
1204        .. versionchanged:: 0.12.0
1205            The ``binary`` keyword argument is deprecated in favor of
1206            ``format``.
1207
1208        .. versionchanged:: 0.13.0
1209            The ``binary`` keyword argument was removed in favor of
1210            ``format``.
1211
1212        .. note::
1213
1214           It is recommended to use the ``'binary'`` or ``'tuple'`` *format*
1215           whenever possible and if the underlying type supports it. Asyncpg
1216           currently does not support text I/O for composite and range types,
1217           and some other functionality, such as
1218           :meth:`Connection.copy_to_table`, does not support types with text
1219           codecs.
1220        """
1221        self._check_open()
1222        typeinfo = await self._introspect_type(typename, schema)
1223        if not introspection.is_scalar_type(typeinfo):
1224            raise exceptions.InterfaceError(
1225                'cannot use custom codec on non-scalar type {}.{}'.format(
1226                    schema, typename))
1227        if introspection.is_domain_type(typeinfo):
1228            raise exceptions.UnsupportedClientFeatureError(
1229                'custom codecs on domain types are not supported',
1230                hint='Set the codec on the base type.',
1231                detail=(
1232                    'PostgreSQL does not distinguish domains from '
1233                    'their base types in query results at the protocol level.'
1234                )
1235            )
1236
1237        oid = typeinfo['oid']
1238        self._protocol.get_settings().add_python_codec(
1239            oid, typename, schema, 'scalar',
1240            encoder, decoder, format)
1241
1242        # Statement cache is no longer valid due to codec changes.
1243        self._drop_local_statement_cache()
1244
1245    async def reset_type_codec(self, typename, *, schema='public'):
1246        """Reset *typename* codec to the default implementation.
1247
1248        :param typename:
1249            Name of the data type the codec is for.
1250
1251        :param schema:
1252            Schema name of the data type the codec is for
1253            (defaults to ``'public'``)
1254
1255        .. versionadded:: 0.12.0
1256        """
1257
1258        typeinfo = await self._introspect_type(typename, schema)
1259        self._protocol.get_settings().remove_python_codec(
1260            typeinfo['oid'], typename, schema)
1261
1262        # Statement cache is no longer valid due to codec changes.
1263        self._drop_local_statement_cache()
1264
1265    async def set_builtin_type_codec(self, typename, *,
1266                                     schema='public', codec_name,
1267                                     format=None):
1268        """Set a builtin codec for the specified scalar data type.
1269
1270        This method has two uses.  The first is to register a builtin
1271        codec for an extension type without a stable OID, such as 'hstore'.
1272        The second use is to declare that an extension type or a
1273        user-defined type is wire-compatible with a certain builtin
1274        data type and should be exchanged as such.
1275
1276        :param typename:
1277            Name of the data type the codec is for.
1278
1279        :param schema:
1280            Schema name of the data type the codec is for
1281            (defaults to ``'public'``).
1282
1283        :param codec_name:
1284            The name of the builtin codec to use for the type.
1285            This should be either the name of a known core type
1286            (such as ``"int"``), or the name of a supported extension
1287            type.  Currently, the only supported extension type is
1288            ``"pg_contrib.hstore"``.
1289
1290        :param format:
1291            If *format* is ``None`` (the default), all formats supported
1292            by the target codec are declared to be supported for *typename*.
1293            If *format* is ``'text'`` or ``'binary'``, then only the
1294            specified format is declared to be supported for *typename*.
1295
1296        .. versionchanged:: 0.18.0
1297            The *codec_name* argument can be the name of any known
1298            core data type.  Added the *format* keyword argument.
1299        """
1300        self._check_open()
1301        typeinfo = await self._introspect_type(typename, schema)
1302        if not introspection.is_scalar_type(typeinfo):
1303            raise exceptions.InterfaceError(
1304                'cannot alias non-scalar type {}.{}'.format(
1305                    schema, typename))
1306
1307        oid = typeinfo['oid']
1308
1309        self._protocol.get_settings().set_builtin_type_codec(
1310            oid, typename, schema, 'scalar', codec_name, format)
1311
1312        # Statement cache is no longer valid due to codec changes.
1313        self._drop_local_statement_cache()
1314
1315    def is_closed(self):
1316        """Return ``True`` if the connection is closed, ``False`` otherwise.
1317
1318        :return bool: ``True`` if the connection is closed, ``False``
1319                      otherwise.
1320        """
1321        return self._aborted or not self._protocol.is_connected()
1322
1323    async def close(self, *, timeout=None):
1324        """Close the connection gracefully.
1325
1326        :param float timeout:
1327            Optional timeout value in seconds.
1328
1329        .. versionchanged:: 0.14.0
1330           Added the *timeout* parameter.
1331        """
1332        try:
1333            if not self.is_closed():
1334                await self._protocol.close(timeout)
1335        except (Exception, asyncio.CancelledError):
1336            # If we fail to close gracefully, abort the connection.
1337            self._abort()
1338            raise
1339        finally:
1340            self._cleanup()
1341
1342    def terminate(self):
1343        """Terminate the connection without waiting for pending data."""
1344        if not self.is_closed():
1345            self._abort()
1346        self._cleanup()
1347
1348    async def reset(self, *, timeout=None):
1349        self._check_open()
1350        self._listeners.clear()
1351        self._log_listeners.clear()
1352        reset_query = self._get_reset_query()
1353
1354        if self._protocol.is_in_transaction() or self._top_xact is not None:
1355            if self._top_xact is None or not self._top_xact._managed:
1356                # Managed transactions are guaranteed to __aexit__
1357                # correctly.
1358                self._loop.call_exception_handler({
1359                    'message': 'Resetting connection with an '
1360                               'active transaction {!r}'.format(self)
1361                })
1362
1363            self._top_xact = None
1364            reset_query = 'ROLLBACK;\n' + reset_query
1365
1366        if reset_query:
1367            await self.execute(reset_query, timeout=timeout)
1368
1369    def _abort(self):
1370        # Put the connection into the aborted state.
1371        self._aborted = True
1372        self._protocol.abort()
1373        self._protocol = None
1374
1375    def _cleanup(self):
1376        self._call_termination_listeners()
1377        # Free the resources associated with this connection.
1378        # This must be called when a connection is terminated.
1379
1380        if self._proxy is not None:
1381            # Connection is a member of a pool, so let the pool
1382            # know that this connection is dead.
1383            self._proxy._holder._release_on_close()
1384
1385        self._mark_stmts_as_closed()
1386        self._listeners.clear()
1387        self._log_listeners.clear()
1388        self._clean_tasks()
1389
1390    def _clean_tasks(self):
1391        # Wrap-up any remaining tasks associated with this connection.
1392        if self._cancellations:
1393            for fut in self._cancellations:
1394                if not fut.done():
1395                    fut.cancel()
1396            self._cancellations.clear()
1397
1398    def _check_open(self):
1399        if self.is_closed():
1400            raise exceptions.InterfaceError('connection is closed')
1401
1402    def _get_unique_id(self, prefix):
1403        global _uid
1404        _uid += 1
1405        return '__asyncpg_{}_{:x}__'.format(prefix, _uid)
1406
1407    def _mark_stmts_as_closed(self):
1408        for stmt in self._stmt_cache.iter_statements():
1409            stmt.mark_closed()
1410
1411        for stmt in self._stmts_to_close:
1412            stmt.mark_closed()
1413
1414        self._stmt_cache.clear()
1415        self._stmts_to_close.clear()
1416
1417    def _maybe_gc_stmt(self, stmt):
1418        if (
1419            stmt.refs == 0
1420            and not self._stmt_cache.has(
1421                (stmt.query, stmt.record_class, stmt.ignore_custom_codec)
1422            )
1423        ):
1424            # If low-level `stmt` isn't referenced from any high-level
1425            # `PreparedStatement` object and is not in the `_stmt_cache`:
1426            #
1427            #  * mark it as closed, which will make it non-usable
1428            #    for any `PreparedStatement` or for methods like
1429            #    `Connection.fetch()`.
1430            #
1431            # * schedule it to be formally closed on the server.
1432            stmt.mark_closed()
1433            self._stmts_to_close.add(stmt)
1434
1435    async def _cleanup_stmts(self):
1436        # Called whenever we create a new prepared statement in
1437        # `Connection._get_statement()` and `_stmts_to_close` is
1438        # not empty.
1439        to_close = self._stmts_to_close
1440        self._stmts_to_close = set()
1441        for stmt in to_close:
1442            # It is imperative that statements are cleaned properly,
1443            # so we ignore the timeout.
1444            await self._protocol.close_statement(stmt, protocol.NO_TIMEOUT)
1445
1446    async def _cancel(self, waiter):
1447        try:
1448            # Open new connection to the server
1449            await connect_utils._cancel(
1450                loop=self._loop, addr=self._addr, params=self._params,
1451                backend_pid=self._protocol.backend_pid,
1452                backend_secret=self._protocol.backend_secret)
1453        except ConnectionResetError as ex:
1454            # On some systems Postgres will reset the connection
1455            # after processing the cancellation command.
1456            if not waiter.done():
1457                waiter.set_exception(ex)
1458        except asyncio.CancelledError:
1459            # There are two scenarios in which the cancellation
1460            # itself will be cancelled: 1) the connection is being closed,
1461            # 2) the event loop is being shut down.
1462            # In either case we do not care about the propagation of
1463            # the CancelledError, and don't want the loop to warn about
1464            # an unretrieved exception.
1465            pass
1466        except (Exception, asyncio.CancelledError) as ex:
1467            if not waiter.done():
1468                waiter.set_exception(ex)
1469        finally:
1470            self._cancellations.discard(
1471                compat.current_asyncio_task(self._loop))
1472            if not waiter.done():
1473                waiter.set_result(None)
1474
1475    def _cancel_current_command(self, waiter):
1476        self._cancellations.add(self._loop.create_task(self._cancel(waiter)))
1477
1478    def _process_log_message(self, fields, last_query):
1479        if not self._log_listeners:
1480            return
1481
1482        message = exceptions.PostgresLogMessage.new(fields, query=last_query)
1483
1484        con_ref = self._unwrap()
1485        for cb in self._log_listeners:
1486            if cb.is_async:
1487                self._loop.create_task(cb.cb(con_ref, message))
1488            else:
1489                self._loop.call_soon(cb.cb, con_ref, message)
1490
1491    def _call_termination_listeners(self):
1492        if not self._termination_listeners:
1493            return
1494
1495        con_ref = self._unwrap()
1496        for cb in self._termination_listeners:
1497            if cb.is_async:
1498                self._loop.create_task(cb.cb(con_ref))
1499            else:
1500                self._loop.call_soon(cb.cb, con_ref)
1501
1502        self._termination_listeners.clear()
1503
1504    def _process_notification(self, pid, channel, payload):
1505        if channel not in self._listeners:
1506            return
1507
1508        con_ref = self._unwrap()
1509        for cb in self._listeners[channel]:
1510            if cb.is_async:
1511                self._loop.create_task(cb.cb(con_ref, pid, channel, payload))
1512            else:
1513                self._loop.call_soon(cb.cb, con_ref, pid, channel, payload)
1514
1515    def _unwrap(self):
1516        if self._proxy is None:
1517            con_ref = self
1518        else:
1519            # `_proxy` is not None when the connection is a member
1520            # of a connection pool.  Which means that the user is working
1521            # with a `PoolConnectionProxy` instance, and expects to see it
1522            # (and not the actual Connection) in their event callbacks.
1523            con_ref = self._proxy
1524        return con_ref
1525
1526    def _get_reset_query(self):
1527        if self._reset_query is not None:
1528            return self._reset_query
1529
1530        caps = self._server_caps
1531
1532        _reset_query = []
1533        if caps.advisory_locks:
1534            _reset_query.append('SELECT pg_advisory_unlock_all();')
1535        if caps.sql_close_all:
1536            _reset_query.append('CLOSE ALL;')
1537        if caps.notifications and caps.plpgsql:
1538            _reset_query.append('UNLISTEN *;')
1539        if caps.sql_reset:
1540            _reset_query.append('RESET ALL;')
1541
1542        _reset_query = '\n'.join(_reset_query)
1543        self._reset_query = _reset_query
1544
1545        return _reset_query
1546
1547    def _set_proxy(self, proxy):
1548        if self._proxy is not None and proxy is not None:
1549            # Should not happen unless there is a bug in `Pool`.
1550            raise exceptions.InterfaceError(
1551                'internal asyncpg error: connection is already proxied')
1552
1553        self._proxy = proxy
1554
1555    def _check_listeners(self, listeners, listener_type):
1556        if listeners:
1557            count = len(listeners)
1558
1559            w = exceptions.InterfaceWarning(
1560                '{conn!r} is being released to the pool but has {c} active '
1561                '{type} listener{s}'.format(
1562                    conn=self, c=count, type=listener_type,
1563                    s='s' if count > 1 else ''))
1564
1565            warnings.warn(w)
1566
1567    def _on_release(self, stacklevel=1):
1568        # Invalidate external references to the connection.
1569        self._pool_release_ctr += 1
1570        # Called when the connection is about to be released to the pool.
1571        # Let's check that the user has not left any listeners on it.
1572        self._check_listeners(
1573            list(itertools.chain.from_iterable(self._listeners.values())),
1574            'notification')
1575        self._check_listeners(
1576            self._log_listeners, 'log')
1577
1578    def _drop_local_statement_cache(self):
1579        self._stmt_cache.clear()
1580
1581    def _drop_global_statement_cache(self):
1582        if self._proxy is not None:
1583            # This connection is a member of a pool, so we delegate
1584            # the cache drop to the pool.
1585            pool = self._proxy._holder._pool
1586            pool._drop_statement_cache()
1587        else:
1588            self._drop_local_statement_cache()
1589
1590    def _drop_local_type_cache(self):
1591        self._protocol.get_settings().clear_type_cache()
1592
1593    def _drop_global_type_cache(self):
1594        if self._proxy is not None:
1595            # This connection is a member of a pool, so we delegate
1596            # the cache drop to the pool.
1597            pool = self._proxy._holder._pool
1598            pool._drop_type_cache()
1599        else:
1600            self._drop_local_type_cache()
1601
1602    async def reload_schema_state(self):
1603        """Indicate that the database schema information must be reloaded.
1604
1605        For performance reasons, asyncpg caches certain aspects of the
1606        database schema, such as the layout of composite types.  Consequently,
1607        when the database schema changes, and asyncpg is not able to
1608        gracefully recover from an error caused by outdated schema
1609        assumptions, an :exc:`~asyncpg.exceptions.OutdatedSchemaCacheError`
1610        is raised.  To prevent the exception, this method may be used to inform
1611        asyncpg that the database schema has changed.
1612
1613        Example:
1614
1615        .. code-block:: pycon
1616
1617            >>> import asyncpg
1618            >>> import asyncio
1619            >>> async def change_type(con):
1620            ...     result = await con.fetch('SELECT id, info FROM tbl')
1621            ...     # Change composite's attribute type "int"=>"text"
1622            ...     await con.execute('ALTER TYPE custom DROP ATTRIBUTE y')
1623            ...     await con.execute('ALTER TYPE custom ADD ATTRIBUTE y text')
1624            ...     await con.reload_schema_state()
1625            ...     for id_, info in result:
1626            ...         new = (info['x'], str(info['y']))
1627            ...         await con.execute(
1628            ...             'UPDATE tbl SET info=$2 WHERE id=$1', id_, new)
1629            ...
1630            >>> async def run():
1631            ...     # Initial schema:
1632            ...     # CREATE TYPE custom AS (x int, y int);
1633            ...     # CREATE TABLE tbl(id int, info custom);
1634            ...     con = await asyncpg.connect(user='postgres')
1635            ...     async with con.transaction():
1636            ...         # Prevent concurrent changes in the table
1637            ...         await con.execute('LOCK TABLE tbl')
1638            ...         await change_type(con)
1639            ...
1640            >>> asyncio.get_event_loop().run_until_complete(run())
1641
1642        .. versionadded:: 0.14.0
1643        """
1644        self._drop_global_type_cache()
1645        self._drop_global_statement_cache()
1646
1647    async def _execute(
1648        self,
1649        query,
1650        args,
1651        limit,
1652        timeout,
1653        *,
1654        return_status=False,
1655        ignore_custom_codec=False,
1656        record_class=None
1657    ):
1658        with self._stmt_exclusive_section:
1659            result, _ = await self.__execute(
1660                query,
1661                args,
1662                limit,
1663                timeout,
1664                return_status=return_status,
1665                record_class=record_class,
1666                ignore_custom_codec=ignore_custom_codec,
1667            )
1668        return result
1669
1670    async def __execute(
1671        self,
1672        query,
1673        args,
1674        limit,
1675        timeout,
1676        *,
1677        return_status=False,
1678        ignore_custom_codec=False,
1679        record_class=None
1680    ):
1681        executor = lambda stmt, timeout: self._protocol.bind_execute(
1682            stmt, args, '', limit, return_status, timeout)
1683        timeout = self._protocol._get_timeout(timeout)
1684        return await self._do_execute(
1685            query,
1686            executor,
1687            timeout,
1688            record_class=record_class,
1689            ignore_custom_codec=ignore_custom_codec,
1690        )
1691
1692    async def _executemany(self, query, args, timeout):
1693        executor = lambda stmt, timeout: self._protocol.bind_execute_many(
1694            stmt, args, '', timeout)
1695        timeout = self._protocol._get_timeout(timeout)
1696        with self._stmt_exclusive_section:
1697            result, _ = await self._do_execute(query, executor, timeout)
1698        return result
1699
1700    async def _do_execute(
1701        self,
1702        query,
1703        executor,
1704        timeout,
1705        retry=True,
1706        *,
1707        ignore_custom_codec=False,
1708        record_class=None
1709    ):
1710        if timeout is None:
1711            stmt = await self._get_statement(
1712                query,
1713                None,
1714                record_class=record_class,
1715                ignore_custom_codec=ignore_custom_codec,
1716            )
1717        else:
1718            before = time.monotonic()
1719            stmt = await self._get_statement(
1720                query,
1721                timeout,
1722                record_class=record_class,
1723                ignore_custom_codec=ignore_custom_codec,
1724            )
1725            after = time.monotonic()
1726            timeout -= after - before
1727            before = after
1728
1729        try:
1730            if timeout is None:
1731                result = await executor(stmt, None)
1732            else:
1733                try:
1734                    result = await executor(stmt, timeout)
1735                finally:
1736                    after = time.monotonic()
1737                    timeout -= after - before
1738
1739        except exceptions.OutdatedSchemaCacheError:
1740            # This exception is raised when we detect a difference between
1741            # cached type's info and incoming tuple from the DB (when a type is
1742            # changed by the ALTER TYPE).
1743            # It is not possible to recover (the statement is already done at
1744            # the server's side), the only way is to drop our caches and
1745            # reraise the exception to the caller.
1746            await self.reload_schema_state()
1747            raise
1748        except exceptions.InvalidCachedStatementError:
1749            # PostgreSQL will raise an exception when it detects
1750            # that the result type of the query has changed from
1751            # when the statement was prepared.  This may happen,
1752            # for example, after an ALTER TABLE or SET search_path.
1753            #
1754            # When this happens, and there is no transaction running,
1755            # we can simply re-prepare the statement and try once
1756            # again.  We deliberately retry only once as this is
1757            # supposed to be a rare occurrence.
1758            #
1759            # If the transaction _is_ running, this error will put it
1760            # into an error state, and we have no choice but to
1761            # re-raise the exception.
1762            #
1763            # In either case we clear the statement cache for this
1764            # connection and all other connections of the pool this
1765            # connection belongs to (if any).
1766            #
1767            # See https://github.com/MagicStack/asyncpg/issues/72
1768            # and https://github.com/MagicStack/asyncpg/issues/76
1769            # for discussion.
1770            #
1771            self._drop_global_statement_cache()
1772            if self._protocol.is_in_transaction() or not retry:
1773                raise
1774            else:
1775                return await self._do_execute(
1776                    query, executor, timeout, retry=False)
1777
1778        return result, stmt
1779
1780
1781async def connect(dsn=None, *,
1782                  host=None, port=None,
1783                  user=None, password=None, passfile=None,
1784                  database=None,
1785                  loop=None,
1786                  timeout=60,
1787                  statement_cache_size=100,
1788                  max_cached_statement_lifetime=300,
1789                  max_cacheable_statement_size=1024 * 15,
1790                  command_timeout=None,
1791                  ssl=None,
1792                  connection_class=Connection,
1793                  record_class=protocol.Record,
1794                  server_settings=None):
1795    r"""A coroutine to establish a connection to a PostgreSQL server.
1796
1797    The connection parameters may be specified either as a connection
1798    URI in *dsn*, or as specific keyword arguments, or both.
1799    If both *dsn* and keyword arguments are specified, the latter
1800    override the corresponding values parsed from the connection URI.
1801    The default values for the majority of arguments can be specified
1802    using `environment variables <postgres envvars_>`_.
1803
1804    Returns a new :class:`~asyncpg.connection.Connection` object.
1805
1806    :param dsn:
1807        Connection arguments specified using as a single string in the
1808        `libpq connection URI format`_:
1809        ``postgres://user:password@host:port/database?option=value``.
1810        The following options are recognized by asyncpg: ``host``,
1811        ``port``, ``user``, ``database`` (or ``dbname``), ``password``,
1812        ``passfile``, ``sslmode``, ``sslcert``, ``sslkey``, ``sslrootcert``,
1813        and ``sslcrl``.  Unlike libpq, asyncpg will treat unrecognized
1814        options as `server settings`_ to be used for the connection.
1815
1816        .. note::
1817
1818           The URI must be *valid*, which means that all components must
1819           be properly quoted with :py:func:`urllib.parse.quote`, and
1820           any literal IPv6 addresses must be enclosed in square brackets.
1821           For example:
1822
1823           .. code-block:: text
1824
1825              postgres://dbuser@[fe80::1ff:fe23:4567:890a%25eth0]/dbname
1826
1827    :param host:
1828        Database host address as one of the following:
1829
1830        - an IP address or a domain name;
1831        - an absolute path to the directory containing the database
1832          server Unix-domain socket (not supported on Windows);
1833        - a sequence of any of the above, in which case the addresses
1834          will be tried in order, and the first successful connection
1835          will be returned.
1836
1837        If not specified, asyncpg will try the following, in order:
1838
1839        - host address(es) parsed from the *dsn* argument,
1840        - the value of the ``PGHOST`` environment variable,
1841        - on Unix, common directories used for PostgreSQL Unix-domain
1842          sockets: ``"/run/postgresql"``, ``"/var/run/postgresl"``,
1843          ``"/var/pgsql_socket"``, ``"/private/tmp"``, and ``"/tmp"``,
1844        - ``"localhost"``.
1845
1846    :param port:
1847        Port number to connect to at the server host
1848        (or Unix-domain socket file extension).  If multiple host
1849        addresses were specified, this parameter may specify a
1850        sequence of port numbers of the same length as the host sequence,
1851        or it may specify a single port number to be used for all host
1852        addresses.
1853
1854        If not specified, the value parsed from the *dsn* argument is used,
1855        or the value of the ``PGPORT`` environment variable, or ``5432`` if
1856        neither is specified.
1857
1858    :param user:
1859        The name of the database role used for authentication.
1860
1861        If not specified, the value parsed from the *dsn* argument is used,
1862        or the value of the ``PGUSER`` environment variable, or the
1863        operating system name of the user running the application.
1864
1865    :param database:
1866        The name of the database to connect to.
1867
1868        If not specified, the value parsed from the *dsn* argument is used,
1869        or the value of the ``PGDATABASE`` environment variable, or the
1870        computed value of the *user* argument.
1871
1872    :param password:
1873        Password to be used for authentication, if the server requires
1874        one.  If not specified, the value parsed from the *dsn* argument
1875        is used, or the value of the ``PGPASSWORD`` environment variable.
1876        Note that the use of the environment variable is discouraged as
1877        other users and applications may be able to read it without needing
1878        specific privileges.  It is recommended to use *passfile* instead.
1879
1880        Password may be either a string, or a callable that returns a string.
1881        If a callable is provided, it will be called each time a new connection
1882        is established.
1883
1884    :param passfile:
1885        The name of the file used to store passwords
1886        (defaults to ``~/.pgpass``, or ``%APPDATA%\postgresql\pgpass.conf``
1887        on Windows).
1888
1889    :param loop:
1890        An asyncio event loop instance.  If ``None``, the default
1891        event loop will be used.
1892
1893    :param float timeout:
1894        Connection timeout in seconds.
1895
1896    :param int statement_cache_size:
1897        The size of prepared statement LRU cache.  Pass ``0`` to
1898        disable the cache.
1899
1900    :param int max_cached_statement_lifetime:
1901        The maximum time in seconds a prepared statement will stay
1902        in the cache.  Pass ``0`` to allow statements be cached
1903        indefinitely.
1904
1905    :param int max_cacheable_statement_size:
1906        The maximum size of a statement that can be cached (15KiB by
1907        default).  Pass ``0`` to allow all statements to be cached
1908        regardless of their size.
1909
1910    :param float command_timeout:
1911        The default timeout for operations on this connection
1912        (the default is ``None``: no timeout).
1913
1914    :param ssl:
1915        Pass ``True`` or an `ssl.SSLContext <SSLContext_>`_ instance to
1916        require an SSL connection.  If ``True``, a default SSL context
1917        returned by `ssl.create_default_context() <create_default_context_>`_
1918        will be used.  The value can also be one of the following strings:
1919
1920        - ``'disable'`` - SSL is disabled (equivalent to ``False``)
1921        - ``'prefer'`` - try SSL first, fallback to non-SSL connection
1922          if SSL connection fails
1923        - ``'allow'`` - try without SSL first, then retry with SSL if the first
1924          attempt fails.
1925        - ``'require'`` - only try an SSL connection.  Certificate
1926          verification errors are ignored
1927        - ``'verify-ca'`` - only try an SSL connection, and verify
1928          that the server certificate is issued by a trusted certificate
1929          authority (CA)
1930        - ``'verify-full'`` - only try an SSL connection, verify
1931          that the server certificate is issued by a trusted CA and
1932          that the requested server host name matches that in the
1933          certificate.
1934
1935        The default is ``'prefer'``: try an SSL connection and fallback to
1936        non-SSL connection if that fails.
1937
1938        .. note::
1939
1940           *ssl* is ignored for Unix domain socket communication.
1941
1942        Example of programmatic SSL context configuration that is equivalent
1943        to ``sslmode=verify-full&sslcert=..&sslkey=..&sslrootcert=..``:
1944
1945        .. code-block:: pycon
1946
1947            >>> import asyncpg
1948            >>> import asyncio
1949            >>> import ssl
1950            >>> async def main():
1951            ...     # Load CA bundle for server certificate verification,
1952            ...     # equivalent to sslrootcert= in DSN.
1953            ...     sslctx = ssl.create_default_context(
1954            ...         ssl.Purpose.SERVER_AUTH,
1955            ...         cafile="path/to/ca_bundle.pem")
1956            ...     # If True, equivalent to sslmode=verify-full, if False:
1957            ...     # sslmode=verify-ca.
1958            ...     sslctx.check_hostname = True
1959            ...     # Load client certificate and private key for client
1960            ...     # authentication, equivalent to sslcert= and sslkey= in
1961            ...     # DSN.
1962            ...     sslctx.load_cert_chain(
1963            ...         "path/to/client.cert",
1964            ...         keyfile="path/to/client.key",
1965            ...     )
1966            ...     con = await asyncpg.connect(user='postgres', ssl=sslctx)
1967            ...     await con.close()
1968            >>> asyncio.run(run())
1969
1970        Example of programmatic SSL context configuration that is equivalent
1971        to ``sslmode=require`` (no server certificate or host verification):
1972
1973        .. code-block:: pycon
1974
1975            >>> import asyncpg
1976            >>> import asyncio
1977            >>> import ssl
1978            >>> async def main():
1979            ...     sslctx = ssl.create_default_context(
1980            ...         ssl.Purpose.SERVER_AUTH)
1981            ...     sslctx.check_hostname = False
1982            ...     sslctx.verify_mode = ssl.CERT_NONE
1983            ...     con = await asyncpg.connect(user='postgres', ssl=sslctx)
1984            ...     await con.close()
1985            >>> asyncio.run(run())
1986
1987    :param dict server_settings:
1988        An optional dict of server runtime parameters.  Refer to
1989        PostgreSQL documentation for
1990        a `list of supported options <server settings_>`_.
1991
1992    :param type connection_class:
1993        Class of the returned connection object.  Must be a subclass of
1994        :class:`~asyncpg.connection.Connection`.
1995
1996    :param type record_class:
1997        If specified, the class to use for records returned by queries on
1998        this connection object.  Must be a subclass of
1999        :class:`~asyncpg.Record`.
2000
2001    :return: A :class:`~asyncpg.connection.Connection` instance.
2002
2003    Example:
2004
2005    .. code-block:: pycon
2006
2007        >>> import asyncpg
2008        >>> import asyncio
2009        >>> async def run():
2010        ...     con = await asyncpg.connect(user='postgres')
2011        ...     types = await con.fetch('SELECT * FROM pg_type')
2012        ...     print(types)
2013        ...
2014        >>> asyncio.get_event_loop().run_until_complete(run())
2015        [<Record typname='bool' typnamespace=11 ...
2016
2017    .. versionadded:: 0.10.0
2018       Added ``max_cached_statement_use_count`` parameter.
2019
2020    .. versionchanged:: 0.11.0
2021       Removed ability to pass arbitrary keyword arguments to set
2022       server settings.  Added a dedicated parameter ``server_settings``
2023       for that.
2024
2025    .. versionadded:: 0.11.0
2026       Added ``connection_class`` parameter.
2027
2028    .. versionadded:: 0.16.0
2029       Added ``passfile`` parameter
2030       (and support for password files in general).
2031
2032    .. versionadded:: 0.18.0
2033       Added ability to specify multiple hosts in the *dsn*
2034       and *host* arguments.
2035
2036    .. versionchanged:: 0.21.0
2037       The *password* argument now accepts a callable or an async function.
2038
2039    .. versionchanged:: 0.22.0
2040       Added the *record_class* parameter.
2041
2042    .. versionchanged:: 0.22.0
2043       The *ssl* argument now defaults to ``'prefer'``.
2044
2045    .. versionchanged:: 0.24.0
2046       The ``sslcert``, ``sslkey``, ``sslrootcert``, and ``sslcrl`` options
2047       are supported in the *dsn* argument.
2048
2049    .. versionchanged:: 0.25.0
2050       The ``sslpassword``, ``ssl_min_protocol_version``,
2051       and ``ssl_max_protocol_version`` options are supported in the *dsn*
2052       argument.
2053
2054    .. versionchanged:: 0.25.0
2055       Default system root CA certificates won't be loaded when specifying a
2056       particular sslmode, following the same behavior in libpq.
2057
2058    .. versionchanged:: 0.25.0
2059       The ``sslcert``, ``sslkey``, ``sslrootcert``, and ``sslcrl`` options
2060       in the *dsn* argument now have consistent default values of files under
2061       ``~/.postgresql/`` as libpq.
2062
2063    .. _SSLContext: https://docs.python.org/3/library/ssl.html#ssl.SSLContext
2064    .. _create_default_context:
2065        https://docs.python.org/3/library/ssl.html#ssl.create_default_context
2066    .. _server settings:
2067        https://www.postgresql.org/docs/current/static/runtime-config.html
2068    .. _postgres envvars:
2069        https://www.postgresql.org/docs/current/static/libpq-envars.html
2070    .. _libpq connection URI format:
2071        https://www.postgresql.org/docs/current/static/
2072        libpq-connect.html#LIBPQ-CONNSTRING
2073    """
2074    if not issubclass(connection_class, Connection):
2075        raise exceptions.InterfaceError(
2076            'connection_class is expected to be a subclass of '
2077            'asyncpg.Connection, got {!r}'.format(connection_class))
2078
2079    if record_class is not protocol.Record:
2080        _check_record_class(record_class)
2081
2082    if loop is None:
2083        loop = asyncio.get_event_loop()
2084
2085    return await connect_utils._connect(
2086        loop=loop,
2087        timeout=timeout,
2088        connection_class=connection_class,
2089        record_class=record_class,
2090        dsn=dsn,
2091        host=host,
2092        port=port,
2093        user=user,
2094        password=password,
2095        passfile=passfile,
2096        ssl=ssl,
2097        database=database,
2098        server_settings=server_settings,
2099        command_timeout=command_timeout,
2100        statement_cache_size=statement_cache_size,
2101        max_cached_statement_lifetime=max_cached_statement_lifetime,
2102        max_cacheable_statement_size=max_cacheable_statement_size,
2103    )
2104
2105
2106class _StatementCacheEntry:
2107
2108    __slots__ = ('_query', '_statement', '_cache', '_cleanup_cb')
2109
2110    def __init__(self, cache, query, statement):
2111        self._cache = cache
2112        self._query = query
2113        self._statement = statement
2114        self._cleanup_cb = None
2115
2116
2117class _StatementCache:
2118
2119    __slots__ = ('_loop', '_entries', '_max_size', '_on_remove',
2120                 '_max_lifetime')
2121
2122    def __init__(self, *, loop, max_size, on_remove, max_lifetime):
2123        self._loop = loop
2124        self._max_size = max_size
2125        self._on_remove = on_remove
2126        self._max_lifetime = max_lifetime
2127
2128        # We use an OrderedDict for LRU implementation.  Operations:
2129        #
2130        # * We use a simple `__setitem__` to push a new entry:
2131        #       `entries[key] = new_entry`
2132        #   That will push `new_entry` to the *end* of the entries dict.
2133        #
2134        # * When we have a cache hit, we call
2135        #       `entries.move_to_end(key, last=True)`
2136        #   to move the entry to the *end* of the entries dict.
2137        #
2138        # * When we need to remove entries to maintain `max_size`, we call
2139        #       `entries.popitem(last=False)`
2140        #   to remove an entry from the *beginning* of the entries dict.
2141        #
2142        # So new entries and hits are always promoted to the end of the
2143        # entries dict, whereas the unused one will group in the
2144        # beginning of it.
2145        self._entries = collections.OrderedDict()
2146
2147    def __len__(self):
2148        return len(self._entries)
2149
2150    def get_max_size(self):
2151        return self._max_size
2152
2153    def set_max_size(self, new_size):
2154        assert new_size >= 0
2155        self._max_size = new_size
2156        self._maybe_cleanup()
2157
2158    def get_max_lifetime(self):
2159        return self._max_lifetime
2160
2161    def set_max_lifetime(self, new_lifetime):
2162        assert new_lifetime >= 0
2163        self._max_lifetime = new_lifetime
2164        for entry in self._entries.values():
2165            # For every entry cancel the existing callback
2166            # and setup a new one if necessary.
2167            self._set_entry_timeout(entry)
2168
2169    def get(self, query, *, promote=True):
2170        if not self._max_size:
2171            # The cache is disabled.
2172            return
2173
2174        entry = self._entries.get(query)  # type: _StatementCacheEntry
2175        if entry is None:
2176            return
2177
2178        if entry._statement.closed:
2179            # Happens in unittests when we call `stmt._state.mark_closed()`
2180            # manually or when a prepared statement closes itself on type
2181            # cache error.
2182            self._entries.pop(query)
2183            self._clear_entry_callback(entry)
2184            return
2185
2186        if promote:
2187            # `promote` is `False` when `get()` is called by `has()`.
2188            self._entries.move_to_end(query, last=True)
2189
2190        return entry._statement
2191
2192    def has(self, query):
2193        return self.get(query, promote=False) is not None
2194
2195    def put(self, query, statement):
2196        if not self._max_size:
2197            # The cache is disabled.
2198            return
2199
2200        self._entries[query] = self._new_entry(query, statement)
2201
2202        # Check if the cache is bigger than max_size and trim it
2203        # if necessary.
2204        self._maybe_cleanup()
2205
2206    def iter_statements(self):
2207        return (e._statement for e in self._entries.values())
2208
2209    def clear(self):
2210        # Store entries for later.
2211        entries = tuple(self._entries.values())
2212
2213        # Clear the entries dict.
2214        self._entries.clear()
2215
2216        # Make sure that we cancel all scheduled callbacks
2217        # and call on_remove callback for each entry.
2218        for entry in entries:
2219            self._clear_entry_callback(entry)
2220            self._on_remove(entry._statement)
2221
2222    def _set_entry_timeout(self, entry):
2223        # Clear the existing timeout.
2224        self._clear_entry_callback(entry)
2225
2226        # Set the new timeout if it's not 0.
2227        if self._max_lifetime:
2228            entry._cleanup_cb = self._loop.call_later(
2229                self._max_lifetime, self._on_entry_expired, entry)
2230
2231    def _new_entry(self, query, statement):
2232        entry = _StatementCacheEntry(self, query, statement)
2233        self._set_entry_timeout(entry)
2234        return entry
2235
2236    def _on_entry_expired(self, entry):
2237        # `call_later` callback, called when an entry stayed longer
2238        # than `self._max_lifetime`.
2239        if self._entries.get(entry._query) is entry:
2240            self._entries.pop(entry._query)
2241            self._on_remove(entry._statement)
2242
2243    def _clear_entry_callback(self, entry):
2244        if entry._cleanup_cb is not None:
2245            entry._cleanup_cb.cancel()
2246
2247    def _maybe_cleanup(self):
2248        # Delete cache entries until the size of the cache is `max_size`.
2249        while len(self._entries) > self._max_size:
2250            old_query, old_entry = self._entries.popitem(last=False)
2251            self._clear_entry_callback(old_entry)
2252
2253            # Let the connection know that the statement was removed
2254            # from the cache.
2255            self._on_remove(old_entry._statement)
2256
2257
2258class _Callback(typing.NamedTuple):
2259
2260    cb: typing.Callable[..., None]
2261    is_async: bool
2262
2263    @classmethod
2264    def from_callable(cls, cb: typing.Callable[..., None]) -> '_Callback':
2265        if inspect.iscoroutinefunction(cb):
2266            is_async = True
2267        elif callable(cb):
2268            is_async = False
2269        else:
2270            raise exceptions.InterfaceError(
2271                'expected a callable or an `async def` function,'
2272                'got {!r}'.format(cb)
2273            )
2274
2275        return cls(cb, is_async)
2276
2277
2278class _Atomic:
2279    __slots__ = ('_acquired',)
2280
2281    def __init__(self):
2282        self._acquired = 0
2283
2284    def __enter__(self):
2285        if self._acquired:
2286            raise exceptions.InterfaceError(
2287                'cannot perform operation: another operation is in progress')
2288        self._acquired = 1
2289
2290    def __exit__(self, t, e, tb):
2291        self._acquired = 0
2292
2293
2294class _ConnectionProxy:
2295    # Base class to enable `isinstance(Connection)` check.
2296    __slots__ = ()
2297
2298
2299ServerCapabilities = collections.namedtuple(
2300    'ServerCapabilities',
2301    ['advisory_locks', 'notifications', 'plpgsql', 'sql_reset',
2302     'sql_close_all'])
2303ServerCapabilities.__doc__ = 'PostgreSQL server capabilities.'
2304
2305
2306def _detect_server_capabilities(server_version, connection_settings):
2307    if hasattr(connection_settings, 'padb_revision'):
2308        # Amazon Redshift detected.
2309        advisory_locks = False
2310        notifications = False
2311        plpgsql = False
2312        sql_reset = True
2313        sql_close_all = False
2314    elif hasattr(connection_settings, 'crdb_version'):
2315        # CockroachDB detected.
2316        advisory_locks = False
2317        notifications = False
2318        plpgsql = False
2319        sql_reset = False
2320        sql_close_all = False
2321    elif hasattr(connection_settings, 'crate_version'):
2322        # CrateDB detected.
2323        advisory_locks = False
2324        notifications = False
2325        plpgsql = False
2326        sql_reset = False
2327        sql_close_all = False
2328    else:
2329        # Standard PostgreSQL server assumed.
2330        advisory_locks = True
2331        notifications = True
2332        plpgsql = True
2333        sql_reset = True
2334        sql_close_all = True
2335
2336    return ServerCapabilities(
2337        advisory_locks=advisory_locks,
2338        notifications=notifications,
2339        plpgsql=plpgsql,
2340        sql_reset=sql_reset,
2341        sql_close_all=sql_close_all
2342    )
2343
2344
2345def _extract_stack(limit=10):
2346    """Replacement for traceback.extract_stack() that only does the
2347    necessary work for asyncio debug mode.
2348    """
2349    frame = sys._getframe().f_back
2350    try:
2351        stack = traceback.StackSummary.extract(
2352            traceback.walk_stack(frame), lookup_lines=False)
2353    finally:
2354        del frame
2355
2356    apg_path = asyncpg.__path__[0]
2357    i = 0
2358    while i < len(stack) and stack[i][0].startswith(apg_path):
2359        i += 1
2360    stack = stack[i:i + limit]
2361
2362    stack.reverse()
2363    return ''.join(traceback.format_list(stack))
2364
2365
2366def _check_record_class(record_class):
2367    if record_class is protocol.Record:
2368        pass
2369    elif (
2370        isinstance(record_class, type)
2371        and issubclass(record_class, protocol.Record)
2372    ):
2373        if (
2374            record_class.__new__ is not object.__new__
2375            or record_class.__init__ is not object.__init__
2376        ):
2377            raise exceptions.InterfaceError(
2378                'record_class must not redefine __new__ or __init__'
2379            )
2380    else:
2381        raise exceptions.InterfaceError(
2382            'record_class is expected to be a subclass of '
2383            'asyncpg.Record, got {!r}'.format(record_class)
2384        )
2385
2386
2387def _weak_maybe_gc_stmt(weak_ref, stmt):
2388    self = weak_ref()
2389    if self is not None:
2390        self._maybe_gc_stmt(stmt)
2391
2392
2393_uid = 0
2394