1# -*- coding: utf-8 -*-
2# © Copyright EnterpriseDB UK Limited 2013-2021
3#
4# This file is part of Barman.
5#
6# Barman is free software: you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation, either version 3 of the License, or
9# (at your option) any later version.
10#
11# Barman is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with Barman.  If not, see <http://www.gnu.org/licenses/>.
18
19import datetime
20
21import psycopg2
22import pytest
23from mock import PropertyMock, call, patch
24from psycopg2.errorcodes import DUPLICATE_OBJECT, UNDEFINED_OBJECT
25
26from barman.exceptions import (
27    PostgresConnectionError,
28    PostgresDuplicateReplicationSlot,
29    PostgresException,
30    PostgresInvalidReplicationSlot,
31    PostgresIsInRecovery,
32    BackupFunctionsAccessRequired,
33    PostgresSuperuserRequired,
34    PostgresUnsupportedFeature,
35)
36from barman.postgres import PostgreSQLConnection
37from barman.xlog import DEFAULT_XLOG_SEG_SIZE
38from testing_helpers import build_real_server
39
40
41class MockProgrammingError(psycopg2.ProgrammingError):
42    """
43    Mock class for psycopg2 ProgrammingError
44    """
45
46    def __init__(self, pgcode=None, pgerror=None):
47        # pgcode and pgerror are read only attributes and the ProgrammingError
48        # class is written in native code. The only way to set these attribute
49        # is to use the private method '__setstate__', which is also native
50        self.__setstate__({"pgcode": pgcode, "pgerror": pgerror})
51
52
53# noinspection PyMethodMayBeStatic
54class TestPostgres(object):
55    def test_connection_error(self):
56        """
57        simple test for missing conninfo
58        """
59        # Test with wrong configuration
60        server = build_real_server(main_conf={"conninfo": ""})
61        assert server.config.msg_list
62        assert (
63            "PostgreSQL connection: Missing 'conninfo' parameter "
64            "for server 'main'" in server.config.msg_list
65        )
66
67    @patch("barman.postgres.psycopg2.connect")
68    def test_connect_and_close(self, pg_connect_mock):
69        """
70        Check pg_connect method beaviour on error
71        """
72        # Setup server
73        server = build_real_server()
74        server.postgres.conninfo = "valid conninfo"
75        conn_mock = pg_connect_mock.return_value
76        conn_mock.server_version = 90401
77        conn_mock.closed = False
78        cursor_mock = conn_mock.cursor.return_value
79
80        # Connection failure
81        pg_connect_mock.side_effect = psycopg2.DatabaseError
82        with pytest.raises(PostgresConnectionError):
83            server.postgres.connect()
84
85        # Good connection but error setting the application name
86        pg_connect_mock.side_effect = None
87        cursor_mock.execute.side_effect = psycopg2.ProgrammingError
88        with pytest.raises(PostgresConnectionError):
89            server.postgres.connect()
90
91        # Good connection
92        cursor_mock.execute.side_effect = None
93        conn = server.postgres.connect()
94        pg_connect_mock.assert_called_with("valid conninfo")
95        assert conn is conn_mock
96
97        # call again and make sure it returns the cached connection
98        pg_connect_mock.reset_mock()
99
100        new_conn = server.postgres.connect()
101
102        assert new_conn is conn_mock
103        assert not pg_connect_mock.called
104
105        # call again with a broken connection
106        pg_connect_mock.reset_mock()
107        conn_mock.cursor.side_effect = [psycopg2.DatabaseError, cursor_mock]
108
109        new_conn = server.postgres.connect()
110
111        assert new_conn is conn_mock
112        pg_connect_mock.assert_called_with("valid conninfo")
113
114        # close it
115        pg_connect_mock.reset_mock()
116        conn_mock.cursor.side_effect = None
117        conn_mock.closed = False
118
119        server.postgres.close()
120
121        assert conn_mock.close.called
122
123        # close it with an already closed connection
124        pg_connect_mock.reset_mock()
125        conn_mock.closed = True
126
127        server.postgres.close()
128
129        assert not conn_mock.close.called
130
131        # open again and verify that it is a new object
132        pg_connect_mock.reset_mock()
133        conn_mock.closed = False
134
135        server.postgres.connect()
136
137        pg_connect_mock.assert_called_with("valid conninfo")
138
139        server.postgres.close()
140
141        assert conn_mock.close.called
142
143    @patch("barman.postgres.psycopg2.connect")
144    def test_connect_error(self, connect_mock):
145        """
146        Check pg_connect method beaviour on error
147        """
148        # Setup temp dir and server
149        server = build_real_server()
150        server.postgres.conninfo = "not valid conninfo"
151        connect_mock.side_effect = psycopg2.DatabaseError
152        # expect pg_connect to raise a PostgresConnectionError
153        with pytest.raises(PostgresConnectionError):
154            server.postgres.connect()
155        connect_mock.assert_called_with("not valid conninfo")
156
157    @patch("barman.postgres.PostgreSQLConnection.connect")
158    def test_server_txt_version(self, conn_mock):
159        """
160        simple test for the server_txt_version property
161        """
162        # Build a server
163        server = build_real_server()
164        cursor_mock = conn_mock.return_value.cursor.return_value
165
166        # Connection error
167        conn_mock.side_effect = PostgresConnectionError
168        assert server.postgres.server_txt_version is None
169
170        # Communication error
171        conn_mock.side_effect = None
172        cursor_mock.execute.side_effect = psycopg2.ProgrammingError
173        assert server.postgres.server_txt_version is None
174
175        # Good connection
176        cursor_mock.execute.side_effect = None
177        cursor_mock.fetchone.return_value = (
178            "PostgreSQL 9.4.5 on x86_64-apple-darwin15.0.0, compiled by "
179            "Apple LLVM version 7.0.0 (clang-700.1.76), 64-bit",
180        )
181
182        assert server.postgres.server_txt_version == "9.4.5"
183        cursor_mock.execute.assert_called_with("SELECT version()")
184
185    @patch("barman.postgres.PostgreSQLConnection.connect")
186    def test_server_txt_version_epas(self, conn_mock):
187        """
188        Verify server_txt_version returns the correct Postgres version
189        against EPAS 9.6 and 10, which both return "EnterpriseDB" in the
190        response to `SELECT version();`.
191
192        Versions 11 and above return the Postgres version string with the
193        EnterpriseDB details appended so do not require special handling.
194        """
195        # Build a server
196        server = build_real_server()
197        cursor_mock = conn_mock.return_value.cursor.return_value
198
199        # EPAS 9.6 returns an extra version field which must be discarded
200        cursor_mock.fetchone.return_value = (
201            "EnterpriseDB 9.6.23.31 on x86_64-pc-linux-gnu, compiled by "
202            "gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-44), 64-bit",
203        )
204        assert server.postgres.server_txt_version == "9.6.23"
205
206        # EPAS 10 also returns an extra field relative to the corresponding
207        # PostgreSQL version and it must be discarded
208        cursor_mock.fetchone.return_value = (
209            "EnterpriseDB 10.18.28 on x86_64-pc-linux-gnu, compiled by "
210            "gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-36), 64-bit",
211        )
212        assert server.postgres.server_txt_version == "10.18"
213
214    @patch("barman.postgres.PostgreSQLConnection.connect")
215    @patch(
216        "barman.postgres.PostgreSQLConnection.is_in_recovery", new_callable=PropertyMock
217    )
218    def test_create_restore_point(self, is_in_recovery_mock, conn_mock):
219        """
220        Basic test for the _restore_point method
221        """
222        # Simulate a master connection
223        is_in_recovery_mock.return_value = False
224
225        server = build_real_server()
226        # Test 1: Postgres 9.0 expect None as result
227        conn_mock.return_value.server_version = 90000
228
229        restore_point = server.postgres.create_restore_point("Test_20151026T092241")
230        assert restore_point is None
231
232        # Simulate a master connection
233        is_in_recovery_mock.return_value = True
234
235        # Test 2: Postgres 9.1 in recovery (standby) expect None as result
236        conn_mock.return_value.server_version = 90100
237
238        restore_point = server.postgres.create_restore_point("Test_20151026T092241")
239        assert restore_point is None
240
241        # Test 3: Postgres 9.1 check mock calls
242        is_in_recovery_mock.return_value = False
243
244        assert server.postgres.create_restore_point("Test_20151026T092241")
245
246        cursor_mock = conn_mock.return_value.cursor.return_value
247        cursor_mock.execute.assert_called_with(
248            "SELECT pg_create_restore_point(%s)", ["Test_20151026T092241"]
249        )
250        assert cursor_mock.fetchone.called
251
252        # Test error management
253        cursor_mock.execute.side_effect = psycopg2.Error
254        assert server.postgres.create_restore_point("Test_20151026T092241") is None
255
256    @patch("barman.postgres.PostgreSQLConnection.connect")
257    def test_stop_exclusive_backup(self, conn_mock):
258        """
259        Basic test for the stop_exclusive_backup method
260
261        :param conn_mock: a mock that imitates a connection to PostgreSQL
262        """
263        # Build a server
264        server = build_real_server()
265
266        # Test call on master, PostgreSQL older than 10
267        conn_mock.return_value.server_version = 90300
268        # Expect no errors on normal call
269        assert server.postgres.stop_exclusive_backup()
270        # check the correct invocation of the execute method
271        cursor_mock = conn_mock.return_value.cursor.return_value
272        cursor_mock.execute.assert_called_once_with(
273            "SELECT location, "
274            "(pg_xlogfile_name_offset(location)).*, "
275            "now() AS timestamp "
276            "FROM pg_stop_backup() AS location"
277        )
278
279        # Test call on master, PostgreSQL 10
280        conn_mock.reset_mock()
281        conn_mock.return_value.server_version = 100000
282        # Expect no errors on normal call
283        assert server.postgres.stop_exclusive_backup()
284        # check the correct invocation of the execute method
285        cursor_mock = conn_mock.return_value.cursor.return_value
286        cursor_mock.execute.assert_called_once_with(
287            "SELECT location, "
288            "(pg_walfile_name_offset(location)).*, "
289            "now() AS timestamp "
290            "FROM pg_stop_backup() AS location"
291        )
292
293        # Test Error: Setup the mock to trigger an exception
294        # expect the method to raise a PostgresException
295        conn_mock.reset_mock()
296        cursor_mock.execute.side_effect = psycopg2.Error
297        # Check that the method raises a PostgresException
298        with pytest.raises(PostgresException):
299            server.postgres.stop_exclusive_backup()
300
301    @patch("barman.postgres.PostgreSQLConnection.connect")
302    def test_stop_concurrent_backup(self, conn):
303        """
304        Basic test for the stop_concurrent_backup method
305
306        :param conn: a mock that imitates a connection to PostgreSQL
307        """
308        # Build a server
309        server = build_real_server()
310
311        # Expect no errors on normal call
312        assert server.postgres.stop_concurrent_backup()
313
314        # check the correct invocation of the execute method
315        cursor_mock = conn.return_value.cursor.return_value
316        cursor_mock.execute.assert_called_once_with(
317            "SELECT end_row.lsn AS location, "
318            "(SELECT CASE WHEN pg_is_in_recovery() "
319            "THEN min_recovery_end_timeline "
320            "ELSE timeline_id END "
321            "FROM pg_control_checkpoint(), pg_control_recovery()"
322            ") AS timeline, "
323            "end_row.labelfile AS backup_label, "
324            "now() AS timestamp "
325            "FROM pg_stop_backup(FALSE) AS end_row"
326        )
327
328        # Test 2: Setup the mock to trigger an exception
329        # expect the method to raise a PostgresException
330        conn.reset_mock()
331        cursor_mock.execute.side_effect = psycopg2.Error
332        # Check that the method raises a PostgresException
333        with pytest.raises(PostgresException):
334            server.postgres.stop_concurrent_backup()
335
336    @patch("barman.postgres.PostgreSQLConnection.connect")
337    def test_pgespresso_stop_backup(self, conn):
338        """
339        Basic test for pgespresso_stop_backup method
340        """
341        # Build a server
342        server = build_real_server()
343
344        # Test 1: Expect no error and the correct call sequence
345        assert server.postgres.pgespresso_stop_backup("test_label")
346
347        cursor_mock = conn.return_value.cursor.return_value
348        cursor_mock.execute.assert_called_once_with(
349            "SELECT pgespresso_stop_backup(%s) AS end_wal, now() AS timestamp",
350            ("test_label",),
351        )
352
353        # Test 2: Setup the mock to trigger an exception
354        # expect the method to raise PostgresException
355        conn.reset_mock()
356        cursor_mock.execute.side_effect = psycopg2.Error
357        # Check that the method raises a PostgresException
358        with pytest.raises(PostgresException):
359            server.postgres.pgespresso_stop_backup("test_label")
360
361    @patch("barman.postgres.PostgreSQLConnection.connect")
362    def test_start_exclusive_backup(self, conn):
363        """
364        Simple test for start_exclusive_backup method of
365        the RsyncBackupExecutor class
366        """
367        # Build a server
368        server = build_real_server()
369        backup_label = "test label"
370
371        # Expect no errors
372        conn.return_value.server_version = 90300
373        assert server.postgres.start_exclusive_backup(backup_label)
374
375        # check for the correct call on the execute method
376        cursor_mock = conn.return_value.cursor.return_value
377        cursor_mock.execute.assert_called_once_with(
378            "SELECT location, "
379            "(pg_xlogfile_name_offset(location)).*, "
380            "now() AS timestamp "
381            "FROM pg_start_backup(%s,%s) AS location",
382            ("test label", False),
383        )
384        conn.return_value.rollback.assert_has_calls([call(), call()])
385        # reset the mock for the next test
386        conn.reset_mock()
387
388        # 8.3 test
389        conn.return_value.server_version = 80300
390        assert server.postgres.start_exclusive_backup(backup_label)
391        # check for the correct call on the execute method
392        cursor_mock.execute.assert_called_once_with(
393            "SELECT location, "
394            "(pg_xlogfile_name_offset(location)).*, "
395            "now() AS timestamp "
396            "FROM pg_start_backup(%s) AS location",
397            ("test label",),
398        )
399        conn.return_value.rollback.assert_has_calls([call(), call()])
400        # reset the mock for the next test
401        conn.reset_mock()
402
403        # 10 test
404        conn.return_value.server_version = 100000
405        assert server.postgres.start_exclusive_backup(backup_label)
406        # check for the correct call on the execute method
407        cursor_mock.execute.assert_called_once_with(
408            "SELECT location, "
409            "(pg_walfile_name_offset(location)).*, "
410            "now() AS timestamp "
411            "FROM pg_start_backup(%s,%s) AS location",
412            ("test label", False),
413        )
414        conn.return_value.rollback.assert_has_calls([call(), call()])
415        # reset the mock for the next test
416        conn.reset_mock()
417
418        # test error management
419        cursor_mock.execute.side_effect = psycopg2.Error
420        with pytest.raises(PostgresException):
421            server.postgres.start_exclusive_backup(backup_label)
422        conn.return_value.rollback.assert_called_once_with()
423
424    @patch("barman.postgres.PostgreSQLConnection.connect")
425    def test_start_concurrent_backup(self, conn):
426        """
427        Simple test for start_exclusive_backup method of
428        the RsyncBackupExecutor class
429        """
430        # Build a server
431        server = build_real_server()
432        label = "test label"
433
434        # Expect no errors
435        assert server.postgres.start_concurrent_backup(label)
436
437        # check for the correct call on the execute method
438        cursor_mock = conn.return_value.cursor.return_value
439        cursor_mock.execute.assert_called_once_with(
440            "SELECT location, "
441            "(SELECT timeline_id "
442            "FROM pg_control_checkpoint()) AS timeline, "
443            "now() AS timestamp "
444            "FROM pg_start_backup(%s, %s, FALSE) AS location",
445            ("test label", False),
446        )
447        conn.return_value.rollback.assert_has_calls([call(), call()])
448        # reset the mock for the next test
449        conn.reset_mock()
450
451        # test error management
452        cursor_mock.execute.side_effect = psycopg2.Error
453        with pytest.raises(PostgresException):
454            server.postgres.start_concurrent_backup(label)
455        conn.return_value.rollback.assert_called_once_with()
456
457    @patch("barman.postgres.PostgreSQLConnection.connect")
458    def test_pgespresso_start_backup(self, conn):
459        """
460        Simple test for _pgespresso_start_backup method
461        of the RsyncBackupExecutor class
462        """
463        # Build and configure a server
464        server = build_real_server()
465        backup_label = "test label"
466
467        # expect no errors
468        assert server.postgres.pgespresso_start_backup(backup_label)
469
470        cursor_mock = conn.return_value.cursor.return_value
471        cursor_mock.execute.assert_called_once_with(
472            "SELECT pgespresso_start_backup(%s,%s) AS backup_label, "
473            "now() AS timestamp",
474            (backup_label, server.config.immediate_checkpoint),
475        )
476        conn.return_value.rollback.assert_has_calls([call(), call()])
477        # reset the mock for the next test
478        conn.reset_mock()
479
480        # Test 2: Setup the mock to trigger an exception
481        # expect the method to return None
482        cursor_mock.execute.side_effect = psycopg2.Error
483        # Check that the method returns None as result
484        with pytest.raises(Exception):
485            server.postgres.pgespresso_start_backup("test_label")
486        conn.return_value.rollback.assert_called_once_with()
487
488    @patch("barman.postgres.PostgreSQLConnection.connect")
489    def test_get_setting(self, conn):
490        """
491        Simple test for retrieving settings from the database
492        """
493        # Build and configure a server
494        server = build_real_server()
495
496        # expect no errors
497        server.postgres.get_setting("test_setting")
498        cursor_mock = conn.return_value.cursor.return_value
499        cursor_mock.execute.assert_called_once_with(
500            'SHOW "%s"' % "test_setting".replace('"', '""')
501        )
502        # reset the mock for the second test
503        conn.reset_mock()
504
505        # Test 2: Setup the mock to trigger an exception
506        # expect the method to return None
507        cursor_mock.execute.side_effect = psycopg2.Error
508        # Check that the method returns None as result
509        assert server.postgres.get_setting("test_setting") is None
510
511    @patch("barman.postgres.PostgreSQLConnection.connect")
512    def test_get_systemid(self, conn):
513        """
514        Simple test for retrieving the systemid from the database
515        """
516        # Build and configure a server
517        server = build_real_server()
518        conn.return_value.server_version = 90600
519
520        # expect no errors
521        server.postgres.get_systemid()
522        cursor_mock = conn.return_value.cursor.return_value
523        cursor_mock.execute.assert_called_once_with(
524            "SELECT system_identifier::text FROM pg_control_system()"
525        )
526        # reset the mock for the second test
527        conn.reset_mock()
528
529        # Test 2: Setup the mock to trigger an exception
530        # expect the method to return None
531        cursor_mock.execute.side_effect = psycopg2.Error
532        # Check that the method returns None as result
533        assert server.postgres.get_systemid() is None
534        # reset the mock for the third test
535        conn.reset_mock()
536
537        # Test 3: setup the mock to return a PostgreSQL version that
538        # don't support pg_control_system()
539        conn.return_value.server_version = 90500
540        assert server.postgres.get_systemid() is None
541
542    @patch("barman.postgres.PostgreSQLConnection.connect")
543    def test_get_tablespaces(self, conn):
544        """
545        Simple test for pg_start_backup method of the RsyncBackupExecutor class
546        """
547        # Build a server
548        server = build_real_server()
549        cursor_mock = conn.return_value.cursor.return_value
550        cursor_mock.fetchall.return_value = [("tbs1", "1234", "/tmp")]
551        # Expect no errors
552        conn.return_value.server_version = 90400
553        tbs = server.postgres.get_tablespaces()
554        # check for the correct call on the execute method
555        cursor_mock.execute.assert_called_once_with(
556            "SELECT spcname, oid, "
557            "pg_tablespace_location(oid) AS spclocation "
558            "FROM pg_tablespace "
559            "WHERE pg_tablespace_location(oid) != ''"
560        )
561        assert tbs == [("tbs1", "1234", "/tmp")]
562        conn.reset_mock()
563
564        # 8.3 test
565        conn.return_value.server_version = 80300
566        cursor_mock.fetchall.return_value = [("tbs2", "5234", "/tmp1")]
567        tbs = server.postgres.get_tablespaces()
568        # check for the correct call on the execute method
569        cursor_mock.execute.assert_called_once_with(
570            "SELECT spcname, oid, spclocation "
571            "FROM pg_tablespace WHERE spclocation != ''"
572        )
573        assert tbs == [("tbs2", "5234", "/tmp1")]
574
575        conn.reset_mock()
576        # test error management
577        cursor_mock.execute.side_effect = psycopg2.Error
578        assert server.postgres.get_tablespaces() is None
579
580    @patch("barman.postgres.PostgreSQLConnection.connect")
581    def test_get_archiver_stats(self, conn):
582        """
583        Simple test for pg_start_backup method of the RsyncBackupExecutor class
584        """
585        # Build a server
586        server = build_real_server()
587        cursor_mock = conn.return_value.cursor.return_value
588        # expect None as result for server version <9.4
589        conn.return_value.server_version = 80300
590        assert server.postgres.get_archiver_stats() is None
591
592        # expect no errors with version >= 9.4
593        conn.reset_mock()
594        conn.return_value.server_version = 90400
595        cursor_mock.fetchone.return_value = {"a": "b"}
596        assert server.postgres.get_archiver_stats() == {"a": "b"}
597        # check for the correct call on the execute method
598        cursor_mock.execute.assert_called_once_with(
599            "SELECT *, "
600            "current_setting('archive_mode') IN ('on', 'always') "
601            "AND (last_failed_wal IS NULL "
602            "OR last_failed_wal LIKE '%.history' "
603            "AND substring(last_failed_wal from 1 for 8) "
604            "<= substring(last_archived_wal from 1 for 8) "
605            "OR last_failed_time <= last_archived_time) "
606            "AS is_archiving, "
607            "CAST (archived_count AS NUMERIC) "
608            "/ EXTRACT (EPOCH FROM age(now(), stats_reset)) "
609            "AS current_archived_wals_per_second "
610            "FROM pg_stat_archiver"
611        )
612        conn.reset_mock()
613
614        # test error management
615        cursor_mock.execute.side_effect = psycopg2.Error
616        assert server.postgres.get_archiver_stats() is None
617
618    @patch("barman.postgres.PostgreSQLConnection.connect")
619    def test_get_configuration_files(self, conn_mock):
620        """
621        simple test for the get_configuration_files method
622        """
623        # Build a server
624        server = build_real_server()
625        conn_mock.return_value.server_version = 80400
626        cursor_mock = conn_mock.return_value.cursor.return_value
627        test_conf_files = [
628            ("config_file", "/tmp/postgresql.conf"),
629            ("hba_file", "/tmp/pg_hba.conf"),
630            ("ident_file", "/tmp/pg_ident.conf"),
631        ]
632        cursor_mock.fetchall.side_effect = [test_conf_files, [("/test/file",)]]
633        retval = server.postgres.get_configuration_files()
634
635        assert retval == server.postgres.configuration_files
636        assert server.postgres.configuration_files == dict(
637            test_conf_files + [("included_files", ["/test/file"])]
638        )
639        cursor_mock.execute.assert_any_call(
640            "SELECT name, setting FROM pg_settings "
641            "WHERE name IN ('config_file', 'hba_file', 'ident_file')"
642        )
643        cursor_mock.execute.assert_any_call(
644            "SELECT DISTINCT sourcefile AS included_file "
645            "FROM pg_settings "
646            "WHERE sourcefile IS NOT NULL "
647            "AND sourcefile NOT IN "
648            "(SELECT setting FROM pg_settings "
649            "WHERE name = 'config_file') "
650            "ORDER BY 1"
651        )
652
653        # Call it again, should not fetch the data twice
654        conn_mock.reset_mock()
655        retval = server.postgres.get_configuration_files()
656        assert retval == server.postgres.configuration_files
657        assert not cursor_mock.execute.called
658
659        # Reset mock and configuration files
660        conn_mock.reset_mock()
661        server.postgres.configuration_files = None
662
663        # Test error management
664        cursor_mock.execute.side_effect = PostgresConnectionError
665        assert server.postgres.get_configuration_files() == {}
666
667        cursor_mock.execute.side_effect = psycopg2.ProgrammingError
668        assert server.postgres.get_configuration_files() == {}
669
670    @patch("barman.postgres.PostgreSQLConnection.connect")
671    def test_has_pgespresso(self, conn_mock):
672        """
673        simple test for has_pgespresso property
674        """
675        # Build a server
676        server = build_real_server()
677        cursor_mock = conn_mock.return_value.cursor.return_value
678
679        # Too old
680        conn_mock.return_value.server_version = 90000
681        assert not server.postgres.has_pgespresso
682
683        # Extension present
684        conn_mock.return_value.server_version = 90100
685        cursor_mock.fetchone.return_value = [1]
686        assert server.postgres.has_pgespresso
687        cursor_mock.execute.assert_called_once_with(
688            "SELECT count(*) FROM pg_extension WHERE extname = 'pgespresso'"
689        )
690
691        # Extension not present
692        cursor_mock.fetchone.return_value = [0]
693        assert not server.postgres.has_pgespresso
694
695        # Reset mock
696        conn_mock.reset_mock()
697
698        # Test error management
699        cursor_mock.execute.side_effect = PostgresConnectionError
700        assert server.postgres.has_pgespresso is None
701
702        cursor_mock.execute.side_effect = psycopg2.ProgrammingError
703        assert server.postgres.has_pgespresso is None
704
705    @patch("barman.postgres.PostgreSQLConnection.connect")
706    def test_is_in_recovery(self, conn_mock):
707        """
708        simple test for is_in_recovery property
709        """
710        # Build a server
711        server = build_real_server()
712        cursor_mock = conn_mock.return_value.cursor.return_value
713
714        # Too old
715        conn_mock.return_value.server_version = 80400
716        assert not server.postgres.is_in_recovery
717
718        # In recovery
719        conn_mock.return_value.server_version = 90100
720        cursor_mock.fetchone.return_value = [True]
721        assert server.postgres.is_in_recovery
722        cursor_mock.execute.assert_called_once_with("SELECT pg_is_in_recovery()")
723
724        # Not in recovery
725        cursor_mock.fetchone.return_value = [False]
726        assert not server.postgres.is_in_recovery
727
728        # Reset mock
729        conn_mock.reset_mock()
730
731        # Test error management
732        cursor_mock.execute.side_effect = PostgresConnectionError
733        assert server.postgres.is_in_recovery is None
734
735        cursor_mock.execute.side_effect = psycopg2.ProgrammingError
736        assert server.postgres.is_in_recovery is None
737
738    @patch("barman.postgres.PostgreSQLConnection.connect")
739    @patch(
740        "barman.postgres.PostgreSQLConnection.is_in_recovery", new_callable=PropertyMock
741    )
742    def test_current_xlog_info(self, is_in_recovery_mock, conn_mock):
743        """
744        Test correct select xlog_loc
745        """
746        # Build and configure a server using a mock
747        server = build_real_server()
748        cursor_mock = conn_mock.return_value.cursor.return_value
749        timestamp = datetime.datetime(2016, 3, 30, 17, 4, 20, 271376)
750        current_xlog_info = dict(
751            location="0/35000528",
752            file_name="000000010000000000000035",
753            file_offset=1320,
754            timestamp=timestamp,
755        )
756        cursor_mock.fetchone.return_value = current_xlog_info
757
758        # Test call on master, PostgreSQL older than 10
759        conn_mock.return_value.server_version = 90300
760        is_in_recovery_mock.return_value = False
761        remote_loc = server.postgres.current_xlog_info
762        assert remote_loc == current_xlog_info
763        cursor_mock.execute.assert_called_once_with(
764            "SELECT location, (pg_xlogfile_name_offset(location)).*, "
765            "CURRENT_TIMESTAMP AS timestamp "
766            "FROM pg_current_xlog_location() AS location"
767        )
768
769        # Check call on standby, PostgreSQL older than 10
770        conn_mock.reset_mock()
771        conn_mock.return_value.server_version = 90300
772        is_in_recovery_mock.return_value = True
773        current_xlog_info["file_name"] = None
774        current_xlog_info["file_offset"] = None
775        remote_loc = server.postgres.current_xlog_info
776        assert remote_loc == current_xlog_info
777        cursor_mock.execute.assert_called_once_with(
778            "SELECT location, NULL AS file_name, NULL AS file_offset, "
779            "CURRENT_TIMESTAMP AS timestamp "
780            "FROM pg_last_xlog_replay_location() AS location"
781        )
782
783        # Test call on master, PostgreSQL 10
784        conn_mock.reset_mock()
785        conn_mock.return_value.server_version = 100000
786        is_in_recovery_mock.return_value = False
787        remote_loc = server.postgres.current_xlog_info
788        assert remote_loc == current_xlog_info
789        cursor_mock.execute.assert_called_once_with(
790            "SELECT location, (pg_walfile_name_offset(location)).*, "
791            "CURRENT_TIMESTAMP AS timestamp "
792            "FROM pg_current_wal_lsn() AS location"
793        )
794
795        # Check call on standby, PostgreSQL 10
796        conn_mock.reset_mock()
797        conn_mock.return_value.server_version = 100000
798        is_in_recovery_mock.return_value = True
799        current_xlog_info["file_name"] = None
800        current_xlog_info["file_offset"] = None
801        remote_loc = server.postgres.current_xlog_info
802        assert remote_loc == current_xlog_info
803        cursor_mock.execute.assert_called_once_with(
804            "SELECT location, NULL AS file_name, NULL AS file_offset, "
805            "CURRENT_TIMESTAMP AS timestamp "
806            "FROM pg_last_wal_replay_lsn() AS location"
807        )
808
809        # Reset mock
810        conn_mock.reset_mock()
811
812        # Test error management
813        cursor_mock.execute.side_effect = PostgresConnectionError
814        assert server.postgres.current_xlog_info is None
815
816        cursor_mock.execute.side_effect = psycopg2.ProgrammingError
817        assert server.postgres.current_xlog_info is None
818
819    @patch("barman.postgres.PostgreSQLConnection.connect")
820    @patch(
821        "barman.postgres.PostgreSQLConnection.is_in_recovery", new_callable=PropertyMock
822    )
823    def test_current_xlog_file_name(self, is_in_recovery_mock, conn_mock):
824        """
825        simple test for current_xlog property
826        """
827        # Build a server
828        server = build_real_server()
829        conn_mock.return_value.server_version = 90300
830        cursor_mock = conn_mock.return_value.cursor.return_value
831
832        timestamp = datetime.datetime(2016, 3, 30, 17, 4, 20, 271376)
833        cursor_mock.fetchone.return_value = dict(
834            location="0/35000528",
835            file_name="000000010000000000000035",
836            file_offset=1320,
837            timestamp=timestamp,
838        )
839
840        # Special way to mock a property
841        is_in_recovery_mock.return_value = False
842        assert server.postgres.current_xlog_file_name == ("000000010000000000000035")
843
844        # Reset mock
845        conn_mock.reset_mock()
846
847        # Test error management
848        cursor_mock.execute.side_effect = PostgresConnectionError
849        assert server.postgres.current_xlog_file_name is None
850
851        cursor_mock.execute.side_effect = psycopg2.ProgrammingError
852        assert server.postgres.current_xlog_file_name is None
853
854    @patch("barman.postgres.psycopg2.connect")
855    @patch(
856        "barman.postgres.PostgreSQLConnection.xlog_segment_size",
857        new_callable=PropertyMock,
858    )
859    @patch(
860        "barman.postgres.PostgreSQLConnection.checkpoint_timeout",
861        new_callable=PropertyMock,
862    )
863    @patch(
864        "barman.postgres.PostgreSQLConnection.archive_timeout",
865        new_callable=PropertyMock,
866    )
867    @patch(
868        "barman.postgres.PostgreSQLConnection.is_in_recovery", new_callable=PropertyMock
869    )
870    @patch(
871        "barman.postgres.PostgreSQLConnection.has_backup_privileges",
872        new_callable=PropertyMock,
873    )
874    @patch(
875        "barman.postgres.PostgreSQLConnection.is_superuser", new_callable=PropertyMock
876    )
877    @patch(
878        "barman.postgres.PostgreSQLConnection.server_txt_version",
879        new_callable=PropertyMock,
880    )
881    @patch(
882        "barman.postgres.PostgreSQLConnection.has_pgespresso", new_callable=PropertyMock
883    )
884    @patch(
885        "barman.postgres.PostgreSQLConnection.current_xlog_info",
886        new_callable=PropertyMock,
887    )
888    @patch(
889        "barman.postgres.PostgreSQLConnection.current_size", new_callable=PropertyMock
890    )
891    @patch("barman.postgres.PostgreSQLConnection.get_configuration_files")
892    @patch("barman.postgres.PostgreSQLConnection.get_setting")
893    @patch("barman.postgres.PostgreSQLConnection.get_synchronous_standby_names")
894    @patch("barman.postgres.PostgreSQLConnection.get_systemid")
895    def test_get_remote_status(
896        self,
897        get_systemid_mock,
898        get_synchronous_standby_names_mock,
899        get_setting_mock,
900        get_configuration_files_mock,
901        current_size_mock,
902        current_xlog_info,
903        has_pgespresso_mock,
904        server_txt_version_mock,
905        is_superuser_mock,
906        has_backup_privileges_mock,
907        is_in_recovery_mock,
908        archive_timeout_mock,
909        checkpoint_timeout_mock,
910        xlog_segment_size,
911        conn_mock,
912    ):
913        """
914        simple test for the fetch_remote_status method
915        """
916        # Build a server
917        server = build_real_server()
918        current_xlog_info.return_value = {
919            "location": "DE/ADBEEF",
920            "file_name": "00000001000000DE00000000",
921            "file_offset": 11386607,
922            "timestamp": datetime.datetime(2016, 3, 30, 17, 4, 20, 271376),
923        }
924        current_size_mock.return_value = 497354072
925        has_pgespresso_mock.return_value = True
926        server_txt_version_mock.return_value = "9.5.0"
927        is_in_recovery_mock.return_value = False
928        has_backup_privileges_mock.return_value = True
929        is_superuser_mock.return_value = True
930        get_configuration_files_mock.return_value = {"a": "b"}
931        get_synchronous_standby_names_mock.return_value = []
932        conn_mock.return_value.server_version = 90500
933        archive_timeout_mock.return_value = 300
934        checkpoint_timeout_mock.return_value = 600
935        xlog_segment_size.return_value = 2 << 22
936        get_systemid_mock.return_value = 6721602258895701769
937
938        settings = {
939            "data_directory": "a directory",
940            "wal_level": "a wal_level value",
941            "hot_standby": "a hot_standby value",
942            "max_wal_senders": "a max_wal_senderse value",
943            "data_checksums": "a data_checksums",
944            "max_replication_slots": "a max_replication_slots value",
945            "wal_compression": "a wal_compression value",
946            "wal_keep_segments": "a wal_keep_segments value",
947            "wal_keep_size": "a wal_keep_size value",
948        }
949
950        get_setting_mock.side_effect = lambda x: settings.get(x, "unknown")
951
952        # Test PostgreSQL < 9.6
953        result = server.postgres.fetch_remote_status()
954        assert result == {
955            "a": "b",
956            "is_superuser": True,
957            "has_backup_privileges": True,
958            "is_in_recovery": False,
959            "current_lsn": "DE/ADBEEF",
960            "current_xlog": "00000001000000DE00000000",
961            "data_directory": "a directory",
962            "pgespresso_installed": True,
963            "server_txt_version": "9.5.0",
964            "wal_level": "a wal_level value",
965            "current_size": 497354072,
966            "replication_slot_support": True,
967            "replication_slot": None,
968            "synchronous_standby_names": [],
969            "archive_timeout": 300,
970            "checkpoint_timeout": 600,
971            "wal_keep_segments": "a wal_keep_segments value",
972            "hot_standby": "a hot_standby value",
973            "max_wal_senders": "a max_wal_senderse value",
974            "data_checksums": "a data_checksums",
975            "max_replication_slots": "a max_replication_slots value",
976            "wal_compression": "a wal_compression value",
977            "xlog_segment_size": 8388608,
978            "postgres_systemid": None,
979        }
980
981        # Test PostgreSQL 9.6
982        conn_mock.return_value.server_version = 90600
983        result = server.postgres.fetch_remote_status()
984        assert result == {
985            "a": "b",
986            "is_superuser": True,
987            "has_backup_privileges": True,
988            "is_in_recovery": False,
989            "current_lsn": "DE/ADBEEF",
990            "current_xlog": "00000001000000DE00000000",
991            "data_directory": "a directory",
992            "pgespresso_installed": True,
993            "server_txt_version": "9.5.0",
994            "wal_level": "a wal_level value",
995            "current_size": 497354072,
996            "replication_slot_support": True,
997            "replication_slot": None,
998            "synchronous_standby_names": [],
999            "archive_timeout": 300,
1000            "checkpoint_timeout": 600,
1001            "wal_keep_segments": "a wal_keep_segments value",
1002            "hot_standby": "a hot_standby value",
1003            "max_wal_senders": "a max_wal_senderse value",
1004            "data_checksums": "a data_checksums",
1005            "max_replication_slots": "a max_replication_slots value",
1006            "wal_compression": "a wal_compression value",
1007            "xlog_segment_size": 8388608,
1008            "postgres_systemid": 6721602258895701769,
1009        }
1010
1011        # Test PostgreSQL 13
1012        conn_mock.return_value.server_version = 130000
1013        result = server.postgres.fetch_remote_status()
1014        assert result == {
1015            "a": "b",
1016            "is_superuser": True,
1017            "has_backup_privileges": True,
1018            "is_in_recovery": False,
1019            "current_lsn": "DE/ADBEEF",
1020            "current_xlog": "00000001000000DE00000000",
1021            "data_directory": "a directory",
1022            "pgespresso_installed": True,
1023            "server_txt_version": "9.5.0",
1024            "wal_level": "a wal_level value",
1025            "current_size": 497354072,
1026            "replication_slot_support": True,
1027            "replication_slot": None,
1028            "synchronous_standby_names": [],
1029            "archive_timeout": 300,
1030            "checkpoint_timeout": 600,
1031            "wal_keep_size": "a wal_keep_size value",
1032            "hot_standby": "a hot_standby value",
1033            "max_wal_senders": "a max_wal_senderse value",
1034            "data_checksums": "a data_checksums",
1035            "max_replication_slots": "a max_replication_slots value",
1036            "wal_compression": "a wal_compression value",
1037            "xlog_segment_size": 8388608,
1038            "postgres_systemid": 6721602258895701769,
1039        }
1040
1041        # Test error management
1042        server.postgres.close()
1043        conn_mock.side_effect = psycopg2.DatabaseError
1044        assert server.postgres.fetch_remote_status() == {
1045            "is_superuser": None,
1046            "is_in_recovery": None,
1047            "current_xlog": None,
1048            "data_directory": None,
1049            "pgespresso_installed": None,
1050            "server_txt_version": None,
1051            "replication_slot_support": None,
1052            "replication_slot": None,
1053            "synchronous_standby_names": None,
1054            "postgres_systemid": None,
1055        }
1056
1057        get_setting_mock.side_effect = psycopg2.ProgrammingError
1058        assert server.postgres.fetch_remote_status() == {
1059            "is_superuser": None,
1060            "is_in_recovery": None,
1061            "current_xlog": None,
1062            "data_directory": None,
1063            "pgespresso_installed": None,
1064            "server_txt_version": None,
1065            "replication_slot_support": None,
1066            "replication_slot": None,
1067            "synchronous_standby_names": None,
1068            "postgres_systemid": None,
1069        }
1070
1071    @patch("barman.postgres.PostgreSQLConnection.connect")
1072    @patch(
1073        "barman.postgres.PostgreSQLConnection.is_in_recovery", new_callable=PropertyMock
1074    )
1075    @patch(
1076        "barman.postgres.PostgreSQLConnection.is_superuser", new_callable=PropertyMock
1077    )
1078    def test_checkpoint(self, is_superuser_mock, is_in_recovery_mock, conn_mock):
1079        """
1080        Simple test for the execution of a checkpoint on a given server
1081        """
1082        # Build a server
1083        server = build_real_server()
1084        cursor_mock = conn_mock.return_value.cursor.return_value
1085        is_in_recovery_mock.return_value = False
1086        is_superuser_mock.return_value = True
1087        # Execute the checkpoint method
1088        server.postgres.checkpoint()
1089        # Check for the right invocation
1090        cursor_mock.execute.assert_called_with("CHECKPOINT")
1091
1092        cursor_mock.reset_mock()
1093        # Missing required permissions
1094        is_in_recovery_mock.return_value = False
1095        is_superuser_mock.return_value = False
1096        with pytest.raises(PostgresSuperuserRequired):
1097            server.postgres.checkpoint()
1098        assert not cursor_mock.execute.called
1099
1100    @patch("barman.postgres.PostgreSQLConnection.connect")
1101    @patch(
1102        "barman.postgres.PostgreSQLConnection.is_in_recovery", new_callable=PropertyMock
1103    )
1104    @patch(
1105        "barman.postgres.PostgreSQLConnection.has_backup_privileges",
1106        new_callable=PropertyMock,
1107    )
1108    def test_switch_wal(
1109        self, has_backup_privileges_mock, is_in_recovery_mock, conn_mock
1110    ):
1111        """
1112        Simple test for the execution of a switch of a xlog on a given server
1113        """
1114        # Build a server
1115        server = build_real_server()
1116        cursor_mock = conn_mock.return_value.cursor.return_value
1117        is_in_recovery_mock.return_value = False
1118        has_backup_privileges_mock.return_value = True
1119
1120        # Test for the response of a correct switch for PostgreSQL < 10
1121        conn_mock.return_value.server_version = 90100
1122        cursor_mock.fetchone.side_effect = [
1123            ("000000010000000000000001",),
1124            ("000000010000000000000002",),
1125        ]
1126        xlog = server.postgres.switch_wal()
1127
1128        # Check for the right invocation for PostgreSQL < 10
1129        assert xlog == "000000010000000000000001"
1130        cursor_mock.execute.assert_has_calls(
1131            [
1132                call("SELECT pg_xlogfile_name(pg_current_xlog_insert_location())"),
1133                call("SELECT pg_xlogfile_name(pg_switch_xlog())"),
1134                call("SELECT pg_xlogfile_name(pg_current_xlog_insert_location())"),
1135            ]
1136        )
1137
1138        # Test for the response of a correct switch for PostgreSQL 10
1139        conn_mock.return_value.server_version = 100000
1140        cursor_mock.reset_mock()
1141        cursor_mock.fetchone.side_effect = [
1142            ("000000010000000000000001",),
1143            ("000000010000000000000002",),
1144        ]
1145        xlog = server.postgres.switch_wal()
1146
1147        # Check for the right invocation for PostgreSQL 10
1148        assert xlog == "000000010000000000000001"
1149        cursor_mock.execute.assert_has_calls(
1150            [
1151                call("SELECT pg_walfile_name(pg_current_wal_insert_lsn())"),
1152                call("SELECT pg_walfile_name(pg_switch_wal())"),
1153                call("SELECT pg_walfile_name(pg_current_wal_insert_lsn())"),
1154            ]
1155        )
1156
1157        cursor_mock.reset_mock()
1158        # The switch has not been executed
1159        cursor_mock.fetchone.side_effect = [
1160            ("000000010000000000000001",),
1161            ("000000010000000000000001",),
1162        ]
1163        xlog = server.postgres.switch_wal()
1164        # Check for the right invocation
1165        assert xlog == ""
1166
1167        cursor_mock.reset_mock()
1168        # Missing required permissions
1169        is_in_recovery_mock.return_value = False
1170        has_backup_privileges_mock.return_value = False
1171        with pytest.raises(BackupFunctionsAccessRequired):
1172            server.postgres.switch_wal()
1173        # Check for the right invocation
1174        assert not cursor_mock.execute.called
1175
1176        cursor_mock.reset_mock()
1177        # Server in recovery
1178        is_in_recovery_mock.return_value = True
1179        has_backup_privileges_mock.return_value = True
1180        with pytest.raises(PostgresIsInRecovery):
1181            server.postgres.switch_wal()
1182        # Check for the right invocation
1183        assert not cursor_mock.execute.called
1184
1185    @patch("barman.postgres.PostgreSQLConnection.connect")
1186    @patch(
1187        "barman.postgres.PostgreSQLConnection.server_version", new_callable=PropertyMock
1188    )
1189    @patch(
1190        "barman.postgres.PostgreSQLConnection.has_backup_privileges",
1191        new_callable=PropertyMock,
1192    )
1193    def test_get_replication_stats(
1194        self, has_backup_privileges_mock, server_version_mock, conn_mock
1195    ):
1196        """
1197        Simple test for the execution of get_replication_stats on a server
1198        """
1199        # Build a server
1200        server = build_real_server()
1201        cursor_mock = conn_mock.return_value.cursor.return_value
1202        has_backup_privileges_mock.return_value = True
1203
1204        # 10 ALL
1205        cursor_mock.reset_mock()
1206        server_version_mock.return_value = 100000
1207        standby_info = server.postgres.get_replication_stats(
1208            PostgreSQLConnection.ANY_STREAMING_CLIENT
1209        )
1210        assert standby_info is cursor_mock.fetchall.return_value
1211        cursor_mock.execute.assert_called_once_with(
1212            "SELECT r.*, rs.slot_name, "
1213            "pg_is_in_recovery() AS is_in_recovery, "
1214            "CASE WHEN pg_is_in_recovery() "
1215            "  THEN pg_last_wal_receive_lsn() "
1216            "  ELSE pg_current_wal_lsn() "
1217            "END AS current_lsn "
1218            "FROM pg_stat_replication r "
1219            "LEFT JOIN pg_replication_slots rs ON (r.pid = rs.active_pid) "
1220            "WHERE (rs.slot_type IS NULL OR rs.slot_type = 'physical') "
1221            "ORDER BY sync_state DESC, sync_priority"
1222        )
1223
1224        # 10 ALL WALSTREAMER
1225        cursor_mock.reset_mock()
1226        server_version_mock.return_value = 100000
1227        standby_info = server.postgres.get_replication_stats(
1228            PostgreSQLConnection.WALSTREAMER
1229        )
1230        assert standby_info is cursor_mock.fetchall.return_value
1231        cursor_mock.execute.assert_called_once_with(
1232            "SELECT r.*, rs.slot_name, "
1233            "pg_is_in_recovery() AS is_in_recovery, "
1234            "CASE WHEN pg_is_in_recovery() "
1235            "  THEN pg_last_wal_receive_lsn() "
1236            "  ELSE pg_current_wal_lsn() "
1237            "END AS current_lsn "
1238            "FROM pg_stat_replication r "
1239            "LEFT JOIN pg_replication_slots rs ON (r.pid = rs.active_pid) "
1240            "WHERE (rs.slot_type IS NULL OR rs.slot_type = 'physical') "
1241            "AND replay_lsn IS NULL "
1242            "ORDER BY sync_state DESC, sync_priority"
1243        )
1244
1245        # 10 ALL STANDBY
1246        cursor_mock.reset_mock()
1247        server_version_mock.return_value = 100000
1248        standby_info = server.postgres.get_replication_stats(
1249            PostgreSQLConnection.STANDBY
1250        )
1251        assert standby_info is cursor_mock.fetchall.return_value
1252        cursor_mock.execute.assert_called_once_with(
1253            "SELECT r.*, rs.slot_name, "
1254            "pg_is_in_recovery() AS is_in_recovery, "
1255            "CASE WHEN pg_is_in_recovery() "
1256            "  THEN pg_last_wal_receive_lsn() "
1257            "  ELSE pg_current_wal_lsn() "
1258            "END AS current_lsn "
1259            "FROM pg_stat_replication r "
1260            "LEFT JOIN pg_replication_slots rs ON (r.pid = rs.active_pid) "
1261            "WHERE (rs.slot_type IS NULL OR rs.slot_type = 'physical') "
1262            "AND replay_lsn IS NOT NULL "
1263            "ORDER BY sync_state DESC, sync_priority"
1264        )
1265
1266        # 9.5 ALL
1267        cursor_mock.reset_mock()
1268        server_version_mock.return_value = 90500
1269        standby_info = server.postgres.get_replication_stats(
1270            PostgreSQLConnection.ANY_STREAMING_CLIENT
1271        )
1272        assert standby_info is cursor_mock.fetchall.return_value
1273        cursor_mock.execute.assert_called_once_with(
1274            "SELECT pid, usesysid, usename, application_name, client_addr, "
1275            "client_hostname, client_port, "
1276            "backend_start, backend_xmin, state, "
1277            "sent_location AS sent_lsn, "
1278            "write_location AS write_lsn, "
1279            "flush_location AS flush_lsn, "
1280            "replay_location AS replay_lsn, "
1281            "sync_priority, sync_state, rs.slot_name, "
1282            "pg_is_in_recovery() AS is_in_recovery, "
1283            "CASE WHEN pg_is_in_recovery() "
1284            "  THEN pg_last_xlog_receive_location() "
1285            "  ELSE pg_current_xlog_location() "
1286            "END AS current_lsn "
1287            "FROM pg_stat_replication r "
1288            "LEFT JOIN pg_replication_slots rs ON (r.pid = rs.active_pid) "
1289            "WHERE (rs.slot_type IS NULL OR rs.slot_type = 'physical') "
1290            "ORDER BY sync_state DESC, sync_priority"
1291        )
1292
1293        # 9.4 ALL
1294        cursor_mock.reset_mock()
1295        server_version_mock.return_value = 90400
1296        standby_info = server.postgres.get_replication_stats(
1297            PostgreSQLConnection.ANY_STREAMING_CLIENT
1298        )
1299        assert standby_info is cursor_mock.fetchall.return_value
1300        cursor_mock.execute.assert_called_once_with(
1301            "SELECT pid, usesysid, usename, application_name, client_addr, "
1302            "client_hostname, client_port, "
1303            "backend_start, backend_xmin, state, "
1304            "sent_location AS sent_lsn, "
1305            "write_location AS write_lsn, "
1306            "flush_location AS flush_lsn, "
1307            "replay_location AS replay_lsn, "
1308            "sync_priority, sync_state, "
1309            "pg_is_in_recovery() AS is_in_recovery, "
1310            "CASE WHEN pg_is_in_recovery() "
1311            "  THEN pg_last_xlog_receive_location() "
1312            "  ELSE pg_current_xlog_location() "
1313            "END AS current_lsn "
1314            "FROM pg_stat_replication r "
1315            "ORDER BY sync_state DESC, sync_priority"
1316        )
1317
1318        # 9.4 WALSTREAMER
1319        cursor_mock.reset_mock()
1320        server_version_mock.return_value = 90400
1321        standby_info = server.postgres.get_replication_stats(
1322            PostgreSQLConnection.WALSTREAMER
1323        )
1324        assert standby_info is cursor_mock.fetchall.return_value
1325        cursor_mock.execute.assert_called_once_with(
1326            "SELECT pid, usesysid, usename, application_name, client_addr, "
1327            "client_hostname, client_port, "
1328            "backend_start, backend_xmin, state, "
1329            "sent_location AS sent_lsn, "
1330            "write_location AS write_lsn, "
1331            "flush_location AS flush_lsn, "
1332            "replay_location AS replay_lsn, "
1333            "sync_priority, sync_state, "
1334            "pg_is_in_recovery() AS is_in_recovery, "
1335            "CASE WHEN pg_is_in_recovery() "
1336            "  THEN pg_last_xlog_receive_location() "
1337            "  ELSE pg_current_xlog_location() "
1338            "END AS current_lsn "
1339            "FROM pg_stat_replication r "
1340            "WHERE replay_location IS NULL "
1341            "ORDER BY sync_state DESC, sync_priority"
1342        )
1343
1344        # 9.4 STANDBY
1345        cursor_mock.reset_mock()
1346        server_version_mock.return_value = 90400
1347        standby_info = server.postgres.get_replication_stats(
1348            PostgreSQLConnection.STANDBY
1349        )
1350        assert standby_info is cursor_mock.fetchall.return_value
1351        cursor_mock.execute.assert_called_once_with(
1352            "SELECT pid, usesysid, usename, application_name, client_addr, "
1353            "client_hostname, client_port, "
1354            "backend_start, backend_xmin, state, "
1355            "sent_location AS sent_lsn, "
1356            "write_location AS write_lsn, "
1357            "flush_location AS flush_lsn, "
1358            "replay_location AS replay_lsn, "
1359            "sync_priority, sync_state, "
1360            "pg_is_in_recovery() AS is_in_recovery, "
1361            "CASE WHEN pg_is_in_recovery() "
1362            "  THEN pg_last_xlog_receive_location() "
1363            "  ELSE pg_current_xlog_location() "
1364            "END AS current_lsn "
1365            "FROM pg_stat_replication r "
1366            "WHERE replay_location IS NOT NULL "
1367            "ORDER BY sync_state DESC, sync_priority"
1368        )
1369
1370        # 9.2 ALL
1371        cursor_mock.reset_mock()
1372        server_version_mock.return_value = 90200
1373        standby_info = server.postgres.get_replication_stats(
1374            PostgreSQLConnection.ANY_STREAMING_CLIENT
1375        )
1376        assert standby_info is cursor_mock.fetchall.return_value
1377        cursor_mock.execute.assert_called_once_with(
1378            "SELECT pid, usesysid, usename, application_name, client_addr, "
1379            "client_hostname, client_port, backend_start, "
1380            "CAST (NULL AS xid) AS backend_xmin, "
1381            "state, "
1382            "sent_location AS sent_lsn, "
1383            "write_location AS write_lsn, "
1384            "flush_location AS flush_lsn, "
1385            "replay_location AS replay_lsn, "
1386            "sync_priority, sync_state, "
1387            "pg_is_in_recovery() AS is_in_recovery, "
1388            "CASE WHEN pg_is_in_recovery() "
1389            "  THEN pg_last_xlog_receive_location() "
1390            "  ELSE pg_current_xlog_location() "
1391            "END AS current_lsn "
1392            "FROM pg_stat_replication r "
1393            "ORDER BY sync_state DESC, sync_priority"
1394        )
1395
1396        # 9.2 WALSTREAMER
1397        cursor_mock.reset_mock()
1398        server_version_mock.return_value = 90200
1399        standby_info = server.postgres.get_replication_stats(
1400            PostgreSQLConnection.WALSTREAMER
1401        )
1402        assert standby_info is cursor_mock.fetchall.return_value
1403        cursor_mock.execute.assert_called_once_with(
1404            "SELECT pid, usesysid, usename, application_name, client_addr, "
1405            "client_hostname, client_port, backend_start, "
1406            "CAST (NULL AS xid) AS backend_xmin, "
1407            "state, "
1408            "sent_location AS sent_lsn, "
1409            "write_location AS write_lsn, "
1410            "flush_location AS flush_lsn, "
1411            "replay_location AS replay_lsn, "
1412            "sync_priority, sync_state, "
1413            "pg_is_in_recovery() AS is_in_recovery, "
1414            "CASE WHEN pg_is_in_recovery() "
1415            "  THEN pg_last_xlog_receive_location() "
1416            "  ELSE pg_current_xlog_location() "
1417            "END AS current_lsn "
1418            "FROM pg_stat_replication r "
1419            "WHERE replay_location IS NULL "
1420            "ORDER BY sync_state DESC, sync_priority"
1421        )
1422
1423        # 9.2 STANDBY
1424        cursor_mock.reset_mock()
1425        server_version_mock.return_value = 90200
1426        standby_info = server.postgres.get_replication_stats(
1427            PostgreSQLConnection.STANDBY
1428        )
1429        assert standby_info is cursor_mock.fetchall.return_value
1430        cursor_mock.execute.assert_called_once_with(
1431            "SELECT pid, usesysid, usename, application_name, client_addr, "
1432            "client_hostname, client_port, backend_start, "
1433            "CAST (NULL AS xid) AS backend_xmin, "
1434            "state, "
1435            "sent_location AS sent_lsn, "
1436            "write_location AS write_lsn, "
1437            "flush_location AS flush_lsn, "
1438            "replay_location AS replay_lsn, "
1439            "sync_priority, sync_state, "
1440            "pg_is_in_recovery() AS is_in_recovery, "
1441            "CASE WHEN pg_is_in_recovery() "
1442            "  THEN pg_last_xlog_receive_location() "
1443            "  ELSE pg_current_xlog_location() "
1444            "END AS current_lsn "
1445            "FROM pg_stat_replication r "
1446            "WHERE replay_location IS NOT NULL "
1447            "ORDER BY sync_state DESC, sync_priority"
1448        )
1449
1450        # 9.1 ALL
1451        cursor_mock.reset_mock()
1452        server_version_mock.return_value = 90100
1453        standby_info = server.postgres.get_replication_stats(
1454            PostgreSQLConnection.ANY_STREAMING_CLIENT
1455        )
1456        assert standby_info is cursor_mock.fetchall.return_value
1457        cursor_mock.execute.assert_called_once_with(
1458            "SELECT procpid AS pid, usesysid, usename, application_name, "
1459            "client_addr, client_hostname, client_port, backend_start, "
1460            "CAST (NULL AS xid) AS backend_xmin, "
1461            "state, "
1462            "sent_location AS sent_lsn, "
1463            "write_location AS write_lsn, "
1464            "flush_location AS flush_lsn, "
1465            "replay_location AS replay_lsn, "
1466            "sync_priority, sync_state, "
1467            "pg_is_in_recovery() AS is_in_recovery, "
1468            "CASE WHEN pg_is_in_recovery() "
1469            "  THEN pg_last_xlog_receive_location() "
1470            "  ELSE pg_current_xlog_location() "
1471            "END AS current_lsn "
1472            "FROM pg_stat_replication r "
1473            "ORDER BY sync_state DESC, sync_priority"
1474        )
1475
1476        # 9.1 WALSTREAMER
1477        cursor_mock.reset_mock()
1478        server_version_mock.return_value = 90100
1479        standby_info = server.postgres.get_replication_stats(
1480            PostgreSQLConnection.WALSTREAMER
1481        )
1482        assert standby_info is cursor_mock.fetchall.return_value
1483        cursor_mock.execute.assert_called_once_with(
1484            "SELECT procpid AS pid, usesysid, usename, application_name, "
1485            "client_addr, client_hostname, client_port, backend_start, "
1486            "CAST (NULL AS xid) AS backend_xmin, "
1487            "state, "
1488            "sent_location AS sent_lsn, "
1489            "write_location AS write_lsn, "
1490            "flush_location AS flush_lsn, "
1491            "replay_location AS replay_lsn, "
1492            "sync_priority, sync_state, "
1493            "pg_is_in_recovery() AS is_in_recovery, "
1494            "CASE WHEN pg_is_in_recovery() "
1495            "  THEN pg_last_xlog_receive_location() "
1496            "  ELSE pg_current_xlog_location() "
1497            "END AS current_lsn "
1498            "FROM pg_stat_replication r "
1499            "WHERE replay_location IS NULL "
1500            "ORDER BY sync_state DESC, sync_priority"
1501        )
1502
1503        # 9.1 STANDBY
1504        cursor_mock.reset_mock()
1505        server_version_mock.return_value = 90100
1506        standby_info = server.postgres.get_replication_stats(
1507            PostgreSQLConnection.STANDBY
1508        )
1509        assert standby_info is cursor_mock.fetchall.return_value
1510        cursor_mock.execute.assert_called_once_with(
1511            "SELECT procpid AS pid, usesysid, usename, application_name, "
1512            "client_addr, client_hostname, client_port, backend_start, "
1513            "CAST (NULL AS xid) AS backend_xmin, "
1514            "state, "
1515            "sent_location AS sent_lsn, "
1516            "write_location AS write_lsn, "
1517            "flush_location AS flush_lsn, "
1518            "replay_location AS replay_lsn, "
1519            "sync_priority, sync_state, "
1520            "pg_is_in_recovery() AS is_in_recovery, "
1521            "CASE WHEN pg_is_in_recovery() "
1522            "  THEN pg_last_xlog_receive_location() "
1523            "  ELSE pg_current_xlog_location() "
1524            "END AS current_lsn "
1525            "FROM pg_stat_replication r "
1526            "WHERE replay_location IS NOT NULL "
1527            "ORDER BY sync_state DESC, sync_priority"
1528        )
1529
1530        cursor_mock.reset_mock()
1531        # Missing required permissions
1532        has_backup_privileges_mock.return_value = False
1533        with pytest.raises(BackupFunctionsAccessRequired):
1534            server.postgres.get_replication_stats(
1535                PostgreSQLConnection.ANY_STREAMING_CLIENT
1536            )
1537        # Check for the right invocation
1538        assert not cursor_mock.execute.called
1539
1540        cursor_mock.reset_mock()
1541        # Too old version (9.0)
1542        has_backup_privileges_mock.return_value = True
1543        server_version_mock.return_value = 90000
1544        with pytest.raises(PostgresUnsupportedFeature):
1545            server.postgres.get_replication_stats(
1546                PostgreSQLConnection.ANY_STREAMING_CLIENT
1547            )
1548        # Check for the right invocation
1549        assert not cursor_mock.execute.called
1550
1551    @patch("barman.postgres.PostgreSQLConnection.connect")
1552    @patch(
1553        "barman.postgres.PostgreSQLConnection.server_version", new_callable=PropertyMock
1554    )
1555    @patch(
1556        "barman.postgres.PostgreSQLConnection.is_superuser", new_callable=PropertyMock
1557    )
1558    def test_get_replication_slot(
1559        self, is_superuser_mock, server_version_mock, conn_mock
1560    ):
1561        """
1562        Simple test for the execution of get_replication_slots on a server
1563        """
1564        # Build a server
1565        server = build_real_server()
1566        server.config.slot_name = "test"
1567        cursor_mock = conn_mock.return_value.cursor.return_value
1568        is_superuser_mock.return_value = True
1569
1570        # Supported version 9.4
1571        cursor_mock.reset_mock()
1572        server_version_mock.return_value = 90400
1573        replication_slot = server.postgres.get_replication_slot(server.config.slot_name)
1574        assert replication_slot is cursor_mock.fetchone.return_value
1575        cursor_mock.execute.assert_called_once_with(
1576            "SELECT slot_name, "
1577            "active, "
1578            "restart_lsn "
1579            "FROM pg_replication_slots "
1580            "WHERE slot_type = 'physical' "
1581            "AND slot_name = '%s'" % server.config.slot_name
1582        )
1583
1584        # Too old version (3.0)
1585        server_version_mock.return_value = 90300
1586        with pytest.raises(PostgresUnsupportedFeature):
1587            server.postgres.get_replication_slot(server.config.slot_name)
1588
1589    @patch("barman.postgres.PostgreSQLConnection.get_setting")
1590    @patch("barman.postgres.PostgreSQLConnection.connect")
1591    def test_get_synchronous_standby_names(self, conn_mock, setting_mock):
1592        """
1593        Simple test for retrieving settings from the database
1594        """
1595        # Build and configure a server
1596        server = build_real_server()
1597
1598        # Unsupported version: 9.0
1599        conn_mock.return_value.server_version = 90000
1600
1601        with pytest.raises(PostgresUnsupportedFeature):
1602            server.postgres.get_synchronous_standby_names()
1603
1604        # Supported version: 9.1
1605        conn_mock.return_value.server_version = 90100
1606
1607        setting_mock.return_value = "a, bc, def"
1608        names = server.postgres.get_synchronous_standby_names()
1609        setting_mock.assert_called_once_with("synchronous_standby_names")
1610        assert names == ["a", "bc", "def"]
1611
1612        setting_mock.reset_mock()
1613        setting_mock.return_value = "a,bc,def"
1614        names = server.postgres.get_synchronous_standby_names()
1615        setting_mock.assert_called_once_with("synchronous_standby_names")
1616        assert names == ["a", "bc", "def"]
1617
1618        setting_mock.reset_mock()
1619        setting_mock.return_value = " a, bc, def "
1620        names = server.postgres.get_synchronous_standby_names()
1621        setting_mock.assert_called_once_with("synchronous_standby_names")
1622        assert names == ["a", "bc", "def"]
1623
1624        setting_mock.reset_mock()
1625        setting_mock.return_value = "2(a, bc, def)"
1626        names = server.postgres.get_synchronous_standby_names()
1627        setting_mock.assert_called_once_with("synchronous_standby_names")
1628        assert names == ["a", "bc", "def"]
1629
1630        setting_mock.reset_mock()
1631        setting_mock.return_value = " 1 ( a, bc, def ) "
1632        names = server.postgres.get_synchronous_standby_names()
1633        setting_mock.assert_called_once_with("synchronous_standby_names")
1634        assert names == ["a", "bc", "def"]
1635
1636        setting_mock.reset_mock()
1637        setting_mock.return_value = " a "
1638        names = server.postgres.get_synchronous_standby_names()
1639        setting_mock.assert_called_once_with("synchronous_standby_names")
1640        assert names == ["a"]
1641
1642        setting_mock.reset_mock()
1643        setting_mock.return_value = "1(a)"
1644        names = server.postgres.get_synchronous_standby_names()
1645        setting_mock.assert_called_once_with("synchronous_standby_names")
1646        assert names == ["a"]
1647
1648        setting_mock.reset_mock()
1649        setting_mock.return_value = '1(a, "b-c")'
1650        names = server.postgres.get_synchronous_standby_names()
1651        setting_mock.assert_called_once_with("synchronous_standby_names")
1652        assert names == ["a", "b-c"]
1653
1654        setting_mock.reset_mock()
1655        setting_mock.return_value = "*"
1656        names = server.postgres.get_synchronous_standby_names()
1657        setting_mock.assert_called_once_with("synchronous_standby_names")
1658        assert names == ["*"]
1659
1660    @patch("barman.postgres.PostgreSQLConnection.connect")
1661    def test_xlog_segment_size(self, conn_mock):
1662        """
1663        Test the xlog_segment_size method
1664        """
1665
1666        default_wal_file_size = 16777216
1667
1668        # Build a server
1669        server = build_real_server()
1670        conn_mock.return_value.server_version = 110000
1671        cursor_mock = conn_mock.return_value.cursor.return_value
1672        cursor_mock.fetchone.side_effect = [[str(default_wal_file_size)]]
1673
1674        result = server.postgres.xlog_segment_size
1675        assert result == default_wal_file_size
1676
1677        execute_calls = [
1678            call("SELECT setting FROM pg_settings WHERE name='wal_segment_size'"),
1679        ]
1680        cursor_mock.execute.assert_has_calls(execute_calls)
1681
1682    @patch("barman.postgres.PostgreSQLConnection.connect")
1683    def test_xlog_segment_size_10(self, conn_mock):
1684        """
1685        Test the xlog_segment_size method
1686        """
1687
1688        default_wal_file_size = 16777216
1689        default_wal_block_size = 8192
1690        default_wal_segments_number = 2048
1691
1692        # Build a server
1693        server = build_real_server()
1694        conn_mock.return_value.server_version = 100000
1695        cursor_mock = conn_mock.return_value.cursor.return_value
1696        cursor_mock.fetchone.side_effect = [
1697            [str(default_wal_segments_number)],
1698            [str(default_wal_block_size)],
1699        ]
1700
1701        result = server.postgres.xlog_segment_size
1702        assert result == default_wal_file_size
1703
1704        execute_calls = [
1705            call("SELECT setting FROM pg_settings WHERE name='wal_segment_size'"),
1706            call("SELECT setting FROM pg_settings WHERE name='wal_block_size'"),
1707        ]
1708        cursor_mock.execute.assert_has_calls(execute_calls)
1709
1710    @patch("barman.postgres.PostgreSQLConnection.connect")
1711    def test_xlog_segment_size_83(self, conn_mock):
1712        """
1713        If you use PostgreSQL 8.3 you can't change the WAL segment size even
1714        at compilation level. Barman shouldn't ask the server for this data,
1715        as this will result in an error
1716        """
1717
1718        # Build a server
1719        server = build_real_server()
1720        conn_mock.return_value.server_version = 80300
1721        cursor_mock = conn_mock.return_value.cursor.return_value
1722
1723        result = server.postgres.xlog_segment_size
1724        assert result == DEFAULT_XLOG_SEG_SIZE
1725
1726        cursor_mock.execute.assert_not_called()
1727
1728    @patch("barman.postgres.PostgreSQLConnection.connect")
1729    def test_name_map(self, conn_mock):
1730        """
1731        Test the `name_map` behaviour
1732        :return:
1733        """
1734        server = build_real_server()
1735
1736        conn_mock.return_value.server_version = 100000
1737        map_10 = server.postgres.name_map
1738        assert map_10
1739
1740        conn_mock.return_value.server_version = 90300
1741        map_93 = server.postgres.name_map
1742        assert map_93
1743
1744        conn_mock.side_effect = PostgresConnectionError
1745        map_error = server.postgres.name_map
1746        assert map_10 == map_error
1747
1748    @patch("barman.postgres.PostgreSQLConnection.connect")
1749    def test_switch_wal_function(self, conn_mock):
1750        """
1751        Test the `switch_wal_function` name
1752        :return:
1753        """
1754        server = build_real_server()
1755
1756        conn_mock.return_value.server_version = 90300
1757        assert server.postgres.name_map["pg_switch_wal"] == "pg_switch_xlog"
1758
1759        conn_mock.return_value.server_version = 100000
1760        assert server.postgres.name_map["pg_switch_wal"] == "pg_switch_wal"
1761
1762    @patch("barman.postgres.PostgreSQLConnection.connect")
1763    def test_xlogfile_name_function(self, conn_mock):
1764        """
1765        Test the `xlogfile_name_function` property.
1766        :return:
1767        """
1768        server = build_real_server()
1769
1770        conn_mock.return_value.server_version = 90300
1771        assert server.postgres.name_map["pg_walfile_name"] == "pg_xlogfile_name"
1772
1773        conn_mock.return_value.server_version = 100000
1774        assert server.postgres.name_map["pg_walfile_name"] == "pg_walfile_name"
1775
1776    @patch("barman.postgres.PostgreSQLConnection.connect")
1777    def test_xlogfile_name_offset_function(self, conn_mock):
1778        """
1779        Test the `xlogfile_name_function` property.
1780        :return:
1781        """
1782        server = build_real_server()
1783
1784        conn_mock.return_value.server_version = 90300
1785        assert (
1786            server.postgres.name_map["pg_walfile_name_offset"]
1787            == "pg_xlogfile_name_offset"
1788        )
1789
1790        conn_mock.return_value.server_version = 100000
1791        assert (
1792            server.postgres.name_map["pg_walfile_name_offset"]
1793            == "pg_walfile_name_offset"
1794        )
1795
1796    @patch("barman.postgres.PostgreSQLConnection.connect")
1797    def test_xlog_directory(self, conn_mock):
1798        """
1799        Test the `xlog_directory` property.
1800        :return:
1801        """
1802        server = build_real_server()
1803
1804        conn_mock.return_value.server_version = 90300
1805        assert server.postgres.name_map["pg_wal"] == "pg_xlog"
1806
1807        conn_mock.return_value.server_version = 100000
1808        assert server.postgres.name_map["pg_wal"] == "pg_wal"
1809
1810    @patch("barman.postgres.PostgreSQLConnection.connect")
1811    def test_last_xlog_replay_location_function(self, conn_mock):
1812        """
1813        Test the `last_xlog_replay_location_function` property.
1814        :return:
1815        """
1816        server = build_real_server()
1817
1818        conn_mock.return_value.server_version = 90300
1819        assert (
1820            server.postgres.name_map["pg_last_wal_replay_lsn"]
1821            == "pg_last_xlog_replay_location"
1822        )
1823
1824        conn_mock.return_value.server_version = 100000
1825        assert (
1826            server.postgres.name_map["pg_last_wal_replay_lsn"]
1827            == "pg_last_wal_replay_lsn"
1828        )
1829
1830    @patch("barman.postgres.PostgreSQLConnection.connect")
1831    def test_current_xlog_location_function(self, conn_mock):
1832        """
1833        Test the `current_xlog_location_function` property
1834        :return:
1835        """
1836        server = build_real_server()
1837
1838        conn_mock.return_value.server_version = 90300
1839        assert (
1840            server.postgres.name_map["pg_current_wal_lsn"] == "pg_current_xlog_location"
1841        )
1842
1843        conn_mock.return_value.server_version = 100000
1844        assert server.postgres.name_map["pg_current_wal_lsn"] == "pg_current_wal_lsn"
1845
1846    @patch("barman.postgres.PostgreSQLConnection.connect")
1847    def test_current_xlog_insert_location_function(self, conn_mock):
1848        """
1849        Test the `current_xlog_insert_location_function` property
1850        :return:
1851        """
1852        server = build_real_server()
1853
1854        conn_mock.return_value.server_version = 90300
1855        assert (
1856            server.postgres.name_map["pg_current_wal_insert_lsn"]
1857            == "pg_current_xlog_insert_location"
1858        )
1859
1860        conn_mock.return_value.server_version = 100000
1861        assert (
1862            server.postgres.name_map["pg_current_wal_insert_lsn"]
1863            == "pg_current_wal_insert_lsn"
1864        )
1865
1866    @patch("barman.postgres.PostgreSQLConnection.connect")
1867    def test_last_xlog_receive_location_function(self, conn_mock):
1868        """
1869        Test the `current_xlog_insert_location_function` property
1870        :return:
1871        """
1872        server = build_real_server()
1873
1874        conn_mock.return_value.server_version = 90300
1875        assert (
1876            server.postgres.name_map["pg_last_wal_receive_lsn"]
1877            == "pg_last_xlog_receive_location"
1878        )
1879
1880        conn_mock.return_value.server_version = 100000
1881        assert (
1882            server.postgres.name_map["pg_last_wal_receive_lsn"]
1883            == "pg_last_wal_receive_lsn"
1884        )
1885
1886
1887# noinspection PyMethodMayBeStatic
1888class TestStreamingConnection(object):
1889    def test_connection_error(self):
1890        """
1891        simple test for streaming_archiver without streaming_conninfo
1892        """
1893        # Test with wrong configuration
1894        server = build_real_server(
1895            main_conf={"streaming_archiver": True, "streaming_conninfo": ""}
1896        )
1897        assert server.config.msg_list
1898        assert (
1899            "Streaming connection: Missing 'streaming_conninfo' "
1900            "parameter for server 'main'" in server.config.msg_list
1901        )
1902        server = build_real_server(
1903            main_conf={
1904                "streaming_archiver": True,
1905                "streaming_conninfo": "host=/test "
1906                "port=5496 "
1907                "user=test "
1908                "dbname=test_db",
1909            }
1910        )
1911        assert server.streaming.conn_parameters["dbname"] == "replication"
1912        assert (
1913            server.streaming.conninfo == "dbname=replication "
1914            "host=/test "
1915            "options=-cdatestyle=iso "
1916            "port=5496 "
1917            "replication=true "
1918            "user=test"
1919        )
1920
1921    @patch("barman.postgres.psycopg2.connect")
1922    def test_fetch_remote_status(self, conn_mock):
1923        """
1924        simple test for the fetch_remote_status method
1925        """
1926        # Build a server
1927        server = build_real_server(
1928            main_conf={"streaming_archiver": True, "streaming_conninfo": "dummy=param"}
1929        )
1930
1931        # Too old PostgreSQL
1932        conn_mock.return_value.server_version = 90100
1933        result = server.streaming.fetch_remote_status()
1934        assert result["streaming_supported"] is False
1935        assert result["streaming"] is None
1936
1937        # Working streaming connection
1938        conn_mock.return_value.server_version = 90300
1939        cursor_mock = conn_mock.return_value.cursor.return_value
1940        cursor_mock.fetchone.return_value = ("12345", 1, "DE/ADBEEF")
1941        result = server.streaming.fetch_remote_status()
1942        cursor_mock.execute.assert_called_with("IDENTIFY_SYSTEM")
1943        assert result["streaming_supported"] is True
1944        assert result["streaming"] is True
1945
1946        # Working non-streaming connection
1947        conn_mock.reset_mock()
1948        cursor_mock.execute.side_effect = psycopg2.ProgrammingError
1949        result = server.streaming.fetch_remote_status()
1950        cursor_mock.execute.assert_called_with("IDENTIFY_SYSTEM")
1951        assert result["streaming_supported"] is True
1952        assert result["streaming"] is False
1953
1954        # Connection failed
1955        server.streaming.close()
1956        conn_mock.reset_mock()
1957        conn_mock.side_effect = psycopg2.DatabaseError
1958        result = server.streaming.fetch_remote_status()
1959        assert result["streaming_supported"] is None
1960        assert result["streaming"] is None
1961
1962    @patch("barman.postgres.PostgreSQL.connect")
1963    def test_streaming_server_txt_version(self, conn_mock):
1964        """
1965        simple test for the server_txt_version property
1966        """
1967        # Build a server
1968        server = build_real_server(
1969            main_conf={"streaming_archiver": True, "streaming_conninfo": "dummy=param"}
1970        )
1971
1972        # Connection error
1973        conn_mock.side_effect = PostgresConnectionError
1974        assert server.streaming.server_txt_version is None
1975
1976        # Good connection
1977        conn_mock.side_effect = None
1978
1979        conn_mock.return_value.server_version = 80300
1980        assert server.streaming.server_txt_version == "8.3.0"
1981
1982        conn_mock.return_value.server_version = 90000
1983        assert server.streaming.server_txt_version == "9.0.0"
1984
1985        conn_mock.return_value.server_version = 90005
1986        assert server.streaming.server_txt_version == "9.0.5"
1987
1988        conn_mock.return_value.server_version = 100001
1989        assert server.streaming.server_txt_version == "10.1"
1990
1991        conn_mock.return_value.server_version = 110011
1992        assert server.streaming.server_txt_version == "11.11"
1993
1994        conn_mock.return_value.server_version = 0
1995        assert server.streaming.server_txt_version == "0.0.0"
1996
1997    @patch("barman.postgres.psycopg2.connect")
1998    def test_streaming_create_repslot(self, connect_mock):
1999        # Build a server
2000        server = build_real_server(
2001            main_conf={"streaming_archiver": True, "streaming_conninfo": "dummy=param"}
2002        )
2003
2004        # Test replication slot creation
2005        cursor_mock = connect_mock.return_value.cursor.return_value
2006        server.streaming.create_physical_repslot("test_repslot")
2007        cursor_mock.execute.assert_called_once_with(
2008            "CREATE_REPLICATION_SLOT test_repslot PHYSICAL"
2009        )
2010
2011        # Test replication slot already existent
2012        cursor_mock = connect_mock.return_value.cursor.return_value
2013        cursor_mock.execute.side_effect = MockProgrammingError(DUPLICATE_OBJECT)
2014
2015        with pytest.raises(PostgresDuplicateReplicationSlot):
2016            server.streaming.create_physical_repslot("test_repslot")
2017            cursor_mock.execute.assert_called_once_with(
2018                "CREATE_REPLICATION_SLOT test_repslot PHYSICAL"
2019            )
2020
2021    @patch("barman.postgres.psycopg2.connect")
2022    def test_streaming_drop_repslot(self, connect_mock):
2023        # Build a server
2024        server = build_real_server(
2025            main_conf={"streaming_archiver": True, "streaming_conninfo": "dummy=param"}
2026        )
2027
2028        # Test replication slot creation
2029        cursor_mock = connect_mock.return_value.cursor.return_value
2030        server.streaming.drop_repslot("test_repslot")
2031        cursor_mock.execute.assert_called_once_with(
2032            "DROP_REPLICATION_SLOT test_repslot"
2033        )
2034
2035        # Test replication slot already existent
2036        cursor_mock = connect_mock.return_value.cursor.return_value
2037        cursor_mock.execute.side_effect = MockProgrammingError(UNDEFINED_OBJECT)
2038
2039        with pytest.raises(PostgresInvalidReplicationSlot):
2040            server.streaming.drop_repslot("test_repslot")
2041            cursor_mock.execute.assert_called_once_with(
2042                "DROP_REPLICATION_SLOT test_repslot"
2043            )
2044
2045        server.streaming.close()
2046