1# -*- coding: utf-8 -*- 2# © Copyright EnterpriseDB UK Limited 2013-2021 3# 4# This file is part of Barman. 5# 6# Barman is free software: you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation, either version 3 of the License, or 9# (at your option) any later version. 10# 11# Barman is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with Barman. If not, see <http://www.gnu.org/licenses/>. 18 19import datetime 20 21import psycopg2 22import pytest 23from mock import PropertyMock, call, patch 24from psycopg2.errorcodes import DUPLICATE_OBJECT, UNDEFINED_OBJECT 25 26from barman.exceptions import ( 27 PostgresConnectionError, 28 PostgresDuplicateReplicationSlot, 29 PostgresException, 30 PostgresInvalidReplicationSlot, 31 PostgresIsInRecovery, 32 BackupFunctionsAccessRequired, 33 PostgresSuperuserRequired, 34 PostgresUnsupportedFeature, 35) 36from barman.postgres import PostgreSQLConnection 37from barman.xlog import DEFAULT_XLOG_SEG_SIZE 38from testing_helpers import build_real_server 39 40 41class MockProgrammingError(psycopg2.ProgrammingError): 42 """ 43 Mock class for psycopg2 ProgrammingError 44 """ 45 46 def __init__(self, pgcode=None, pgerror=None): 47 # pgcode and pgerror are read only attributes and the ProgrammingError 48 # class is written in native code. The only way to set these attribute 49 # is to use the private method '__setstate__', which is also native 50 self.__setstate__({"pgcode": pgcode, "pgerror": pgerror}) 51 52 53# noinspection PyMethodMayBeStatic 54class TestPostgres(object): 55 def test_connection_error(self): 56 """ 57 simple test for missing conninfo 58 """ 59 # Test with wrong configuration 60 server = build_real_server(main_conf={"conninfo": ""}) 61 assert server.config.msg_list 62 assert ( 63 "PostgreSQL connection: Missing 'conninfo' parameter " 64 "for server 'main'" in server.config.msg_list 65 ) 66 67 @patch("barman.postgres.psycopg2.connect") 68 def test_connect_and_close(self, pg_connect_mock): 69 """ 70 Check pg_connect method beaviour on error 71 """ 72 # Setup server 73 server = build_real_server() 74 server.postgres.conninfo = "valid conninfo" 75 conn_mock = pg_connect_mock.return_value 76 conn_mock.server_version = 90401 77 conn_mock.closed = False 78 cursor_mock = conn_mock.cursor.return_value 79 80 # Connection failure 81 pg_connect_mock.side_effect = psycopg2.DatabaseError 82 with pytest.raises(PostgresConnectionError): 83 server.postgres.connect() 84 85 # Good connection but error setting the application name 86 pg_connect_mock.side_effect = None 87 cursor_mock.execute.side_effect = psycopg2.ProgrammingError 88 with pytest.raises(PostgresConnectionError): 89 server.postgres.connect() 90 91 # Good connection 92 cursor_mock.execute.side_effect = None 93 conn = server.postgres.connect() 94 pg_connect_mock.assert_called_with("valid conninfo") 95 assert conn is conn_mock 96 97 # call again and make sure it returns the cached connection 98 pg_connect_mock.reset_mock() 99 100 new_conn = server.postgres.connect() 101 102 assert new_conn is conn_mock 103 assert not pg_connect_mock.called 104 105 # call again with a broken connection 106 pg_connect_mock.reset_mock() 107 conn_mock.cursor.side_effect = [psycopg2.DatabaseError, cursor_mock] 108 109 new_conn = server.postgres.connect() 110 111 assert new_conn is conn_mock 112 pg_connect_mock.assert_called_with("valid conninfo") 113 114 # close it 115 pg_connect_mock.reset_mock() 116 conn_mock.cursor.side_effect = None 117 conn_mock.closed = False 118 119 server.postgres.close() 120 121 assert conn_mock.close.called 122 123 # close it with an already closed connection 124 pg_connect_mock.reset_mock() 125 conn_mock.closed = True 126 127 server.postgres.close() 128 129 assert not conn_mock.close.called 130 131 # open again and verify that it is a new object 132 pg_connect_mock.reset_mock() 133 conn_mock.closed = False 134 135 server.postgres.connect() 136 137 pg_connect_mock.assert_called_with("valid conninfo") 138 139 server.postgres.close() 140 141 assert conn_mock.close.called 142 143 @patch("barman.postgres.psycopg2.connect") 144 def test_connect_error(self, connect_mock): 145 """ 146 Check pg_connect method beaviour on error 147 """ 148 # Setup temp dir and server 149 server = build_real_server() 150 server.postgres.conninfo = "not valid conninfo" 151 connect_mock.side_effect = psycopg2.DatabaseError 152 # expect pg_connect to raise a PostgresConnectionError 153 with pytest.raises(PostgresConnectionError): 154 server.postgres.connect() 155 connect_mock.assert_called_with("not valid conninfo") 156 157 @patch("barman.postgres.PostgreSQLConnection.connect") 158 def test_server_txt_version(self, conn_mock): 159 """ 160 simple test for the server_txt_version property 161 """ 162 # Build a server 163 server = build_real_server() 164 cursor_mock = conn_mock.return_value.cursor.return_value 165 166 # Connection error 167 conn_mock.side_effect = PostgresConnectionError 168 assert server.postgres.server_txt_version is None 169 170 # Communication error 171 conn_mock.side_effect = None 172 cursor_mock.execute.side_effect = psycopg2.ProgrammingError 173 assert server.postgres.server_txt_version is None 174 175 # Good connection 176 cursor_mock.execute.side_effect = None 177 cursor_mock.fetchone.return_value = ( 178 "PostgreSQL 9.4.5 on x86_64-apple-darwin15.0.0, compiled by " 179 "Apple LLVM version 7.0.0 (clang-700.1.76), 64-bit", 180 ) 181 182 assert server.postgres.server_txt_version == "9.4.5" 183 cursor_mock.execute.assert_called_with("SELECT version()") 184 185 @patch("barman.postgres.PostgreSQLConnection.connect") 186 def test_server_txt_version_epas(self, conn_mock): 187 """ 188 Verify server_txt_version returns the correct Postgres version 189 against EPAS 9.6 and 10, which both return "EnterpriseDB" in the 190 response to `SELECT version();`. 191 192 Versions 11 and above return the Postgres version string with the 193 EnterpriseDB details appended so do not require special handling. 194 """ 195 # Build a server 196 server = build_real_server() 197 cursor_mock = conn_mock.return_value.cursor.return_value 198 199 # EPAS 9.6 returns an extra version field which must be discarded 200 cursor_mock.fetchone.return_value = ( 201 "EnterpriseDB 9.6.23.31 on x86_64-pc-linux-gnu, compiled by " 202 "gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-44), 64-bit", 203 ) 204 assert server.postgres.server_txt_version == "9.6.23" 205 206 # EPAS 10 also returns an extra field relative to the corresponding 207 # PostgreSQL version and it must be discarded 208 cursor_mock.fetchone.return_value = ( 209 "EnterpriseDB 10.18.28 on x86_64-pc-linux-gnu, compiled by " 210 "gcc (GCC) 4.8.5 20150623 (Red Hat 4.8.5-36), 64-bit", 211 ) 212 assert server.postgres.server_txt_version == "10.18" 213 214 @patch("barman.postgres.PostgreSQLConnection.connect") 215 @patch( 216 "barman.postgres.PostgreSQLConnection.is_in_recovery", new_callable=PropertyMock 217 ) 218 def test_create_restore_point(self, is_in_recovery_mock, conn_mock): 219 """ 220 Basic test for the _restore_point method 221 """ 222 # Simulate a master connection 223 is_in_recovery_mock.return_value = False 224 225 server = build_real_server() 226 # Test 1: Postgres 9.0 expect None as result 227 conn_mock.return_value.server_version = 90000 228 229 restore_point = server.postgres.create_restore_point("Test_20151026T092241") 230 assert restore_point is None 231 232 # Simulate a master connection 233 is_in_recovery_mock.return_value = True 234 235 # Test 2: Postgres 9.1 in recovery (standby) expect None as result 236 conn_mock.return_value.server_version = 90100 237 238 restore_point = server.postgres.create_restore_point("Test_20151026T092241") 239 assert restore_point is None 240 241 # Test 3: Postgres 9.1 check mock calls 242 is_in_recovery_mock.return_value = False 243 244 assert server.postgres.create_restore_point("Test_20151026T092241") 245 246 cursor_mock = conn_mock.return_value.cursor.return_value 247 cursor_mock.execute.assert_called_with( 248 "SELECT pg_create_restore_point(%s)", ["Test_20151026T092241"] 249 ) 250 assert cursor_mock.fetchone.called 251 252 # Test error management 253 cursor_mock.execute.side_effect = psycopg2.Error 254 assert server.postgres.create_restore_point("Test_20151026T092241") is None 255 256 @patch("barman.postgres.PostgreSQLConnection.connect") 257 def test_stop_exclusive_backup(self, conn_mock): 258 """ 259 Basic test for the stop_exclusive_backup method 260 261 :param conn_mock: a mock that imitates a connection to PostgreSQL 262 """ 263 # Build a server 264 server = build_real_server() 265 266 # Test call on master, PostgreSQL older than 10 267 conn_mock.return_value.server_version = 90300 268 # Expect no errors on normal call 269 assert server.postgres.stop_exclusive_backup() 270 # check the correct invocation of the execute method 271 cursor_mock = conn_mock.return_value.cursor.return_value 272 cursor_mock.execute.assert_called_once_with( 273 "SELECT location, " 274 "(pg_xlogfile_name_offset(location)).*, " 275 "now() AS timestamp " 276 "FROM pg_stop_backup() AS location" 277 ) 278 279 # Test call on master, PostgreSQL 10 280 conn_mock.reset_mock() 281 conn_mock.return_value.server_version = 100000 282 # Expect no errors on normal call 283 assert server.postgres.stop_exclusive_backup() 284 # check the correct invocation of the execute method 285 cursor_mock = conn_mock.return_value.cursor.return_value 286 cursor_mock.execute.assert_called_once_with( 287 "SELECT location, " 288 "(pg_walfile_name_offset(location)).*, " 289 "now() AS timestamp " 290 "FROM pg_stop_backup() AS location" 291 ) 292 293 # Test Error: Setup the mock to trigger an exception 294 # expect the method to raise a PostgresException 295 conn_mock.reset_mock() 296 cursor_mock.execute.side_effect = psycopg2.Error 297 # Check that the method raises a PostgresException 298 with pytest.raises(PostgresException): 299 server.postgres.stop_exclusive_backup() 300 301 @patch("barman.postgres.PostgreSQLConnection.connect") 302 def test_stop_concurrent_backup(self, conn): 303 """ 304 Basic test for the stop_concurrent_backup method 305 306 :param conn: a mock that imitates a connection to PostgreSQL 307 """ 308 # Build a server 309 server = build_real_server() 310 311 # Expect no errors on normal call 312 assert server.postgres.stop_concurrent_backup() 313 314 # check the correct invocation of the execute method 315 cursor_mock = conn.return_value.cursor.return_value 316 cursor_mock.execute.assert_called_once_with( 317 "SELECT end_row.lsn AS location, " 318 "(SELECT CASE WHEN pg_is_in_recovery() " 319 "THEN min_recovery_end_timeline " 320 "ELSE timeline_id END " 321 "FROM pg_control_checkpoint(), pg_control_recovery()" 322 ") AS timeline, " 323 "end_row.labelfile AS backup_label, " 324 "now() AS timestamp " 325 "FROM pg_stop_backup(FALSE) AS end_row" 326 ) 327 328 # Test 2: Setup the mock to trigger an exception 329 # expect the method to raise a PostgresException 330 conn.reset_mock() 331 cursor_mock.execute.side_effect = psycopg2.Error 332 # Check that the method raises a PostgresException 333 with pytest.raises(PostgresException): 334 server.postgres.stop_concurrent_backup() 335 336 @patch("barman.postgres.PostgreSQLConnection.connect") 337 def test_pgespresso_stop_backup(self, conn): 338 """ 339 Basic test for pgespresso_stop_backup method 340 """ 341 # Build a server 342 server = build_real_server() 343 344 # Test 1: Expect no error and the correct call sequence 345 assert server.postgres.pgespresso_stop_backup("test_label") 346 347 cursor_mock = conn.return_value.cursor.return_value 348 cursor_mock.execute.assert_called_once_with( 349 "SELECT pgespresso_stop_backup(%s) AS end_wal, now() AS timestamp", 350 ("test_label",), 351 ) 352 353 # Test 2: Setup the mock to trigger an exception 354 # expect the method to raise PostgresException 355 conn.reset_mock() 356 cursor_mock.execute.side_effect = psycopg2.Error 357 # Check that the method raises a PostgresException 358 with pytest.raises(PostgresException): 359 server.postgres.pgespresso_stop_backup("test_label") 360 361 @patch("barman.postgres.PostgreSQLConnection.connect") 362 def test_start_exclusive_backup(self, conn): 363 """ 364 Simple test for start_exclusive_backup method of 365 the RsyncBackupExecutor class 366 """ 367 # Build a server 368 server = build_real_server() 369 backup_label = "test label" 370 371 # Expect no errors 372 conn.return_value.server_version = 90300 373 assert server.postgres.start_exclusive_backup(backup_label) 374 375 # check for the correct call on the execute method 376 cursor_mock = conn.return_value.cursor.return_value 377 cursor_mock.execute.assert_called_once_with( 378 "SELECT location, " 379 "(pg_xlogfile_name_offset(location)).*, " 380 "now() AS timestamp " 381 "FROM pg_start_backup(%s,%s) AS location", 382 ("test label", False), 383 ) 384 conn.return_value.rollback.assert_has_calls([call(), call()]) 385 # reset the mock for the next test 386 conn.reset_mock() 387 388 # 8.3 test 389 conn.return_value.server_version = 80300 390 assert server.postgres.start_exclusive_backup(backup_label) 391 # check for the correct call on the execute method 392 cursor_mock.execute.assert_called_once_with( 393 "SELECT location, " 394 "(pg_xlogfile_name_offset(location)).*, " 395 "now() AS timestamp " 396 "FROM pg_start_backup(%s) AS location", 397 ("test label",), 398 ) 399 conn.return_value.rollback.assert_has_calls([call(), call()]) 400 # reset the mock for the next test 401 conn.reset_mock() 402 403 # 10 test 404 conn.return_value.server_version = 100000 405 assert server.postgres.start_exclusive_backup(backup_label) 406 # check for the correct call on the execute method 407 cursor_mock.execute.assert_called_once_with( 408 "SELECT location, " 409 "(pg_walfile_name_offset(location)).*, " 410 "now() AS timestamp " 411 "FROM pg_start_backup(%s,%s) AS location", 412 ("test label", False), 413 ) 414 conn.return_value.rollback.assert_has_calls([call(), call()]) 415 # reset the mock for the next test 416 conn.reset_mock() 417 418 # test error management 419 cursor_mock.execute.side_effect = psycopg2.Error 420 with pytest.raises(PostgresException): 421 server.postgres.start_exclusive_backup(backup_label) 422 conn.return_value.rollback.assert_called_once_with() 423 424 @patch("barman.postgres.PostgreSQLConnection.connect") 425 def test_start_concurrent_backup(self, conn): 426 """ 427 Simple test for start_exclusive_backup method of 428 the RsyncBackupExecutor class 429 """ 430 # Build a server 431 server = build_real_server() 432 label = "test label" 433 434 # Expect no errors 435 assert server.postgres.start_concurrent_backup(label) 436 437 # check for the correct call on the execute method 438 cursor_mock = conn.return_value.cursor.return_value 439 cursor_mock.execute.assert_called_once_with( 440 "SELECT location, " 441 "(SELECT timeline_id " 442 "FROM pg_control_checkpoint()) AS timeline, " 443 "now() AS timestamp " 444 "FROM pg_start_backup(%s, %s, FALSE) AS location", 445 ("test label", False), 446 ) 447 conn.return_value.rollback.assert_has_calls([call(), call()]) 448 # reset the mock for the next test 449 conn.reset_mock() 450 451 # test error management 452 cursor_mock.execute.side_effect = psycopg2.Error 453 with pytest.raises(PostgresException): 454 server.postgres.start_concurrent_backup(label) 455 conn.return_value.rollback.assert_called_once_with() 456 457 @patch("barman.postgres.PostgreSQLConnection.connect") 458 def test_pgespresso_start_backup(self, conn): 459 """ 460 Simple test for _pgespresso_start_backup method 461 of the RsyncBackupExecutor class 462 """ 463 # Build and configure a server 464 server = build_real_server() 465 backup_label = "test label" 466 467 # expect no errors 468 assert server.postgres.pgespresso_start_backup(backup_label) 469 470 cursor_mock = conn.return_value.cursor.return_value 471 cursor_mock.execute.assert_called_once_with( 472 "SELECT pgespresso_start_backup(%s,%s) AS backup_label, " 473 "now() AS timestamp", 474 (backup_label, server.config.immediate_checkpoint), 475 ) 476 conn.return_value.rollback.assert_has_calls([call(), call()]) 477 # reset the mock for the next test 478 conn.reset_mock() 479 480 # Test 2: Setup the mock to trigger an exception 481 # expect the method to return None 482 cursor_mock.execute.side_effect = psycopg2.Error 483 # Check that the method returns None as result 484 with pytest.raises(Exception): 485 server.postgres.pgespresso_start_backup("test_label") 486 conn.return_value.rollback.assert_called_once_with() 487 488 @patch("barman.postgres.PostgreSQLConnection.connect") 489 def test_get_setting(self, conn): 490 """ 491 Simple test for retrieving settings from the database 492 """ 493 # Build and configure a server 494 server = build_real_server() 495 496 # expect no errors 497 server.postgres.get_setting("test_setting") 498 cursor_mock = conn.return_value.cursor.return_value 499 cursor_mock.execute.assert_called_once_with( 500 'SHOW "%s"' % "test_setting".replace('"', '""') 501 ) 502 # reset the mock for the second test 503 conn.reset_mock() 504 505 # Test 2: Setup the mock to trigger an exception 506 # expect the method to return None 507 cursor_mock.execute.side_effect = psycopg2.Error 508 # Check that the method returns None as result 509 assert server.postgres.get_setting("test_setting") is None 510 511 @patch("barman.postgres.PostgreSQLConnection.connect") 512 def test_get_systemid(self, conn): 513 """ 514 Simple test for retrieving the systemid from the database 515 """ 516 # Build and configure a server 517 server = build_real_server() 518 conn.return_value.server_version = 90600 519 520 # expect no errors 521 server.postgres.get_systemid() 522 cursor_mock = conn.return_value.cursor.return_value 523 cursor_mock.execute.assert_called_once_with( 524 "SELECT system_identifier::text FROM pg_control_system()" 525 ) 526 # reset the mock for the second test 527 conn.reset_mock() 528 529 # Test 2: Setup the mock to trigger an exception 530 # expect the method to return None 531 cursor_mock.execute.side_effect = psycopg2.Error 532 # Check that the method returns None as result 533 assert server.postgres.get_systemid() is None 534 # reset the mock for the third test 535 conn.reset_mock() 536 537 # Test 3: setup the mock to return a PostgreSQL version that 538 # don't support pg_control_system() 539 conn.return_value.server_version = 90500 540 assert server.postgres.get_systemid() is None 541 542 @patch("barman.postgres.PostgreSQLConnection.connect") 543 def test_get_tablespaces(self, conn): 544 """ 545 Simple test for pg_start_backup method of the RsyncBackupExecutor class 546 """ 547 # Build a server 548 server = build_real_server() 549 cursor_mock = conn.return_value.cursor.return_value 550 cursor_mock.fetchall.return_value = [("tbs1", "1234", "/tmp")] 551 # Expect no errors 552 conn.return_value.server_version = 90400 553 tbs = server.postgres.get_tablespaces() 554 # check for the correct call on the execute method 555 cursor_mock.execute.assert_called_once_with( 556 "SELECT spcname, oid, " 557 "pg_tablespace_location(oid) AS spclocation " 558 "FROM pg_tablespace " 559 "WHERE pg_tablespace_location(oid) != ''" 560 ) 561 assert tbs == [("tbs1", "1234", "/tmp")] 562 conn.reset_mock() 563 564 # 8.3 test 565 conn.return_value.server_version = 80300 566 cursor_mock.fetchall.return_value = [("tbs2", "5234", "/tmp1")] 567 tbs = server.postgres.get_tablespaces() 568 # check for the correct call on the execute method 569 cursor_mock.execute.assert_called_once_with( 570 "SELECT spcname, oid, spclocation " 571 "FROM pg_tablespace WHERE spclocation != ''" 572 ) 573 assert tbs == [("tbs2", "5234", "/tmp1")] 574 575 conn.reset_mock() 576 # test error management 577 cursor_mock.execute.side_effect = psycopg2.Error 578 assert server.postgres.get_tablespaces() is None 579 580 @patch("barman.postgres.PostgreSQLConnection.connect") 581 def test_get_archiver_stats(self, conn): 582 """ 583 Simple test for pg_start_backup method of the RsyncBackupExecutor class 584 """ 585 # Build a server 586 server = build_real_server() 587 cursor_mock = conn.return_value.cursor.return_value 588 # expect None as result for server version <9.4 589 conn.return_value.server_version = 80300 590 assert server.postgres.get_archiver_stats() is None 591 592 # expect no errors with version >= 9.4 593 conn.reset_mock() 594 conn.return_value.server_version = 90400 595 cursor_mock.fetchone.return_value = {"a": "b"} 596 assert server.postgres.get_archiver_stats() == {"a": "b"} 597 # check for the correct call on the execute method 598 cursor_mock.execute.assert_called_once_with( 599 "SELECT *, " 600 "current_setting('archive_mode') IN ('on', 'always') " 601 "AND (last_failed_wal IS NULL " 602 "OR last_failed_wal LIKE '%.history' " 603 "AND substring(last_failed_wal from 1 for 8) " 604 "<= substring(last_archived_wal from 1 for 8) " 605 "OR last_failed_time <= last_archived_time) " 606 "AS is_archiving, " 607 "CAST (archived_count AS NUMERIC) " 608 "/ EXTRACT (EPOCH FROM age(now(), stats_reset)) " 609 "AS current_archived_wals_per_second " 610 "FROM pg_stat_archiver" 611 ) 612 conn.reset_mock() 613 614 # test error management 615 cursor_mock.execute.side_effect = psycopg2.Error 616 assert server.postgres.get_archiver_stats() is None 617 618 @patch("barman.postgres.PostgreSQLConnection.connect") 619 def test_get_configuration_files(self, conn_mock): 620 """ 621 simple test for the get_configuration_files method 622 """ 623 # Build a server 624 server = build_real_server() 625 conn_mock.return_value.server_version = 80400 626 cursor_mock = conn_mock.return_value.cursor.return_value 627 test_conf_files = [ 628 ("config_file", "/tmp/postgresql.conf"), 629 ("hba_file", "/tmp/pg_hba.conf"), 630 ("ident_file", "/tmp/pg_ident.conf"), 631 ] 632 cursor_mock.fetchall.side_effect = [test_conf_files, [("/test/file",)]] 633 retval = server.postgres.get_configuration_files() 634 635 assert retval == server.postgres.configuration_files 636 assert server.postgres.configuration_files == dict( 637 test_conf_files + [("included_files", ["/test/file"])] 638 ) 639 cursor_mock.execute.assert_any_call( 640 "SELECT name, setting FROM pg_settings " 641 "WHERE name IN ('config_file', 'hba_file', 'ident_file')" 642 ) 643 cursor_mock.execute.assert_any_call( 644 "SELECT DISTINCT sourcefile AS included_file " 645 "FROM pg_settings " 646 "WHERE sourcefile IS NOT NULL " 647 "AND sourcefile NOT IN " 648 "(SELECT setting FROM pg_settings " 649 "WHERE name = 'config_file') " 650 "ORDER BY 1" 651 ) 652 653 # Call it again, should not fetch the data twice 654 conn_mock.reset_mock() 655 retval = server.postgres.get_configuration_files() 656 assert retval == server.postgres.configuration_files 657 assert not cursor_mock.execute.called 658 659 # Reset mock and configuration files 660 conn_mock.reset_mock() 661 server.postgres.configuration_files = None 662 663 # Test error management 664 cursor_mock.execute.side_effect = PostgresConnectionError 665 assert server.postgres.get_configuration_files() == {} 666 667 cursor_mock.execute.side_effect = psycopg2.ProgrammingError 668 assert server.postgres.get_configuration_files() == {} 669 670 @patch("barman.postgres.PostgreSQLConnection.connect") 671 def test_has_pgespresso(self, conn_mock): 672 """ 673 simple test for has_pgespresso property 674 """ 675 # Build a server 676 server = build_real_server() 677 cursor_mock = conn_mock.return_value.cursor.return_value 678 679 # Too old 680 conn_mock.return_value.server_version = 90000 681 assert not server.postgres.has_pgespresso 682 683 # Extension present 684 conn_mock.return_value.server_version = 90100 685 cursor_mock.fetchone.return_value = [1] 686 assert server.postgres.has_pgespresso 687 cursor_mock.execute.assert_called_once_with( 688 "SELECT count(*) FROM pg_extension WHERE extname = 'pgespresso'" 689 ) 690 691 # Extension not present 692 cursor_mock.fetchone.return_value = [0] 693 assert not server.postgres.has_pgespresso 694 695 # Reset mock 696 conn_mock.reset_mock() 697 698 # Test error management 699 cursor_mock.execute.side_effect = PostgresConnectionError 700 assert server.postgres.has_pgespresso is None 701 702 cursor_mock.execute.side_effect = psycopg2.ProgrammingError 703 assert server.postgres.has_pgespresso is None 704 705 @patch("barman.postgres.PostgreSQLConnection.connect") 706 def test_is_in_recovery(self, conn_mock): 707 """ 708 simple test for is_in_recovery property 709 """ 710 # Build a server 711 server = build_real_server() 712 cursor_mock = conn_mock.return_value.cursor.return_value 713 714 # Too old 715 conn_mock.return_value.server_version = 80400 716 assert not server.postgres.is_in_recovery 717 718 # In recovery 719 conn_mock.return_value.server_version = 90100 720 cursor_mock.fetchone.return_value = [True] 721 assert server.postgres.is_in_recovery 722 cursor_mock.execute.assert_called_once_with("SELECT pg_is_in_recovery()") 723 724 # Not in recovery 725 cursor_mock.fetchone.return_value = [False] 726 assert not server.postgres.is_in_recovery 727 728 # Reset mock 729 conn_mock.reset_mock() 730 731 # Test error management 732 cursor_mock.execute.side_effect = PostgresConnectionError 733 assert server.postgres.is_in_recovery is None 734 735 cursor_mock.execute.side_effect = psycopg2.ProgrammingError 736 assert server.postgres.is_in_recovery is None 737 738 @patch("barman.postgres.PostgreSQLConnection.connect") 739 @patch( 740 "barman.postgres.PostgreSQLConnection.is_in_recovery", new_callable=PropertyMock 741 ) 742 def test_current_xlog_info(self, is_in_recovery_mock, conn_mock): 743 """ 744 Test correct select xlog_loc 745 """ 746 # Build and configure a server using a mock 747 server = build_real_server() 748 cursor_mock = conn_mock.return_value.cursor.return_value 749 timestamp = datetime.datetime(2016, 3, 30, 17, 4, 20, 271376) 750 current_xlog_info = dict( 751 location="0/35000528", 752 file_name="000000010000000000000035", 753 file_offset=1320, 754 timestamp=timestamp, 755 ) 756 cursor_mock.fetchone.return_value = current_xlog_info 757 758 # Test call on master, PostgreSQL older than 10 759 conn_mock.return_value.server_version = 90300 760 is_in_recovery_mock.return_value = False 761 remote_loc = server.postgres.current_xlog_info 762 assert remote_loc == current_xlog_info 763 cursor_mock.execute.assert_called_once_with( 764 "SELECT location, (pg_xlogfile_name_offset(location)).*, " 765 "CURRENT_TIMESTAMP AS timestamp " 766 "FROM pg_current_xlog_location() AS location" 767 ) 768 769 # Check call on standby, PostgreSQL older than 10 770 conn_mock.reset_mock() 771 conn_mock.return_value.server_version = 90300 772 is_in_recovery_mock.return_value = True 773 current_xlog_info["file_name"] = None 774 current_xlog_info["file_offset"] = None 775 remote_loc = server.postgres.current_xlog_info 776 assert remote_loc == current_xlog_info 777 cursor_mock.execute.assert_called_once_with( 778 "SELECT location, NULL AS file_name, NULL AS file_offset, " 779 "CURRENT_TIMESTAMP AS timestamp " 780 "FROM pg_last_xlog_replay_location() AS location" 781 ) 782 783 # Test call on master, PostgreSQL 10 784 conn_mock.reset_mock() 785 conn_mock.return_value.server_version = 100000 786 is_in_recovery_mock.return_value = False 787 remote_loc = server.postgres.current_xlog_info 788 assert remote_loc == current_xlog_info 789 cursor_mock.execute.assert_called_once_with( 790 "SELECT location, (pg_walfile_name_offset(location)).*, " 791 "CURRENT_TIMESTAMP AS timestamp " 792 "FROM pg_current_wal_lsn() AS location" 793 ) 794 795 # Check call on standby, PostgreSQL 10 796 conn_mock.reset_mock() 797 conn_mock.return_value.server_version = 100000 798 is_in_recovery_mock.return_value = True 799 current_xlog_info["file_name"] = None 800 current_xlog_info["file_offset"] = None 801 remote_loc = server.postgres.current_xlog_info 802 assert remote_loc == current_xlog_info 803 cursor_mock.execute.assert_called_once_with( 804 "SELECT location, NULL AS file_name, NULL AS file_offset, " 805 "CURRENT_TIMESTAMP AS timestamp " 806 "FROM pg_last_wal_replay_lsn() AS location" 807 ) 808 809 # Reset mock 810 conn_mock.reset_mock() 811 812 # Test error management 813 cursor_mock.execute.side_effect = PostgresConnectionError 814 assert server.postgres.current_xlog_info is None 815 816 cursor_mock.execute.side_effect = psycopg2.ProgrammingError 817 assert server.postgres.current_xlog_info is None 818 819 @patch("barman.postgres.PostgreSQLConnection.connect") 820 @patch( 821 "barman.postgres.PostgreSQLConnection.is_in_recovery", new_callable=PropertyMock 822 ) 823 def test_current_xlog_file_name(self, is_in_recovery_mock, conn_mock): 824 """ 825 simple test for current_xlog property 826 """ 827 # Build a server 828 server = build_real_server() 829 conn_mock.return_value.server_version = 90300 830 cursor_mock = conn_mock.return_value.cursor.return_value 831 832 timestamp = datetime.datetime(2016, 3, 30, 17, 4, 20, 271376) 833 cursor_mock.fetchone.return_value = dict( 834 location="0/35000528", 835 file_name="000000010000000000000035", 836 file_offset=1320, 837 timestamp=timestamp, 838 ) 839 840 # Special way to mock a property 841 is_in_recovery_mock.return_value = False 842 assert server.postgres.current_xlog_file_name == ("000000010000000000000035") 843 844 # Reset mock 845 conn_mock.reset_mock() 846 847 # Test error management 848 cursor_mock.execute.side_effect = PostgresConnectionError 849 assert server.postgres.current_xlog_file_name is None 850 851 cursor_mock.execute.side_effect = psycopg2.ProgrammingError 852 assert server.postgres.current_xlog_file_name is None 853 854 @patch("barman.postgres.psycopg2.connect") 855 @patch( 856 "barman.postgres.PostgreSQLConnection.xlog_segment_size", 857 new_callable=PropertyMock, 858 ) 859 @patch( 860 "barman.postgres.PostgreSQLConnection.checkpoint_timeout", 861 new_callable=PropertyMock, 862 ) 863 @patch( 864 "barman.postgres.PostgreSQLConnection.archive_timeout", 865 new_callable=PropertyMock, 866 ) 867 @patch( 868 "barman.postgres.PostgreSQLConnection.is_in_recovery", new_callable=PropertyMock 869 ) 870 @patch( 871 "barman.postgres.PostgreSQLConnection.has_backup_privileges", 872 new_callable=PropertyMock, 873 ) 874 @patch( 875 "barman.postgres.PostgreSQLConnection.is_superuser", new_callable=PropertyMock 876 ) 877 @patch( 878 "barman.postgres.PostgreSQLConnection.server_txt_version", 879 new_callable=PropertyMock, 880 ) 881 @patch( 882 "barman.postgres.PostgreSQLConnection.has_pgespresso", new_callable=PropertyMock 883 ) 884 @patch( 885 "barman.postgres.PostgreSQLConnection.current_xlog_info", 886 new_callable=PropertyMock, 887 ) 888 @patch( 889 "barman.postgres.PostgreSQLConnection.current_size", new_callable=PropertyMock 890 ) 891 @patch("barman.postgres.PostgreSQLConnection.get_configuration_files") 892 @patch("barman.postgres.PostgreSQLConnection.get_setting") 893 @patch("barman.postgres.PostgreSQLConnection.get_synchronous_standby_names") 894 @patch("barman.postgres.PostgreSQLConnection.get_systemid") 895 def test_get_remote_status( 896 self, 897 get_systemid_mock, 898 get_synchronous_standby_names_mock, 899 get_setting_mock, 900 get_configuration_files_mock, 901 current_size_mock, 902 current_xlog_info, 903 has_pgespresso_mock, 904 server_txt_version_mock, 905 is_superuser_mock, 906 has_backup_privileges_mock, 907 is_in_recovery_mock, 908 archive_timeout_mock, 909 checkpoint_timeout_mock, 910 xlog_segment_size, 911 conn_mock, 912 ): 913 """ 914 simple test for the fetch_remote_status method 915 """ 916 # Build a server 917 server = build_real_server() 918 current_xlog_info.return_value = { 919 "location": "DE/ADBEEF", 920 "file_name": "00000001000000DE00000000", 921 "file_offset": 11386607, 922 "timestamp": datetime.datetime(2016, 3, 30, 17, 4, 20, 271376), 923 } 924 current_size_mock.return_value = 497354072 925 has_pgespresso_mock.return_value = True 926 server_txt_version_mock.return_value = "9.5.0" 927 is_in_recovery_mock.return_value = False 928 has_backup_privileges_mock.return_value = True 929 is_superuser_mock.return_value = True 930 get_configuration_files_mock.return_value = {"a": "b"} 931 get_synchronous_standby_names_mock.return_value = [] 932 conn_mock.return_value.server_version = 90500 933 archive_timeout_mock.return_value = 300 934 checkpoint_timeout_mock.return_value = 600 935 xlog_segment_size.return_value = 2 << 22 936 get_systemid_mock.return_value = 6721602258895701769 937 938 settings = { 939 "data_directory": "a directory", 940 "wal_level": "a wal_level value", 941 "hot_standby": "a hot_standby value", 942 "max_wal_senders": "a max_wal_senderse value", 943 "data_checksums": "a data_checksums", 944 "max_replication_slots": "a max_replication_slots value", 945 "wal_compression": "a wal_compression value", 946 "wal_keep_segments": "a wal_keep_segments value", 947 "wal_keep_size": "a wal_keep_size value", 948 } 949 950 get_setting_mock.side_effect = lambda x: settings.get(x, "unknown") 951 952 # Test PostgreSQL < 9.6 953 result = server.postgres.fetch_remote_status() 954 assert result == { 955 "a": "b", 956 "is_superuser": True, 957 "has_backup_privileges": True, 958 "is_in_recovery": False, 959 "current_lsn": "DE/ADBEEF", 960 "current_xlog": "00000001000000DE00000000", 961 "data_directory": "a directory", 962 "pgespresso_installed": True, 963 "server_txt_version": "9.5.0", 964 "wal_level": "a wal_level value", 965 "current_size": 497354072, 966 "replication_slot_support": True, 967 "replication_slot": None, 968 "synchronous_standby_names": [], 969 "archive_timeout": 300, 970 "checkpoint_timeout": 600, 971 "wal_keep_segments": "a wal_keep_segments value", 972 "hot_standby": "a hot_standby value", 973 "max_wal_senders": "a max_wal_senderse value", 974 "data_checksums": "a data_checksums", 975 "max_replication_slots": "a max_replication_slots value", 976 "wal_compression": "a wal_compression value", 977 "xlog_segment_size": 8388608, 978 "postgres_systemid": None, 979 } 980 981 # Test PostgreSQL 9.6 982 conn_mock.return_value.server_version = 90600 983 result = server.postgres.fetch_remote_status() 984 assert result == { 985 "a": "b", 986 "is_superuser": True, 987 "has_backup_privileges": True, 988 "is_in_recovery": False, 989 "current_lsn": "DE/ADBEEF", 990 "current_xlog": "00000001000000DE00000000", 991 "data_directory": "a directory", 992 "pgespresso_installed": True, 993 "server_txt_version": "9.5.0", 994 "wal_level": "a wal_level value", 995 "current_size": 497354072, 996 "replication_slot_support": True, 997 "replication_slot": None, 998 "synchronous_standby_names": [], 999 "archive_timeout": 300, 1000 "checkpoint_timeout": 600, 1001 "wal_keep_segments": "a wal_keep_segments value", 1002 "hot_standby": "a hot_standby value", 1003 "max_wal_senders": "a max_wal_senderse value", 1004 "data_checksums": "a data_checksums", 1005 "max_replication_slots": "a max_replication_slots value", 1006 "wal_compression": "a wal_compression value", 1007 "xlog_segment_size": 8388608, 1008 "postgres_systemid": 6721602258895701769, 1009 } 1010 1011 # Test PostgreSQL 13 1012 conn_mock.return_value.server_version = 130000 1013 result = server.postgres.fetch_remote_status() 1014 assert result == { 1015 "a": "b", 1016 "is_superuser": True, 1017 "has_backup_privileges": True, 1018 "is_in_recovery": False, 1019 "current_lsn": "DE/ADBEEF", 1020 "current_xlog": "00000001000000DE00000000", 1021 "data_directory": "a directory", 1022 "pgespresso_installed": True, 1023 "server_txt_version": "9.5.0", 1024 "wal_level": "a wal_level value", 1025 "current_size": 497354072, 1026 "replication_slot_support": True, 1027 "replication_slot": None, 1028 "synchronous_standby_names": [], 1029 "archive_timeout": 300, 1030 "checkpoint_timeout": 600, 1031 "wal_keep_size": "a wal_keep_size value", 1032 "hot_standby": "a hot_standby value", 1033 "max_wal_senders": "a max_wal_senderse value", 1034 "data_checksums": "a data_checksums", 1035 "max_replication_slots": "a max_replication_slots value", 1036 "wal_compression": "a wal_compression value", 1037 "xlog_segment_size": 8388608, 1038 "postgres_systemid": 6721602258895701769, 1039 } 1040 1041 # Test error management 1042 server.postgres.close() 1043 conn_mock.side_effect = psycopg2.DatabaseError 1044 assert server.postgres.fetch_remote_status() == { 1045 "is_superuser": None, 1046 "is_in_recovery": None, 1047 "current_xlog": None, 1048 "data_directory": None, 1049 "pgespresso_installed": None, 1050 "server_txt_version": None, 1051 "replication_slot_support": None, 1052 "replication_slot": None, 1053 "synchronous_standby_names": None, 1054 "postgres_systemid": None, 1055 } 1056 1057 get_setting_mock.side_effect = psycopg2.ProgrammingError 1058 assert server.postgres.fetch_remote_status() == { 1059 "is_superuser": None, 1060 "is_in_recovery": None, 1061 "current_xlog": None, 1062 "data_directory": None, 1063 "pgespresso_installed": None, 1064 "server_txt_version": None, 1065 "replication_slot_support": None, 1066 "replication_slot": None, 1067 "synchronous_standby_names": None, 1068 "postgres_systemid": None, 1069 } 1070 1071 @patch("barman.postgres.PostgreSQLConnection.connect") 1072 @patch( 1073 "barman.postgres.PostgreSQLConnection.is_in_recovery", new_callable=PropertyMock 1074 ) 1075 @patch( 1076 "barman.postgres.PostgreSQLConnection.is_superuser", new_callable=PropertyMock 1077 ) 1078 def test_checkpoint(self, is_superuser_mock, is_in_recovery_mock, conn_mock): 1079 """ 1080 Simple test for the execution of a checkpoint on a given server 1081 """ 1082 # Build a server 1083 server = build_real_server() 1084 cursor_mock = conn_mock.return_value.cursor.return_value 1085 is_in_recovery_mock.return_value = False 1086 is_superuser_mock.return_value = True 1087 # Execute the checkpoint method 1088 server.postgres.checkpoint() 1089 # Check for the right invocation 1090 cursor_mock.execute.assert_called_with("CHECKPOINT") 1091 1092 cursor_mock.reset_mock() 1093 # Missing required permissions 1094 is_in_recovery_mock.return_value = False 1095 is_superuser_mock.return_value = False 1096 with pytest.raises(PostgresSuperuserRequired): 1097 server.postgres.checkpoint() 1098 assert not cursor_mock.execute.called 1099 1100 @patch("barman.postgres.PostgreSQLConnection.connect") 1101 @patch( 1102 "barman.postgres.PostgreSQLConnection.is_in_recovery", new_callable=PropertyMock 1103 ) 1104 @patch( 1105 "barman.postgres.PostgreSQLConnection.has_backup_privileges", 1106 new_callable=PropertyMock, 1107 ) 1108 def test_switch_wal( 1109 self, has_backup_privileges_mock, is_in_recovery_mock, conn_mock 1110 ): 1111 """ 1112 Simple test for the execution of a switch of a xlog on a given server 1113 """ 1114 # Build a server 1115 server = build_real_server() 1116 cursor_mock = conn_mock.return_value.cursor.return_value 1117 is_in_recovery_mock.return_value = False 1118 has_backup_privileges_mock.return_value = True 1119 1120 # Test for the response of a correct switch for PostgreSQL < 10 1121 conn_mock.return_value.server_version = 90100 1122 cursor_mock.fetchone.side_effect = [ 1123 ("000000010000000000000001",), 1124 ("000000010000000000000002",), 1125 ] 1126 xlog = server.postgres.switch_wal() 1127 1128 # Check for the right invocation for PostgreSQL < 10 1129 assert xlog == "000000010000000000000001" 1130 cursor_mock.execute.assert_has_calls( 1131 [ 1132 call("SELECT pg_xlogfile_name(pg_current_xlog_insert_location())"), 1133 call("SELECT pg_xlogfile_name(pg_switch_xlog())"), 1134 call("SELECT pg_xlogfile_name(pg_current_xlog_insert_location())"), 1135 ] 1136 ) 1137 1138 # Test for the response of a correct switch for PostgreSQL 10 1139 conn_mock.return_value.server_version = 100000 1140 cursor_mock.reset_mock() 1141 cursor_mock.fetchone.side_effect = [ 1142 ("000000010000000000000001",), 1143 ("000000010000000000000002",), 1144 ] 1145 xlog = server.postgres.switch_wal() 1146 1147 # Check for the right invocation for PostgreSQL 10 1148 assert xlog == "000000010000000000000001" 1149 cursor_mock.execute.assert_has_calls( 1150 [ 1151 call("SELECT pg_walfile_name(pg_current_wal_insert_lsn())"), 1152 call("SELECT pg_walfile_name(pg_switch_wal())"), 1153 call("SELECT pg_walfile_name(pg_current_wal_insert_lsn())"), 1154 ] 1155 ) 1156 1157 cursor_mock.reset_mock() 1158 # The switch has not been executed 1159 cursor_mock.fetchone.side_effect = [ 1160 ("000000010000000000000001",), 1161 ("000000010000000000000001",), 1162 ] 1163 xlog = server.postgres.switch_wal() 1164 # Check for the right invocation 1165 assert xlog == "" 1166 1167 cursor_mock.reset_mock() 1168 # Missing required permissions 1169 is_in_recovery_mock.return_value = False 1170 has_backup_privileges_mock.return_value = False 1171 with pytest.raises(BackupFunctionsAccessRequired): 1172 server.postgres.switch_wal() 1173 # Check for the right invocation 1174 assert not cursor_mock.execute.called 1175 1176 cursor_mock.reset_mock() 1177 # Server in recovery 1178 is_in_recovery_mock.return_value = True 1179 has_backup_privileges_mock.return_value = True 1180 with pytest.raises(PostgresIsInRecovery): 1181 server.postgres.switch_wal() 1182 # Check for the right invocation 1183 assert not cursor_mock.execute.called 1184 1185 @patch("barman.postgres.PostgreSQLConnection.connect") 1186 @patch( 1187 "barman.postgres.PostgreSQLConnection.server_version", new_callable=PropertyMock 1188 ) 1189 @patch( 1190 "barman.postgres.PostgreSQLConnection.has_backup_privileges", 1191 new_callable=PropertyMock, 1192 ) 1193 def test_get_replication_stats( 1194 self, has_backup_privileges_mock, server_version_mock, conn_mock 1195 ): 1196 """ 1197 Simple test for the execution of get_replication_stats on a server 1198 """ 1199 # Build a server 1200 server = build_real_server() 1201 cursor_mock = conn_mock.return_value.cursor.return_value 1202 has_backup_privileges_mock.return_value = True 1203 1204 # 10 ALL 1205 cursor_mock.reset_mock() 1206 server_version_mock.return_value = 100000 1207 standby_info = server.postgres.get_replication_stats( 1208 PostgreSQLConnection.ANY_STREAMING_CLIENT 1209 ) 1210 assert standby_info is cursor_mock.fetchall.return_value 1211 cursor_mock.execute.assert_called_once_with( 1212 "SELECT r.*, rs.slot_name, " 1213 "pg_is_in_recovery() AS is_in_recovery, " 1214 "CASE WHEN pg_is_in_recovery() " 1215 " THEN pg_last_wal_receive_lsn() " 1216 " ELSE pg_current_wal_lsn() " 1217 "END AS current_lsn " 1218 "FROM pg_stat_replication r " 1219 "LEFT JOIN pg_replication_slots rs ON (r.pid = rs.active_pid) " 1220 "WHERE (rs.slot_type IS NULL OR rs.slot_type = 'physical') " 1221 "ORDER BY sync_state DESC, sync_priority" 1222 ) 1223 1224 # 10 ALL WALSTREAMER 1225 cursor_mock.reset_mock() 1226 server_version_mock.return_value = 100000 1227 standby_info = server.postgres.get_replication_stats( 1228 PostgreSQLConnection.WALSTREAMER 1229 ) 1230 assert standby_info is cursor_mock.fetchall.return_value 1231 cursor_mock.execute.assert_called_once_with( 1232 "SELECT r.*, rs.slot_name, " 1233 "pg_is_in_recovery() AS is_in_recovery, " 1234 "CASE WHEN pg_is_in_recovery() " 1235 " THEN pg_last_wal_receive_lsn() " 1236 " ELSE pg_current_wal_lsn() " 1237 "END AS current_lsn " 1238 "FROM pg_stat_replication r " 1239 "LEFT JOIN pg_replication_slots rs ON (r.pid = rs.active_pid) " 1240 "WHERE (rs.slot_type IS NULL OR rs.slot_type = 'physical') " 1241 "AND replay_lsn IS NULL " 1242 "ORDER BY sync_state DESC, sync_priority" 1243 ) 1244 1245 # 10 ALL STANDBY 1246 cursor_mock.reset_mock() 1247 server_version_mock.return_value = 100000 1248 standby_info = server.postgres.get_replication_stats( 1249 PostgreSQLConnection.STANDBY 1250 ) 1251 assert standby_info is cursor_mock.fetchall.return_value 1252 cursor_mock.execute.assert_called_once_with( 1253 "SELECT r.*, rs.slot_name, " 1254 "pg_is_in_recovery() AS is_in_recovery, " 1255 "CASE WHEN pg_is_in_recovery() " 1256 " THEN pg_last_wal_receive_lsn() " 1257 " ELSE pg_current_wal_lsn() " 1258 "END AS current_lsn " 1259 "FROM pg_stat_replication r " 1260 "LEFT JOIN pg_replication_slots rs ON (r.pid = rs.active_pid) " 1261 "WHERE (rs.slot_type IS NULL OR rs.slot_type = 'physical') " 1262 "AND replay_lsn IS NOT NULL " 1263 "ORDER BY sync_state DESC, sync_priority" 1264 ) 1265 1266 # 9.5 ALL 1267 cursor_mock.reset_mock() 1268 server_version_mock.return_value = 90500 1269 standby_info = server.postgres.get_replication_stats( 1270 PostgreSQLConnection.ANY_STREAMING_CLIENT 1271 ) 1272 assert standby_info is cursor_mock.fetchall.return_value 1273 cursor_mock.execute.assert_called_once_with( 1274 "SELECT pid, usesysid, usename, application_name, client_addr, " 1275 "client_hostname, client_port, " 1276 "backend_start, backend_xmin, state, " 1277 "sent_location AS sent_lsn, " 1278 "write_location AS write_lsn, " 1279 "flush_location AS flush_lsn, " 1280 "replay_location AS replay_lsn, " 1281 "sync_priority, sync_state, rs.slot_name, " 1282 "pg_is_in_recovery() AS is_in_recovery, " 1283 "CASE WHEN pg_is_in_recovery() " 1284 " THEN pg_last_xlog_receive_location() " 1285 " ELSE pg_current_xlog_location() " 1286 "END AS current_lsn " 1287 "FROM pg_stat_replication r " 1288 "LEFT JOIN pg_replication_slots rs ON (r.pid = rs.active_pid) " 1289 "WHERE (rs.slot_type IS NULL OR rs.slot_type = 'physical') " 1290 "ORDER BY sync_state DESC, sync_priority" 1291 ) 1292 1293 # 9.4 ALL 1294 cursor_mock.reset_mock() 1295 server_version_mock.return_value = 90400 1296 standby_info = server.postgres.get_replication_stats( 1297 PostgreSQLConnection.ANY_STREAMING_CLIENT 1298 ) 1299 assert standby_info is cursor_mock.fetchall.return_value 1300 cursor_mock.execute.assert_called_once_with( 1301 "SELECT pid, usesysid, usename, application_name, client_addr, " 1302 "client_hostname, client_port, " 1303 "backend_start, backend_xmin, state, " 1304 "sent_location AS sent_lsn, " 1305 "write_location AS write_lsn, " 1306 "flush_location AS flush_lsn, " 1307 "replay_location AS replay_lsn, " 1308 "sync_priority, sync_state, " 1309 "pg_is_in_recovery() AS is_in_recovery, " 1310 "CASE WHEN pg_is_in_recovery() " 1311 " THEN pg_last_xlog_receive_location() " 1312 " ELSE pg_current_xlog_location() " 1313 "END AS current_lsn " 1314 "FROM pg_stat_replication r " 1315 "ORDER BY sync_state DESC, sync_priority" 1316 ) 1317 1318 # 9.4 WALSTREAMER 1319 cursor_mock.reset_mock() 1320 server_version_mock.return_value = 90400 1321 standby_info = server.postgres.get_replication_stats( 1322 PostgreSQLConnection.WALSTREAMER 1323 ) 1324 assert standby_info is cursor_mock.fetchall.return_value 1325 cursor_mock.execute.assert_called_once_with( 1326 "SELECT pid, usesysid, usename, application_name, client_addr, " 1327 "client_hostname, client_port, " 1328 "backend_start, backend_xmin, state, " 1329 "sent_location AS sent_lsn, " 1330 "write_location AS write_lsn, " 1331 "flush_location AS flush_lsn, " 1332 "replay_location AS replay_lsn, " 1333 "sync_priority, sync_state, " 1334 "pg_is_in_recovery() AS is_in_recovery, " 1335 "CASE WHEN pg_is_in_recovery() " 1336 " THEN pg_last_xlog_receive_location() " 1337 " ELSE pg_current_xlog_location() " 1338 "END AS current_lsn " 1339 "FROM pg_stat_replication r " 1340 "WHERE replay_location IS NULL " 1341 "ORDER BY sync_state DESC, sync_priority" 1342 ) 1343 1344 # 9.4 STANDBY 1345 cursor_mock.reset_mock() 1346 server_version_mock.return_value = 90400 1347 standby_info = server.postgres.get_replication_stats( 1348 PostgreSQLConnection.STANDBY 1349 ) 1350 assert standby_info is cursor_mock.fetchall.return_value 1351 cursor_mock.execute.assert_called_once_with( 1352 "SELECT pid, usesysid, usename, application_name, client_addr, " 1353 "client_hostname, client_port, " 1354 "backend_start, backend_xmin, state, " 1355 "sent_location AS sent_lsn, " 1356 "write_location AS write_lsn, " 1357 "flush_location AS flush_lsn, " 1358 "replay_location AS replay_lsn, " 1359 "sync_priority, sync_state, " 1360 "pg_is_in_recovery() AS is_in_recovery, " 1361 "CASE WHEN pg_is_in_recovery() " 1362 " THEN pg_last_xlog_receive_location() " 1363 " ELSE pg_current_xlog_location() " 1364 "END AS current_lsn " 1365 "FROM pg_stat_replication r " 1366 "WHERE replay_location IS NOT NULL " 1367 "ORDER BY sync_state DESC, sync_priority" 1368 ) 1369 1370 # 9.2 ALL 1371 cursor_mock.reset_mock() 1372 server_version_mock.return_value = 90200 1373 standby_info = server.postgres.get_replication_stats( 1374 PostgreSQLConnection.ANY_STREAMING_CLIENT 1375 ) 1376 assert standby_info is cursor_mock.fetchall.return_value 1377 cursor_mock.execute.assert_called_once_with( 1378 "SELECT pid, usesysid, usename, application_name, client_addr, " 1379 "client_hostname, client_port, backend_start, " 1380 "CAST (NULL AS xid) AS backend_xmin, " 1381 "state, " 1382 "sent_location AS sent_lsn, " 1383 "write_location AS write_lsn, " 1384 "flush_location AS flush_lsn, " 1385 "replay_location AS replay_lsn, " 1386 "sync_priority, sync_state, " 1387 "pg_is_in_recovery() AS is_in_recovery, " 1388 "CASE WHEN pg_is_in_recovery() " 1389 " THEN pg_last_xlog_receive_location() " 1390 " ELSE pg_current_xlog_location() " 1391 "END AS current_lsn " 1392 "FROM pg_stat_replication r " 1393 "ORDER BY sync_state DESC, sync_priority" 1394 ) 1395 1396 # 9.2 WALSTREAMER 1397 cursor_mock.reset_mock() 1398 server_version_mock.return_value = 90200 1399 standby_info = server.postgres.get_replication_stats( 1400 PostgreSQLConnection.WALSTREAMER 1401 ) 1402 assert standby_info is cursor_mock.fetchall.return_value 1403 cursor_mock.execute.assert_called_once_with( 1404 "SELECT pid, usesysid, usename, application_name, client_addr, " 1405 "client_hostname, client_port, backend_start, " 1406 "CAST (NULL AS xid) AS backend_xmin, " 1407 "state, " 1408 "sent_location AS sent_lsn, " 1409 "write_location AS write_lsn, " 1410 "flush_location AS flush_lsn, " 1411 "replay_location AS replay_lsn, " 1412 "sync_priority, sync_state, " 1413 "pg_is_in_recovery() AS is_in_recovery, " 1414 "CASE WHEN pg_is_in_recovery() " 1415 " THEN pg_last_xlog_receive_location() " 1416 " ELSE pg_current_xlog_location() " 1417 "END AS current_lsn " 1418 "FROM pg_stat_replication r " 1419 "WHERE replay_location IS NULL " 1420 "ORDER BY sync_state DESC, sync_priority" 1421 ) 1422 1423 # 9.2 STANDBY 1424 cursor_mock.reset_mock() 1425 server_version_mock.return_value = 90200 1426 standby_info = server.postgres.get_replication_stats( 1427 PostgreSQLConnection.STANDBY 1428 ) 1429 assert standby_info is cursor_mock.fetchall.return_value 1430 cursor_mock.execute.assert_called_once_with( 1431 "SELECT pid, usesysid, usename, application_name, client_addr, " 1432 "client_hostname, client_port, backend_start, " 1433 "CAST (NULL AS xid) AS backend_xmin, " 1434 "state, " 1435 "sent_location AS sent_lsn, " 1436 "write_location AS write_lsn, " 1437 "flush_location AS flush_lsn, " 1438 "replay_location AS replay_lsn, " 1439 "sync_priority, sync_state, " 1440 "pg_is_in_recovery() AS is_in_recovery, " 1441 "CASE WHEN pg_is_in_recovery() " 1442 " THEN pg_last_xlog_receive_location() " 1443 " ELSE pg_current_xlog_location() " 1444 "END AS current_lsn " 1445 "FROM pg_stat_replication r " 1446 "WHERE replay_location IS NOT NULL " 1447 "ORDER BY sync_state DESC, sync_priority" 1448 ) 1449 1450 # 9.1 ALL 1451 cursor_mock.reset_mock() 1452 server_version_mock.return_value = 90100 1453 standby_info = server.postgres.get_replication_stats( 1454 PostgreSQLConnection.ANY_STREAMING_CLIENT 1455 ) 1456 assert standby_info is cursor_mock.fetchall.return_value 1457 cursor_mock.execute.assert_called_once_with( 1458 "SELECT procpid AS pid, usesysid, usename, application_name, " 1459 "client_addr, client_hostname, client_port, backend_start, " 1460 "CAST (NULL AS xid) AS backend_xmin, " 1461 "state, " 1462 "sent_location AS sent_lsn, " 1463 "write_location AS write_lsn, " 1464 "flush_location AS flush_lsn, " 1465 "replay_location AS replay_lsn, " 1466 "sync_priority, sync_state, " 1467 "pg_is_in_recovery() AS is_in_recovery, " 1468 "CASE WHEN pg_is_in_recovery() " 1469 " THEN pg_last_xlog_receive_location() " 1470 " ELSE pg_current_xlog_location() " 1471 "END AS current_lsn " 1472 "FROM pg_stat_replication r " 1473 "ORDER BY sync_state DESC, sync_priority" 1474 ) 1475 1476 # 9.1 WALSTREAMER 1477 cursor_mock.reset_mock() 1478 server_version_mock.return_value = 90100 1479 standby_info = server.postgres.get_replication_stats( 1480 PostgreSQLConnection.WALSTREAMER 1481 ) 1482 assert standby_info is cursor_mock.fetchall.return_value 1483 cursor_mock.execute.assert_called_once_with( 1484 "SELECT procpid AS pid, usesysid, usename, application_name, " 1485 "client_addr, client_hostname, client_port, backend_start, " 1486 "CAST (NULL AS xid) AS backend_xmin, " 1487 "state, " 1488 "sent_location AS sent_lsn, " 1489 "write_location AS write_lsn, " 1490 "flush_location AS flush_lsn, " 1491 "replay_location AS replay_lsn, " 1492 "sync_priority, sync_state, " 1493 "pg_is_in_recovery() AS is_in_recovery, " 1494 "CASE WHEN pg_is_in_recovery() " 1495 " THEN pg_last_xlog_receive_location() " 1496 " ELSE pg_current_xlog_location() " 1497 "END AS current_lsn " 1498 "FROM pg_stat_replication r " 1499 "WHERE replay_location IS NULL " 1500 "ORDER BY sync_state DESC, sync_priority" 1501 ) 1502 1503 # 9.1 STANDBY 1504 cursor_mock.reset_mock() 1505 server_version_mock.return_value = 90100 1506 standby_info = server.postgres.get_replication_stats( 1507 PostgreSQLConnection.STANDBY 1508 ) 1509 assert standby_info is cursor_mock.fetchall.return_value 1510 cursor_mock.execute.assert_called_once_with( 1511 "SELECT procpid AS pid, usesysid, usename, application_name, " 1512 "client_addr, client_hostname, client_port, backend_start, " 1513 "CAST (NULL AS xid) AS backend_xmin, " 1514 "state, " 1515 "sent_location AS sent_lsn, " 1516 "write_location AS write_lsn, " 1517 "flush_location AS flush_lsn, " 1518 "replay_location AS replay_lsn, " 1519 "sync_priority, sync_state, " 1520 "pg_is_in_recovery() AS is_in_recovery, " 1521 "CASE WHEN pg_is_in_recovery() " 1522 " THEN pg_last_xlog_receive_location() " 1523 " ELSE pg_current_xlog_location() " 1524 "END AS current_lsn " 1525 "FROM pg_stat_replication r " 1526 "WHERE replay_location IS NOT NULL " 1527 "ORDER BY sync_state DESC, sync_priority" 1528 ) 1529 1530 cursor_mock.reset_mock() 1531 # Missing required permissions 1532 has_backup_privileges_mock.return_value = False 1533 with pytest.raises(BackupFunctionsAccessRequired): 1534 server.postgres.get_replication_stats( 1535 PostgreSQLConnection.ANY_STREAMING_CLIENT 1536 ) 1537 # Check for the right invocation 1538 assert not cursor_mock.execute.called 1539 1540 cursor_mock.reset_mock() 1541 # Too old version (9.0) 1542 has_backup_privileges_mock.return_value = True 1543 server_version_mock.return_value = 90000 1544 with pytest.raises(PostgresUnsupportedFeature): 1545 server.postgres.get_replication_stats( 1546 PostgreSQLConnection.ANY_STREAMING_CLIENT 1547 ) 1548 # Check for the right invocation 1549 assert not cursor_mock.execute.called 1550 1551 @patch("barman.postgres.PostgreSQLConnection.connect") 1552 @patch( 1553 "barman.postgres.PostgreSQLConnection.server_version", new_callable=PropertyMock 1554 ) 1555 @patch( 1556 "barman.postgres.PostgreSQLConnection.is_superuser", new_callable=PropertyMock 1557 ) 1558 def test_get_replication_slot( 1559 self, is_superuser_mock, server_version_mock, conn_mock 1560 ): 1561 """ 1562 Simple test for the execution of get_replication_slots on a server 1563 """ 1564 # Build a server 1565 server = build_real_server() 1566 server.config.slot_name = "test" 1567 cursor_mock = conn_mock.return_value.cursor.return_value 1568 is_superuser_mock.return_value = True 1569 1570 # Supported version 9.4 1571 cursor_mock.reset_mock() 1572 server_version_mock.return_value = 90400 1573 replication_slot = server.postgres.get_replication_slot(server.config.slot_name) 1574 assert replication_slot is cursor_mock.fetchone.return_value 1575 cursor_mock.execute.assert_called_once_with( 1576 "SELECT slot_name, " 1577 "active, " 1578 "restart_lsn " 1579 "FROM pg_replication_slots " 1580 "WHERE slot_type = 'physical' " 1581 "AND slot_name = '%s'" % server.config.slot_name 1582 ) 1583 1584 # Too old version (3.0) 1585 server_version_mock.return_value = 90300 1586 with pytest.raises(PostgresUnsupportedFeature): 1587 server.postgres.get_replication_slot(server.config.slot_name) 1588 1589 @patch("barman.postgres.PostgreSQLConnection.get_setting") 1590 @patch("barman.postgres.PostgreSQLConnection.connect") 1591 def test_get_synchronous_standby_names(self, conn_mock, setting_mock): 1592 """ 1593 Simple test for retrieving settings from the database 1594 """ 1595 # Build and configure a server 1596 server = build_real_server() 1597 1598 # Unsupported version: 9.0 1599 conn_mock.return_value.server_version = 90000 1600 1601 with pytest.raises(PostgresUnsupportedFeature): 1602 server.postgres.get_synchronous_standby_names() 1603 1604 # Supported version: 9.1 1605 conn_mock.return_value.server_version = 90100 1606 1607 setting_mock.return_value = "a, bc, def" 1608 names = server.postgres.get_synchronous_standby_names() 1609 setting_mock.assert_called_once_with("synchronous_standby_names") 1610 assert names == ["a", "bc", "def"] 1611 1612 setting_mock.reset_mock() 1613 setting_mock.return_value = "a,bc,def" 1614 names = server.postgres.get_synchronous_standby_names() 1615 setting_mock.assert_called_once_with("synchronous_standby_names") 1616 assert names == ["a", "bc", "def"] 1617 1618 setting_mock.reset_mock() 1619 setting_mock.return_value = " a, bc, def " 1620 names = server.postgres.get_synchronous_standby_names() 1621 setting_mock.assert_called_once_with("synchronous_standby_names") 1622 assert names == ["a", "bc", "def"] 1623 1624 setting_mock.reset_mock() 1625 setting_mock.return_value = "2(a, bc, def)" 1626 names = server.postgres.get_synchronous_standby_names() 1627 setting_mock.assert_called_once_with("synchronous_standby_names") 1628 assert names == ["a", "bc", "def"] 1629 1630 setting_mock.reset_mock() 1631 setting_mock.return_value = " 1 ( a, bc, def ) " 1632 names = server.postgres.get_synchronous_standby_names() 1633 setting_mock.assert_called_once_with("synchronous_standby_names") 1634 assert names == ["a", "bc", "def"] 1635 1636 setting_mock.reset_mock() 1637 setting_mock.return_value = " a " 1638 names = server.postgres.get_synchronous_standby_names() 1639 setting_mock.assert_called_once_with("synchronous_standby_names") 1640 assert names == ["a"] 1641 1642 setting_mock.reset_mock() 1643 setting_mock.return_value = "1(a)" 1644 names = server.postgres.get_synchronous_standby_names() 1645 setting_mock.assert_called_once_with("synchronous_standby_names") 1646 assert names == ["a"] 1647 1648 setting_mock.reset_mock() 1649 setting_mock.return_value = '1(a, "b-c")' 1650 names = server.postgres.get_synchronous_standby_names() 1651 setting_mock.assert_called_once_with("synchronous_standby_names") 1652 assert names == ["a", "b-c"] 1653 1654 setting_mock.reset_mock() 1655 setting_mock.return_value = "*" 1656 names = server.postgres.get_synchronous_standby_names() 1657 setting_mock.assert_called_once_with("synchronous_standby_names") 1658 assert names == ["*"] 1659 1660 @patch("barman.postgres.PostgreSQLConnection.connect") 1661 def test_xlog_segment_size(self, conn_mock): 1662 """ 1663 Test the xlog_segment_size method 1664 """ 1665 1666 default_wal_file_size = 16777216 1667 1668 # Build a server 1669 server = build_real_server() 1670 conn_mock.return_value.server_version = 110000 1671 cursor_mock = conn_mock.return_value.cursor.return_value 1672 cursor_mock.fetchone.side_effect = [[str(default_wal_file_size)]] 1673 1674 result = server.postgres.xlog_segment_size 1675 assert result == default_wal_file_size 1676 1677 execute_calls = [ 1678 call("SELECT setting FROM pg_settings WHERE name='wal_segment_size'"), 1679 ] 1680 cursor_mock.execute.assert_has_calls(execute_calls) 1681 1682 @patch("barman.postgres.PostgreSQLConnection.connect") 1683 def test_xlog_segment_size_10(self, conn_mock): 1684 """ 1685 Test the xlog_segment_size method 1686 """ 1687 1688 default_wal_file_size = 16777216 1689 default_wal_block_size = 8192 1690 default_wal_segments_number = 2048 1691 1692 # Build a server 1693 server = build_real_server() 1694 conn_mock.return_value.server_version = 100000 1695 cursor_mock = conn_mock.return_value.cursor.return_value 1696 cursor_mock.fetchone.side_effect = [ 1697 [str(default_wal_segments_number)], 1698 [str(default_wal_block_size)], 1699 ] 1700 1701 result = server.postgres.xlog_segment_size 1702 assert result == default_wal_file_size 1703 1704 execute_calls = [ 1705 call("SELECT setting FROM pg_settings WHERE name='wal_segment_size'"), 1706 call("SELECT setting FROM pg_settings WHERE name='wal_block_size'"), 1707 ] 1708 cursor_mock.execute.assert_has_calls(execute_calls) 1709 1710 @patch("barman.postgres.PostgreSQLConnection.connect") 1711 def test_xlog_segment_size_83(self, conn_mock): 1712 """ 1713 If you use PostgreSQL 8.3 you can't change the WAL segment size even 1714 at compilation level. Barman shouldn't ask the server for this data, 1715 as this will result in an error 1716 """ 1717 1718 # Build a server 1719 server = build_real_server() 1720 conn_mock.return_value.server_version = 80300 1721 cursor_mock = conn_mock.return_value.cursor.return_value 1722 1723 result = server.postgres.xlog_segment_size 1724 assert result == DEFAULT_XLOG_SEG_SIZE 1725 1726 cursor_mock.execute.assert_not_called() 1727 1728 @patch("barman.postgres.PostgreSQLConnection.connect") 1729 def test_name_map(self, conn_mock): 1730 """ 1731 Test the `name_map` behaviour 1732 :return: 1733 """ 1734 server = build_real_server() 1735 1736 conn_mock.return_value.server_version = 100000 1737 map_10 = server.postgres.name_map 1738 assert map_10 1739 1740 conn_mock.return_value.server_version = 90300 1741 map_93 = server.postgres.name_map 1742 assert map_93 1743 1744 conn_mock.side_effect = PostgresConnectionError 1745 map_error = server.postgres.name_map 1746 assert map_10 == map_error 1747 1748 @patch("barman.postgres.PostgreSQLConnection.connect") 1749 def test_switch_wal_function(self, conn_mock): 1750 """ 1751 Test the `switch_wal_function` name 1752 :return: 1753 """ 1754 server = build_real_server() 1755 1756 conn_mock.return_value.server_version = 90300 1757 assert server.postgres.name_map["pg_switch_wal"] == "pg_switch_xlog" 1758 1759 conn_mock.return_value.server_version = 100000 1760 assert server.postgres.name_map["pg_switch_wal"] == "pg_switch_wal" 1761 1762 @patch("barman.postgres.PostgreSQLConnection.connect") 1763 def test_xlogfile_name_function(self, conn_mock): 1764 """ 1765 Test the `xlogfile_name_function` property. 1766 :return: 1767 """ 1768 server = build_real_server() 1769 1770 conn_mock.return_value.server_version = 90300 1771 assert server.postgres.name_map["pg_walfile_name"] == "pg_xlogfile_name" 1772 1773 conn_mock.return_value.server_version = 100000 1774 assert server.postgres.name_map["pg_walfile_name"] == "pg_walfile_name" 1775 1776 @patch("barman.postgres.PostgreSQLConnection.connect") 1777 def test_xlogfile_name_offset_function(self, conn_mock): 1778 """ 1779 Test the `xlogfile_name_function` property. 1780 :return: 1781 """ 1782 server = build_real_server() 1783 1784 conn_mock.return_value.server_version = 90300 1785 assert ( 1786 server.postgres.name_map["pg_walfile_name_offset"] 1787 == "pg_xlogfile_name_offset" 1788 ) 1789 1790 conn_mock.return_value.server_version = 100000 1791 assert ( 1792 server.postgres.name_map["pg_walfile_name_offset"] 1793 == "pg_walfile_name_offset" 1794 ) 1795 1796 @patch("barman.postgres.PostgreSQLConnection.connect") 1797 def test_xlog_directory(self, conn_mock): 1798 """ 1799 Test the `xlog_directory` property. 1800 :return: 1801 """ 1802 server = build_real_server() 1803 1804 conn_mock.return_value.server_version = 90300 1805 assert server.postgres.name_map["pg_wal"] == "pg_xlog" 1806 1807 conn_mock.return_value.server_version = 100000 1808 assert server.postgres.name_map["pg_wal"] == "pg_wal" 1809 1810 @patch("barman.postgres.PostgreSQLConnection.connect") 1811 def test_last_xlog_replay_location_function(self, conn_mock): 1812 """ 1813 Test the `last_xlog_replay_location_function` property. 1814 :return: 1815 """ 1816 server = build_real_server() 1817 1818 conn_mock.return_value.server_version = 90300 1819 assert ( 1820 server.postgres.name_map["pg_last_wal_replay_lsn"] 1821 == "pg_last_xlog_replay_location" 1822 ) 1823 1824 conn_mock.return_value.server_version = 100000 1825 assert ( 1826 server.postgres.name_map["pg_last_wal_replay_lsn"] 1827 == "pg_last_wal_replay_lsn" 1828 ) 1829 1830 @patch("barman.postgres.PostgreSQLConnection.connect") 1831 def test_current_xlog_location_function(self, conn_mock): 1832 """ 1833 Test the `current_xlog_location_function` property 1834 :return: 1835 """ 1836 server = build_real_server() 1837 1838 conn_mock.return_value.server_version = 90300 1839 assert ( 1840 server.postgres.name_map["pg_current_wal_lsn"] == "pg_current_xlog_location" 1841 ) 1842 1843 conn_mock.return_value.server_version = 100000 1844 assert server.postgres.name_map["pg_current_wal_lsn"] == "pg_current_wal_lsn" 1845 1846 @patch("barman.postgres.PostgreSQLConnection.connect") 1847 def test_current_xlog_insert_location_function(self, conn_mock): 1848 """ 1849 Test the `current_xlog_insert_location_function` property 1850 :return: 1851 """ 1852 server = build_real_server() 1853 1854 conn_mock.return_value.server_version = 90300 1855 assert ( 1856 server.postgres.name_map["pg_current_wal_insert_lsn"] 1857 == "pg_current_xlog_insert_location" 1858 ) 1859 1860 conn_mock.return_value.server_version = 100000 1861 assert ( 1862 server.postgres.name_map["pg_current_wal_insert_lsn"] 1863 == "pg_current_wal_insert_lsn" 1864 ) 1865 1866 @patch("barman.postgres.PostgreSQLConnection.connect") 1867 def test_last_xlog_receive_location_function(self, conn_mock): 1868 """ 1869 Test the `current_xlog_insert_location_function` property 1870 :return: 1871 """ 1872 server = build_real_server() 1873 1874 conn_mock.return_value.server_version = 90300 1875 assert ( 1876 server.postgres.name_map["pg_last_wal_receive_lsn"] 1877 == "pg_last_xlog_receive_location" 1878 ) 1879 1880 conn_mock.return_value.server_version = 100000 1881 assert ( 1882 server.postgres.name_map["pg_last_wal_receive_lsn"] 1883 == "pg_last_wal_receive_lsn" 1884 ) 1885 1886 1887# noinspection PyMethodMayBeStatic 1888class TestStreamingConnection(object): 1889 def test_connection_error(self): 1890 """ 1891 simple test for streaming_archiver without streaming_conninfo 1892 """ 1893 # Test with wrong configuration 1894 server = build_real_server( 1895 main_conf={"streaming_archiver": True, "streaming_conninfo": ""} 1896 ) 1897 assert server.config.msg_list 1898 assert ( 1899 "Streaming connection: Missing 'streaming_conninfo' " 1900 "parameter for server 'main'" in server.config.msg_list 1901 ) 1902 server = build_real_server( 1903 main_conf={ 1904 "streaming_archiver": True, 1905 "streaming_conninfo": "host=/test " 1906 "port=5496 " 1907 "user=test " 1908 "dbname=test_db", 1909 } 1910 ) 1911 assert server.streaming.conn_parameters["dbname"] == "replication" 1912 assert ( 1913 server.streaming.conninfo == "dbname=replication " 1914 "host=/test " 1915 "options=-cdatestyle=iso " 1916 "port=5496 " 1917 "replication=true " 1918 "user=test" 1919 ) 1920 1921 @patch("barman.postgres.psycopg2.connect") 1922 def test_fetch_remote_status(self, conn_mock): 1923 """ 1924 simple test for the fetch_remote_status method 1925 """ 1926 # Build a server 1927 server = build_real_server( 1928 main_conf={"streaming_archiver": True, "streaming_conninfo": "dummy=param"} 1929 ) 1930 1931 # Too old PostgreSQL 1932 conn_mock.return_value.server_version = 90100 1933 result = server.streaming.fetch_remote_status() 1934 assert result["streaming_supported"] is False 1935 assert result["streaming"] is None 1936 1937 # Working streaming connection 1938 conn_mock.return_value.server_version = 90300 1939 cursor_mock = conn_mock.return_value.cursor.return_value 1940 cursor_mock.fetchone.return_value = ("12345", 1, "DE/ADBEEF") 1941 result = server.streaming.fetch_remote_status() 1942 cursor_mock.execute.assert_called_with("IDENTIFY_SYSTEM") 1943 assert result["streaming_supported"] is True 1944 assert result["streaming"] is True 1945 1946 # Working non-streaming connection 1947 conn_mock.reset_mock() 1948 cursor_mock.execute.side_effect = psycopg2.ProgrammingError 1949 result = server.streaming.fetch_remote_status() 1950 cursor_mock.execute.assert_called_with("IDENTIFY_SYSTEM") 1951 assert result["streaming_supported"] is True 1952 assert result["streaming"] is False 1953 1954 # Connection failed 1955 server.streaming.close() 1956 conn_mock.reset_mock() 1957 conn_mock.side_effect = psycopg2.DatabaseError 1958 result = server.streaming.fetch_remote_status() 1959 assert result["streaming_supported"] is None 1960 assert result["streaming"] is None 1961 1962 @patch("barman.postgres.PostgreSQL.connect") 1963 def test_streaming_server_txt_version(self, conn_mock): 1964 """ 1965 simple test for the server_txt_version property 1966 """ 1967 # Build a server 1968 server = build_real_server( 1969 main_conf={"streaming_archiver": True, "streaming_conninfo": "dummy=param"} 1970 ) 1971 1972 # Connection error 1973 conn_mock.side_effect = PostgresConnectionError 1974 assert server.streaming.server_txt_version is None 1975 1976 # Good connection 1977 conn_mock.side_effect = None 1978 1979 conn_mock.return_value.server_version = 80300 1980 assert server.streaming.server_txt_version == "8.3.0" 1981 1982 conn_mock.return_value.server_version = 90000 1983 assert server.streaming.server_txt_version == "9.0.0" 1984 1985 conn_mock.return_value.server_version = 90005 1986 assert server.streaming.server_txt_version == "9.0.5" 1987 1988 conn_mock.return_value.server_version = 100001 1989 assert server.streaming.server_txt_version == "10.1" 1990 1991 conn_mock.return_value.server_version = 110011 1992 assert server.streaming.server_txt_version == "11.11" 1993 1994 conn_mock.return_value.server_version = 0 1995 assert server.streaming.server_txt_version == "0.0.0" 1996 1997 @patch("barman.postgres.psycopg2.connect") 1998 def test_streaming_create_repslot(self, connect_mock): 1999 # Build a server 2000 server = build_real_server( 2001 main_conf={"streaming_archiver": True, "streaming_conninfo": "dummy=param"} 2002 ) 2003 2004 # Test replication slot creation 2005 cursor_mock = connect_mock.return_value.cursor.return_value 2006 server.streaming.create_physical_repslot("test_repslot") 2007 cursor_mock.execute.assert_called_once_with( 2008 "CREATE_REPLICATION_SLOT test_repslot PHYSICAL" 2009 ) 2010 2011 # Test replication slot already existent 2012 cursor_mock = connect_mock.return_value.cursor.return_value 2013 cursor_mock.execute.side_effect = MockProgrammingError(DUPLICATE_OBJECT) 2014 2015 with pytest.raises(PostgresDuplicateReplicationSlot): 2016 server.streaming.create_physical_repslot("test_repslot") 2017 cursor_mock.execute.assert_called_once_with( 2018 "CREATE_REPLICATION_SLOT test_repslot PHYSICAL" 2019 ) 2020 2021 @patch("barman.postgres.psycopg2.connect") 2022 def test_streaming_drop_repslot(self, connect_mock): 2023 # Build a server 2024 server = build_real_server( 2025 main_conf={"streaming_archiver": True, "streaming_conninfo": "dummy=param"} 2026 ) 2027 2028 # Test replication slot creation 2029 cursor_mock = connect_mock.return_value.cursor.return_value 2030 server.streaming.drop_repslot("test_repslot") 2031 cursor_mock.execute.assert_called_once_with( 2032 "DROP_REPLICATION_SLOT test_repslot" 2033 ) 2034 2035 # Test replication slot already existent 2036 cursor_mock = connect_mock.return_value.cursor.return_value 2037 cursor_mock.execute.side_effect = MockProgrammingError(UNDEFINED_OBJECT) 2038 2039 with pytest.raises(PostgresInvalidReplicationSlot): 2040 server.streaming.drop_repslot("test_repslot") 2041 cursor_mock.execute.assert_called_once_with( 2042 "DROP_REPLICATION_SLOT test_repslot" 2043 ) 2044 2045 server.streaming.close() 2046