1# -*- coding: utf-8 -*- 2# © Copyright EnterpriseDB UK Limited 2011-2021 3# 4# This file is part of Barman. 5# 6# Barman is free software: you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation, either version 3 of the License, or 9# (at your option) any later version. 10# 11# Barman is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with Barman. If not, see <http://www.gnu.org/licenses/>. 18 19""" 20This module represents the interface towards a PostgreSQL server. 21""" 22 23import atexit 24import logging 25from abc import ABCMeta 26 27import psycopg2 28from psycopg2.errorcodes import DUPLICATE_OBJECT, OBJECT_IN_USE, UNDEFINED_OBJECT 29from psycopg2.extensions import STATUS_IN_TRANSACTION, STATUS_READY 30from psycopg2.extras import DictCursor, NamedTupleCursor 31 32from barman.exceptions import ( 33 ConninfoException, 34 PostgresAppNameError, 35 PostgresConnectionError, 36 PostgresDuplicateReplicationSlot, 37 PostgresException, 38 PostgresInvalidReplicationSlot, 39 PostgresIsInRecovery, 40 PostgresReplicationSlotInUse, 41 PostgresReplicationSlotsFull, 42 BackupFunctionsAccessRequired, 43 PostgresSuperuserRequired, 44 PostgresUnsupportedFeature, 45) 46from barman.infofile import Tablespace 47from barman.postgres_plumbing import function_name_map 48from barman.remote_status import RemoteStatusMixin 49from barman.utils import force_str, simplify_version, with_metaclass 50from barman.xlog import DEFAULT_XLOG_SEG_SIZE 51 52# This is necessary because the CONFIGURATION_LIMIT_EXCEEDED constant 53# has been added in psycopg2 2.5, but Barman supports version 2.4.2+ so 54# in case of import error we declare a constant providing the correct value. 55try: 56 from psycopg2.errorcodes import CONFIGURATION_LIMIT_EXCEEDED 57except ImportError: 58 CONFIGURATION_LIMIT_EXCEEDED = "53400" 59 60 61_logger = logging.getLogger(__name__) 62 63_live_connections = [] 64""" 65List of connections to be closed at the interpreter shutdown 66""" 67 68 69@atexit.register 70def _atexit(): 71 """ 72 Ensure that all the connections are correctly closed 73 at interpreter shutdown 74 """ 75 # Take a copy of the list because the conn.close() method modify it 76 for conn in list(_live_connections): 77 _logger.warning( 78 "Forcing %s cleanup during process shut down.", conn.__class__.__name__ 79 ) 80 conn.close() 81 82 83class PostgreSQL(with_metaclass(ABCMeta, RemoteStatusMixin)): 84 """ 85 This abstract class represents a generic interface to a PostgreSQL server. 86 """ 87 88 CHECK_QUERY = "SELECT 1" 89 90 def __init__(self, conninfo): 91 """ 92 Abstract base class constructor for PostgreSQL interface. 93 94 :param str conninfo: Connection information (aka DSN) 95 """ 96 super(PostgreSQL, self).__init__() 97 self.conninfo = conninfo 98 self._conn = None 99 self.allow_reconnect = True 100 # Build a dictionary with connection info parameters 101 # This is mainly used to speed up search in conninfo 102 try: 103 self.conn_parameters = self.parse_dsn(conninfo) 104 except (ValueError, TypeError) as e: 105 _logger.debug(e) 106 raise ConninfoException( 107 'Cannot connect to postgres: "%s" ' 108 "is not a valid connection string" % conninfo 109 ) 110 111 @staticmethod 112 def parse_dsn(dsn): 113 """ 114 Parse connection parameters from 'conninfo' 115 116 :param str dsn: Connection information (aka DSN) 117 :rtype: dict[str,str] 118 """ 119 # TODO: this might be made more robust in the future 120 return dict(x.split("=", 1) for x in dsn.split()) 121 122 @staticmethod 123 def encode_dsn(parameters): 124 """ 125 Build a connection string from a dictionary of connection 126 parameters 127 128 :param dict[str,str] parameters: Connection parameters 129 :rtype: str 130 """ 131 # TODO: this might be made more robust in the future 132 return " ".join(["%s=%s" % (k, v) for k, v in sorted(parameters.items())]) 133 134 def get_connection_string(self, application_name=None): 135 """ 136 Return the connection string, adding the application_name parameter 137 if requested, unless already defined by user in the connection string 138 139 :param str application_name: the application_name to add 140 :return str: the connection string 141 """ 142 conn_string = self.conninfo 143 # check if the application name is already defined by user 144 if application_name and "application_name" not in self.conn_parameters: 145 # Then add the it to the connection string 146 conn_string += " application_name=%s" % application_name 147 # adopt a secure schema-usage pattern. See: 148 # https://www.postgresql.org/docs/current/libpq-connect.html 149 if "options" not in self.conn_parameters: 150 conn_string += " options=-csearch_path=" 151 152 return conn_string 153 154 def connect(self): 155 """ 156 Generic function for Postgres connection (using psycopg2) 157 """ 158 159 if not self._check_connection(): 160 try: 161 self._conn = psycopg2.connect(self.conninfo) 162 self._conn.autocommit = True 163 # If psycopg2 fails to connect to the host, 164 # raise the appropriate exception 165 except psycopg2.DatabaseError as e: 166 raise PostgresConnectionError(force_str(e).strip()) 167 # Register the connection to the list of live connections 168 _live_connections.append(self) 169 return self._conn 170 171 def _check_connection(self): 172 """ 173 Return false if the connection is broken 174 175 :rtype: bool 176 """ 177 # If the connection is not present return False 178 if not self._conn: 179 return False 180 181 # Check if the connection works by running 'SELECT 1' 182 cursor = None 183 initial_status = None 184 try: 185 initial_status = self._conn.status 186 cursor = self._conn.cursor() 187 cursor.execute(self.CHECK_QUERY) 188 # Rollback if initial status was IDLE because the CHECK QUERY 189 # has started a new transaction. 190 if initial_status == STATUS_READY: 191 self._conn.rollback() 192 except psycopg2.DatabaseError: 193 # Connection is broken, so we need to reconnect 194 self.close() 195 # Raise an error if reconnect is not allowed 196 if not self.allow_reconnect: 197 raise PostgresConnectionError( 198 "Connection lost, reconnection not allowed" 199 ) 200 return False 201 finally: 202 if cursor: 203 cursor.close() 204 205 return True 206 207 def close(self): 208 """ 209 Close the connection to PostgreSQL 210 """ 211 if self._conn: 212 # If the connection is still alive, rollback and close it 213 if not self._conn.closed: 214 if self._conn.status == STATUS_IN_TRANSACTION: 215 self._conn.rollback() 216 self._conn.close() 217 # Remove the connection from the live connections list 218 self._conn = None 219 _live_connections.remove(self) 220 221 def _cursor(self, *args, **kwargs): 222 """ 223 Return a cursor 224 """ 225 conn = self.connect() 226 return conn.cursor(*args, **kwargs) 227 228 @property 229 def server_version(self): 230 """ 231 Version of PostgreSQL (returned by psycopg2) 232 """ 233 conn = self.connect() 234 return conn.server_version 235 236 @property 237 def server_txt_version(self): 238 """ 239 Human readable version of PostgreSQL (calculated from server_version) 240 241 :rtype: str|None 242 """ 243 try: 244 conn = self.connect() 245 major = int(conn.server_version / 10000) 246 minor = int(conn.server_version / 100 % 100) 247 patch = int(conn.server_version % 100) 248 if major < 10: 249 return "%d.%d.%d" % (major, minor, patch) 250 if minor != 0: 251 _logger.warning( 252 "Unexpected non zero minor version %s in %s", 253 minor, 254 conn.server_version, 255 ) 256 return "%d.%d" % (major, patch) 257 except PostgresConnectionError as e: 258 _logger.debug( 259 "Error retrieving PostgreSQL version: %s", force_str(e).strip() 260 ) 261 return None 262 263 @property 264 def server_major_version(self): 265 """ 266 PostgreSQL major version (calculated from server_txt_version) 267 268 :rtype: str|None 269 """ 270 result = self.server_txt_version 271 if result is not None: 272 return simplify_version(result) 273 return None 274 275 276class StreamingConnection(PostgreSQL): 277 """ 278 This class represents a streaming connection to a PostgreSQL server. 279 """ 280 281 CHECK_QUERY = "IDENTIFY_SYSTEM" 282 283 def __init__(self, conninfo): 284 """ 285 Streaming connection constructor 286 287 :param str conninfo: Connection information (aka DSN) 288 """ 289 super(StreamingConnection, self).__init__(conninfo) 290 291 # Make sure we connect using the 'replication' option which 292 # triggers streaming replication protocol communication 293 self.conn_parameters["replication"] = "true" 294 # ensure that the datestyle is set to iso, working around an 295 # issue in some psycopg2 versions 296 self.conn_parameters["options"] = "-cdatestyle=iso" 297 # Override 'dbname' parameter. This operation is required to mimic 298 # the behaviour of pg_receivexlog and pg_basebackup 299 self.conn_parameters["dbname"] = "replication" 300 # Rebuild the conninfo string from the modified parameter lists 301 self.conninfo = self.encode_dsn(self.conn_parameters) 302 303 def connect(self): 304 """ 305 Connect to the PostgreSQL server. It reuses an existing connection. 306 307 :returns: the connection to the server 308 """ 309 if self._check_connection(): 310 return self._conn 311 312 # Build a connection 313 self._conn = super(StreamingConnection, self).connect() 314 return self._conn 315 316 def fetch_remote_status(self): 317 """ 318 Returns the status of the connection to the PostgreSQL server. 319 320 This method does not raise any exception in case of errors, 321 but set the missing values to None in the resulting dictionary. 322 323 :rtype: dict[str, None|str] 324 """ 325 result = dict.fromkeys( 326 ( 327 "connection_error", 328 "streaming_supported", 329 "streaming", 330 "streaming_systemid", 331 "timeline", 332 "xlogpos", 333 ), 334 None, 335 ) 336 try: 337 # If the server is too old to support `pg_receivexlog`, 338 # exit immediately. 339 # This needs to be protected by the try/except because 340 # `self.server_version` can raise a PostgresConnectionError 341 if self.server_version < 90200: 342 result["streaming_supported"] = False 343 return result 344 result["streaming_supported"] = True 345 # Execute a IDENTIFY_SYSYEM to check the connection 346 cursor = self._cursor() 347 cursor.execute("IDENTIFY_SYSTEM") 348 row = cursor.fetchone() 349 # If something has been returned, barman is connected 350 # to a replication backend 351 if row: 352 result["streaming"] = True 353 # IDENTIFY_SYSTEM always returns at least two values 354 result["streaming_systemid"] = row[0] 355 result["timeline"] = row[1] 356 # PostgreSQL 9.1+ returns also the current xlog flush location 357 if len(row) > 2: 358 result["xlogpos"] = row[2] 359 except psycopg2.ProgrammingError: 360 # This is not a streaming connection 361 result["streaming"] = False 362 except PostgresConnectionError as e: 363 result["connection_error"] = force_str(e).strip() 364 _logger.warning( 365 "Error retrieving PostgreSQL status: %s", force_str(e).strip() 366 ) 367 return result 368 369 def create_physical_repslot(self, slot_name): 370 """ 371 Create a physical replication slot using the streaming connection 372 :param str slot_name: Replication slot name 373 """ 374 cursor = self._cursor() 375 try: 376 # In the following query, the slot name is directly passed 377 # to the CREATE_REPLICATION_SLOT command, without any 378 # quoting. This is a characteristic of the streaming 379 # connection, otherwise if will fail with a generic 380 # "syntax error" 381 cursor.execute("CREATE_REPLICATION_SLOT %s PHYSICAL" % slot_name) 382 _logger.info("Replication slot '%s' successfully created", slot_name) 383 except psycopg2.DatabaseError as exc: 384 if exc.pgcode == DUPLICATE_OBJECT: 385 # A replication slot with the same name exists 386 raise PostgresDuplicateReplicationSlot() 387 elif exc.pgcode == CONFIGURATION_LIMIT_EXCEEDED: 388 # Unable to create a new physical replication slot. 389 # All slots are full. 390 raise PostgresReplicationSlotsFull() 391 else: 392 raise PostgresException(force_str(exc).strip()) 393 394 def drop_repslot(self, slot_name): 395 """ 396 Drop a physical replication slot using the streaming connection 397 :param str slot_name: Replication slot name 398 """ 399 cursor = self._cursor() 400 try: 401 # In the following query, the slot name is directly passed 402 # to the DROP_REPLICATION_SLOT command, without any 403 # quoting. This is a characteristic of the streaming 404 # connection, otherwise if will fail with a generic 405 # "syntax error" 406 cursor.execute("DROP_REPLICATION_SLOT %s" % slot_name) 407 _logger.info("Replication slot '%s' successfully dropped", slot_name) 408 except psycopg2.DatabaseError as exc: 409 if exc.pgcode == UNDEFINED_OBJECT: 410 # A replication slot with the that name does not exist 411 raise PostgresInvalidReplicationSlot() 412 if exc.pgcode == OBJECT_IN_USE: 413 # The replication slot is still in use 414 raise PostgresReplicationSlotInUse() 415 else: 416 raise PostgresException(force_str(exc).strip()) 417 418 419class PostgreSQLConnection(PostgreSQL): 420 """ 421 This class represents a standard client connection to a PostgreSQL server. 422 """ 423 424 # Streaming replication client types 425 STANDBY = 1 426 WALSTREAMER = 2 427 ANY_STREAMING_CLIENT = (STANDBY, WALSTREAMER) 428 429 def __init__( 430 self, 431 conninfo, 432 immediate_checkpoint=False, 433 slot_name=None, 434 application_name="barman", 435 ): 436 """ 437 PostgreSQL connection constructor. 438 439 :param str conninfo: Connection information (aka DSN) 440 :param bool immediate_checkpoint: Whether to do an immediate checkpoint 441 when start a backup 442 :param str|None slot_name: Replication slot name 443 """ 444 super(PostgreSQLConnection, self).__init__(conninfo) 445 self.immediate_checkpoint = immediate_checkpoint 446 self.slot_name = slot_name 447 self.application_name = application_name 448 self.configuration_files = None 449 450 def connect(self): 451 """ 452 Connect to the PostgreSQL server. It reuses an existing connection. 453 """ 454 if self._check_connection(): 455 return self._conn 456 457 self._conn = super(PostgreSQLConnection, self).connect() 458 server_version = self._conn.server_version 459 use_app_name = "application_name" in self.conn_parameters 460 if server_version >= 90000 and not use_app_name: 461 try: 462 cur = self._conn.cursor() 463 # Do not use parameter substitution with SET 464 cur.execute("SET application_name TO %s" % self.application_name) 465 cur.close() 466 # If psycopg2 fails to set the application name, 467 # raise the appropriate exception 468 except psycopg2.ProgrammingError as e: 469 raise PostgresAppNameError(force_str(e).strip()) 470 return self._conn 471 472 @property 473 def server_txt_version(self): 474 """ 475 Human readable version of PostgreSQL (returned by the server). 476 477 Note: The return value of this function is used when composing include 478 patterns which are passed to rsync when copying tablespaces. If the 479 value does not exactly match the PostgreSQL version then Barman may 480 fail to copy tablespace files during a backup. 481 """ 482 try: 483 cur = self._cursor() 484 cur.execute("SELECT version()") 485 version_string = cur.fetchone()[0] 486 platform, version = version_string.split()[:2] 487 # EPAS <= 10 will return a version string which starts with 488 # EnterpriseDB followed by the PostgreSQL version with an 489 # additional version field. This additional field must be discarded 490 # so that we return the exact PostgreSQL version. Later versions of 491 # EPAS report the PostgreSQL version directly so do not need 492 # special handling. 493 if platform == "EnterpriseDB": 494 return ".".join(version.split(".")[:-1]) 495 else: 496 return version 497 except (PostgresConnectionError, psycopg2.Error) as e: 498 _logger.debug( 499 "Error retrieving PostgreSQL version: %s", force_str(e).strip() 500 ) 501 return None 502 503 @property 504 def has_pgespresso(self): 505 """ 506 Returns true if the `pgespresso` extension is available 507 """ 508 try: 509 # pg_extension is only available from Postgres 9.1+ 510 if self.server_version < 90100: 511 return False 512 cur = self._cursor() 513 cur.execute( 514 "SELECT count(*) FROM pg_extension WHERE extname = 'pgespresso'" 515 ) 516 q_result = cur.fetchone()[0] 517 return q_result > 0 518 except (PostgresConnectionError, psycopg2.Error) as e: 519 _logger.debug( 520 "Error retrieving pgespresso information: %s", force_str(e).strip() 521 ) 522 return None 523 524 @property 525 def is_in_recovery(self): 526 """ 527 Returns true if PostgreSQL server is in recovery mode (hot standby) 528 """ 529 try: 530 # pg_is_in_recovery is only available from Postgres 9.0+ 531 if self.server_version < 90000: 532 return False 533 cur = self._cursor() 534 cur.execute("SELECT pg_is_in_recovery()") 535 return cur.fetchone()[0] 536 except (PostgresConnectionError, psycopg2.Error) as e: 537 _logger.debug( 538 "Error calling pg_is_in_recovery() function: %s", force_str(e).strip() 539 ) 540 return None 541 542 @property 543 def is_superuser(self): 544 """ 545 Returns true if current user has superuser privileges 546 """ 547 try: 548 cur = self._cursor() 549 cur.execute("SELECT usesuper FROM pg_user WHERE usename = CURRENT_USER") 550 return cur.fetchone()[0] 551 except (PostgresConnectionError, psycopg2.Error) as e: 552 _logger.debug( 553 "Error calling is_superuser() function: %s", force_str(e).strip() 554 ) 555 return None 556 557 @property 558 def has_backup_privileges(self): 559 """ 560 Returns true if current user is superuser or, for PostgreSQL 10 561 or above, is a standard user that has grants to read server 562 settings and to execute all the functions needed for 563 exclusive/concurrent backup control and WAL control. 564 """ 565 # pg_monitor / pg_read_all_settings only available from v10 566 if self.server_version < 100000: 567 return self.is_superuser 568 569 backup_check_query = """ 570 SELECT 571 usesuper 572 OR 573 ( 574 ( 575 pg_has_role(CURRENT_USER, 'pg_monitor', 'MEMBER') 576 OR 577 ( 578 pg_has_role(CURRENT_USER, 'pg_read_all_settings', 'MEMBER') 579 AND pg_has_role(CURRENT_USER, 'pg_read_all_stats', 'MEMBER') 580 ) 581 ) 582 AND has_function_privilege( 583 CURRENT_USER, 'pg_start_backup(text,bool,bool)', 'EXECUTE') 584 AND 585 ( 586 has_function_privilege( 587 CURRENT_USER, 'pg_stop_backup()', 'EXECUTE') 588 OR has_function_privilege( 589 CURRENT_USER, 'pg_stop_backup(bool,bool)', 'EXECUTE') 590 ) 591 AND has_function_privilege( 592 CURRENT_USER, 'pg_switch_wal()', 'EXECUTE') 593 AND has_function_privilege( 594 CURRENT_USER, 'pg_create_restore_point(text)', 'EXECUTE') 595 ) 596 FROM 597 pg_user 598 WHERE 599 usename = CURRENT_USER 600 """ 601 try: 602 cur = self._cursor() 603 cur.execute(backup_check_query) 604 return cur.fetchone()[0] 605 except (PostgresConnectionError, psycopg2.Error) as e: 606 _logger.debug( 607 "Error checking privileges for functions needed for backups: %s", 608 force_str(e).strip(), 609 ) 610 return None 611 612 @property 613 def current_xlog_info(self): 614 """ 615 Get detailed information about the current WAL position in PostgreSQL. 616 617 This method returns a dictionary containing the following data: 618 619 * location 620 * file_name 621 * file_offset 622 * timestamp 623 624 When executed on a standby server file_name and file_offset are always 625 None 626 627 :rtype: psycopg2.extras.DictRow 628 """ 629 try: 630 cur = self._cursor(cursor_factory=DictCursor) 631 if not self.is_in_recovery: 632 cur.execute( 633 "SELECT location, " 634 "({pg_walfile_name_offset}(location)).*, " 635 "CURRENT_TIMESTAMP AS timestamp " 636 "FROM {pg_current_wal_lsn}() AS location".format(**self.name_map) 637 ) 638 return cur.fetchone() 639 else: 640 cur.execute( 641 "SELECT location, " 642 "NULL AS file_name, " 643 "NULL AS file_offset, " 644 "CURRENT_TIMESTAMP AS timestamp " 645 "FROM {pg_last_wal_replay_lsn}() AS location".format( 646 **self.name_map 647 ) 648 ) 649 return cur.fetchone() 650 except (PostgresConnectionError, psycopg2.Error) as e: 651 _logger.debug( 652 "Error retrieving current xlog detailed information: %s", 653 force_str(e).strip(), 654 ) 655 return None 656 657 @property 658 def current_xlog_file_name(self): 659 """ 660 Get current WAL file from PostgreSQL 661 662 :return str: current WAL file in PostgreSQL 663 """ 664 current_xlog_info = self.current_xlog_info 665 if current_xlog_info is not None: 666 return current_xlog_info["file_name"] 667 return None 668 669 @property 670 def xlog_segment_size(self): 671 """ 672 Retrieve the size of one WAL file. 673 674 In PostgreSQL 11, users will be able to change the WAL size 675 at runtime. Up to PostgreSQL 10, included, the WAL size can be changed 676 at compile time 677 678 :return: The wal size (In bytes) 679 """ 680 681 # Prior to PostgreSQL 8.4, the wal segment size was not configurable, 682 # even in compilation 683 if self.server_version < 80400: 684 return DEFAULT_XLOG_SEG_SIZE 685 686 try: 687 cur = self._cursor(cursor_factory=DictCursor) 688 # We can't use the `get_setting` method here, because it 689 # use `SHOW`, returning an human readable value such as "16MB", 690 # while we prefer a raw value such as 16777216. 691 cur.execute("SELECT setting FROM pg_settings WHERE name='wal_segment_size'") 692 result = cur.fetchone() 693 wal_segment_size = int(result[0]) 694 695 # Prior to PostgreSQL 11, the wal segment size is returned in 696 # blocks 697 if self.server_version < 110000: 698 cur.execute( 699 "SELECT setting FROM pg_settings WHERE name='wal_block_size'" 700 ) 701 result = cur.fetchone() 702 wal_block_size = int(result[0]) 703 704 wal_segment_size *= wal_block_size 705 706 return wal_segment_size 707 except ValueError as e: 708 _logger.error( 709 "Error retrieving current xlog segment size: %s", 710 force_str(e).strip(), 711 ) 712 return None 713 714 @property 715 def current_xlog_location(self): 716 """ 717 Get current WAL location from PostgreSQL 718 719 :return str: current WAL location in PostgreSQL 720 """ 721 current_xlog_info = self.current_xlog_info 722 if current_xlog_info is not None: 723 return current_xlog_info["location"] 724 return None 725 726 @property 727 def current_size(self): 728 """ 729 Returns the total size of the PostgreSQL server 730 (requires superuser or pg_read_all_stats) 731 """ 732 if not self.has_backup_privileges: 733 return None 734 735 try: 736 cur = self._cursor() 737 cur.execute("SELECT sum(pg_tablespace_size(oid)) FROM pg_tablespace") 738 return cur.fetchone()[0] 739 except (PostgresConnectionError, psycopg2.Error) as e: 740 _logger.debug( 741 "Error retrieving PostgreSQL total size: %s", force_str(e).strip() 742 ) 743 return None 744 745 @property 746 def archive_timeout(self): 747 """ 748 Retrieve the archive_timeout setting in PostgreSQL 749 750 :return: The archive timeout (in seconds) 751 """ 752 try: 753 cur = self._cursor(cursor_factory=DictCursor) 754 # We can't use the `get_setting` method here, because it 755 # uses `SHOW`, returning an human readable value such as "5min", 756 # while we prefer a raw value such as 300. 757 cur.execute("SELECT setting FROM pg_settings WHERE name='archive_timeout'") 758 result = cur.fetchone() 759 archive_timeout = int(result[0]) 760 761 return archive_timeout 762 except ValueError as e: 763 _logger.error("Error retrieving archive_timeout: %s", force_str(e).strip()) 764 return None 765 766 @property 767 def checkpoint_timeout(self): 768 """ 769 Retrieve the checkpoint_timeout setting in PostgreSQL 770 771 :return: The checkpoint timeout (in seconds) 772 """ 773 try: 774 cur = self._cursor(cursor_factory=DictCursor) 775 # We can't use the `get_setting` method here, because it 776 # uses `SHOW`, returning an human readable value such as "5min", 777 # while we prefer a raw value such as 300. 778 cur.execute( 779 "SELECT setting FROM pg_settings WHERE name='checkpoint_timeout'" 780 ) 781 result = cur.fetchone() 782 checkpoint_timeout = int(result[0]) 783 784 return checkpoint_timeout 785 except ValueError as e: 786 _logger.error( 787 "Error retrieving checkpoint_timeout: %s", force_str(e).strip() 788 ) 789 return None 790 791 def get_archiver_stats(self): 792 """ 793 This method gathers statistics from pg_stat_archiver. 794 Only for Postgres 9.4+ or greater. If not available, returns None. 795 796 :return dict|None: a dictionary containing Postgres statistics from 797 pg_stat_archiver or None 798 """ 799 try: 800 # pg_stat_archiver is only available from Postgres 9.4+ 801 if self.server_version < 90400: 802 return None 803 cur = self._cursor(cursor_factory=DictCursor) 804 # Select from pg_stat_archiver statistics view, 805 # retrieving statistics about WAL archiver process activity, 806 # also evaluating if the server is archiving without issues 807 # and the archived WALs per second rate. 808 # 809 # We are using current_settings to check for archive_mode=always. 810 # current_setting does normalise its output so we can just 811 # check for 'always' settings using a direct string 812 # comparison 813 cur.execute( 814 "SELECT *, " 815 "current_setting('archive_mode') IN ('on', 'always') " 816 "AND (last_failed_wal IS NULL " 817 "OR last_failed_wal LIKE '%.history' " 818 "AND substring(last_failed_wal from 1 for 8) " 819 "<= substring(last_archived_wal from 1 for 8) " 820 "OR last_failed_time <= last_archived_time) " 821 "AS is_archiving, " 822 "CAST (archived_count AS NUMERIC) " 823 "/ EXTRACT (EPOCH FROM age(now(), stats_reset)) " 824 "AS current_archived_wals_per_second " 825 "FROM pg_stat_archiver" 826 ) 827 return cur.fetchone() 828 except (PostgresConnectionError, psycopg2.Error) as e: 829 _logger.debug( 830 "Error retrieving pg_stat_archive data: %s", force_str(e).strip() 831 ) 832 return None 833 834 def fetch_remote_status(self): 835 """ 836 Get the status of the PostgreSQL server 837 838 This method does not raise any exception in case of errors, 839 but set the missing values to None in the resulting dictionary. 840 841 :rtype: dict[str, None|str] 842 """ 843 # PostgreSQL settings to get from the server (requiring superuser) 844 pg_superuser_settings = ["data_directory"] 845 # PostgreSQL settings to get from the server 846 pg_settings = [] 847 pg_query_keys = [ 848 "server_txt_version", 849 "is_superuser", 850 "is_in_recovery", 851 "current_xlog", 852 "pgespresso_installed", 853 "replication_slot_support", 854 "replication_slot", 855 "synchronous_standby_names", 856 "postgres_systemid", 857 ] 858 # Initialise the result dictionary setting all the values to None 859 result = dict.fromkeys( 860 pg_superuser_settings + pg_settings + pg_query_keys, None 861 ) 862 try: 863 # Retrieve wal_level, hot_standby and max_wal_senders 864 # only if version is >= 9.0 865 if self.server_version >= 90000: 866 pg_settings.append("wal_level") 867 pg_settings.append("hot_standby") 868 pg_settings.append("max_wal_senders") 869 # Retrieve wal_keep_segments from version 9.0 onwards, until 870 # version 13.0, where it was renamed to wal_keep_size 871 if self.server_version < 130000: 872 pg_settings.append("wal_keep_segments") 873 else: 874 pg_settings.append("wal_keep_size") 875 876 if self.server_version >= 90300: 877 pg_settings.append("data_checksums") 878 879 if self.server_version >= 90400: 880 pg_settings.append("max_replication_slots") 881 882 if self.server_version >= 90500: 883 pg_settings.append("wal_compression") 884 885 # retrieves superuser settings 886 if self.has_backup_privileges: 887 for name in pg_superuser_settings: 888 result[name] = self.get_setting(name) 889 890 # retrieves standard settings 891 for name in pg_settings: 892 result[name] = self.get_setting(name) 893 894 result["is_superuser"] = self.is_superuser 895 result["has_backup_privileges"] = self.has_backup_privileges 896 result["is_in_recovery"] = self.is_in_recovery 897 result["server_txt_version"] = self.server_txt_version 898 result["pgespresso_installed"] = self.has_pgespresso 899 current_xlog_info = self.current_xlog_info 900 if current_xlog_info: 901 result["current_lsn"] = current_xlog_info["location"] 902 result["current_xlog"] = current_xlog_info["file_name"] 903 else: 904 result["current_lsn"] = None 905 result["current_xlog"] = None 906 result["current_size"] = self.current_size 907 result["archive_timeout"] = self.archive_timeout 908 result["checkpoint_timeout"] = self.checkpoint_timeout 909 result["xlog_segment_size"] = self.xlog_segment_size 910 911 result.update(self.get_configuration_files()) 912 913 # Retrieve the replication_slot status 914 result["replication_slot_support"] = False 915 if self.server_version >= 90400: 916 result["replication_slot_support"] = True 917 if self.slot_name is not None: 918 result["replication_slot"] = self.get_replication_slot( 919 self.slot_name 920 ) 921 922 # Retrieve the list of synchronous standby names 923 result["synchronous_standby_names"] = [] 924 if self.server_version >= 90100: 925 result[ 926 "synchronous_standby_names" 927 ] = self.get_synchronous_standby_names() 928 929 if self.server_version >= 90600: 930 result["postgres_systemid"] = self.get_systemid() 931 except (PostgresConnectionError, psycopg2.Error) as e: 932 _logger.warning( 933 "Error retrieving PostgreSQL status: %s", force_str(e).strip() 934 ) 935 return result 936 937 def get_systemid(self): 938 """ 939 Get a Postgres instance systemid 940 """ 941 if self.server_version < 90600: 942 return 943 944 try: 945 cur = self._cursor() 946 cur.execute("SELECT system_identifier::text FROM pg_control_system()") 947 return cur.fetchone()[0] 948 except (PostgresConnectionError, psycopg2.Error) as e: 949 _logger.debug( 950 "Error retrieving PostgreSQL system Id: %s", force_str(e).strip() 951 ) 952 return None 953 954 def get_setting(self, name): 955 """ 956 Get a Postgres setting with a given name 957 958 :param name: a parameter name 959 """ 960 try: 961 cur = self._cursor() 962 cur.execute('SHOW "%s"' % name.replace('"', '""')) 963 return cur.fetchone()[0] 964 except (PostgresConnectionError, psycopg2.Error) as e: 965 _logger.debug( 966 "Error retrieving PostgreSQL setting '%s': %s", 967 name.replace('"', '""'), 968 force_str(e).strip(), 969 ) 970 return None 971 972 def get_tablespaces(self): 973 """ 974 Returns a list of tablespaces or None if not present 975 """ 976 try: 977 cur = self._cursor() 978 if self.server_version >= 90200: 979 cur.execute( 980 "SELECT spcname, oid, " 981 "pg_tablespace_location(oid) AS spclocation " 982 "FROM pg_tablespace " 983 "WHERE pg_tablespace_location(oid) != ''" 984 ) 985 else: 986 cur.execute( 987 "SELECT spcname, oid, spclocation " 988 "FROM pg_tablespace WHERE spclocation != ''" 989 ) 990 # Generate a list of tablespace objects 991 return [Tablespace._make(item) for item in cur.fetchall()] 992 except (PostgresConnectionError, psycopg2.Error) as e: 993 _logger.debug( 994 "Error retrieving PostgreSQL tablespaces: %s", force_str(e).strip() 995 ) 996 return None 997 998 def get_configuration_files(self): 999 """ 1000 Get postgres configuration files or an empty dictionary 1001 in case of error 1002 1003 :rtype: dict 1004 """ 1005 if self.configuration_files: 1006 return self.configuration_files 1007 try: 1008 self.configuration_files = {} 1009 cur = self._cursor() 1010 cur.execute( 1011 "SELECT name, setting FROM pg_settings " 1012 "WHERE name IN ('config_file', 'hba_file', 'ident_file')" 1013 ) 1014 for cname, cpath in cur.fetchall(): 1015 self.configuration_files[cname] = cpath 1016 1017 # Retrieve additional configuration files 1018 # If PostgreSQL is older than 8.4 disable this check 1019 if self.server_version >= 80400: 1020 cur.execute( 1021 "SELECT DISTINCT sourcefile AS included_file " 1022 "FROM pg_settings " 1023 "WHERE sourcefile IS NOT NULL " 1024 "AND sourcefile NOT IN " 1025 "(SELECT setting FROM pg_settings " 1026 "WHERE name = 'config_file') " 1027 "ORDER BY 1" 1028 ) 1029 # Extract the values from the containing single element tuples 1030 included_files = [included_file for included_file, in cur.fetchall()] 1031 if len(included_files) > 0: 1032 self.configuration_files["included_files"] = included_files 1033 1034 except (PostgresConnectionError, psycopg2.Error) as e: 1035 _logger.debug( 1036 "Error retrieving PostgreSQL configuration files location: %s", 1037 force_str(e).strip(), 1038 ) 1039 self.configuration_files = {} 1040 1041 return self.configuration_files 1042 1043 def create_restore_point(self, target_name): 1044 """ 1045 Create a restore point with the given target name 1046 1047 The method executes the pg_create_restore_point() function through 1048 a PostgreSQL connection. Only for Postgres versions >= 9.1 when not 1049 in replication. 1050 1051 If requirements are not met, the operation is skipped. 1052 1053 :param str target_name: name of the restore point 1054 1055 :returns: the restore point LSN 1056 :rtype: str|None 1057 """ 1058 if self.server_version < 90100: 1059 return None 1060 1061 # Not possible if on a standby 1062 # Called inside the pg_connect context to reuse the connection 1063 if self.is_in_recovery: 1064 return None 1065 1066 try: 1067 cur = self._cursor() 1068 cur.execute("SELECT pg_create_restore_point(%s)", [target_name]) 1069 _logger.info("Restore point '%s' successfully created", target_name) 1070 return cur.fetchone()[0] 1071 except (PostgresConnectionError, psycopg2.Error) as e: 1072 _logger.debug( 1073 "Error issuing pg_create_restore_point() command: %s", 1074 force_str(e).strip(), 1075 ) 1076 return None 1077 1078 def start_exclusive_backup(self, label): 1079 """ 1080 Calls pg_start_backup() on the PostgreSQL server 1081 1082 This method returns a dictionary containing the following data: 1083 1084 * location 1085 * file_name 1086 * file_offset 1087 * timestamp 1088 1089 :param str label: descriptive string to identify the backup 1090 :rtype: psycopg2.extras.DictRow 1091 """ 1092 try: 1093 conn = self.connect() 1094 1095 # Rollback to release the transaction, as the pg_start_backup 1096 # invocation can last up to PostgreSQL's checkpoint_timeout 1097 conn.rollback() 1098 1099 # Start an exclusive backup 1100 cur = conn.cursor(cursor_factory=DictCursor) 1101 if self.server_version < 80400: 1102 cur.execute( 1103 "SELECT location, " 1104 "({pg_walfile_name_offset}(location)).*, " 1105 "now() AS timestamp " 1106 "FROM pg_start_backup(%s) AS location".format(**self.name_map), 1107 (label,), 1108 ) 1109 else: 1110 cur.execute( 1111 "SELECT location, " 1112 "({pg_walfile_name_offset}(location)).*, " 1113 "now() AS timestamp " 1114 "FROM pg_start_backup(%s,%s) AS location".format(**self.name_map), 1115 (label, self.immediate_checkpoint), 1116 ) 1117 1118 start_row = cur.fetchone() 1119 1120 # Rollback to release the transaction, as the connection 1121 # is to be retained until the end of backup 1122 conn.rollback() 1123 1124 return start_row 1125 except (PostgresConnectionError, psycopg2.Error) as e: 1126 msg = "pg_start_backup(): %s" % force_str(e).strip() 1127 _logger.debug(msg) 1128 raise PostgresException(msg) 1129 1130 def start_concurrent_backup(self, label): 1131 """ 1132 Calls pg_start_backup on the PostgreSQL server using the 1133 API introduced with version 9.6 1134 1135 This method returns a dictionary containing the following data: 1136 1137 * location 1138 * timeline 1139 * timestamp 1140 1141 :param str label: descriptive string to identify the backup 1142 :rtype: psycopg2.extras.DictRow 1143 """ 1144 try: 1145 conn = self.connect() 1146 1147 # Rollback to release the transaction, as the pg_start_backup 1148 # invocation can last up to PostgreSQL's checkpoint_timeout 1149 conn.rollback() 1150 1151 # Start the backup using the api introduced in postgres 9.6 1152 cur = conn.cursor(cursor_factory=DictCursor) 1153 cur.execute( 1154 "SELECT location, " 1155 "(SELECT timeline_id " 1156 "FROM pg_control_checkpoint()) AS timeline, " 1157 "now() AS timestamp " 1158 "FROM pg_start_backup(%s, %s, FALSE) AS location", 1159 (label, self.immediate_checkpoint), 1160 ) 1161 start_row = cur.fetchone() 1162 1163 # Rollback to release the transaction, as the connection 1164 # is to be retained until the end of backup 1165 conn.rollback() 1166 1167 return start_row 1168 except (PostgresConnectionError, psycopg2.Error) as e: 1169 msg = "pg_start_backup command: %s" % (force_str(e).strip(),) 1170 _logger.debug(msg) 1171 raise PostgresException(msg) 1172 1173 def stop_exclusive_backup(self): 1174 """ 1175 Calls pg_stop_backup() on the PostgreSQL server 1176 1177 This method returns a dictionary containing the following data: 1178 1179 * location 1180 * file_name 1181 * file_offset 1182 * timestamp 1183 1184 :rtype: psycopg2.extras.DictRow 1185 """ 1186 try: 1187 conn = self.connect() 1188 1189 # Rollback to release the transaction, as the pg_stop_backup 1190 # invocation could will wait until the current WAL file is shipped 1191 conn.rollback() 1192 1193 # Stop the backup 1194 cur = conn.cursor(cursor_factory=DictCursor) 1195 cur.execute( 1196 "SELECT location, " 1197 "({pg_walfile_name_offset}(location)).*, " 1198 "now() AS timestamp " 1199 "FROM pg_stop_backup() AS location".format(**self.name_map) 1200 ) 1201 1202 return cur.fetchone() 1203 except (PostgresConnectionError, psycopg2.Error) as e: 1204 msg = "Error issuing pg_stop_backup command: %s" % force_str(e).strip() 1205 _logger.debug(msg) 1206 raise PostgresException( 1207 "Cannot terminate exclusive backup. " 1208 "You might have to manually execute pg_stop_backup " 1209 "on your PostgreSQL server" 1210 ) 1211 1212 def stop_concurrent_backup(self): 1213 """ 1214 Calls pg_stop_backup on the PostgreSQL server using the 1215 API introduced with version 9.6 1216 1217 This method returns a dictionary containing the following data: 1218 1219 * location 1220 * timeline 1221 * backup_label 1222 * timestamp 1223 1224 :rtype: psycopg2.extras.DictRow 1225 """ 1226 try: 1227 conn = self.connect() 1228 1229 # Rollback to release the transaction, as the pg_stop_backup 1230 # invocation could will wait until the current WAL file is shipped 1231 conn.rollback() 1232 1233 # Stop the backup using the api introduced with version 9.6 1234 cur = conn.cursor(cursor_factory=DictCursor) 1235 cur.execute( 1236 "SELECT end_row.lsn AS location, " 1237 "(SELECT CASE WHEN pg_is_in_recovery() " 1238 "THEN min_recovery_end_timeline ELSE timeline_id END " 1239 "FROM pg_control_checkpoint(), pg_control_recovery()" 1240 ") AS timeline, " 1241 "end_row.labelfile AS backup_label, " 1242 "now() AS timestamp FROM pg_stop_backup(FALSE) AS end_row" 1243 ) 1244 1245 return cur.fetchone() 1246 except (PostgresConnectionError, psycopg2.Error) as e: 1247 msg = "Error issuing pg_stop_backup command: %s" % force_str(e).strip() 1248 _logger.debug(msg) 1249 raise PostgresException(msg) 1250 1251 def pgespresso_start_backup(self, label): 1252 """ 1253 Execute a pgespresso_start_backup 1254 1255 This method returns a dictionary containing the following data: 1256 1257 * backup_label 1258 * timestamp 1259 1260 :param str label: descriptive string to identify the backup 1261 :rtype: psycopg2.extras.DictRow 1262 """ 1263 try: 1264 conn = self.connect() 1265 1266 # Rollback to release the transaction, 1267 # as the pgespresso_start_backup invocation can last 1268 # up to PostgreSQL's checkpoint_timeout 1269 conn.rollback() 1270 1271 # Start the concurrent backup using pgespresso 1272 cur = conn.cursor(cursor_factory=DictCursor) 1273 cur.execute( 1274 "SELECT pgespresso_start_backup(%s,%s) AS backup_label, " 1275 "now() AS timestamp", 1276 (label, self.immediate_checkpoint), 1277 ) 1278 1279 start_row = cur.fetchone() 1280 1281 # Rollback to release the transaction, as the connection 1282 # is to be retained until the end of backup 1283 conn.rollback() 1284 1285 return start_row 1286 except (PostgresConnectionError, psycopg2.Error) as e: 1287 msg = "pgespresso_start_backup(): %s" % force_str(e).strip() 1288 _logger.debug(msg) 1289 raise PostgresException(msg) 1290 1291 def pgespresso_stop_backup(self, backup_label): 1292 """ 1293 Execute a pgespresso_stop_backup 1294 1295 This method returns a dictionary containing the following data: 1296 1297 * end_wal 1298 * timestamp 1299 1300 :param str backup_label: backup label as returned 1301 by pgespress_start_backup 1302 :rtype: psycopg2.extras.DictRow 1303 """ 1304 try: 1305 conn = self.connect() 1306 # Issue a rollback to release any unneeded lock 1307 conn.rollback() 1308 cur = conn.cursor(cursor_factory=DictCursor) 1309 cur.execute( 1310 "SELECT pgespresso_stop_backup(%s) AS end_wal, now() AS timestamp", 1311 (backup_label,), 1312 ) 1313 return cur.fetchone() 1314 except (PostgresConnectionError, psycopg2.Error) as e: 1315 msg = "Error issuing pgespresso_stop_backup() command: %s" % ( 1316 force_str(e).strip() 1317 ) 1318 _logger.debug(msg) 1319 raise PostgresException( 1320 "%s\n" 1321 "HINT: You might have to manually execute " 1322 "pgespresso_abort_backup() on your PostgreSQL " 1323 "server" % msg 1324 ) 1325 1326 def switch_wal(self): 1327 """ 1328 Execute a pg_switch_wal() 1329 1330 To be SURE of the switch of a xlog, we collect the xlogfile name 1331 before and after the switch. 1332 The method returns the just closed xlog file name if the current xlog 1333 file has changed, it returns an empty string otherwise. 1334 1335 The method returns None if something went wrong during the execution 1336 of the pg_switch_wal command. 1337 1338 :rtype: str|None 1339 """ 1340 try: 1341 conn = self.connect() 1342 if not self.has_backup_privileges: 1343 raise BackupFunctionsAccessRequired( 1344 "Postgres user '%s' is missing required privileges " 1345 '(see "Preliminary steps" in the Barman manual)' 1346 % self.conn_parameters.get("user") 1347 ) 1348 1349 # If this server is in recovery there is nothing to do 1350 if self.is_in_recovery: 1351 raise PostgresIsInRecovery() 1352 1353 cur = conn.cursor() 1354 # Collect the xlog file name before the switch 1355 cur.execute( 1356 "SELECT {pg_walfile_name}(" 1357 "{pg_current_wal_insert_lsn}())".format(**self.name_map) 1358 ) 1359 pre_switch = cur.fetchone()[0] 1360 # Switch 1361 cur.execute( 1362 "SELECT {pg_walfile_name}({pg_switch_wal}())".format(**self.name_map) 1363 ) 1364 # Collect the xlog file name after the switch 1365 cur.execute( 1366 "SELECT {pg_walfile_name}(" 1367 "{pg_current_wal_insert_lsn}())".format(**self.name_map) 1368 ) 1369 post_switch = cur.fetchone()[0] 1370 if pre_switch < post_switch: 1371 return pre_switch 1372 else: 1373 return "" 1374 except (PostgresConnectionError, psycopg2.Error) as e: 1375 _logger.debug( 1376 "Error issuing {pg_switch_wal}() command: %s".format(**self.name_map), 1377 force_str(e).strip(), 1378 ) 1379 return None 1380 1381 def checkpoint(self): 1382 """ 1383 Execute a checkpoint 1384 """ 1385 try: 1386 conn = self.connect() 1387 1388 # Requires superuser privilege 1389 if not self.is_superuser: 1390 raise PostgresSuperuserRequired() 1391 1392 cur = conn.cursor() 1393 cur.execute("CHECKPOINT") 1394 except (PostgresConnectionError, psycopg2.Error) as e: 1395 _logger.debug("Error issuing CHECKPOINT: %s", force_str(e).strip()) 1396 1397 def get_replication_stats(self, client_type=STANDBY): 1398 """ 1399 Returns streaming replication information 1400 """ 1401 try: 1402 cur = self._cursor(cursor_factory=NamedTupleCursor) 1403 1404 if not self.has_backup_privileges: 1405 raise BackupFunctionsAccessRequired( 1406 "Postgres user '%s' is missing required privileges " 1407 '(see "Preliminary steps" in the Barman manual)' 1408 % self.conn_parameters.get("user") 1409 ) 1410 1411 # pg_stat_replication is a system view that contains one 1412 # row per WAL sender process with information about the 1413 # replication status of a standby server. It has been 1414 # introduced in PostgreSQL 9.1. Current fields are: 1415 # 1416 # - pid (procpid in 9.1) 1417 # - usesysid 1418 # - usename 1419 # - application_name 1420 # - client_addr 1421 # - client_hostname 1422 # - client_port 1423 # - backend_start 1424 # - backend_xmin (9.4+) 1425 # - state 1426 # - sent_lsn (sent_location before 10) 1427 # - write_lsn (write_location before 10) 1428 # - flush_lsn (flush_location before 10) 1429 # - replay_lsn (replay_location before 10) 1430 # - sync_priority 1431 # - sync_state 1432 # 1433 1434 if self.server_version < 90100: 1435 raise PostgresUnsupportedFeature("9.1") 1436 1437 from_repslot = "" 1438 where_clauses = [] 1439 if self.server_version >= 100000: 1440 # Current implementation (10+) 1441 what = "r.*, rs.slot_name" 1442 # Look for replication slot name 1443 from_repslot = ( 1444 "LEFT JOIN pg_replication_slots rs ON (r.pid = rs.active_pid) " 1445 ) 1446 where_clauses += ["(rs.slot_type IS NULL OR rs.slot_type = 'physical')"] 1447 elif self.server_version >= 90500: 1448 # PostgreSQL 9.5/9.6 1449 what = ( 1450 "pid, " 1451 "usesysid, " 1452 "usename, " 1453 "application_name, " 1454 "client_addr, " 1455 "client_hostname, " 1456 "client_port, " 1457 "backend_start, " 1458 "backend_xmin, " 1459 "state, " 1460 "sent_location AS sent_lsn, " 1461 "write_location AS write_lsn, " 1462 "flush_location AS flush_lsn, " 1463 "replay_location AS replay_lsn, " 1464 "sync_priority, " 1465 "sync_state, " 1466 "rs.slot_name" 1467 ) 1468 # Look for replication slot name 1469 from_repslot = ( 1470 "LEFT JOIN pg_replication_slots rs ON (r.pid = rs.active_pid) " 1471 ) 1472 where_clauses += ["(rs.slot_type IS NULL OR rs.slot_type = 'physical')"] 1473 elif self.server_version >= 90400: 1474 # PostgreSQL 9.4 1475 what = ( 1476 "pid, " 1477 "usesysid, " 1478 "usename, " 1479 "application_name, " 1480 "client_addr, " 1481 "client_hostname, " 1482 "client_port, " 1483 "backend_start, " 1484 "backend_xmin, " 1485 "state, " 1486 "sent_location AS sent_lsn, " 1487 "write_location AS write_lsn, " 1488 "flush_location AS flush_lsn, " 1489 "replay_location AS replay_lsn, " 1490 "sync_priority, " 1491 "sync_state" 1492 ) 1493 elif self.server_version >= 90200: 1494 # PostgreSQL 9.2/9.3 1495 what = ( 1496 "pid, " 1497 "usesysid, " 1498 "usename, " 1499 "application_name, " 1500 "client_addr, " 1501 "client_hostname, " 1502 "client_port, " 1503 "backend_start, " 1504 "CAST (NULL AS xid) AS backend_xmin, " 1505 "state, " 1506 "sent_location AS sent_lsn, " 1507 "write_location AS write_lsn, " 1508 "flush_location AS flush_lsn, " 1509 "replay_location AS replay_lsn, " 1510 "sync_priority, " 1511 "sync_state" 1512 ) 1513 else: 1514 # PostgreSQL 9.1 1515 what = ( 1516 "procpid AS pid, " 1517 "usesysid, " 1518 "usename, " 1519 "application_name, " 1520 "client_addr, " 1521 "client_hostname, " 1522 "client_port, " 1523 "backend_start, " 1524 "CAST (NULL AS xid) AS backend_xmin, " 1525 "state, " 1526 "sent_location AS sent_lsn, " 1527 "write_location AS write_lsn, " 1528 "flush_location AS flush_lsn, " 1529 "replay_location AS replay_lsn, " 1530 "sync_priority, " 1531 "sync_state" 1532 ) 1533 1534 # Streaming client 1535 if client_type == self.STANDBY: 1536 # Standby server 1537 where_clauses += ["{replay_lsn} IS NOT NULL".format(**self.name_map)] 1538 elif client_type == self.WALSTREAMER: 1539 # WAL streamer 1540 where_clauses += ["{replay_lsn} IS NULL".format(**self.name_map)] 1541 1542 if where_clauses: 1543 where = "WHERE %s " % " AND ".join(where_clauses) 1544 else: 1545 where = "" 1546 1547 # Execute the query 1548 cur.execute( 1549 "SELECT %s, " 1550 "pg_is_in_recovery() AS is_in_recovery, " 1551 "CASE WHEN pg_is_in_recovery() " 1552 " THEN {pg_last_wal_receive_lsn}() " 1553 " ELSE {pg_current_wal_lsn}() " 1554 "END AS current_lsn " 1555 "FROM pg_stat_replication r " 1556 "%s" 1557 "%s" 1558 "ORDER BY sync_state DESC, sync_priority".format(**self.name_map) 1559 % (what, from_repslot, where) 1560 ) 1561 1562 # Generate a list of standby objects 1563 return cur.fetchall() 1564 except (PostgresConnectionError, psycopg2.Error) as e: 1565 _logger.debug( 1566 "Error retrieving status of standby servers: %s", force_str(e).strip() 1567 ) 1568 return None 1569 1570 def get_replication_slot(self, slot_name): 1571 """ 1572 Retrieve from the PostgreSQL server a physical replication slot 1573 with a specific slot_name. 1574 1575 This method returns a dictionary containing the following data: 1576 1577 * slot_name 1578 * active 1579 * restart_lsn 1580 1581 :param str slot_name: the replication slot name 1582 :rtype: psycopg2.extras.DictRow 1583 """ 1584 if self.server_version < 90400: 1585 # Raise exception if replication slot are not supported 1586 # by PostgreSQL version 1587 raise PostgresUnsupportedFeature("9.4") 1588 else: 1589 cur = self._cursor(cursor_factory=NamedTupleCursor) 1590 try: 1591 cur.execute( 1592 "SELECT slot_name, " 1593 "active, " 1594 "restart_lsn " 1595 "FROM pg_replication_slots " 1596 "WHERE slot_type = 'physical' " 1597 "AND slot_name = '%s'" % slot_name 1598 ) 1599 # Retrieve the replication slot information 1600 return cur.fetchone() 1601 except (PostgresConnectionError, psycopg2.Error) as e: 1602 _logger.debug( 1603 "Error retrieving replication_slots: %s", force_str(e).strip() 1604 ) 1605 raise 1606 1607 def get_synchronous_standby_names(self): 1608 """ 1609 Retrieve the list of named synchronous standby servers from PostgreSQL 1610 1611 This method returns a list of names 1612 1613 :return list: synchronous standby names 1614 """ 1615 if self.server_version < 90100: 1616 # Raise exception if synchronous replication is not supported 1617 raise PostgresUnsupportedFeature("9.1") 1618 else: 1619 synchronous_standby_names = self.get_setting("synchronous_standby_names") 1620 # Return empty list if not defined 1621 if synchronous_standby_names is None: 1622 return [] 1623 # Normalise the list of sync standby names 1624 # On PostgreSQL 9.6 it is possible to specify the number of 1625 # required synchronous standby using this format: 1626 # n (name1, name2, ... nameN). 1627 # We only need the name list, so we discard everything else. 1628 1629 # The name list starts after the first parenthesis or at pos 0 1630 names_start = synchronous_standby_names.find("(") + 1 1631 names_end = synchronous_standby_names.rfind(")") 1632 if names_end < 0: 1633 names_end = len(synchronous_standby_names) 1634 names_list = synchronous_standby_names[names_start:names_end] 1635 # We can blindly strip double quotes because PostgreSQL enforces 1636 # the format of the synchronous_standby_names content 1637 return [x.strip().strip('"') for x in names_list.split(",")] 1638 1639 @property 1640 def name_map(self): 1641 """ 1642 Return a map with function and directory names according to the current 1643 PostgreSQL version. 1644 1645 Each entry has the `current` name as key and the name for the specific 1646 version as value. 1647 1648 :rtype: dict[str] 1649 """ 1650 1651 # Avoid raising an error if the connection is not available 1652 try: 1653 server_version = self.server_version 1654 except PostgresConnectionError: 1655 _logger.debug( 1656 "Impossible to detect the PostgreSQL version, " 1657 "name_map will return names from latest version" 1658 ) 1659 server_version = None 1660 1661 return function_name_map(server_version) 1662