1# Testing of logical decoding using SQL interface and/or pg_recvlogical 2# 3# Most logical decoding tests are in contrib/test_decoding. This module 4# is for work that doesn't fit well there, like where server restarts 5# are required. 6use strict; 7use warnings; 8use PostgresNode; 9use TestLib; 10use Test::More tests => 12; 11use Config; 12 13# Initialize master node 14my $node_master = get_new_node('master'); 15$node_master->init(allows_streaming => 1); 16$node_master->append_conf( 17 'postgresql.conf', qq( 18wal_level = logical 19)); 20$node_master->start; 21my $backup_name = 'master_backup'; 22 23$node_master->safe_psql('postgres', 24 qq[CREATE TABLE decoding_test(x integer, y text);]); 25 26$node_master->safe_psql('postgres', 27 qq[SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');] 28); 29 30$node_master->safe_psql('postgres', 31 qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;] 32); 33 34# Basic decoding works 35my ($result) = $node_master->safe_psql('postgres', 36 qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]); 37is(scalar(my @foobar = split /^/m, $result), 38 12, 'Decoding produced 12 rows inc BEGIN/COMMIT'); 39 40# If we immediately crash the server we might lose the progress we just made 41# and replay the same changes again. But a clean shutdown should never repeat 42# the same changes when we use the SQL decoding interface. 43$node_master->restart('fast'); 44 45# There are no new writes, so the result should be empty. 46$result = $node_master->safe_psql('postgres', 47 qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]); 48chomp($result); 49is($result, '', 'Decoding after fast restart repeats no rows'); 50 51# Insert some rows and verify that we get the same results from pg_recvlogical 52# and the SQL interface. 53$node_master->safe_psql('postgres', 54 qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;] 55); 56 57my $expected = q{BEGIN 58table public.decoding_test: INSERT: x[integer]:1 y[text]:'1' 59table public.decoding_test: INSERT: x[integer]:2 y[text]:'2' 60table public.decoding_test: INSERT: x[integer]:3 y[text]:'3' 61table public.decoding_test: INSERT: x[integer]:4 y[text]:'4' 62COMMIT}; 63 64my $stdout_sql = $node_master->safe_psql('postgres', 65 qq[SELECT data FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');] 66); 67is($stdout_sql, $expected, 'got expected output from SQL decoding session'); 68 69my $endpos = $node_master->safe_psql('postgres', 70 "SELECT lsn FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;" 71); 72print "waiting to replay $endpos\n"; 73 74# Insert some rows after $endpos, which we won't read. 75$node_master->safe_psql('postgres', 76 qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,50) s;] 77); 78 79my $stdout_recv = $node_master->pg_recvlogical_upto( 80 'postgres', 'test_slot', $endpos, 180, 81 'include-xids' => '0', 82 'skip-empty-xacts' => '1'); 83chomp($stdout_recv); 84is($stdout_recv, $expected, 85 'got same expected output from pg_recvlogical decoding session'); 86 87$node_master->poll_query_until('postgres', 88 "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'test_slot' AND active_pid IS NULL)" 89) or die "slot never became inactive"; 90 91$stdout_recv = $node_master->pg_recvlogical_upto( 92 'postgres', 'test_slot', $endpos, 180, 93 'include-xids' => '0', 94 'skip-empty-xacts' => '1'); 95chomp($stdout_recv); 96is($stdout_recv, '', 97 'pg_recvlogical acknowledged changes'); 98 99$node_master->safe_psql('postgres', 'CREATE DATABASE otherdb'); 100 101is( $node_master->psql( 102 'otherdb', 103 "SELECT lsn FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;" 104 ), 105 3, 106 'replaying logical slot from another database fails'); 107 108$node_master->safe_psql('otherdb', 109 qq[SELECT pg_create_logical_replication_slot('otherdb_slot', 'test_decoding');] 110); 111 112# make sure you can't drop a slot while active 113SKIP: 114{ 115 116 # some Windows Perls at least don't like IPC::Run's start/kill_kill regime. 117 skip "Test fails on Windows perl", 2 if $Config{osname} eq 'MSWin32'; 118 119 my $pg_recvlogical = IPC::Run::start( 120 [ 121 'pg_recvlogical', '-d', $node_master->connstr('otherdb'), 122 '-S', 'otherdb_slot', '-f', '-', '--start' 123 ]); 124 $node_master->poll_query_until('otherdb', 125 "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NOT NULL)" 126 ) or die "slot never became active"; 127 is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 128 3, 'dropping a DB with active logical slots fails'); 129 $pg_recvlogical->kill_kill; 130 is($node_master->slot('otherdb_slot')->{'slot_name'}, 131 undef, 'logical slot still exists'); 132} 133 134$node_master->poll_query_until('otherdb', 135 "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NULL)" 136) or die "slot never became inactive"; 137 138is($node_master->psql('postgres', 'DROP DATABASE otherdb'), 139 0, 'dropping a DB with inactive logical slots succeeds'); 140is($node_master->slot('otherdb_slot')->{'slot_name'}, 141 undef, 'logical slot was actually dropped with DB'); 142 143# Test logical slot advancing and its durability. 144my $logical_slot = 'logical_slot'; 145$node_master->safe_psql('postgres', 146 "SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false);"); 147$node_master->psql('postgres', " 148 CREATE TABLE tab_logical_slot (a int); 149 INSERT INTO tab_logical_slot VALUES (generate_series(1,10));"); 150my $current_lsn = $node_master->safe_psql('postgres', 151 "SELECT pg_current_wal_lsn();"); 152chomp($current_lsn); 153my $psql_rc = $node_master->psql('postgres', 154 "SELECT pg_replication_slot_advance('$logical_slot', '$current_lsn'::pg_lsn);"); 155is($psql_rc, '0', 'slot advancing with logical slot'); 156my $logical_restart_lsn_pre = $node_master->safe_psql('postgres', 157 "SELECT restart_lsn from pg_replication_slots WHERE slot_name = '$logical_slot';"); 158chomp($logical_restart_lsn_pre); 159# Slot advance should persist across clean restarts. 160$node_master->restart; 161my $logical_restart_lsn_post = $node_master->safe_psql('postgres', 162 "SELECT restart_lsn from pg_replication_slots WHERE slot_name = '$logical_slot';"); 163chomp($logical_restart_lsn_post); 164ok(($logical_restart_lsn_pre cmp $logical_restart_lsn_post) == 0, 165 "logical slot advance persists across restarts"); 166 167# done with the node 168$node_master->stop; 169