1# Copyright (C) 2016-present the asyncpg authors and contributors 2# <see AUTHORS file> 3# 4# This module is part of asyncpg and is released under 5# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0 6 7 8import asyncio 9import asyncpg 10import collections 11import collections.abc 12import functools 13import itertools 14import inspect 15import os 16import sys 17import time 18import traceback 19import typing 20import warnings 21import weakref 22 23from . import compat 24from . import connect_utils 25from . import cursor 26from . import exceptions 27from . import introspection 28from . import prepared_stmt 29from . import protocol 30from . import serverversion 31from . import transaction 32from . import utils 33 34 35class ConnectionMeta(type): 36 37 def __instancecheck__(cls, instance): 38 mro = type(instance).__mro__ 39 return Connection in mro or _ConnectionProxy in mro 40 41 42class Connection(metaclass=ConnectionMeta): 43 """A representation of a database session. 44 45 Connections are created by calling :func:`~asyncpg.connection.connect`. 46 """ 47 48 __slots__ = ('_protocol', '_transport', '_loop', 49 '_top_xact', '_aborted', 50 '_pool_release_ctr', '_stmt_cache', '_stmts_to_close', 51 '_listeners', '_server_version', '_server_caps', 52 '_intro_query', '_reset_query', '_proxy', 53 '_stmt_exclusive_section', '_config', '_params', '_addr', 54 '_log_listeners', '_termination_listeners', '_cancellations', 55 '_source_traceback', '__weakref__') 56 57 def __init__(self, protocol, transport, loop, 58 addr, 59 config: connect_utils._ClientConfiguration, 60 params: connect_utils._ConnectionParameters): 61 self._protocol = protocol 62 self._transport = transport 63 self._loop = loop 64 self._top_xact = None 65 self._aborted = False 66 # Incremented every time the connection is released back to a pool. 67 # Used to catch invalid references to connection-related resources 68 # post-release (e.g. explicit prepared statements). 69 self._pool_release_ctr = 0 70 71 self._addr = addr 72 self._config = config 73 self._params = params 74 75 self._stmt_cache = _StatementCache( 76 loop=loop, 77 max_size=config.statement_cache_size, 78 on_remove=functools.partial( 79 _weak_maybe_gc_stmt, weakref.ref(self)), 80 max_lifetime=config.max_cached_statement_lifetime) 81 82 self._stmts_to_close = set() 83 84 self._listeners = {} 85 self._log_listeners = set() 86 self._cancellations = set() 87 self._termination_listeners = set() 88 89 settings = self._protocol.get_settings() 90 ver_string = settings.server_version 91 self._server_version = \ 92 serverversion.split_server_version_string(ver_string) 93 94 self._server_caps = _detect_server_capabilities( 95 self._server_version, settings) 96 97 if self._server_version < (14, 0): 98 self._intro_query = introspection.INTRO_LOOKUP_TYPES_13 99 else: 100 self._intro_query = introspection.INTRO_LOOKUP_TYPES 101 102 self._reset_query = None 103 self._proxy = None 104 105 # Used to serialize operations that might involve anonymous 106 # statements. Specifically, we want to make the following 107 # operation atomic: 108 # ("prepare an anonymous statement", "use the statement") 109 # 110 # Used for `con.fetchval()`, `con.fetch()`, `con.fetchrow()`, 111 # `con.execute()`, and `con.executemany()`. 112 self._stmt_exclusive_section = _Atomic() 113 114 if loop.get_debug(): 115 self._source_traceback = _extract_stack() 116 else: 117 self._source_traceback = None 118 119 def __del__(self): 120 if not self.is_closed() and self._protocol is not None: 121 if self._source_traceback: 122 msg = "unclosed connection {!r}; created at:\n {}".format( 123 self, self._source_traceback) 124 else: 125 msg = ( 126 "unclosed connection {!r}; run in asyncio debug " 127 "mode to show the traceback of connection " 128 "origin".format(self) 129 ) 130 131 warnings.warn(msg, ResourceWarning) 132 if not self._loop.is_closed(): 133 self.terminate() 134 135 async def add_listener(self, channel, callback): 136 """Add a listener for Postgres notifications. 137 138 :param str channel: Channel to listen on. 139 140 :param callable callback: 141 A callable or a coroutine function receiving the following 142 arguments: 143 **connection**: a Connection the callback is registered with; 144 **pid**: PID of the Postgres server that sent the notification; 145 **channel**: name of the channel the notification was sent to; 146 **payload**: the payload. 147 148 .. versionchanged:: 0.24.0 149 The ``callback`` argument may be a coroutine function. 150 """ 151 self._check_open() 152 if channel not in self._listeners: 153 await self.fetch('LISTEN {}'.format(utils._quote_ident(channel))) 154 self._listeners[channel] = set() 155 self._listeners[channel].add(_Callback.from_callable(callback)) 156 157 async def remove_listener(self, channel, callback): 158 """Remove a listening callback on the specified channel.""" 159 if self.is_closed(): 160 return 161 if channel not in self._listeners: 162 return 163 cb = _Callback.from_callable(callback) 164 if cb not in self._listeners[channel]: 165 return 166 self._listeners[channel].remove(cb) 167 if not self._listeners[channel]: 168 del self._listeners[channel] 169 await self.fetch('UNLISTEN {}'.format(utils._quote_ident(channel))) 170 171 def add_log_listener(self, callback): 172 """Add a listener for Postgres log messages. 173 174 It will be called when asyncronous NoticeResponse is received 175 from the connection. Possible message types are: WARNING, NOTICE, 176 DEBUG, INFO, or LOG. 177 178 :param callable callback: 179 A callable or a coroutine function receiving the following 180 arguments: 181 **connection**: a Connection the callback is registered with; 182 **message**: the `exceptions.PostgresLogMessage` message. 183 184 .. versionadded:: 0.12.0 185 186 .. versionchanged:: 0.24.0 187 The ``callback`` argument may be a coroutine function. 188 """ 189 if self.is_closed(): 190 raise exceptions.InterfaceError('connection is closed') 191 self._log_listeners.add(_Callback.from_callable(callback)) 192 193 def remove_log_listener(self, callback): 194 """Remove a listening callback for log messages. 195 196 .. versionadded:: 0.12.0 197 """ 198 self._log_listeners.discard(_Callback.from_callable(callback)) 199 200 def add_termination_listener(self, callback): 201 """Add a listener that will be called when the connection is closed. 202 203 :param callable callback: 204 A callable or a coroutine function receiving one argument: 205 **connection**: a Connection the callback is registered with. 206 207 .. versionadded:: 0.21.0 208 209 .. versionchanged:: 0.24.0 210 The ``callback`` argument may be a coroutine function. 211 """ 212 self._termination_listeners.add(_Callback.from_callable(callback)) 213 214 def remove_termination_listener(self, callback): 215 """Remove a listening callback for connection termination. 216 217 :param callable callback: 218 The callable or coroutine function that was passed to 219 :meth:`Connection.add_termination_listener`. 220 221 .. versionadded:: 0.21.0 222 """ 223 self._termination_listeners.discard(_Callback.from_callable(callback)) 224 225 def get_server_pid(self): 226 """Return the PID of the Postgres server the connection is bound to.""" 227 return self._protocol.get_server_pid() 228 229 def get_server_version(self): 230 """Return the version of the connected PostgreSQL server. 231 232 The returned value is a named tuple similar to that in 233 ``sys.version_info``: 234 235 .. code-block:: pycon 236 237 >>> con.get_server_version() 238 ServerVersion(major=9, minor=6, micro=1, 239 releaselevel='final', serial=0) 240 241 .. versionadded:: 0.8.0 242 """ 243 return self._server_version 244 245 def get_settings(self): 246 """Return connection settings. 247 248 :return: :class:`~asyncpg.ConnectionSettings`. 249 """ 250 return self._protocol.get_settings() 251 252 def transaction(self, *, isolation=None, readonly=False, 253 deferrable=False): 254 """Create a :class:`~transaction.Transaction` object. 255 256 Refer to `PostgreSQL documentation`_ on the meaning of transaction 257 parameters. 258 259 :param isolation: Transaction isolation mode, can be one of: 260 `'serializable'`, `'repeatable_read'`, 261 `'read_committed'`. If not specified, the behavior 262 is up to the server and session, which is usually 263 ``read_committed``. 264 265 :param readonly: Specifies whether or not this transaction is 266 read-only. 267 268 :param deferrable: Specifies whether or not this transaction is 269 deferrable. 270 271 .. _`PostgreSQL documentation`: 272 https://www.postgresql.org/docs/ 273 current/static/sql-set-transaction.html 274 """ 275 self._check_open() 276 return transaction.Transaction(self, isolation, readonly, deferrable) 277 278 def is_in_transaction(self): 279 """Return True if Connection is currently inside a transaction. 280 281 :return bool: True if inside transaction, False otherwise. 282 283 .. versionadded:: 0.16.0 284 """ 285 return self._protocol.is_in_transaction() 286 287 async def execute(self, query: str, *args, timeout: float=None) -> str: 288 """Execute an SQL command (or commands). 289 290 This method can execute many SQL commands at once, when no arguments 291 are provided. 292 293 Example: 294 295 .. code-block:: pycon 296 297 >>> await con.execute(''' 298 ... CREATE TABLE mytab (a int); 299 ... INSERT INTO mytab (a) VALUES (100), (200), (300); 300 ... ''') 301 INSERT 0 3 302 303 >>> await con.execute(''' 304 ... INSERT INTO mytab (a) VALUES ($1), ($2) 305 ... ''', 10, 20) 306 INSERT 0 2 307 308 :param args: Query arguments. 309 :param float timeout: Optional timeout value in seconds. 310 :return str: Status of the last SQL command. 311 312 .. versionchanged:: 0.5.4 313 Made it possible to pass query arguments. 314 """ 315 self._check_open() 316 317 if not args: 318 return await self._protocol.query(query, timeout) 319 320 _, status, _ = await self._execute( 321 query, 322 args, 323 0, 324 timeout, 325 return_status=True, 326 ) 327 return status.decode() 328 329 async def executemany(self, command: str, args, *, timeout: float=None): 330 """Execute an SQL *command* for each sequence of arguments in *args*. 331 332 Example: 333 334 .. code-block:: pycon 335 336 >>> await con.executemany(''' 337 ... INSERT INTO mytab (a) VALUES ($1, $2, $3); 338 ... ''', [(1, 2, 3), (4, 5, 6)]) 339 340 :param command: Command to execute. 341 :param args: An iterable containing sequences of arguments. 342 :param float timeout: Optional timeout value in seconds. 343 :return None: This method discards the results of the operations. 344 345 .. versionadded:: 0.7.0 346 347 .. versionchanged:: 0.11.0 348 `timeout` became a keyword-only parameter. 349 350 .. versionchanged:: 0.22.0 351 ``executemany()`` is now an atomic operation, which means that 352 either all executions succeed, or none at all. This is in contrast 353 to prior versions, where the effect of already-processed iterations 354 would remain in place when an error has occurred, unless 355 ``executemany()`` was called in a transaction. 356 """ 357 self._check_open() 358 return await self._executemany(command, args, timeout) 359 360 async def _get_statement( 361 self, 362 query, 363 timeout, 364 *, 365 named=False, 366 use_cache=True, 367 ignore_custom_codec=False, 368 record_class=None 369 ): 370 if record_class is None: 371 record_class = self._protocol.get_record_class() 372 else: 373 _check_record_class(record_class) 374 375 if use_cache: 376 statement = self._stmt_cache.get( 377 (query, record_class, ignore_custom_codec) 378 ) 379 if statement is not None: 380 return statement 381 382 # Only use the cache when: 383 # * `statement_cache_size` is greater than 0; 384 # * query size is less than `max_cacheable_statement_size`. 385 use_cache = self._stmt_cache.get_max_size() > 0 386 if (use_cache and 387 self._config.max_cacheable_statement_size and 388 len(query) > self._config.max_cacheable_statement_size): 389 use_cache = False 390 391 if isinstance(named, str): 392 stmt_name = named 393 elif use_cache or named: 394 stmt_name = self._get_unique_id('stmt') 395 else: 396 stmt_name = '' 397 398 statement = await self._protocol.prepare( 399 stmt_name, 400 query, 401 timeout, 402 record_class=record_class, 403 ignore_custom_codec=ignore_custom_codec, 404 ) 405 need_reprepare = False 406 types_with_missing_codecs = statement._init_types() 407 tries = 0 408 while types_with_missing_codecs: 409 settings = self._protocol.get_settings() 410 411 # Introspect newly seen types and populate the 412 # codec cache. 413 types, intro_stmt = await self._introspect_types( 414 types_with_missing_codecs, timeout) 415 416 settings.register_data_types(types) 417 418 # The introspection query has used an anonymous statement, 419 # which has blown away the anonymous statement we've prepared 420 # for the query, so we need to re-prepare it. 421 need_reprepare = not intro_stmt.name and not statement.name 422 types_with_missing_codecs = statement._init_types() 423 tries += 1 424 if tries > 5: 425 # In the vast majority of cases there will be only 426 # one iteration. In rare cases, there might be a race 427 # with reload_schema_state(), which would cause a 428 # second try. More than five is clearly a bug. 429 raise exceptions.InternalClientError( 430 'could not resolve query result and/or argument types ' 431 'in {} attempts'.format(tries) 432 ) 433 434 # Now that types have been resolved, populate the codec pipeline 435 # for the statement. 436 statement._init_codecs() 437 438 if need_reprepare: 439 await self._protocol.prepare( 440 stmt_name, 441 query, 442 timeout, 443 state=statement, 444 record_class=record_class, 445 ) 446 447 if use_cache: 448 self._stmt_cache.put( 449 (query, record_class, ignore_custom_codec), statement) 450 451 # If we've just created a new statement object, check if there 452 # are any statements for GC. 453 if self._stmts_to_close: 454 await self._cleanup_stmts() 455 456 return statement 457 458 async def _introspect_types(self, typeoids, timeout): 459 return await self.__execute( 460 self._intro_query, 461 (list(typeoids),), 462 0, 463 timeout, 464 ignore_custom_codec=True, 465 ) 466 467 async def _introspect_type(self, typename, schema): 468 if ( 469 schema == 'pg_catalog' 470 and typename.lower() in protocol.BUILTIN_TYPE_NAME_MAP 471 ): 472 typeoid = protocol.BUILTIN_TYPE_NAME_MAP[typename.lower()] 473 rows = await self._execute( 474 introspection.TYPE_BY_OID, 475 [typeoid], 476 limit=0, 477 timeout=None, 478 ignore_custom_codec=True, 479 ) 480 else: 481 rows = await self._execute( 482 introspection.TYPE_BY_NAME, 483 [typename, schema], 484 limit=1, 485 timeout=None, 486 ignore_custom_codec=True, 487 ) 488 489 if not rows: 490 raise ValueError( 491 'unknown type: {}.{}'.format(schema, typename)) 492 493 return rows[0] 494 495 def cursor( 496 self, 497 query, 498 *args, 499 prefetch=None, 500 timeout=None, 501 record_class=None 502 ): 503 """Return a *cursor factory* for the specified query. 504 505 :param args: 506 Query arguments. 507 :param int prefetch: 508 The number of rows the *cursor iterator* 509 will prefetch (defaults to ``50``.) 510 :param float timeout: 511 Optional timeout in seconds. 512 :param type record_class: 513 If specified, the class to use for records returned by this cursor. 514 Must be a subclass of :class:`~asyncpg.Record`. If not specified, 515 a per-connection *record_class* is used. 516 517 :return: 518 A :class:`~cursor.CursorFactory` object. 519 520 .. versionchanged:: 0.22.0 521 Added the *record_class* parameter. 522 """ 523 self._check_open() 524 return cursor.CursorFactory( 525 self, 526 query, 527 None, 528 args, 529 prefetch, 530 timeout, 531 record_class, 532 ) 533 534 async def prepare( 535 self, 536 query, 537 *, 538 name=None, 539 timeout=None, 540 record_class=None, 541 ): 542 """Create a *prepared statement* for the specified query. 543 544 :param str query: 545 Text of the query to create a prepared statement for. 546 :param str name: 547 Optional name of the returned prepared statement. If not 548 specified, the name is auto-generated. 549 :param float timeout: 550 Optional timeout value in seconds. 551 :param type record_class: 552 If specified, the class to use for records returned by the 553 prepared statement. Must be a subclass of 554 :class:`~asyncpg.Record`. If not specified, a per-connection 555 *record_class* is used. 556 557 :return: 558 A :class:`~prepared_stmt.PreparedStatement` instance. 559 560 .. versionchanged:: 0.22.0 561 Added the *record_class* parameter. 562 563 .. versionchanged:: 0.25.0 564 Added the *name* parameter. 565 """ 566 return await self._prepare( 567 query, 568 name=name, 569 timeout=timeout, 570 use_cache=False, 571 record_class=record_class, 572 ) 573 574 async def _prepare( 575 self, 576 query, 577 *, 578 name=None, 579 timeout=None, 580 use_cache: bool=False, 581 record_class=None 582 ): 583 self._check_open() 584 stmt = await self._get_statement( 585 query, 586 timeout, 587 named=True if name is None else name, 588 use_cache=use_cache, 589 record_class=record_class, 590 ) 591 return prepared_stmt.PreparedStatement(self, query, stmt) 592 593 async def fetch( 594 self, 595 query, 596 *args, 597 timeout=None, 598 record_class=None 599 ) -> list: 600 """Run a query and return the results as a list of :class:`Record`. 601 602 :param str query: 603 Query text. 604 :param args: 605 Query arguments. 606 :param float timeout: 607 Optional timeout value in seconds. 608 :param type record_class: 609 If specified, the class to use for records returned by this method. 610 Must be a subclass of :class:`~asyncpg.Record`. If not specified, 611 a per-connection *record_class* is used. 612 613 :return list: 614 A list of :class:`~asyncpg.Record` instances. If specified, the 615 actual type of list elements would be *record_class*. 616 617 .. versionchanged:: 0.22.0 618 Added the *record_class* parameter. 619 """ 620 self._check_open() 621 return await self._execute( 622 query, 623 args, 624 0, 625 timeout, 626 record_class=record_class, 627 ) 628 629 async def fetchval(self, query, *args, column=0, timeout=None): 630 """Run a query and return a value in the first row. 631 632 :param str query: Query text. 633 :param args: Query arguments. 634 :param int column: Numeric index within the record of the value to 635 return (defaults to 0). 636 :param float timeout: Optional timeout value in seconds. 637 If not specified, defaults to the value of 638 ``command_timeout`` argument to the ``Connection`` 639 instance constructor. 640 641 :return: The value of the specified column of the first record, or 642 None if no records were returned by the query. 643 """ 644 self._check_open() 645 data = await self._execute(query, args, 1, timeout) 646 if not data: 647 return None 648 return data[0][column] 649 650 async def fetchrow( 651 self, 652 query, 653 *args, 654 timeout=None, 655 record_class=None 656 ): 657 """Run a query and return the first row. 658 659 :param str query: 660 Query text 661 :param args: 662 Query arguments 663 :param float timeout: 664 Optional timeout value in seconds. 665 :param type record_class: 666 If specified, the class to use for the value returned by this 667 method. Must be a subclass of :class:`~asyncpg.Record`. 668 If not specified, a per-connection *record_class* is used. 669 670 :return: 671 The first row as a :class:`~asyncpg.Record` instance, or None if 672 no records were returned by the query. If specified, 673 *record_class* is used as the type for the result value. 674 675 .. versionchanged:: 0.22.0 676 Added the *record_class* parameter. 677 """ 678 self._check_open() 679 data = await self._execute( 680 query, 681 args, 682 1, 683 timeout, 684 record_class=record_class, 685 ) 686 if not data: 687 return None 688 return data[0] 689 690 async def copy_from_table(self, table_name, *, output, 691 columns=None, schema_name=None, timeout=None, 692 format=None, oids=None, delimiter=None, 693 null=None, header=None, quote=None, 694 escape=None, force_quote=None, encoding=None): 695 """Copy table contents to a file or file-like object. 696 697 :param str table_name: 698 The name of the table to copy data from. 699 700 :param output: 701 A :term:`path-like object <python:path-like object>`, 702 or a :term:`file-like object <python:file-like object>`, or 703 a :term:`coroutine function <python:coroutine function>` 704 that takes a ``bytes`` instance as a sole argument. 705 706 :param list columns: 707 An optional list of column names to copy. 708 709 :param str schema_name: 710 An optional schema name to qualify the table. 711 712 :param float timeout: 713 Optional timeout value in seconds. 714 715 The remaining keyword arguments are ``COPY`` statement options, 716 see `COPY statement documentation`_ for details. 717 718 :return: The status string of the COPY command. 719 720 Example: 721 722 .. code-block:: pycon 723 724 >>> import asyncpg 725 >>> import asyncio 726 >>> async def run(): 727 ... con = await asyncpg.connect(user='postgres') 728 ... result = await con.copy_from_table( 729 ... 'mytable', columns=('foo', 'bar'), 730 ... output='file.csv', format='csv') 731 ... print(result) 732 ... 733 >>> asyncio.get_event_loop().run_until_complete(run()) 734 'COPY 100' 735 736 .. _`COPY statement documentation`: 737 https://www.postgresql.org/docs/current/static/sql-copy.html 738 739 .. versionadded:: 0.11.0 740 """ 741 tabname = utils._quote_ident(table_name) 742 if schema_name: 743 tabname = utils._quote_ident(schema_name) + '.' + tabname 744 745 if columns: 746 cols = '({})'.format( 747 ', '.join(utils._quote_ident(c) for c in columns)) 748 else: 749 cols = '' 750 751 opts = self._format_copy_opts( 752 format=format, oids=oids, delimiter=delimiter, 753 null=null, header=header, quote=quote, escape=escape, 754 force_quote=force_quote, encoding=encoding 755 ) 756 757 copy_stmt = 'COPY {tab}{cols} TO STDOUT {opts}'.format( 758 tab=tabname, cols=cols, opts=opts) 759 760 return await self._copy_out(copy_stmt, output, timeout) 761 762 async def copy_from_query(self, query, *args, output, 763 timeout=None, format=None, oids=None, 764 delimiter=None, null=None, header=None, 765 quote=None, escape=None, force_quote=None, 766 encoding=None): 767 """Copy the results of a query to a file or file-like object. 768 769 :param str query: 770 The query to copy the results of. 771 772 :param args: 773 Query arguments. 774 775 :param output: 776 A :term:`path-like object <python:path-like object>`, 777 or a :term:`file-like object <python:file-like object>`, or 778 a :term:`coroutine function <python:coroutine function>` 779 that takes a ``bytes`` instance as a sole argument. 780 781 :param float timeout: 782 Optional timeout value in seconds. 783 784 The remaining keyword arguments are ``COPY`` statement options, 785 see `COPY statement documentation`_ for details. 786 787 :return: The status string of the COPY command. 788 789 Example: 790 791 .. code-block:: pycon 792 793 >>> import asyncpg 794 >>> import asyncio 795 >>> async def run(): 796 ... con = await asyncpg.connect(user='postgres') 797 ... result = await con.copy_from_query( 798 ... 'SELECT foo, bar FROM mytable WHERE foo > $1', 10, 799 ... output='file.csv', format='csv') 800 ... print(result) 801 ... 802 >>> asyncio.get_event_loop().run_until_complete(run()) 803 'COPY 10' 804 805 .. _`COPY statement documentation`: 806 https://www.postgresql.org/docs/current/static/sql-copy.html 807 808 .. versionadded:: 0.11.0 809 """ 810 opts = self._format_copy_opts( 811 format=format, oids=oids, delimiter=delimiter, 812 null=null, header=header, quote=quote, escape=escape, 813 force_quote=force_quote, encoding=encoding 814 ) 815 816 if args: 817 query = await utils._mogrify(self, query, args) 818 819 copy_stmt = 'COPY ({query}) TO STDOUT {opts}'.format( 820 query=query, opts=opts) 821 822 return await self._copy_out(copy_stmt, output, timeout) 823 824 async def copy_to_table(self, table_name, *, source, 825 columns=None, schema_name=None, timeout=None, 826 format=None, oids=None, freeze=None, 827 delimiter=None, null=None, header=None, 828 quote=None, escape=None, force_quote=None, 829 force_not_null=None, force_null=None, 830 encoding=None): 831 """Copy data to the specified table. 832 833 :param str table_name: 834 The name of the table to copy data to. 835 836 :param source: 837 A :term:`path-like object <python:path-like object>`, 838 or a :term:`file-like object <python:file-like object>`, or 839 an :term:`asynchronous iterable <python:asynchronous iterable>` 840 that returns ``bytes``, or an object supporting the 841 :ref:`buffer protocol <python:bufferobjects>`. 842 843 :param list columns: 844 An optional list of column names to copy. 845 846 :param str schema_name: 847 An optional schema name to qualify the table. 848 849 :param float timeout: 850 Optional timeout value in seconds. 851 852 The remaining keyword arguments are ``COPY`` statement options, 853 see `COPY statement documentation`_ for details. 854 855 :return: The status string of the COPY command. 856 857 Example: 858 859 .. code-block:: pycon 860 861 >>> import asyncpg 862 >>> import asyncio 863 >>> async def run(): 864 ... con = await asyncpg.connect(user='postgres') 865 ... result = await con.copy_to_table( 866 ... 'mytable', source='datafile.tbl') 867 ... print(result) 868 ... 869 >>> asyncio.get_event_loop().run_until_complete(run()) 870 'COPY 140000' 871 872 .. _`COPY statement documentation`: 873 https://www.postgresql.org/docs/current/static/sql-copy.html 874 875 .. versionadded:: 0.11.0 876 """ 877 tabname = utils._quote_ident(table_name) 878 if schema_name: 879 tabname = utils._quote_ident(schema_name) + '.' + tabname 880 881 if columns: 882 cols = '({})'.format( 883 ', '.join(utils._quote_ident(c) for c in columns)) 884 else: 885 cols = '' 886 887 opts = self._format_copy_opts( 888 format=format, oids=oids, freeze=freeze, delimiter=delimiter, 889 null=null, header=header, quote=quote, escape=escape, 890 force_not_null=force_not_null, force_null=force_null, 891 encoding=encoding 892 ) 893 894 copy_stmt = 'COPY {tab}{cols} FROM STDIN {opts}'.format( 895 tab=tabname, cols=cols, opts=opts) 896 897 return await self._copy_in(copy_stmt, source, timeout) 898 899 async def copy_records_to_table(self, table_name, *, records, 900 columns=None, schema_name=None, 901 timeout=None): 902 """Copy a list of records to the specified table using binary COPY. 903 904 :param str table_name: 905 The name of the table to copy data to. 906 907 :param records: 908 An iterable returning row tuples to copy into the table. 909 :term:`Asynchronous iterables <python:asynchronous iterable>` 910 are also supported. 911 912 :param list columns: 913 An optional list of column names to copy. 914 915 :param str schema_name: 916 An optional schema name to qualify the table. 917 918 :param float timeout: 919 Optional timeout value in seconds. 920 921 :return: The status string of the COPY command. 922 923 Example: 924 925 .. code-block:: pycon 926 927 >>> import asyncpg 928 >>> import asyncio 929 >>> async def run(): 930 ... con = await asyncpg.connect(user='postgres') 931 ... result = await con.copy_records_to_table( 932 ... 'mytable', records=[ 933 ... (1, 'foo', 'bar'), 934 ... (2, 'ham', 'spam')]) 935 ... print(result) 936 ... 937 >>> asyncio.get_event_loop().run_until_complete(run()) 938 'COPY 2' 939 940 Asynchronous record iterables are also supported: 941 942 .. code-block:: pycon 943 944 >>> import asyncpg 945 >>> import asyncio 946 >>> async def run(): 947 ... con = await asyncpg.connect(user='postgres') 948 ... async def record_gen(size): 949 ... for i in range(size): 950 ... yield (i,) 951 ... result = await con.copy_records_to_table( 952 ... 'mytable', records=record_gen(100)) 953 ... print(result) 954 ... 955 >>> asyncio.get_event_loop().run_until_complete(run()) 956 'COPY 100' 957 958 .. versionadded:: 0.11.0 959 960 .. versionchanged:: 0.24.0 961 The ``records`` argument may be an asynchronous iterable. 962 """ 963 tabname = utils._quote_ident(table_name) 964 if schema_name: 965 tabname = utils._quote_ident(schema_name) + '.' + tabname 966 967 if columns: 968 col_list = ', '.join(utils._quote_ident(c) for c in columns) 969 cols = '({})'.format(col_list) 970 else: 971 col_list = '*' 972 cols = '' 973 974 intro_query = 'SELECT {cols} FROM {tab} LIMIT 1'.format( 975 tab=tabname, cols=col_list) 976 977 intro_ps = await self._prepare(intro_query, use_cache=True) 978 979 opts = '(FORMAT binary)' 980 981 copy_stmt = 'COPY {tab}{cols} FROM STDIN {opts}'.format( 982 tab=tabname, cols=cols, opts=opts) 983 984 return await self._protocol.copy_in( 985 copy_stmt, None, None, records, intro_ps._state, timeout) 986 987 def _format_copy_opts(self, *, format=None, oids=None, freeze=None, 988 delimiter=None, null=None, header=None, quote=None, 989 escape=None, force_quote=None, force_not_null=None, 990 force_null=None, encoding=None): 991 kwargs = dict(locals()) 992 kwargs.pop('self') 993 opts = [] 994 995 if force_quote is not None and isinstance(force_quote, bool): 996 kwargs.pop('force_quote') 997 if force_quote: 998 opts.append('FORCE_QUOTE *') 999 1000 for k, v in kwargs.items(): 1001 if v is not None: 1002 if k in ('force_not_null', 'force_null', 'force_quote'): 1003 v = '(' + ', '.join(utils._quote_ident(c) for c in v) + ')' 1004 elif k in ('oids', 'freeze', 'header'): 1005 v = str(v) 1006 else: 1007 v = utils._quote_literal(v) 1008 1009 opts.append('{} {}'.format(k.upper(), v)) 1010 1011 if opts: 1012 return '(' + ', '.join(opts) + ')' 1013 else: 1014 return '' 1015 1016 async def _copy_out(self, copy_stmt, output, timeout): 1017 try: 1018 path = os.fspath(output) 1019 except TypeError: 1020 # output is not a path-like object 1021 path = None 1022 1023 writer = None 1024 opened_by_us = False 1025 run_in_executor = self._loop.run_in_executor 1026 1027 if path is not None: 1028 # a path 1029 f = await run_in_executor(None, open, path, 'wb') 1030 opened_by_us = True 1031 elif hasattr(output, 'write'): 1032 # file-like 1033 f = output 1034 elif callable(output): 1035 # assuming calling output returns an awaitable. 1036 writer = output 1037 else: 1038 raise TypeError( 1039 'output is expected to be a file-like object, ' 1040 'a path-like object or a coroutine function, ' 1041 'not {}'.format(type(output).__name__) 1042 ) 1043 1044 if writer is None: 1045 async def _writer(data): 1046 await run_in_executor(None, f.write, data) 1047 writer = _writer 1048 1049 try: 1050 return await self._protocol.copy_out(copy_stmt, writer, timeout) 1051 finally: 1052 if opened_by_us: 1053 f.close() 1054 1055 async def _copy_in(self, copy_stmt, source, timeout): 1056 try: 1057 path = os.fspath(source) 1058 except TypeError: 1059 # source is not a path-like object 1060 path = None 1061 1062 f = None 1063 reader = None 1064 data = None 1065 opened_by_us = False 1066 run_in_executor = self._loop.run_in_executor 1067 1068 if path is not None: 1069 # a path 1070 f = await run_in_executor(None, open, path, 'rb') 1071 opened_by_us = True 1072 elif hasattr(source, 'read'): 1073 # file-like 1074 f = source 1075 elif isinstance(source, collections.abc.AsyncIterable): 1076 # assuming calling output returns an awaitable. 1077 # copy_in() is designed to handle very large amounts of data, and 1078 # the source async iterable is allowed to return an arbitrary 1079 # amount of data on every iteration. 1080 reader = source 1081 else: 1082 # assuming source is an instance supporting the buffer protocol. 1083 data = source 1084 1085 if f is not None: 1086 # Copying from a file-like object. 1087 class _Reader: 1088 def __aiter__(self): 1089 return self 1090 1091 async def __anext__(self): 1092 data = await run_in_executor(None, f.read, 524288) 1093 if len(data) == 0: 1094 raise StopAsyncIteration 1095 else: 1096 return data 1097 1098 reader = _Reader() 1099 1100 try: 1101 return await self._protocol.copy_in( 1102 copy_stmt, reader, data, None, None, timeout) 1103 finally: 1104 if opened_by_us: 1105 await run_in_executor(None, f.close) 1106 1107 async def set_type_codec(self, typename, *, 1108 schema='public', encoder, decoder, 1109 format='text'): 1110 """Set an encoder/decoder pair for the specified data type. 1111 1112 :param typename: 1113 Name of the data type the codec is for. 1114 1115 :param schema: 1116 Schema name of the data type the codec is for 1117 (defaults to ``'public'``) 1118 1119 :param format: 1120 The type of the argument received by the *decoder* callback, 1121 and the type of the *encoder* callback return value. 1122 1123 If *format* is ``'text'`` (the default), the exchange datum is a 1124 ``str`` instance containing valid text representation of the 1125 data type. 1126 1127 If *format* is ``'binary'``, the exchange datum is a ``bytes`` 1128 instance containing valid _binary_ representation of the 1129 data type. 1130 1131 If *format* is ``'tuple'``, the exchange datum is a type-specific 1132 ``tuple`` of values. The table below lists supported data 1133 types and their format for this mode. 1134 1135 +-----------------+---------------------------------------------+ 1136 | Type | Tuple layout | 1137 +=================+=============================================+ 1138 | ``interval`` | (``months``, ``days``, ``microseconds``) | 1139 +-----------------+---------------------------------------------+ 1140 | ``date`` | (``date ordinal relative to Jan 1 2000``,) | 1141 | | ``-2^31`` for negative infinity timestamp | 1142 | | ``2^31-1`` for positive infinity timestamp. | 1143 +-----------------+---------------------------------------------+ 1144 | ``timestamp`` | (``microseconds relative to Jan 1 2000``,) | 1145 | | ``-2^63`` for negative infinity timestamp | 1146 | | ``2^63-1`` for positive infinity timestamp. | 1147 +-----------------+---------------------------------------------+ 1148 | ``timestamp | (``microseconds relative to Jan 1 2000 | 1149 | with time zone``| UTC``,) | 1150 | | ``-2^63`` for negative infinity timestamp | 1151 | | ``2^63-1`` for positive infinity timestamp. | 1152 +-----------------+---------------------------------------------+ 1153 | ``time`` | (``microseconds``,) | 1154 +-----------------+---------------------------------------------+ 1155 | ``time with | (``microseconds``, | 1156 | time zone`` | ``time zone offset in seconds``) | 1157 +-----------------+---------------------------------------------+ 1158 1159 :param encoder: 1160 Callable accepting a Python object as a single argument and 1161 returning a value encoded according to *format*. 1162 1163 :param decoder: 1164 Callable accepting a single argument encoded according to *format* 1165 and returning a decoded Python object. 1166 1167 Example: 1168 1169 .. code-block:: pycon 1170 1171 >>> import asyncpg 1172 >>> import asyncio 1173 >>> import datetime 1174 >>> from dateutil.relativedelta import relativedelta 1175 >>> async def run(): 1176 ... con = await asyncpg.connect(user='postgres') 1177 ... def encoder(delta): 1178 ... ndelta = delta.normalized() 1179 ... return (ndelta.years * 12 + ndelta.months, 1180 ... ndelta.days, 1181 ... ((ndelta.hours * 3600 + 1182 ... ndelta.minutes * 60 + 1183 ... ndelta.seconds) * 1000000 + 1184 ... ndelta.microseconds)) 1185 ... def decoder(tup): 1186 ... return relativedelta(months=tup[0], days=tup[1], 1187 ... microseconds=tup[2]) 1188 ... await con.set_type_codec( 1189 ... 'interval', schema='pg_catalog', encoder=encoder, 1190 ... decoder=decoder, format='tuple') 1191 ... result = await con.fetchval( 1192 ... "SELECT '2 years 3 mons 1 day'::interval") 1193 ... print(result) 1194 ... print(datetime.datetime(2002, 1, 1) + result) 1195 ... 1196 >>> asyncio.get_event_loop().run_until_complete(run()) 1197 relativedelta(years=+2, months=+3, days=+1) 1198 2004-04-02 00:00:00 1199 1200 .. versionadded:: 0.12.0 1201 Added the ``format`` keyword argument and support for 'tuple' 1202 format. 1203 1204 .. versionchanged:: 0.12.0 1205 The ``binary`` keyword argument is deprecated in favor of 1206 ``format``. 1207 1208 .. versionchanged:: 0.13.0 1209 The ``binary`` keyword argument was removed in favor of 1210 ``format``. 1211 1212 .. note:: 1213 1214 It is recommended to use the ``'binary'`` or ``'tuple'`` *format* 1215 whenever possible and if the underlying type supports it. Asyncpg 1216 currently does not support text I/O for composite and range types, 1217 and some other functionality, such as 1218 :meth:`Connection.copy_to_table`, does not support types with text 1219 codecs. 1220 """ 1221 self._check_open() 1222 typeinfo = await self._introspect_type(typename, schema) 1223 if not introspection.is_scalar_type(typeinfo): 1224 raise exceptions.InterfaceError( 1225 'cannot use custom codec on non-scalar type {}.{}'.format( 1226 schema, typename)) 1227 if introspection.is_domain_type(typeinfo): 1228 raise exceptions.UnsupportedClientFeatureError( 1229 'custom codecs on domain types are not supported', 1230 hint='Set the codec on the base type.', 1231 detail=( 1232 'PostgreSQL does not distinguish domains from ' 1233 'their base types in query results at the protocol level.' 1234 ) 1235 ) 1236 1237 oid = typeinfo['oid'] 1238 self._protocol.get_settings().add_python_codec( 1239 oid, typename, schema, 'scalar', 1240 encoder, decoder, format) 1241 1242 # Statement cache is no longer valid due to codec changes. 1243 self._drop_local_statement_cache() 1244 1245 async def reset_type_codec(self, typename, *, schema='public'): 1246 """Reset *typename* codec to the default implementation. 1247 1248 :param typename: 1249 Name of the data type the codec is for. 1250 1251 :param schema: 1252 Schema name of the data type the codec is for 1253 (defaults to ``'public'``) 1254 1255 .. versionadded:: 0.12.0 1256 """ 1257 1258 typeinfo = await self._introspect_type(typename, schema) 1259 self._protocol.get_settings().remove_python_codec( 1260 typeinfo['oid'], typename, schema) 1261 1262 # Statement cache is no longer valid due to codec changes. 1263 self._drop_local_statement_cache() 1264 1265 async def set_builtin_type_codec(self, typename, *, 1266 schema='public', codec_name, 1267 format=None): 1268 """Set a builtin codec for the specified scalar data type. 1269 1270 This method has two uses. The first is to register a builtin 1271 codec for an extension type without a stable OID, such as 'hstore'. 1272 The second use is to declare that an extension type or a 1273 user-defined type is wire-compatible with a certain builtin 1274 data type and should be exchanged as such. 1275 1276 :param typename: 1277 Name of the data type the codec is for. 1278 1279 :param schema: 1280 Schema name of the data type the codec is for 1281 (defaults to ``'public'``). 1282 1283 :param codec_name: 1284 The name of the builtin codec to use for the type. 1285 This should be either the name of a known core type 1286 (such as ``"int"``), or the name of a supported extension 1287 type. Currently, the only supported extension type is 1288 ``"pg_contrib.hstore"``. 1289 1290 :param format: 1291 If *format* is ``None`` (the default), all formats supported 1292 by the target codec are declared to be supported for *typename*. 1293 If *format* is ``'text'`` or ``'binary'``, then only the 1294 specified format is declared to be supported for *typename*. 1295 1296 .. versionchanged:: 0.18.0 1297 The *codec_name* argument can be the name of any known 1298 core data type. Added the *format* keyword argument. 1299 """ 1300 self._check_open() 1301 typeinfo = await self._introspect_type(typename, schema) 1302 if not introspection.is_scalar_type(typeinfo): 1303 raise exceptions.InterfaceError( 1304 'cannot alias non-scalar type {}.{}'.format( 1305 schema, typename)) 1306 1307 oid = typeinfo['oid'] 1308 1309 self._protocol.get_settings().set_builtin_type_codec( 1310 oid, typename, schema, 'scalar', codec_name, format) 1311 1312 # Statement cache is no longer valid due to codec changes. 1313 self._drop_local_statement_cache() 1314 1315 def is_closed(self): 1316 """Return ``True`` if the connection is closed, ``False`` otherwise. 1317 1318 :return bool: ``True`` if the connection is closed, ``False`` 1319 otherwise. 1320 """ 1321 return self._aborted or not self._protocol.is_connected() 1322 1323 async def close(self, *, timeout=None): 1324 """Close the connection gracefully. 1325 1326 :param float timeout: 1327 Optional timeout value in seconds. 1328 1329 .. versionchanged:: 0.14.0 1330 Added the *timeout* parameter. 1331 """ 1332 try: 1333 if not self.is_closed(): 1334 await self._protocol.close(timeout) 1335 except (Exception, asyncio.CancelledError): 1336 # If we fail to close gracefully, abort the connection. 1337 self._abort() 1338 raise 1339 finally: 1340 self._cleanup() 1341 1342 def terminate(self): 1343 """Terminate the connection without waiting for pending data.""" 1344 if not self.is_closed(): 1345 self._abort() 1346 self._cleanup() 1347 1348 async def reset(self, *, timeout=None): 1349 self._check_open() 1350 self._listeners.clear() 1351 self._log_listeners.clear() 1352 reset_query = self._get_reset_query() 1353 1354 if self._protocol.is_in_transaction() or self._top_xact is not None: 1355 if self._top_xact is None or not self._top_xact._managed: 1356 # Managed transactions are guaranteed to __aexit__ 1357 # correctly. 1358 self._loop.call_exception_handler({ 1359 'message': 'Resetting connection with an ' 1360 'active transaction {!r}'.format(self) 1361 }) 1362 1363 self._top_xact = None 1364 reset_query = 'ROLLBACK;\n' + reset_query 1365 1366 if reset_query: 1367 await self.execute(reset_query, timeout=timeout) 1368 1369 def _abort(self): 1370 # Put the connection into the aborted state. 1371 self._aborted = True 1372 self._protocol.abort() 1373 self._protocol = None 1374 1375 def _cleanup(self): 1376 self._call_termination_listeners() 1377 # Free the resources associated with this connection. 1378 # This must be called when a connection is terminated. 1379 1380 if self._proxy is not None: 1381 # Connection is a member of a pool, so let the pool 1382 # know that this connection is dead. 1383 self._proxy._holder._release_on_close() 1384 1385 self._mark_stmts_as_closed() 1386 self._listeners.clear() 1387 self._log_listeners.clear() 1388 self._clean_tasks() 1389 1390 def _clean_tasks(self): 1391 # Wrap-up any remaining tasks associated with this connection. 1392 if self._cancellations: 1393 for fut in self._cancellations: 1394 if not fut.done(): 1395 fut.cancel() 1396 self._cancellations.clear() 1397 1398 def _check_open(self): 1399 if self.is_closed(): 1400 raise exceptions.InterfaceError('connection is closed') 1401 1402 def _get_unique_id(self, prefix): 1403 global _uid 1404 _uid += 1 1405 return '__asyncpg_{}_{:x}__'.format(prefix, _uid) 1406 1407 def _mark_stmts_as_closed(self): 1408 for stmt in self._stmt_cache.iter_statements(): 1409 stmt.mark_closed() 1410 1411 for stmt in self._stmts_to_close: 1412 stmt.mark_closed() 1413 1414 self._stmt_cache.clear() 1415 self._stmts_to_close.clear() 1416 1417 def _maybe_gc_stmt(self, stmt): 1418 if ( 1419 stmt.refs == 0 1420 and not self._stmt_cache.has( 1421 (stmt.query, stmt.record_class, stmt.ignore_custom_codec) 1422 ) 1423 ): 1424 # If low-level `stmt` isn't referenced from any high-level 1425 # `PreparedStatement` object and is not in the `_stmt_cache`: 1426 # 1427 # * mark it as closed, which will make it non-usable 1428 # for any `PreparedStatement` or for methods like 1429 # `Connection.fetch()`. 1430 # 1431 # * schedule it to be formally closed on the server. 1432 stmt.mark_closed() 1433 self._stmts_to_close.add(stmt) 1434 1435 async def _cleanup_stmts(self): 1436 # Called whenever we create a new prepared statement in 1437 # `Connection._get_statement()` and `_stmts_to_close` is 1438 # not empty. 1439 to_close = self._stmts_to_close 1440 self._stmts_to_close = set() 1441 for stmt in to_close: 1442 # It is imperative that statements are cleaned properly, 1443 # so we ignore the timeout. 1444 await self._protocol.close_statement(stmt, protocol.NO_TIMEOUT) 1445 1446 async def _cancel(self, waiter): 1447 try: 1448 # Open new connection to the server 1449 await connect_utils._cancel( 1450 loop=self._loop, addr=self._addr, params=self._params, 1451 backend_pid=self._protocol.backend_pid, 1452 backend_secret=self._protocol.backend_secret) 1453 except ConnectionResetError as ex: 1454 # On some systems Postgres will reset the connection 1455 # after processing the cancellation command. 1456 if not waiter.done(): 1457 waiter.set_exception(ex) 1458 except asyncio.CancelledError: 1459 # There are two scenarios in which the cancellation 1460 # itself will be cancelled: 1) the connection is being closed, 1461 # 2) the event loop is being shut down. 1462 # In either case we do not care about the propagation of 1463 # the CancelledError, and don't want the loop to warn about 1464 # an unretrieved exception. 1465 pass 1466 except (Exception, asyncio.CancelledError) as ex: 1467 if not waiter.done(): 1468 waiter.set_exception(ex) 1469 finally: 1470 self._cancellations.discard( 1471 compat.current_asyncio_task(self._loop)) 1472 if not waiter.done(): 1473 waiter.set_result(None) 1474 1475 def _cancel_current_command(self, waiter): 1476 self._cancellations.add(self._loop.create_task(self._cancel(waiter))) 1477 1478 def _process_log_message(self, fields, last_query): 1479 if not self._log_listeners: 1480 return 1481 1482 message = exceptions.PostgresLogMessage.new(fields, query=last_query) 1483 1484 con_ref = self._unwrap() 1485 for cb in self._log_listeners: 1486 if cb.is_async: 1487 self._loop.create_task(cb.cb(con_ref, message)) 1488 else: 1489 self._loop.call_soon(cb.cb, con_ref, message) 1490 1491 def _call_termination_listeners(self): 1492 if not self._termination_listeners: 1493 return 1494 1495 con_ref = self._unwrap() 1496 for cb in self._termination_listeners: 1497 if cb.is_async: 1498 self._loop.create_task(cb.cb(con_ref)) 1499 else: 1500 self._loop.call_soon(cb.cb, con_ref) 1501 1502 self._termination_listeners.clear() 1503 1504 def _process_notification(self, pid, channel, payload): 1505 if channel not in self._listeners: 1506 return 1507 1508 con_ref = self._unwrap() 1509 for cb in self._listeners[channel]: 1510 if cb.is_async: 1511 self._loop.create_task(cb.cb(con_ref, pid, channel, payload)) 1512 else: 1513 self._loop.call_soon(cb.cb, con_ref, pid, channel, payload) 1514 1515 def _unwrap(self): 1516 if self._proxy is None: 1517 con_ref = self 1518 else: 1519 # `_proxy` is not None when the connection is a member 1520 # of a connection pool. Which means that the user is working 1521 # with a `PoolConnectionProxy` instance, and expects to see it 1522 # (and not the actual Connection) in their event callbacks. 1523 con_ref = self._proxy 1524 return con_ref 1525 1526 def _get_reset_query(self): 1527 if self._reset_query is not None: 1528 return self._reset_query 1529 1530 caps = self._server_caps 1531 1532 _reset_query = [] 1533 if caps.advisory_locks: 1534 _reset_query.append('SELECT pg_advisory_unlock_all();') 1535 if caps.sql_close_all: 1536 _reset_query.append('CLOSE ALL;') 1537 if caps.notifications and caps.plpgsql: 1538 _reset_query.append('UNLISTEN *;') 1539 if caps.sql_reset: 1540 _reset_query.append('RESET ALL;') 1541 1542 _reset_query = '\n'.join(_reset_query) 1543 self._reset_query = _reset_query 1544 1545 return _reset_query 1546 1547 def _set_proxy(self, proxy): 1548 if self._proxy is not None and proxy is not None: 1549 # Should not happen unless there is a bug in `Pool`. 1550 raise exceptions.InterfaceError( 1551 'internal asyncpg error: connection is already proxied') 1552 1553 self._proxy = proxy 1554 1555 def _check_listeners(self, listeners, listener_type): 1556 if listeners: 1557 count = len(listeners) 1558 1559 w = exceptions.InterfaceWarning( 1560 '{conn!r} is being released to the pool but has {c} active ' 1561 '{type} listener{s}'.format( 1562 conn=self, c=count, type=listener_type, 1563 s='s' if count > 1 else '')) 1564 1565 warnings.warn(w) 1566 1567 def _on_release(self, stacklevel=1): 1568 # Invalidate external references to the connection. 1569 self._pool_release_ctr += 1 1570 # Called when the connection is about to be released to the pool. 1571 # Let's check that the user has not left any listeners on it. 1572 self._check_listeners( 1573 list(itertools.chain.from_iterable(self._listeners.values())), 1574 'notification') 1575 self._check_listeners( 1576 self._log_listeners, 'log') 1577 1578 def _drop_local_statement_cache(self): 1579 self._stmt_cache.clear() 1580 1581 def _drop_global_statement_cache(self): 1582 if self._proxy is not None: 1583 # This connection is a member of a pool, so we delegate 1584 # the cache drop to the pool. 1585 pool = self._proxy._holder._pool 1586 pool._drop_statement_cache() 1587 else: 1588 self._drop_local_statement_cache() 1589 1590 def _drop_local_type_cache(self): 1591 self._protocol.get_settings().clear_type_cache() 1592 1593 def _drop_global_type_cache(self): 1594 if self._proxy is not None: 1595 # This connection is a member of a pool, so we delegate 1596 # the cache drop to the pool. 1597 pool = self._proxy._holder._pool 1598 pool._drop_type_cache() 1599 else: 1600 self._drop_local_type_cache() 1601 1602 async def reload_schema_state(self): 1603 """Indicate that the database schema information must be reloaded. 1604 1605 For performance reasons, asyncpg caches certain aspects of the 1606 database schema, such as the layout of composite types. Consequently, 1607 when the database schema changes, and asyncpg is not able to 1608 gracefully recover from an error caused by outdated schema 1609 assumptions, an :exc:`~asyncpg.exceptions.OutdatedSchemaCacheError` 1610 is raised. To prevent the exception, this method may be used to inform 1611 asyncpg that the database schema has changed. 1612 1613 Example: 1614 1615 .. code-block:: pycon 1616 1617 >>> import asyncpg 1618 >>> import asyncio 1619 >>> async def change_type(con): 1620 ... result = await con.fetch('SELECT id, info FROM tbl') 1621 ... # Change composite's attribute type "int"=>"text" 1622 ... await con.execute('ALTER TYPE custom DROP ATTRIBUTE y') 1623 ... await con.execute('ALTER TYPE custom ADD ATTRIBUTE y text') 1624 ... await con.reload_schema_state() 1625 ... for id_, info in result: 1626 ... new = (info['x'], str(info['y'])) 1627 ... await con.execute( 1628 ... 'UPDATE tbl SET info=$2 WHERE id=$1', id_, new) 1629 ... 1630 >>> async def run(): 1631 ... # Initial schema: 1632 ... # CREATE TYPE custom AS (x int, y int); 1633 ... # CREATE TABLE tbl(id int, info custom); 1634 ... con = await asyncpg.connect(user='postgres') 1635 ... async with con.transaction(): 1636 ... # Prevent concurrent changes in the table 1637 ... await con.execute('LOCK TABLE tbl') 1638 ... await change_type(con) 1639 ... 1640 >>> asyncio.get_event_loop().run_until_complete(run()) 1641 1642 .. versionadded:: 0.14.0 1643 """ 1644 self._drop_global_type_cache() 1645 self._drop_global_statement_cache() 1646 1647 async def _execute( 1648 self, 1649 query, 1650 args, 1651 limit, 1652 timeout, 1653 *, 1654 return_status=False, 1655 ignore_custom_codec=False, 1656 record_class=None 1657 ): 1658 with self._stmt_exclusive_section: 1659 result, _ = await self.__execute( 1660 query, 1661 args, 1662 limit, 1663 timeout, 1664 return_status=return_status, 1665 record_class=record_class, 1666 ignore_custom_codec=ignore_custom_codec, 1667 ) 1668 return result 1669 1670 async def __execute( 1671 self, 1672 query, 1673 args, 1674 limit, 1675 timeout, 1676 *, 1677 return_status=False, 1678 ignore_custom_codec=False, 1679 record_class=None 1680 ): 1681 executor = lambda stmt, timeout: self._protocol.bind_execute( 1682 stmt, args, '', limit, return_status, timeout) 1683 timeout = self._protocol._get_timeout(timeout) 1684 return await self._do_execute( 1685 query, 1686 executor, 1687 timeout, 1688 record_class=record_class, 1689 ignore_custom_codec=ignore_custom_codec, 1690 ) 1691 1692 async def _executemany(self, query, args, timeout): 1693 executor = lambda stmt, timeout: self._protocol.bind_execute_many( 1694 stmt, args, '', timeout) 1695 timeout = self._protocol._get_timeout(timeout) 1696 with self._stmt_exclusive_section: 1697 result, _ = await self._do_execute(query, executor, timeout) 1698 return result 1699 1700 async def _do_execute( 1701 self, 1702 query, 1703 executor, 1704 timeout, 1705 retry=True, 1706 *, 1707 ignore_custom_codec=False, 1708 record_class=None 1709 ): 1710 if timeout is None: 1711 stmt = await self._get_statement( 1712 query, 1713 None, 1714 record_class=record_class, 1715 ignore_custom_codec=ignore_custom_codec, 1716 ) 1717 else: 1718 before = time.monotonic() 1719 stmt = await self._get_statement( 1720 query, 1721 timeout, 1722 record_class=record_class, 1723 ignore_custom_codec=ignore_custom_codec, 1724 ) 1725 after = time.monotonic() 1726 timeout -= after - before 1727 before = after 1728 1729 try: 1730 if timeout is None: 1731 result = await executor(stmt, None) 1732 else: 1733 try: 1734 result = await executor(stmt, timeout) 1735 finally: 1736 after = time.monotonic() 1737 timeout -= after - before 1738 1739 except exceptions.OutdatedSchemaCacheError: 1740 # This exception is raised when we detect a difference between 1741 # cached type's info and incoming tuple from the DB (when a type is 1742 # changed by the ALTER TYPE). 1743 # It is not possible to recover (the statement is already done at 1744 # the server's side), the only way is to drop our caches and 1745 # reraise the exception to the caller. 1746 await self.reload_schema_state() 1747 raise 1748 except exceptions.InvalidCachedStatementError: 1749 # PostgreSQL will raise an exception when it detects 1750 # that the result type of the query has changed from 1751 # when the statement was prepared. This may happen, 1752 # for example, after an ALTER TABLE or SET search_path. 1753 # 1754 # When this happens, and there is no transaction running, 1755 # we can simply re-prepare the statement and try once 1756 # again. We deliberately retry only once as this is 1757 # supposed to be a rare occurrence. 1758 # 1759 # If the transaction _is_ running, this error will put it 1760 # into an error state, and we have no choice but to 1761 # re-raise the exception. 1762 # 1763 # In either case we clear the statement cache for this 1764 # connection and all other connections of the pool this 1765 # connection belongs to (if any). 1766 # 1767 # See https://github.com/MagicStack/asyncpg/issues/72 1768 # and https://github.com/MagicStack/asyncpg/issues/76 1769 # for discussion. 1770 # 1771 self._drop_global_statement_cache() 1772 if self._protocol.is_in_transaction() or not retry: 1773 raise 1774 else: 1775 return await self._do_execute( 1776 query, executor, timeout, retry=False) 1777 1778 return result, stmt 1779 1780 1781async def connect(dsn=None, *, 1782 host=None, port=None, 1783 user=None, password=None, passfile=None, 1784 database=None, 1785 loop=None, 1786 timeout=60, 1787 statement_cache_size=100, 1788 max_cached_statement_lifetime=300, 1789 max_cacheable_statement_size=1024 * 15, 1790 command_timeout=None, 1791 ssl=None, 1792 connection_class=Connection, 1793 record_class=protocol.Record, 1794 server_settings=None): 1795 r"""A coroutine to establish a connection to a PostgreSQL server. 1796 1797 The connection parameters may be specified either as a connection 1798 URI in *dsn*, or as specific keyword arguments, or both. 1799 If both *dsn* and keyword arguments are specified, the latter 1800 override the corresponding values parsed from the connection URI. 1801 The default values for the majority of arguments can be specified 1802 using `environment variables <postgres envvars_>`_. 1803 1804 Returns a new :class:`~asyncpg.connection.Connection` object. 1805 1806 :param dsn: 1807 Connection arguments specified using as a single string in the 1808 `libpq connection URI format`_: 1809 ``postgres://user:password@host:port/database?option=value``. 1810 The following options are recognized by asyncpg: ``host``, 1811 ``port``, ``user``, ``database`` (or ``dbname``), ``password``, 1812 ``passfile``, ``sslmode``, ``sslcert``, ``sslkey``, ``sslrootcert``, 1813 and ``sslcrl``. Unlike libpq, asyncpg will treat unrecognized 1814 options as `server settings`_ to be used for the connection. 1815 1816 .. note:: 1817 1818 The URI must be *valid*, which means that all components must 1819 be properly quoted with :py:func:`urllib.parse.quote`, and 1820 any literal IPv6 addresses must be enclosed in square brackets. 1821 For example: 1822 1823 .. code-block:: text 1824 1825 postgres://dbuser@[fe80::1ff:fe23:4567:890a%25eth0]/dbname 1826 1827 :param host: 1828 Database host address as one of the following: 1829 1830 - an IP address or a domain name; 1831 - an absolute path to the directory containing the database 1832 server Unix-domain socket (not supported on Windows); 1833 - a sequence of any of the above, in which case the addresses 1834 will be tried in order, and the first successful connection 1835 will be returned. 1836 1837 If not specified, asyncpg will try the following, in order: 1838 1839 - host address(es) parsed from the *dsn* argument, 1840 - the value of the ``PGHOST`` environment variable, 1841 - on Unix, common directories used for PostgreSQL Unix-domain 1842 sockets: ``"/run/postgresql"``, ``"/var/run/postgresl"``, 1843 ``"/var/pgsql_socket"``, ``"/private/tmp"``, and ``"/tmp"``, 1844 - ``"localhost"``. 1845 1846 :param port: 1847 Port number to connect to at the server host 1848 (or Unix-domain socket file extension). If multiple host 1849 addresses were specified, this parameter may specify a 1850 sequence of port numbers of the same length as the host sequence, 1851 or it may specify a single port number to be used for all host 1852 addresses. 1853 1854 If not specified, the value parsed from the *dsn* argument is used, 1855 or the value of the ``PGPORT`` environment variable, or ``5432`` if 1856 neither is specified. 1857 1858 :param user: 1859 The name of the database role used for authentication. 1860 1861 If not specified, the value parsed from the *dsn* argument is used, 1862 or the value of the ``PGUSER`` environment variable, or the 1863 operating system name of the user running the application. 1864 1865 :param database: 1866 The name of the database to connect to. 1867 1868 If not specified, the value parsed from the *dsn* argument is used, 1869 or the value of the ``PGDATABASE`` environment variable, or the 1870 computed value of the *user* argument. 1871 1872 :param password: 1873 Password to be used for authentication, if the server requires 1874 one. If not specified, the value parsed from the *dsn* argument 1875 is used, or the value of the ``PGPASSWORD`` environment variable. 1876 Note that the use of the environment variable is discouraged as 1877 other users and applications may be able to read it without needing 1878 specific privileges. It is recommended to use *passfile* instead. 1879 1880 Password may be either a string, or a callable that returns a string. 1881 If a callable is provided, it will be called each time a new connection 1882 is established. 1883 1884 :param passfile: 1885 The name of the file used to store passwords 1886 (defaults to ``~/.pgpass``, or ``%APPDATA%\postgresql\pgpass.conf`` 1887 on Windows). 1888 1889 :param loop: 1890 An asyncio event loop instance. If ``None``, the default 1891 event loop will be used. 1892 1893 :param float timeout: 1894 Connection timeout in seconds. 1895 1896 :param int statement_cache_size: 1897 The size of prepared statement LRU cache. Pass ``0`` to 1898 disable the cache. 1899 1900 :param int max_cached_statement_lifetime: 1901 The maximum time in seconds a prepared statement will stay 1902 in the cache. Pass ``0`` to allow statements be cached 1903 indefinitely. 1904 1905 :param int max_cacheable_statement_size: 1906 The maximum size of a statement that can be cached (15KiB by 1907 default). Pass ``0`` to allow all statements to be cached 1908 regardless of their size. 1909 1910 :param float command_timeout: 1911 The default timeout for operations on this connection 1912 (the default is ``None``: no timeout). 1913 1914 :param ssl: 1915 Pass ``True`` or an `ssl.SSLContext <SSLContext_>`_ instance to 1916 require an SSL connection. If ``True``, a default SSL context 1917 returned by `ssl.create_default_context() <create_default_context_>`_ 1918 will be used. The value can also be one of the following strings: 1919 1920 - ``'disable'`` - SSL is disabled (equivalent to ``False``) 1921 - ``'prefer'`` - try SSL first, fallback to non-SSL connection 1922 if SSL connection fails 1923 - ``'allow'`` - try without SSL first, then retry with SSL if the first 1924 attempt fails. 1925 - ``'require'`` - only try an SSL connection. Certificate 1926 verification errors are ignored 1927 - ``'verify-ca'`` - only try an SSL connection, and verify 1928 that the server certificate is issued by a trusted certificate 1929 authority (CA) 1930 - ``'verify-full'`` - only try an SSL connection, verify 1931 that the server certificate is issued by a trusted CA and 1932 that the requested server host name matches that in the 1933 certificate. 1934 1935 The default is ``'prefer'``: try an SSL connection and fallback to 1936 non-SSL connection if that fails. 1937 1938 .. note:: 1939 1940 *ssl* is ignored for Unix domain socket communication. 1941 1942 Example of programmatic SSL context configuration that is equivalent 1943 to ``sslmode=verify-full&sslcert=..&sslkey=..&sslrootcert=..``: 1944 1945 .. code-block:: pycon 1946 1947 >>> import asyncpg 1948 >>> import asyncio 1949 >>> import ssl 1950 >>> async def main(): 1951 ... # Load CA bundle for server certificate verification, 1952 ... # equivalent to sslrootcert= in DSN. 1953 ... sslctx = ssl.create_default_context( 1954 ... ssl.Purpose.SERVER_AUTH, 1955 ... cafile="path/to/ca_bundle.pem") 1956 ... # If True, equivalent to sslmode=verify-full, if False: 1957 ... # sslmode=verify-ca. 1958 ... sslctx.check_hostname = True 1959 ... # Load client certificate and private key for client 1960 ... # authentication, equivalent to sslcert= and sslkey= in 1961 ... # DSN. 1962 ... sslctx.load_cert_chain( 1963 ... "path/to/client.cert", 1964 ... keyfile="path/to/client.key", 1965 ... ) 1966 ... con = await asyncpg.connect(user='postgres', ssl=sslctx) 1967 ... await con.close() 1968 >>> asyncio.run(run()) 1969 1970 Example of programmatic SSL context configuration that is equivalent 1971 to ``sslmode=require`` (no server certificate or host verification): 1972 1973 .. code-block:: pycon 1974 1975 >>> import asyncpg 1976 >>> import asyncio 1977 >>> import ssl 1978 >>> async def main(): 1979 ... sslctx = ssl.create_default_context( 1980 ... ssl.Purpose.SERVER_AUTH) 1981 ... sslctx.check_hostname = False 1982 ... sslctx.verify_mode = ssl.CERT_NONE 1983 ... con = await asyncpg.connect(user='postgres', ssl=sslctx) 1984 ... await con.close() 1985 >>> asyncio.run(run()) 1986 1987 :param dict server_settings: 1988 An optional dict of server runtime parameters. Refer to 1989 PostgreSQL documentation for 1990 a `list of supported options <server settings_>`_. 1991 1992 :param type connection_class: 1993 Class of the returned connection object. Must be a subclass of 1994 :class:`~asyncpg.connection.Connection`. 1995 1996 :param type record_class: 1997 If specified, the class to use for records returned by queries on 1998 this connection object. Must be a subclass of 1999 :class:`~asyncpg.Record`. 2000 2001 :return: A :class:`~asyncpg.connection.Connection` instance. 2002 2003 Example: 2004 2005 .. code-block:: pycon 2006 2007 >>> import asyncpg 2008 >>> import asyncio 2009 >>> async def run(): 2010 ... con = await asyncpg.connect(user='postgres') 2011 ... types = await con.fetch('SELECT * FROM pg_type') 2012 ... print(types) 2013 ... 2014 >>> asyncio.get_event_loop().run_until_complete(run()) 2015 [<Record typname='bool' typnamespace=11 ... 2016 2017 .. versionadded:: 0.10.0 2018 Added ``max_cached_statement_use_count`` parameter. 2019 2020 .. versionchanged:: 0.11.0 2021 Removed ability to pass arbitrary keyword arguments to set 2022 server settings. Added a dedicated parameter ``server_settings`` 2023 for that. 2024 2025 .. versionadded:: 0.11.0 2026 Added ``connection_class`` parameter. 2027 2028 .. versionadded:: 0.16.0 2029 Added ``passfile`` parameter 2030 (and support for password files in general). 2031 2032 .. versionadded:: 0.18.0 2033 Added ability to specify multiple hosts in the *dsn* 2034 and *host* arguments. 2035 2036 .. versionchanged:: 0.21.0 2037 The *password* argument now accepts a callable or an async function. 2038 2039 .. versionchanged:: 0.22.0 2040 Added the *record_class* parameter. 2041 2042 .. versionchanged:: 0.22.0 2043 The *ssl* argument now defaults to ``'prefer'``. 2044 2045 .. versionchanged:: 0.24.0 2046 The ``sslcert``, ``sslkey``, ``sslrootcert``, and ``sslcrl`` options 2047 are supported in the *dsn* argument. 2048 2049 .. versionchanged:: 0.25.0 2050 The ``sslpassword``, ``ssl_min_protocol_version``, 2051 and ``ssl_max_protocol_version`` options are supported in the *dsn* 2052 argument. 2053 2054 .. versionchanged:: 0.25.0 2055 Default system root CA certificates won't be loaded when specifying a 2056 particular sslmode, following the same behavior in libpq. 2057 2058 .. versionchanged:: 0.25.0 2059 The ``sslcert``, ``sslkey``, ``sslrootcert``, and ``sslcrl`` options 2060 in the *dsn* argument now have consistent default values of files under 2061 ``~/.postgresql/`` as libpq. 2062 2063 .. _SSLContext: https://docs.python.org/3/library/ssl.html#ssl.SSLContext 2064 .. _create_default_context: 2065 https://docs.python.org/3/library/ssl.html#ssl.create_default_context 2066 .. _server settings: 2067 https://www.postgresql.org/docs/current/static/runtime-config.html 2068 .. _postgres envvars: 2069 https://www.postgresql.org/docs/current/static/libpq-envars.html 2070 .. _libpq connection URI format: 2071 https://www.postgresql.org/docs/current/static/ 2072 libpq-connect.html#LIBPQ-CONNSTRING 2073 """ 2074 if not issubclass(connection_class, Connection): 2075 raise exceptions.InterfaceError( 2076 'connection_class is expected to be a subclass of ' 2077 'asyncpg.Connection, got {!r}'.format(connection_class)) 2078 2079 if record_class is not protocol.Record: 2080 _check_record_class(record_class) 2081 2082 if loop is None: 2083 loop = asyncio.get_event_loop() 2084 2085 return await connect_utils._connect( 2086 loop=loop, 2087 timeout=timeout, 2088 connection_class=connection_class, 2089 record_class=record_class, 2090 dsn=dsn, 2091 host=host, 2092 port=port, 2093 user=user, 2094 password=password, 2095 passfile=passfile, 2096 ssl=ssl, 2097 database=database, 2098 server_settings=server_settings, 2099 command_timeout=command_timeout, 2100 statement_cache_size=statement_cache_size, 2101 max_cached_statement_lifetime=max_cached_statement_lifetime, 2102 max_cacheable_statement_size=max_cacheable_statement_size, 2103 ) 2104 2105 2106class _StatementCacheEntry: 2107 2108 __slots__ = ('_query', '_statement', '_cache', '_cleanup_cb') 2109 2110 def __init__(self, cache, query, statement): 2111 self._cache = cache 2112 self._query = query 2113 self._statement = statement 2114 self._cleanup_cb = None 2115 2116 2117class _StatementCache: 2118 2119 __slots__ = ('_loop', '_entries', '_max_size', '_on_remove', 2120 '_max_lifetime') 2121 2122 def __init__(self, *, loop, max_size, on_remove, max_lifetime): 2123 self._loop = loop 2124 self._max_size = max_size 2125 self._on_remove = on_remove 2126 self._max_lifetime = max_lifetime 2127 2128 # We use an OrderedDict for LRU implementation. Operations: 2129 # 2130 # * We use a simple `__setitem__` to push a new entry: 2131 # `entries[key] = new_entry` 2132 # That will push `new_entry` to the *end* of the entries dict. 2133 # 2134 # * When we have a cache hit, we call 2135 # `entries.move_to_end(key, last=True)` 2136 # to move the entry to the *end* of the entries dict. 2137 # 2138 # * When we need to remove entries to maintain `max_size`, we call 2139 # `entries.popitem(last=False)` 2140 # to remove an entry from the *beginning* of the entries dict. 2141 # 2142 # So new entries and hits are always promoted to the end of the 2143 # entries dict, whereas the unused one will group in the 2144 # beginning of it. 2145 self._entries = collections.OrderedDict() 2146 2147 def __len__(self): 2148 return len(self._entries) 2149 2150 def get_max_size(self): 2151 return self._max_size 2152 2153 def set_max_size(self, new_size): 2154 assert new_size >= 0 2155 self._max_size = new_size 2156 self._maybe_cleanup() 2157 2158 def get_max_lifetime(self): 2159 return self._max_lifetime 2160 2161 def set_max_lifetime(self, new_lifetime): 2162 assert new_lifetime >= 0 2163 self._max_lifetime = new_lifetime 2164 for entry in self._entries.values(): 2165 # For every entry cancel the existing callback 2166 # and setup a new one if necessary. 2167 self._set_entry_timeout(entry) 2168 2169 def get(self, query, *, promote=True): 2170 if not self._max_size: 2171 # The cache is disabled. 2172 return 2173 2174 entry = self._entries.get(query) # type: _StatementCacheEntry 2175 if entry is None: 2176 return 2177 2178 if entry._statement.closed: 2179 # Happens in unittests when we call `stmt._state.mark_closed()` 2180 # manually or when a prepared statement closes itself on type 2181 # cache error. 2182 self._entries.pop(query) 2183 self._clear_entry_callback(entry) 2184 return 2185 2186 if promote: 2187 # `promote` is `False` when `get()` is called by `has()`. 2188 self._entries.move_to_end(query, last=True) 2189 2190 return entry._statement 2191 2192 def has(self, query): 2193 return self.get(query, promote=False) is not None 2194 2195 def put(self, query, statement): 2196 if not self._max_size: 2197 # The cache is disabled. 2198 return 2199 2200 self._entries[query] = self._new_entry(query, statement) 2201 2202 # Check if the cache is bigger than max_size and trim it 2203 # if necessary. 2204 self._maybe_cleanup() 2205 2206 def iter_statements(self): 2207 return (e._statement for e in self._entries.values()) 2208 2209 def clear(self): 2210 # Store entries for later. 2211 entries = tuple(self._entries.values()) 2212 2213 # Clear the entries dict. 2214 self._entries.clear() 2215 2216 # Make sure that we cancel all scheduled callbacks 2217 # and call on_remove callback for each entry. 2218 for entry in entries: 2219 self._clear_entry_callback(entry) 2220 self._on_remove(entry._statement) 2221 2222 def _set_entry_timeout(self, entry): 2223 # Clear the existing timeout. 2224 self._clear_entry_callback(entry) 2225 2226 # Set the new timeout if it's not 0. 2227 if self._max_lifetime: 2228 entry._cleanup_cb = self._loop.call_later( 2229 self._max_lifetime, self._on_entry_expired, entry) 2230 2231 def _new_entry(self, query, statement): 2232 entry = _StatementCacheEntry(self, query, statement) 2233 self._set_entry_timeout(entry) 2234 return entry 2235 2236 def _on_entry_expired(self, entry): 2237 # `call_later` callback, called when an entry stayed longer 2238 # than `self._max_lifetime`. 2239 if self._entries.get(entry._query) is entry: 2240 self._entries.pop(entry._query) 2241 self._on_remove(entry._statement) 2242 2243 def _clear_entry_callback(self, entry): 2244 if entry._cleanup_cb is not None: 2245 entry._cleanup_cb.cancel() 2246 2247 def _maybe_cleanup(self): 2248 # Delete cache entries until the size of the cache is `max_size`. 2249 while len(self._entries) > self._max_size: 2250 old_query, old_entry = self._entries.popitem(last=False) 2251 self._clear_entry_callback(old_entry) 2252 2253 # Let the connection know that the statement was removed 2254 # from the cache. 2255 self._on_remove(old_entry._statement) 2256 2257 2258class _Callback(typing.NamedTuple): 2259 2260 cb: typing.Callable[..., None] 2261 is_async: bool 2262 2263 @classmethod 2264 def from_callable(cls, cb: typing.Callable[..., None]) -> '_Callback': 2265 if inspect.iscoroutinefunction(cb): 2266 is_async = True 2267 elif callable(cb): 2268 is_async = False 2269 else: 2270 raise exceptions.InterfaceError( 2271 'expected a callable or an `async def` function,' 2272 'got {!r}'.format(cb) 2273 ) 2274 2275 return cls(cb, is_async) 2276 2277 2278class _Atomic: 2279 __slots__ = ('_acquired',) 2280 2281 def __init__(self): 2282 self._acquired = 0 2283 2284 def __enter__(self): 2285 if self._acquired: 2286 raise exceptions.InterfaceError( 2287 'cannot perform operation: another operation is in progress') 2288 self._acquired = 1 2289 2290 def __exit__(self, t, e, tb): 2291 self._acquired = 0 2292 2293 2294class _ConnectionProxy: 2295 # Base class to enable `isinstance(Connection)` check. 2296 __slots__ = () 2297 2298 2299ServerCapabilities = collections.namedtuple( 2300 'ServerCapabilities', 2301 ['advisory_locks', 'notifications', 'plpgsql', 'sql_reset', 2302 'sql_close_all']) 2303ServerCapabilities.__doc__ = 'PostgreSQL server capabilities.' 2304 2305 2306def _detect_server_capabilities(server_version, connection_settings): 2307 if hasattr(connection_settings, 'padb_revision'): 2308 # Amazon Redshift detected. 2309 advisory_locks = False 2310 notifications = False 2311 plpgsql = False 2312 sql_reset = True 2313 sql_close_all = False 2314 elif hasattr(connection_settings, 'crdb_version'): 2315 # CockroachDB detected. 2316 advisory_locks = False 2317 notifications = False 2318 plpgsql = False 2319 sql_reset = False 2320 sql_close_all = False 2321 elif hasattr(connection_settings, 'crate_version'): 2322 # CrateDB detected. 2323 advisory_locks = False 2324 notifications = False 2325 plpgsql = False 2326 sql_reset = False 2327 sql_close_all = False 2328 else: 2329 # Standard PostgreSQL server assumed. 2330 advisory_locks = True 2331 notifications = True 2332 plpgsql = True 2333 sql_reset = True 2334 sql_close_all = True 2335 2336 return ServerCapabilities( 2337 advisory_locks=advisory_locks, 2338 notifications=notifications, 2339 plpgsql=plpgsql, 2340 sql_reset=sql_reset, 2341 sql_close_all=sql_close_all 2342 ) 2343 2344 2345def _extract_stack(limit=10): 2346 """Replacement for traceback.extract_stack() that only does the 2347 necessary work for asyncio debug mode. 2348 """ 2349 frame = sys._getframe().f_back 2350 try: 2351 stack = traceback.StackSummary.extract( 2352 traceback.walk_stack(frame), lookup_lines=False) 2353 finally: 2354 del frame 2355 2356 apg_path = asyncpg.__path__[0] 2357 i = 0 2358 while i < len(stack) and stack[i][0].startswith(apg_path): 2359 i += 1 2360 stack = stack[i:i + limit] 2361 2362 stack.reverse() 2363 return ''.join(traceback.format_list(stack)) 2364 2365 2366def _check_record_class(record_class): 2367 if record_class is protocol.Record: 2368 pass 2369 elif ( 2370 isinstance(record_class, type) 2371 and issubclass(record_class, protocol.Record) 2372 ): 2373 if ( 2374 record_class.__new__ is not object.__new__ 2375 or record_class.__init__ is not object.__init__ 2376 ): 2377 raise exceptions.InterfaceError( 2378 'record_class must not redefine __new__ or __init__' 2379 ) 2380 else: 2381 raise exceptions.InterfaceError( 2382 'record_class is expected to be a subclass of ' 2383 'asyncpg.Record, got {!r}'.format(record_class) 2384 ) 2385 2386 2387def _weak_maybe_gc_stmt(weak_ref, stmt): 2388 self = weak_ref() 2389 if self is not None: 2390 self._maybe_gc_stmt(stmt) 2391 2392 2393_uid = 0 2394