1# -*- coding: utf-8 -*-
2# © Copyright EnterpriseDB UK Limited 2011-2021
3#
4# This file is part of Barman.
5#
6# Barman is free software: you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation, either version 3 of the License, or
9# (at your option) any later version.
10#
11# Barman is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with Barman.  If not, see <http://www.gnu.org/licenses/>.
18
19"""
20This module represents the interface towards a PostgreSQL server.
21"""
22
23import atexit
24import logging
25from abc import ABCMeta
26
27import psycopg2
28from psycopg2.errorcodes import DUPLICATE_OBJECT, OBJECT_IN_USE, UNDEFINED_OBJECT
29from psycopg2.extensions import STATUS_IN_TRANSACTION, STATUS_READY
30from psycopg2.extras import DictCursor, NamedTupleCursor
31
32from barman.exceptions import (
33    ConninfoException,
34    PostgresAppNameError,
35    PostgresConnectionError,
36    PostgresDuplicateReplicationSlot,
37    PostgresException,
38    PostgresInvalidReplicationSlot,
39    PostgresIsInRecovery,
40    PostgresReplicationSlotInUse,
41    PostgresReplicationSlotsFull,
42    BackupFunctionsAccessRequired,
43    PostgresSuperuserRequired,
44    PostgresUnsupportedFeature,
45)
46from barman.infofile import Tablespace
47from barman.postgres_plumbing import function_name_map
48from barman.remote_status import RemoteStatusMixin
49from barman.utils import force_str, simplify_version, with_metaclass
50from barman.xlog import DEFAULT_XLOG_SEG_SIZE
51
52# This is necessary because the CONFIGURATION_LIMIT_EXCEEDED constant
53# has been added in psycopg2 2.5, but Barman supports version 2.4.2+ so
54# in case of import error we declare a constant providing the correct value.
55try:
56    from psycopg2.errorcodes import CONFIGURATION_LIMIT_EXCEEDED
57except ImportError:
58    CONFIGURATION_LIMIT_EXCEEDED = "53400"
59
60
61_logger = logging.getLogger(__name__)
62
63_live_connections = []
64"""
65List of connections to be closed at the interpreter shutdown
66"""
67
68
69@atexit.register
70def _atexit():
71    """
72    Ensure that all the connections are correctly closed
73    at interpreter shutdown
74    """
75    # Take a copy of the list because the conn.close() method modify it
76    for conn in list(_live_connections):
77        _logger.warning(
78            "Forcing %s cleanup during process shut down.", conn.__class__.__name__
79        )
80        conn.close()
81
82
83class PostgreSQL(with_metaclass(ABCMeta, RemoteStatusMixin)):
84    """
85    This abstract class represents a generic interface to a PostgreSQL server.
86    """
87
88    CHECK_QUERY = "SELECT 1"
89
90    def __init__(self, conninfo):
91        """
92        Abstract base class constructor for PostgreSQL interface.
93
94        :param str conninfo: Connection information (aka DSN)
95        """
96        super(PostgreSQL, self).__init__()
97        self.conninfo = conninfo
98        self._conn = None
99        self.allow_reconnect = True
100        # Build a dictionary with connection info parameters
101        # This is mainly used to speed up search in conninfo
102        try:
103            self.conn_parameters = self.parse_dsn(conninfo)
104        except (ValueError, TypeError) as e:
105            _logger.debug(e)
106            raise ConninfoException(
107                'Cannot connect to postgres: "%s" '
108                "is not a valid connection string" % conninfo
109            )
110
111    @staticmethod
112    def parse_dsn(dsn):
113        """
114        Parse connection parameters from 'conninfo'
115
116        :param str dsn: Connection information (aka DSN)
117        :rtype: dict[str,str]
118        """
119        # TODO: this might be made more robust in the future
120        return dict(x.split("=", 1) for x in dsn.split())
121
122    @staticmethod
123    def encode_dsn(parameters):
124        """
125        Build a connection string from a dictionary of connection
126        parameters
127
128        :param dict[str,str] parameters: Connection parameters
129        :rtype: str
130        """
131        # TODO: this might be made more robust in the future
132        return " ".join(["%s=%s" % (k, v) for k, v in sorted(parameters.items())])
133
134    def get_connection_string(self, application_name=None):
135        """
136        Return the connection string, adding the application_name parameter
137        if requested, unless already defined by user in the connection string
138
139        :param str application_name: the application_name to add
140        :return str: the connection string
141        """
142        conn_string = self.conninfo
143        # check if the application name is already defined by user
144        if application_name and "application_name" not in self.conn_parameters:
145            # Then add the it to the connection string
146            conn_string += " application_name=%s" % application_name
147        # adopt a secure schema-usage pattern. See:
148        # https://www.postgresql.org/docs/current/libpq-connect.html
149        if "options" not in self.conn_parameters:
150            conn_string += " options=-csearch_path="
151
152        return conn_string
153
154    def connect(self):
155        """
156        Generic function for Postgres connection (using psycopg2)
157        """
158
159        if not self._check_connection():
160            try:
161                self._conn = psycopg2.connect(self.conninfo)
162                self._conn.autocommit = True
163            # If psycopg2 fails to connect to the host,
164            # raise the appropriate exception
165            except psycopg2.DatabaseError as e:
166                raise PostgresConnectionError(force_str(e).strip())
167            # Register the connection to the list of live connections
168            _live_connections.append(self)
169        return self._conn
170
171    def _check_connection(self):
172        """
173        Return false if the connection is broken
174
175        :rtype: bool
176        """
177        # If the connection is not present return False
178        if not self._conn:
179            return False
180
181        # Check if the connection works by running 'SELECT 1'
182        cursor = None
183        initial_status = None
184        try:
185            initial_status = self._conn.status
186            cursor = self._conn.cursor()
187            cursor.execute(self.CHECK_QUERY)
188            # Rollback if initial status was IDLE because the CHECK QUERY
189            # has started a new transaction.
190            if initial_status == STATUS_READY:
191                self._conn.rollback()
192        except psycopg2.DatabaseError:
193            # Connection is broken, so we need to reconnect
194            self.close()
195            # Raise an error if reconnect is not allowed
196            if not self.allow_reconnect:
197                raise PostgresConnectionError(
198                    "Connection lost, reconnection not allowed"
199                )
200            return False
201        finally:
202            if cursor:
203                cursor.close()
204
205        return True
206
207    def close(self):
208        """
209        Close the connection to PostgreSQL
210        """
211        if self._conn:
212            # If the connection is still alive, rollback and close it
213            if not self._conn.closed:
214                if self._conn.status == STATUS_IN_TRANSACTION:
215                    self._conn.rollback()
216                self._conn.close()
217            # Remove the connection from the live connections list
218            self._conn = None
219            _live_connections.remove(self)
220
221    def _cursor(self, *args, **kwargs):
222        """
223        Return a cursor
224        """
225        conn = self.connect()
226        return conn.cursor(*args, **kwargs)
227
228    @property
229    def server_version(self):
230        """
231        Version of PostgreSQL (returned by psycopg2)
232        """
233        conn = self.connect()
234        return conn.server_version
235
236    @property
237    def server_txt_version(self):
238        """
239        Human readable version of PostgreSQL (calculated from server_version)
240
241        :rtype: str|None
242        """
243        try:
244            conn = self.connect()
245            major = int(conn.server_version / 10000)
246            minor = int(conn.server_version / 100 % 100)
247            patch = int(conn.server_version % 100)
248            if major < 10:
249                return "%d.%d.%d" % (major, minor, patch)
250            if minor != 0:
251                _logger.warning(
252                    "Unexpected non zero minor version %s in %s",
253                    minor,
254                    conn.server_version,
255                )
256            return "%d.%d" % (major, patch)
257        except PostgresConnectionError as e:
258            _logger.debug(
259                "Error retrieving PostgreSQL version: %s", force_str(e).strip()
260            )
261            return None
262
263    @property
264    def server_major_version(self):
265        """
266        PostgreSQL major version (calculated from server_txt_version)
267
268        :rtype: str|None
269        """
270        result = self.server_txt_version
271        if result is not None:
272            return simplify_version(result)
273        return None
274
275
276class StreamingConnection(PostgreSQL):
277    """
278    This class represents a streaming connection to a PostgreSQL server.
279    """
280
281    CHECK_QUERY = "IDENTIFY_SYSTEM"
282
283    def __init__(self, conninfo):
284        """
285        Streaming connection constructor
286
287        :param str conninfo: Connection information (aka DSN)
288        """
289        super(StreamingConnection, self).__init__(conninfo)
290
291        # Make sure we connect using the 'replication' option which
292        # triggers streaming replication protocol communication
293        self.conn_parameters["replication"] = "true"
294        # ensure that the datestyle is set to iso, working around an
295        # issue in some psycopg2 versions
296        self.conn_parameters["options"] = "-cdatestyle=iso"
297        # Override 'dbname' parameter. This operation is required to mimic
298        # the behaviour of pg_receivexlog and pg_basebackup
299        self.conn_parameters["dbname"] = "replication"
300        # Rebuild the conninfo string from the modified parameter lists
301        self.conninfo = self.encode_dsn(self.conn_parameters)
302
303    def connect(self):
304        """
305        Connect to the PostgreSQL server. It reuses an existing connection.
306
307        :returns: the connection to the server
308        """
309        if self._check_connection():
310            return self._conn
311
312        # Build a connection
313        self._conn = super(StreamingConnection, self).connect()
314        return self._conn
315
316    def fetch_remote_status(self):
317        """
318        Returns the status of the connection to the PostgreSQL server.
319
320        This method does not raise any exception in case of errors,
321        but set the missing values to None in the resulting dictionary.
322
323        :rtype: dict[str, None|str]
324        """
325        result = dict.fromkeys(
326            (
327                "connection_error",
328                "streaming_supported",
329                "streaming",
330                "streaming_systemid",
331                "timeline",
332                "xlogpos",
333            ),
334            None,
335        )
336        try:
337            # If the server is too old to support `pg_receivexlog`,
338            # exit immediately.
339            # This needs to be protected by the try/except because
340            # `self.server_version` can raise a PostgresConnectionError
341            if self.server_version < 90200:
342                result["streaming_supported"] = False
343                return result
344            result["streaming_supported"] = True
345            # Execute a IDENTIFY_SYSYEM to check the connection
346            cursor = self._cursor()
347            cursor.execute("IDENTIFY_SYSTEM")
348            row = cursor.fetchone()
349            # If something has been returned, barman is connected
350            # to a replication backend
351            if row:
352                result["streaming"] = True
353                # IDENTIFY_SYSTEM always returns at least two values
354                result["streaming_systemid"] = row[0]
355                result["timeline"] = row[1]
356                # PostgreSQL 9.1+ returns also the current xlog flush location
357                if len(row) > 2:
358                    result["xlogpos"] = row[2]
359        except psycopg2.ProgrammingError:
360            # This is not a streaming connection
361            result["streaming"] = False
362        except PostgresConnectionError as e:
363            result["connection_error"] = force_str(e).strip()
364            _logger.warning(
365                "Error retrieving PostgreSQL status: %s", force_str(e).strip()
366            )
367        return result
368
369    def create_physical_repslot(self, slot_name):
370        """
371        Create a physical replication slot using the streaming connection
372        :param str slot_name: Replication slot name
373        """
374        cursor = self._cursor()
375        try:
376            # In the following query, the slot name is directly passed
377            # to the CREATE_REPLICATION_SLOT command, without any
378            # quoting. This is a characteristic of the streaming
379            # connection, otherwise if will fail with a generic
380            # "syntax error"
381            cursor.execute("CREATE_REPLICATION_SLOT %s PHYSICAL" % slot_name)
382            _logger.info("Replication slot '%s' successfully created", slot_name)
383        except psycopg2.DatabaseError as exc:
384            if exc.pgcode == DUPLICATE_OBJECT:
385                # A replication slot with the same name exists
386                raise PostgresDuplicateReplicationSlot()
387            elif exc.pgcode == CONFIGURATION_LIMIT_EXCEEDED:
388                # Unable to create a new physical replication slot.
389                # All slots are full.
390                raise PostgresReplicationSlotsFull()
391            else:
392                raise PostgresException(force_str(exc).strip())
393
394    def drop_repslot(self, slot_name):
395        """
396        Drop a physical replication slot using the streaming connection
397        :param str slot_name: Replication slot name
398        """
399        cursor = self._cursor()
400        try:
401            # In the following query, the slot name is directly passed
402            # to the DROP_REPLICATION_SLOT command, without any
403            # quoting. This is a characteristic of the streaming
404            # connection, otherwise if will fail with a generic
405            # "syntax error"
406            cursor.execute("DROP_REPLICATION_SLOT %s" % slot_name)
407            _logger.info("Replication slot '%s' successfully dropped", slot_name)
408        except psycopg2.DatabaseError as exc:
409            if exc.pgcode == UNDEFINED_OBJECT:
410                # A replication slot with the that name does not exist
411                raise PostgresInvalidReplicationSlot()
412            if exc.pgcode == OBJECT_IN_USE:
413                # The replication slot is still in use
414                raise PostgresReplicationSlotInUse()
415            else:
416                raise PostgresException(force_str(exc).strip())
417
418
419class PostgreSQLConnection(PostgreSQL):
420    """
421    This class represents a standard client connection to a PostgreSQL server.
422    """
423
424    # Streaming replication client types
425    STANDBY = 1
426    WALSTREAMER = 2
427    ANY_STREAMING_CLIENT = (STANDBY, WALSTREAMER)
428
429    def __init__(
430        self,
431        conninfo,
432        immediate_checkpoint=False,
433        slot_name=None,
434        application_name="barman",
435    ):
436        """
437        PostgreSQL connection constructor.
438
439        :param str conninfo: Connection information (aka DSN)
440        :param bool immediate_checkpoint: Whether to do an immediate checkpoint
441           when start a backup
442        :param str|None slot_name: Replication slot name
443        """
444        super(PostgreSQLConnection, self).__init__(conninfo)
445        self.immediate_checkpoint = immediate_checkpoint
446        self.slot_name = slot_name
447        self.application_name = application_name
448        self.configuration_files = None
449
450    def connect(self):
451        """
452        Connect to the PostgreSQL server. It reuses an existing connection.
453        """
454        if self._check_connection():
455            return self._conn
456
457        self._conn = super(PostgreSQLConnection, self).connect()
458        server_version = self._conn.server_version
459        use_app_name = "application_name" in self.conn_parameters
460        if server_version >= 90000 and not use_app_name:
461            try:
462                cur = self._conn.cursor()
463                # Do not use parameter substitution with SET
464                cur.execute("SET application_name TO %s" % self.application_name)
465                cur.close()
466            # If psycopg2 fails to set the application name,
467            # raise the appropriate exception
468            except psycopg2.ProgrammingError as e:
469                raise PostgresAppNameError(force_str(e).strip())
470        return self._conn
471
472    @property
473    def server_txt_version(self):
474        """
475        Human readable version of PostgreSQL (returned by the server).
476
477        Note: The return value of this function is used when composing include
478        patterns which are passed to rsync when copying tablespaces. If the
479        value does not exactly match the PostgreSQL version then Barman may
480        fail to copy tablespace files during a backup.
481        """
482        try:
483            cur = self._cursor()
484            cur.execute("SELECT version()")
485            version_string = cur.fetchone()[0]
486            platform, version = version_string.split()[:2]
487            # EPAS <= 10 will return a version string which starts with
488            # EnterpriseDB followed by the PostgreSQL version with an
489            # additional version field. This additional field must be discarded
490            # so that we return the exact PostgreSQL version. Later versions of
491            # EPAS report the PostgreSQL version directly so do not need
492            # special handling.
493            if platform == "EnterpriseDB":
494                return ".".join(version.split(".")[:-1])
495            else:
496                return version
497        except (PostgresConnectionError, psycopg2.Error) as e:
498            _logger.debug(
499                "Error retrieving PostgreSQL version: %s", force_str(e).strip()
500            )
501            return None
502
503    @property
504    def has_pgespresso(self):
505        """
506        Returns true if the `pgespresso` extension is available
507        """
508        try:
509            # pg_extension is only available from Postgres 9.1+
510            if self.server_version < 90100:
511                return False
512            cur = self._cursor()
513            cur.execute(
514                "SELECT count(*) FROM pg_extension WHERE extname = 'pgespresso'"
515            )
516            q_result = cur.fetchone()[0]
517            return q_result > 0
518        except (PostgresConnectionError, psycopg2.Error) as e:
519            _logger.debug(
520                "Error retrieving pgespresso information: %s", force_str(e).strip()
521            )
522            return None
523
524    @property
525    def is_in_recovery(self):
526        """
527        Returns true if PostgreSQL server is in recovery mode (hot standby)
528        """
529        try:
530            # pg_is_in_recovery is only available from Postgres 9.0+
531            if self.server_version < 90000:
532                return False
533            cur = self._cursor()
534            cur.execute("SELECT pg_is_in_recovery()")
535            return cur.fetchone()[0]
536        except (PostgresConnectionError, psycopg2.Error) as e:
537            _logger.debug(
538                "Error calling pg_is_in_recovery() function: %s", force_str(e).strip()
539            )
540            return None
541
542    @property
543    def is_superuser(self):
544        """
545        Returns true if current user has superuser privileges
546        """
547        try:
548            cur = self._cursor()
549            cur.execute("SELECT usesuper FROM pg_user WHERE usename = CURRENT_USER")
550            return cur.fetchone()[0]
551        except (PostgresConnectionError, psycopg2.Error) as e:
552            _logger.debug(
553                "Error calling is_superuser() function: %s", force_str(e).strip()
554            )
555            return None
556
557    @property
558    def has_backup_privileges(self):
559        """
560        Returns true if current user is superuser or, for PostgreSQL 10
561        or above, is a standard user that has grants to read server
562        settings and to execute all the functions needed for
563        exclusive/concurrent backup control and WAL control.
564        """
565        # pg_monitor / pg_read_all_settings only available from v10
566        if self.server_version < 100000:
567            return self.is_superuser
568
569        backup_check_query = """
570        SELECT
571          usesuper
572          OR
573          (
574            (
575              pg_has_role(CURRENT_USER, 'pg_monitor', 'MEMBER')
576              OR
577              (
578                pg_has_role(CURRENT_USER, 'pg_read_all_settings', 'MEMBER')
579                AND pg_has_role(CURRENT_USER, 'pg_read_all_stats', 'MEMBER')
580              )
581            )
582            AND has_function_privilege(
583              CURRENT_USER, 'pg_start_backup(text,bool,bool)', 'EXECUTE')
584            AND
585            (
586              has_function_privilege(
587                CURRENT_USER, 'pg_stop_backup()', 'EXECUTE')
588              OR has_function_privilege(
589                CURRENT_USER, 'pg_stop_backup(bool,bool)', 'EXECUTE')
590            )
591            AND has_function_privilege(
592              CURRENT_USER, 'pg_switch_wal()', 'EXECUTE')
593            AND has_function_privilege(
594              CURRENT_USER, 'pg_create_restore_point(text)', 'EXECUTE')
595          )
596        FROM
597          pg_user
598        WHERE
599          usename = CURRENT_USER
600        """
601        try:
602            cur = self._cursor()
603            cur.execute(backup_check_query)
604            return cur.fetchone()[0]
605        except (PostgresConnectionError, psycopg2.Error) as e:
606            _logger.debug(
607                "Error checking privileges for functions needed for backups: %s",
608                force_str(e).strip(),
609            )
610            return None
611
612    @property
613    def current_xlog_info(self):
614        """
615        Get detailed information about the current WAL position in PostgreSQL.
616
617        This method returns a dictionary containing the following data:
618
619         * location
620         * file_name
621         * file_offset
622         * timestamp
623
624        When executed on a standby server file_name and file_offset are always
625        None
626
627        :rtype: psycopg2.extras.DictRow
628        """
629        try:
630            cur = self._cursor(cursor_factory=DictCursor)
631            if not self.is_in_recovery:
632                cur.execute(
633                    "SELECT location, "
634                    "({pg_walfile_name_offset}(location)).*, "
635                    "CURRENT_TIMESTAMP AS timestamp "
636                    "FROM {pg_current_wal_lsn}() AS location".format(**self.name_map)
637                )
638                return cur.fetchone()
639            else:
640                cur.execute(
641                    "SELECT location, "
642                    "NULL AS file_name, "
643                    "NULL AS file_offset, "
644                    "CURRENT_TIMESTAMP AS timestamp "
645                    "FROM {pg_last_wal_replay_lsn}() AS location".format(
646                        **self.name_map
647                    )
648                )
649                return cur.fetchone()
650        except (PostgresConnectionError, psycopg2.Error) as e:
651            _logger.debug(
652                "Error retrieving current xlog detailed information: %s",
653                force_str(e).strip(),
654            )
655        return None
656
657    @property
658    def current_xlog_file_name(self):
659        """
660        Get current WAL file from PostgreSQL
661
662        :return str: current WAL file in PostgreSQL
663        """
664        current_xlog_info = self.current_xlog_info
665        if current_xlog_info is not None:
666            return current_xlog_info["file_name"]
667        return None
668
669    @property
670    def xlog_segment_size(self):
671        """
672        Retrieve the size of one WAL file.
673
674        In PostgreSQL 11, users will be able to change the WAL size
675        at runtime. Up to PostgreSQL 10, included, the WAL size can be changed
676        at compile time
677
678        :return: The wal size (In bytes)
679        """
680
681        # Prior to PostgreSQL 8.4, the wal segment size was not configurable,
682        # even in compilation
683        if self.server_version < 80400:
684            return DEFAULT_XLOG_SEG_SIZE
685
686        try:
687            cur = self._cursor(cursor_factory=DictCursor)
688            # We can't use the `get_setting` method here, because it
689            # use `SHOW`, returning an human readable value such as "16MB",
690            # while we prefer a raw value such as 16777216.
691            cur.execute("SELECT setting FROM pg_settings WHERE name='wal_segment_size'")
692            result = cur.fetchone()
693            wal_segment_size = int(result[0])
694
695            # Prior to PostgreSQL 11, the wal segment size is returned in
696            # blocks
697            if self.server_version < 110000:
698                cur.execute(
699                    "SELECT setting FROM pg_settings WHERE name='wal_block_size'"
700                )
701                result = cur.fetchone()
702                wal_block_size = int(result[0])
703
704                wal_segment_size *= wal_block_size
705
706            return wal_segment_size
707        except ValueError as e:
708            _logger.error(
709                "Error retrieving current xlog segment size: %s",
710                force_str(e).strip(),
711            )
712            return None
713
714    @property
715    def current_xlog_location(self):
716        """
717        Get current WAL location from PostgreSQL
718
719        :return str: current WAL location in PostgreSQL
720        """
721        current_xlog_info = self.current_xlog_info
722        if current_xlog_info is not None:
723            return current_xlog_info["location"]
724        return None
725
726    @property
727    def current_size(self):
728        """
729        Returns the total size of the PostgreSQL server
730        (requires superuser or pg_read_all_stats)
731        """
732        if not self.has_backup_privileges:
733            return None
734
735        try:
736            cur = self._cursor()
737            cur.execute("SELECT sum(pg_tablespace_size(oid)) FROM pg_tablespace")
738            return cur.fetchone()[0]
739        except (PostgresConnectionError, psycopg2.Error) as e:
740            _logger.debug(
741                "Error retrieving PostgreSQL total size: %s", force_str(e).strip()
742            )
743            return None
744
745    @property
746    def archive_timeout(self):
747        """
748        Retrieve the archive_timeout setting in PostgreSQL
749
750        :return: The archive timeout (in seconds)
751        """
752        try:
753            cur = self._cursor(cursor_factory=DictCursor)
754            # We can't use the `get_setting` method here, because it
755            # uses `SHOW`, returning an human readable value such as "5min",
756            # while we prefer a raw value such as 300.
757            cur.execute("SELECT setting FROM pg_settings WHERE name='archive_timeout'")
758            result = cur.fetchone()
759            archive_timeout = int(result[0])
760
761            return archive_timeout
762        except ValueError as e:
763            _logger.error("Error retrieving archive_timeout: %s", force_str(e).strip())
764            return None
765
766    @property
767    def checkpoint_timeout(self):
768        """
769        Retrieve the checkpoint_timeout setting in PostgreSQL
770
771        :return: The checkpoint timeout (in seconds)
772        """
773        try:
774            cur = self._cursor(cursor_factory=DictCursor)
775            # We can't use the `get_setting` method here, because it
776            # uses `SHOW`, returning an human readable value such as "5min",
777            # while we prefer a raw value such as 300.
778            cur.execute(
779                "SELECT setting FROM pg_settings WHERE name='checkpoint_timeout'"
780            )
781            result = cur.fetchone()
782            checkpoint_timeout = int(result[0])
783
784            return checkpoint_timeout
785        except ValueError as e:
786            _logger.error(
787                "Error retrieving checkpoint_timeout: %s", force_str(e).strip()
788            )
789            return None
790
791    def get_archiver_stats(self):
792        """
793        This method gathers statistics from pg_stat_archiver.
794        Only for Postgres 9.4+ or greater. If not available, returns None.
795
796        :return dict|None: a dictionary containing Postgres statistics from
797            pg_stat_archiver or None
798        """
799        try:
800            # pg_stat_archiver is only available from Postgres 9.4+
801            if self.server_version < 90400:
802                return None
803            cur = self._cursor(cursor_factory=DictCursor)
804            # Select from pg_stat_archiver statistics view,
805            # retrieving statistics about WAL archiver process activity,
806            # also evaluating if the server is archiving without issues
807            # and the archived WALs per second rate.
808            #
809            # We are using current_settings to check for archive_mode=always.
810            # current_setting does normalise its output so we can just
811            # check for 'always' settings using a direct string
812            # comparison
813            cur.execute(
814                "SELECT *, "
815                "current_setting('archive_mode') IN ('on', 'always') "
816                "AND (last_failed_wal IS NULL "
817                "OR last_failed_wal LIKE '%.history' "
818                "AND substring(last_failed_wal from 1 for 8) "
819                "<= substring(last_archived_wal from 1 for 8) "
820                "OR last_failed_time <= last_archived_time) "
821                "AS is_archiving, "
822                "CAST (archived_count AS NUMERIC) "
823                "/ EXTRACT (EPOCH FROM age(now(), stats_reset)) "
824                "AS current_archived_wals_per_second "
825                "FROM pg_stat_archiver"
826            )
827            return cur.fetchone()
828        except (PostgresConnectionError, psycopg2.Error) as e:
829            _logger.debug(
830                "Error retrieving pg_stat_archive data: %s", force_str(e).strip()
831            )
832            return None
833
834    def fetch_remote_status(self):
835        """
836        Get the status of the PostgreSQL server
837
838        This method does not raise any exception in case of errors,
839        but set the missing values to None in the resulting dictionary.
840
841        :rtype: dict[str, None|str]
842        """
843        # PostgreSQL settings to get from the server (requiring superuser)
844        pg_superuser_settings = ["data_directory"]
845        # PostgreSQL settings to get from the server
846        pg_settings = []
847        pg_query_keys = [
848            "server_txt_version",
849            "is_superuser",
850            "is_in_recovery",
851            "current_xlog",
852            "pgespresso_installed",
853            "replication_slot_support",
854            "replication_slot",
855            "synchronous_standby_names",
856            "postgres_systemid",
857        ]
858        # Initialise the result dictionary setting all the values to None
859        result = dict.fromkeys(
860            pg_superuser_settings + pg_settings + pg_query_keys, None
861        )
862        try:
863            # Retrieve wal_level, hot_standby and max_wal_senders
864            # only if version is >= 9.0
865            if self.server_version >= 90000:
866                pg_settings.append("wal_level")
867                pg_settings.append("hot_standby")
868                pg_settings.append("max_wal_senders")
869                # Retrieve wal_keep_segments from version 9.0 onwards, until
870                # version 13.0, where it was renamed to wal_keep_size
871                if self.server_version < 130000:
872                    pg_settings.append("wal_keep_segments")
873                else:
874                    pg_settings.append("wal_keep_size")
875
876            if self.server_version >= 90300:
877                pg_settings.append("data_checksums")
878
879            if self.server_version >= 90400:
880                pg_settings.append("max_replication_slots")
881
882            if self.server_version >= 90500:
883                pg_settings.append("wal_compression")
884
885            # retrieves superuser settings
886            if self.has_backup_privileges:
887                for name in pg_superuser_settings:
888                    result[name] = self.get_setting(name)
889
890            # retrieves standard settings
891            for name in pg_settings:
892                result[name] = self.get_setting(name)
893
894            result["is_superuser"] = self.is_superuser
895            result["has_backup_privileges"] = self.has_backup_privileges
896            result["is_in_recovery"] = self.is_in_recovery
897            result["server_txt_version"] = self.server_txt_version
898            result["pgespresso_installed"] = self.has_pgespresso
899            current_xlog_info = self.current_xlog_info
900            if current_xlog_info:
901                result["current_lsn"] = current_xlog_info["location"]
902                result["current_xlog"] = current_xlog_info["file_name"]
903            else:
904                result["current_lsn"] = None
905                result["current_xlog"] = None
906            result["current_size"] = self.current_size
907            result["archive_timeout"] = self.archive_timeout
908            result["checkpoint_timeout"] = self.checkpoint_timeout
909            result["xlog_segment_size"] = self.xlog_segment_size
910
911            result.update(self.get_configuration_files())
912
913            # Retrieve the replication_slot status
914            result["replication_slot_support"] = False
915            if self.server_version >= 90400:
916                result["replication_slot_support"] = True
917                if self.slot_name is not None:
918                    result["replication_slot"] = self.get_replication_slot(
919                        self.slot_name
920                    )
921
922            # Retrieve the list of synchronous standby names
923            result["synchronous_standby_names"] = []
924            if self.server_version >= 90100:
925                result[
926                    "synchronous_standby_names"
927                ] = self.get_synchronous_standby_names()
928
929            if self.server_version >= 90600:
930                result["postgres_systemid"] = self.get_systemid()
931        except (PostgresConnectionError, psycopg2.Error) as e:
932            _logger.warning(
933                "Error retrieving PostgreSQL status: %s", force_str(e).strip()
934            )
935        return result
936
937    def get_systemid(self):
938        """
939        Get a Postgres instance systemid
940        """
941        if self.server_version < 90600:
942            return
943
944        try:
945            cur = self._cursor()
946            cur.execute("SELECT system_identifier::text FROM pg_control_system()")
947            return cur.fetchone()[0]
948        except (PostgresConnectionError, psycopg2.Error) as e:
949            _logger.debug(
950                "Error retrieving PostgreSQL system Id: %s", force_str(e).strip()
951            )
952            return None
953
954    def get_setting(self, name):
955        """
956        Get a Postgres setting with a given name
957
958        :param name: a parameter name
959        """
960        try:
961            cur = self._cursor()
962            cur.execute('SHOW "%s"' % name.replace('"', '""'))
963            return cur.fetchone()[0]
964        except (PostgresConnectionError, psycopg2.Error) as e:
965            _logger.debug(
966                "Error retrieving PostgreSQL setting '%s': %s",
967                name.replace('"', '""'),
968                force_str(e).strip(),
969            )
970            return None
971
972    def get_tablespaces(self):
973        """
974        Returns a list of tablespaces or None if not present
975        """
976        try:
977            cur = self._cursor()
978            if self.server_version >= 90200:
979                cur.execute(
980                    "SELECT spcname, oid, "
981                    "pg_tablespace_location(oid) AS spclocation "
982                    "FROM pg_tablespace "
983                    "WHERE pg_tablespace_location(oid) != ''"
984                )
985            else:
986                cur.execute(
987                    "SELECT spcname, oid, spclocation "
988                    "FROM pg_tablespace WHERE spclocation != ''"
989                )
990            # Generate a list of tablespace objects
991            return [Tablespace._make(item) for item in cur.fetchall()]
992        except (PostgresConnectionError, psycopg2.Error) as e:
993            _logger.debug(
994                "Error retrieving PostgreSQL tablespaces: %s", force_str(e).strip()
995            )
996            return None
997
998    def get_configuration_files(self):
999        """
1000        Get postgres configuration files or an empty dictionary
1001        in case of error
1002
1003        :rtype: dict
1004        """
1005        if self.configuration_files:
1006            return self.configuration_files
1007        try:
1008            self.configuration_files = {}
1009            cur = self._cursor()
1010            cur.execute(
1011                "SELECT name, setting FROM pg_settings "
1012                "WHERE name IN ('config_file', 'hba_file', 'ident_file')"
1013            )
1014            for cname, cpath in cur.fetchall():
1015                self.configuration_files[cname] = cpath
1016
1017            # Retrieve additional configuration files
1018            # If PostgreSQL is older than 8.4 disable this check
1019            if self.server_version >= 80400:
1020                cur.execute(
1021                    "SELECT DISTINCT sourcefile AS included_file "
1022                    "FROM pg_settings "
1023                    "WHERE sourcefile IS NOT NULL "
1024                    "AND sourcefile NOT IN "
1025                    "(SELECT setting FROM pg_settings "
1026                    "WHERE name = 'config_file') "
1027                    "ORDER BY 1"
1028                )
1029                # Extract the values from the containing single element tuples
1030                included_files = [included_file for included_file, in cur.fetchall()]
1031                if len(included_files) > 0:
1032                    self.configuration_files["included_files"] = included_files
1033
1034        except (PostgresConnectionError, psycopg2.Error) as e:
1035            _logger.debug(
1036                "Error retrieving PostgreSQL configuration files location: %s",
1037                force_str(e).strip(),
1038            )
1039            self.configuration_files = {}
1040
1041        return self.configuration_files
1042
1043    def create_restore_point(self, target_name):
1044        """
1045        Create a restore point with the given target name
1046
1047        The method executes the pg_create_restore_point() function through
1048        a PostgreSQL connection. Only for Postgres versions >= 9.1 when not
1049        in replication.
1050
1051        If requirements are not met, the operation is skipped.
1052
1053        :param str target_name: name of the restore point
1054
1055        :returns: the restore point LSN
1056        :rtype: str|None
1057        """
1058        if self.server_version < 90100:
1059            return None
1060
1061        # Not possible if on a standby
1062        # Called inside the pg_connect context to reuse the connection
1063        if self.is_in_recovery:
1064            return None
1065
1066        try:
1067            cur = self._cursor()
1068            cur.execute("SELECT pg_create_restore_point(%s)", [target_name])
1069            _logger.info("Restore point '%s' successfully created", target_name)
1070            return cur.fetchone()[0]
1071        except (PostgresConnectionError, psycopg2.Error) as e:
1072            _logger.debug(
1073                "Error issuing pg_create_restore_point() command: %s",
1074                force_str(e).strip(),
1075            )
1076            return None
1077
1078    def start_exclusive_backup(self, label):
1079        """
1080        Calls pg_start_backup() on the PostgreSQL server
1081
1082        This method returns a dictionary containing the following data:
1083
1084         * location
1085         * file_name
1086         * file_offset
1087         * timestamp
1088
1089        :param str label: descriptive string to identify the backup
1090        :rtype: psycopg2.extras.DictRow
1091        """
1092        try:
1093            conn = self.connect()
1094
1095            # Rollback to release the transaction, as the pg_start_backup
1096            # invocation can last up to PostgreSQL's checkpoint_timeout
1097            conn.rollback()
1098
1099            # Start an exclusive backup
1100            cur = conn.cursor(cursor_factory=DictCursor)
1101            if self.server_version < 80400:
1102                cur.execute(
1103                    "SELECT location, "
1104                    "({pg_walfile_name_offset}(location)).*, "
1105                    "now() AS timestamp "
1106                    "FROM pg_start_backup(%s) AS location".format(**self.name_map),
1107                    (label,),
1108                )
1109            else:
1110                cur.execute(
1111                    "SELECT location, "
1112                    "({pg_walfile_name_offset}(location)).*, "
1113                    "now() AS timestamp "
1114                    "FROM pg_start_backup(%s,%s) AS location".format(**self.name_map),
1115                    (label, self.immediate_checkpoint),
1116                )
1117
1118            start_row = cur.fetchone()
1119
1120            # Rollback to release the transaction, as the connection
1121            # is to be retained until the end of backup
1122            conn.rollback()
1123
1124            return start_row
1125        except (PostgresConnectionError, psycopg2.Error) as e:
1126            msg = "pg_start_backup(): %s" % force_str(e).strip()
1127            _logger.debug(msg)
1128            raise PostgresException(msg)
1129
1130    def start_concurrent_backup(self, label):
1131        """
1132        Calls pg_start_backup on the PostgreSQL server using the
1133        API introduced with version 9.6
1134
1135        This method returns a dictionary containing the following data:
1136
1137         * location
1138         * timeline
1139         * timestamp
1140
1141        :param str label: descriptive string to identify the backup
1142        :rtype: psycopg2.extras.DictRow
1143        """
1144        try:
1145            conn = self.connect()
1146
1147            # Rollback to release the transaction, as the pg_start_backup
1148            # invocation can last up to PostgreSQL's checkpoint_timeout
1149            conn.rollback()
1150
1151            # Start the backup using the api introduced in postgres 9.6
1152            cur = conn.cursor(cursor_factory=DictCursor)
1153            cur.execute(
1154                "SELECT location, "
1155                "(SELECT timeline_id "
1156                "FROM pg_control_checkpoint()) AS timeline, "
1157                "now() AS timestamp "
1158                "FROM pg_start_backup(%s, %s, FALSE) AS location",
1159                (label, self.immediate_checkpoint),
1160            )
1161            start_row = cur.fetchone()
1162
1163            # Rollback to release the transaction, as the connection
1164            # is to be retained until the end of backup
1165            conn.rollback()
1166
1167            return start_row
1168        except (PostgresConnectionError, psycopg2.Error) as e:
1169            msg = "pg_start_backup command: %s" % (force_str(e).strip(),)
1170            _logger.debug(msg)
1171            raise PostgresException(msg)
1172
1173    def stop_exclusive_backup(self):
1174        """
1175        Calls pg_stop_backup() on the PostgreSQL server
1176
1177        This method returns a dictionary containing the following data:
1178
1179         * location
1180         * file_name
1181         * file_offset
1182         * timestamp
1183
1184        :rtype: psycopg2.extras.DictRow
1185        """
1186        try:
1187            conn = self.connect()
1188
1189            # Rollback to release the transaction, as the pg_stop_backup
1190            # invocation could will wait until the current WAL file is shipped
1191            conn.rollback()
1192
1193            # Stop the backup
1194            cur = conn.cursor(cursor_factory=DictCursor)
1195            cur.execute(
1196                "SELECT location, "
1197                "({pg_walfile_name_offset}(location)).*, "
1198                "now() AS timestamp "
1199                "FROM pg_stop_backup() AS location".format(**self.name_map)
1200            )
1201
1202            return cur.fetchone()
1203        except (PostgresConnectionError, psycopg2.Error) as e:
1204            msg = "Error issuing pg_stop_backup command: %s" % force_str(e).strip()
1205            _logger.debug(msg)
1206            raise PostgresException(
1207                "Cannot terminate exclusive backup. "
1208                "You might have to manually execute pg_stop_backup "
1209                "on your PostgreSQL server"
1210            )
1211
1212    def stop_concurrent_backup(self):
1213        """
1214        Calls pg_stop_backup on the PostgreSQL server using the
1215        API introduced with version 9.6
1216
1217        This method returns a dictionary containing the following data:
1218
1219         * location
1220         * timeline
1221         * backup_label
1222         * timestamp
1223
1224        :rtype: psycopg2.extras.DictRow
1225        """
1226        try:
1227            conn = self.connect()
1228
1229            # Rollback to release the transaction, as the pg_stop_backup
1230            # invocation could will wait until the current WAL file is shipped
1231            conn.rollback()
1232
1233            # Stop the backup  using the api introduced with version 9.6
1234            cur = conn.cursor(cursor_factory=DictCursor)
1235            cur.execute(
1236                "SELECT end_row.lsn AS location, "
1237                "(SELECT CASE WHEN pg_is_in_recovery() "
1238                "THEN min_recovery_end_timeline ELSE timeline_id END "
1239                "FROM pg_control_checkpoint(), pg_control_recovery()"
1240                ") AS timeline, "
1241                "end_row.labelfile AS backup_label, "
1242                "now() AS timestamp FROM pg_stop_backup(FALSE) AS end_row"
1243            )
1244
1245            return cur.fetchone()
1246        except (PostgresConnectionError, psycopg2.Error) as e:
1247            msg = "Error issuing pg_stop_backup command: %s" % force_str(e).strip()
1248            _logger.debug(msg)
1249            raise PostgresException(msg)
1250
1251    def pgespresso_start_backup(self, label):
1252        """
1253        Execute a pgespresso_start_backup
1254
1255        This method returns a dictionary containing the following data:
1256
1257         * backup_label
1258         * timestamp
1259
1260        :param str label: descriptive string to identify the backup
1261        :rtype: psycopg2.extras.DictRow
1262        """
1263        try:
1264            conn = self.connect()
1265
1266            # Rollback to release the transaction,
1267            # as the pgespresso_start_backup invocation can last
1268            # up to PostgreSQL's checkpoint_timeout
1269            conn.rollback()
1270
1271            # Start the concurrent backup using pgespresso
1272            cur = conn.cursor(cursor_factory=DictCursor)
1273            cur.execute(
1274                "SELECT pgespresso_start_backup(%s,%s) AS backup_label, "
1275                "now() AS timestamp",
1276                (label, self.immediate_checkpoint),
1277            )
1278
1279            start_row = cur.fetchone()
1280
1281            # Rollback to release the transaction, as the connection
1282            # is to be retained until the end of backup
1283            conn.rollback()
1284
1285            return start_row
1286        except (PostgresConnectionError, psycopg2.Error) as e:
1287            msg = "pgespresso_start_backup(): %s" % force_str(e).strip()
1288            _logger.debug(msg)
1289            raise PostgresException(msg)
1290
1291    def pgespresso_stop_backup(self, backup_label):
1292        """
1293        Execute a pgespresso_stop_backup
1294
1295        This method returns a dictionary containing the following data:
1296
1297         * end_wal
1298         * timestamp
1299
1300        :param str backup_label: backup label as returned
1301            by pgespress_start_backup
1302        :rtype: psycopg2.extras.DictRow
1303        """
1304        try:
1305            conn = self.connect()
1306            # Issue a rollback to release any unneeded lock
1307            conn.rollback()
1308            cur = conn.cursor(cursor_factory=DictCursor)
1309            cur.execute(
1310                "SELECT pgespresso_stop_backup(%s) AS end_wal, now() AS timestamp",
1311                (backup_label,),
1312            )
1313            return cur.fetchone()
1314        except (PostgresConnectionError, psycopg2.Error) as e:
1315            msg = "Error issuing pgespresso_stop_backup() command: %s" % (
1316                force_str(e).strip()
1317            )
1318            _logger.debug(msg)
1319            raise PostgresException(
1320                "%s\n"
1321                "HINT: You might have to manually execute "
1322                "pgespresso_abort_backup() on your PostgreSQL "
1323                "server" % msg
1324            )
1325
1326    def switch_wal(self):
1327        """
1328        Execute a pg_switch_wal()
1329
1330        To be SURE of the switch of a xlog, we collect the xlogfile name
1331        before and after the switch.
1332        The method returns the just closed xlog file name if the current xlog
1333        file has changed, it returns an empty string otherwise.
1334
1335        The method returns None if something went wrong during the execution
1336        of the pg_switch_wal command.
1337
1338        :rtype: str|None
1339        """
1340        try:
1341            conn = self.connect()
1342            if not self.has_backup_privileges:
1343                raise BackupFunctionsAccessRequired(
1344                    "Postgres user '%s' is missing required privileges "
1345                    '(see "Preliminary steps" in the Barman manual)'
1346                    % self.conn_parameters.get("user")
1347                )
1348
1349            # If this server is in recovery there is nothing to do
1350            if self.is_in_recovery:
1351                raise PostgresIsInRecovery()
1352
1353            cur = conn.cursor()
1354            # Collect the xlog file name before the switch
1355            cur.execute(
1356                "SELECT {pg_walfile_name}("
1357                "{pg_current_wal_insert_lsn}())".format(**self.name_map)
1358            )
1359            pre_switch = cur.fetchone()[0]
1360            # Switch
1361            cur.execute(
1362                "SELECT {pg_walfile_name}({pg_switch_wal}())".format(**self.name_map)
1363            )
1364            # Collect the xlog file name after the switch
1365            cur.execute(
1366                "SELECT {pg_walfile_name}("
1367                "{pg_current_wal_insert_lsn}())".format(**self.name_map)
1368            )
1369            post_switch = cur.fetchone()[0]
1370            if pre_switch < post_switch:
1371                return pre_switch
1372            else:
1373                return ""
1374        except (PostgresConnectionError, psycopg2.Error) as e:
1375            _logger.debug(
1376                "Error issuing {pg_switch_wal}() command: %s".format(**self.name_map),
1377                force_str(e).strip(),
1378            )
1379            return None
1380
1381    def checkpoint(self):
1382        """
1383        Execute a checkpoint
1384        """
1385        try:
1386            conn = self.connect()
1387
1388            # Requires superuser privilege
1389            if not self.is_superuser:
1390                raise PostgresSuperuserRequired()
1391
1392            cur = conn.cursor()
1393            cur.execute("CHECKPOINT")
1394        except (PostgresConnectionError, psycopg2.Error) as e:
1395            _logger.debug("Error issuing CHECKPOINT: %s", force_str(e).strip())
1396
1397    def get_replication_stats(self, client_type=STANDBY):
1398        """
1399        Returns streaming replication information
1400        """
1401        try:
1402            cur = self._cursor(cursor_factory=NamedTupleCursor)
1403
1404            if not self.has_backup_privileges:
1405                raise BackupFunctionsAccessRequired(
1406                    "Postgres user '%s' is missing required privileges "
1407                    '(see "Preliminary steps" in the Barman manual)'
1408                    % self.conn_parameters.get("user")
1409                )
1410
1411            # pg_stat_replication is a system view that contains one
1412            # row per WAL sender process with information about the
1413            # replication status of a standby server. It has been
1414            # introduced in PostgreSQL 9.1. Current fields are:
1415            #
1416            # - pid (procpid in 9.1)
1417            # - usesysid
1418            # - usename
1419            # - application_name
1420            # - client_addr
1421            # - client_hostname
1422            # - client_port
1423            # - backend_start
1424            # - backend_xmin (9.4+)
1425            # - state
1426            # - sent_lsn (sent_location before 10)
1427            # - write_lsn (write_location before 10)
1428            # - flush_lsn (flush_location before 10)
1429            # - replay_lsn (replay_location before 10)
1430            # - sync_priority
1431            # - sync_state
1432            #
1433
1434            if self.server_version < 90100:
1435                raise PostgresUnsupportedFeature("9.1")
1436
1437            from_repslot = ""
1438            where_clauses = []
1439            if self.server_version >= 100000:
1440                # Current implementation (10+)
1441                what = "r.*, rs.slot_name"
1442                # Look for replication slot name
1443                from_repslot = (
1444                    "LEFT JOIN pg_replication_slots rs ON (r.pid = rs.active_pid) "
1445                )
1446                where_clauses += ["(rs.slot_type IS NULL OR rs.slot_type = 'physical')"]
1447            elif self.server_version >= 90500:
1448                # PostgreSQL 9.5/9.6
1449                what = (
1450                    "pid, "
1451                    "usesysid, "
1452                    "usename, "
1453                    "application_name, "
1454                    "client_addr, "
1455                    "client_hostname, "
1456                    "client_port, "
1457                    "backend_start, "
1458                    "backend_xmin, "
1459                    "state, "
1460                    "sent_location AS sent_lsn, "
1461                    "write_location AS write_lsn, "
1462                    "flush_location AS flush_lsn, "
1463                    "replay_location AS replay_lsn, "
1464                    "sync_priority, "
1465                    "sync_state, "
1466                    "rs.slot_name"
1467                )
1468                # Look for replication slot name
1469                from_repslot = (
1470                    "LEFT JOIN pg_replication_slots rs ON (r.pid = rs.active_pid) "
1471                )
1472                where_clauses += ["(rs.slot_type IS NULL OR rs.slot_type = 'physical')"]
1473            elif self.server_version >= 90400:
1474                # PostgreSQL 9.4
1475                what = (
1476                    "pid, "
1477                    "usesysid, "
1478                    "usename, "
1479                    "application_name, "
1480                    "client_addr, "
1481                    "client_hostname, "
1482                    "client_port, "
1483                    "backend_start, "
1484                    "backend_xmin, "
1485                    "state, "
1486                    "sent_location AS sent_lsn, "
1487                    "write_location AS write_lsn, "
1488                    "flush_location AS flush_lsn, "
1489                    "replay_location AS replay_lsn, "
1490                    "sync_priority, "
1491                    "sync_state"
1492                )
1493            elif self.server_version >= 90200:
1494                # PostgreSQL 9.2/9.3
1495                what = (
1496                    "pid, "
1497                    "usesysid, "
1498                    "usename, "
1499                    "application_name, "
1500                    "client_addr, "
1501                    "client_hostname, "
1502                    "client_port, "
1503                    "backend_start, "
1504                    "CAST (NULL AS xid) AS backend_xmin, "
1505                    "state, "
1506                    "sent_location AS sent_lsn, "
1507                    "write_location AS write_lsn, "
1508                    "flush_location AS flush_lsn, "
1509                    "replay_location AS replay_lsn, "
1510                    "sync_priority, "
1511                    "sync_state"
1512                )
1513            else:
1514                # PostgreSQL 9.1
1515                what = (
1516                    "procpid AS pid, "
1517                    "usesysid, "
1518                    "usename, "
1519                    "application_name, "
1520                    "client_addr, "
1521                    "client_hostname, "
1522                    "client_port, "
1523                    "backend_start, "
1524                    "CAST (NULL AS xid) AS backend_xmin, "
1525                    "state, "
1526                    "sent_location AS sent_lsn, "
1527                    "write_location AS write_lsn, "
1528                    "flush_location AS flush_lsn, "
1529                    "replay_location AS replay_lsn, "
1530                    "sync_priority, "
1531                    "sync_state"
1532                )
1533
1534            # Streaming client
1535            if client_type == self.STANDBY:
1536                # Standby server
1537                where_clauses += ["{replay_lsn} IS NOT NULL".format(**self.name_map)]
1538            elif client_type == self.WALSTREAMER:
1539                # WAL streamer
1540                where_clauses += ["{replay_lsn} IS NULL".format(**self.name_map)]
1541
1542            if where_clauses:
1543                where = "WHERE %s " % " AND ".join(where_clauses)
1544            else:
1545                where = ""
1546
1547            # Execute the query
1548            cur.execute(
1549                "SELECT %s, "
1550                "pg_is_in_recovery() AS is_in_recovery, "
1551                "CASE WHEN pg_is_in_recovery() "
1552                "  THEN {pg_last_wal_receive_lsn}() "
1553                "  ELSE {pg_current_wal_lsn}() "
1554                "END AS current_lsn "
1555                "FROM pg_stat_replication r "
1556                "%s"
1557                "%s"
1558                "ORDER BY sync_state DESC, sync_priority".format(**self.name_map)
1559                % (what, from_repslot, where)
1560            )
1561
1562            # Generate a list of standby objects
1563            return cur.fetchall()
1564        except (PostgresConnectionError, psycopg2.Error) as e:
1565            _logger.debug(
1566                "Error retrieving status of standby servers: %s", force_str(e).strip()
1567            )
1568            return None
1569
1570    def get_replication_slot(self, slot_name):
1571        """
1572        Retrieve from the PostgreSQL server a physical replication slot
1573        with a specific slot_name.
1574
1575        This method returns a dictionary containing the following data:
1576
1577         * slot_name
1578         * active
1579         * restart_lsn
1580
1581        :param str slot_name: the replication slot name
1582        :rtype: psycopg2.extras.DictRow
1583        """
1584        if self.server_version < 90400:
1585            # Raise exception if replication slot are not supported
1586            # by PostgreSQL version
1587            raise PostgresUnsupportedFeature("9.4")
1588        else:
1589            cur = self._cursor(cursor_factory=NamedTupleCursor)
1590            try:
1591                cur.execute(
1592                    "SELECT slot_name, "
1593                    "active, "
1594                    "restart_lsn "
1595                    "FROM pg_replication_slots "
1596                    "WHERE slot_type = 'physical' "
1597                    "AND slot_name = '%s'" % slot_name
1598                )
1599                # Retrieve the replication slot information
1600                return cur.fetchone()
1601            except (PostgresConnectionError, psycopg2.Error) as e:
1602                _logger.debug(
1603                    "Error retrieving replication_slots: %s", force_str(e).strip()
1604                )
1605                raise
1606
1607    def get_synchronous_standby_names(self):
1608        """
1609        Retrieve the list of named synchronous standby servers from PostgreSQL
1610
1611        This method returns a list of names
1612
1613        :return list: synchronous standby names
1614        """
1615        if self.server_version < 90100:
1616            # Raise exception if synchronous replication is not supported
1617            raise PostgresUnsupportedFeature("9.1")
1618        else:
1619            synchronous_standby_names = self.get_setting("synchronous_standby_names")
1620            # Return empty list if not defined
1621            if synchronous_standby_names is None:
1622                return []
1623            # Normalise the list of sync standby names
1624            # On PostgreSQL 9.6 it is possible to specify the number of
1625            # required synchronous standby using this format:
1626            # n (name1, name2, ... nameN).
1627            # We only need the name list, so we discard everything else.
1628
1629            # The name list starts after the first parenthesis or at pos 0
1630            names_start = synchronous_standby_names.find("(") + 1
1631            names_end = synchronous_standby_names.rfind(")")
1632            if names_end < 0:
1633                names_end = len(synchronous_standby_names)
1634            names_list = synchronous_standby_names[names_start:names_end]
1635            # We can blindly strip double quotes because PostgreSQL enforces
1636            # the format of the synchronous_standby_names content
1637            return [x.strip().strip('"') for x in names_list.split(",")]
1638
1639    @property
1640    def name_map(self):
1641        """
1642        Return a map with function and directory names according to the current
1643        PostgreSQL version.
1644
1645        Each entry has the `current` name as key and the name for the specific
1646        version as value.
1647
1648        :rtype: dict[str]
1649        """
1650
1651        # Avoid raising an error if the connection is not available
1652        try:
1653            server_version = self.server_version
1654        except PostgresConnectionError:
1655            _logger.debug(
1656                "Impossible to detect the PostgreSQL version, "
1657                "name_map will return names from latest version"
1658            )
1659            server_version = None
1660
1661        return function_name_map(server_version)
1662