1# Testing of logical decoding using SQL interface and/or pg_recvlogical
2#
3# Most logical decoding tests are in contrib/test_decoding. This module
4# is for work that doesn't fit well there, like where server restarts
5# are required.
6use strict;
7use warnings;
8use PostgresNode;
9use TestLib;
10use Test::More tests => 12;
11use Config;
12
13# Initialize master node
14my $node_master = get_new_node('master');
15$node_master->init(allows_streaming => 1);
16$node_master->append_conf(
17	'postgresql.conf', qq(
18wal_level = logical
19));
20$node_master->start;
21my $backup_name = 'master_backup';
22
23$node_master->safe_psql('postgres',
24	qq[CREATE TABLE decoding_test(x integer, y text);]);
25
26$node_master->safe_psql('postgres',
27	qq[SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');]
28);
29
30$node_master->safe_psql('postgres',
31	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]
32);
33
34# Basic decoding works
35my ($result) = $node_master->safe_psql('postgres',
36	qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
37is(scalar(my @foobar = split /^/m, $result),
38	12, 'Decoding produced 12 rows inc BEGIN/COMMIT');
39
40# If we immediately crash the server we might lose the progress we just made
41# and replay the same changes again. But a clean shutdown should never repeat
42# the same changes when we use the SQL decoding interface.
43$node_master->restart('fast');
44
45# There are no new writes, so the result should be empty.
46$result = $node_master->safe_psql('postgres',
47	qq[SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);]);
48chomp($result);
49is($result, '', 'Decoding after fast restart repeats no rows');
50
51# Insert some rows and verify that we get the same results from pg_recvlogical
52# and the SQL interface.
53$node_master->safe_psql('postgres',
54	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]
55);
56
57my $expected = q{BEGIN
58table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
59table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
60table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
61table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
62COMMIT};
63
64my $stdout_sql = $node_master->safe_psql('postgres',
65	qq[SELECT data FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
66);
67is($stdout_sql, $expected, 'got expected output from SQL decoding session');
68
69my $endpos = $node_master->safe_psql('postgres',
70	"SELECT lsn FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
71);
72print "waiting to replay $endpos\n";
73
74# Insert some rows after $endpos, which we won't read.
75$node_master->safe_psql('postgres',
76	qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,50) s;]
77);
78
79my $stdout_recv = $node_master->pg_recvlogical_upto(
80	'postgres', 'test_slot', $endpos, 180,
81	'include-xids'     => '0',
82	'skip-empty-xacts' => '1');
83chomp($stdout_recv);
84is($stdout_recv, $expected,
85	'got same expected output from pg_recvlogical decoding session');
86
87$node_master->poll_query_until('postgres',
88	"SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'test_slot' AND active_pid IS NULL)"
89) or die "slot never became inactive";
90
91$stdout_recv = $node_master->pg_recvlogical_upto(
92	'postgres', 'test_slot', $endpos, 180,
93	'include-xids'     => '0',
94	'skip-empty-xacts' => '1');
95chomp($stdout_recv);
96is($stdout_recv, '',
97	'pg_recvlogical acknowledged changes');
98
99$node_master->safe_psql('postgres', 'CREATE DATABASE otherdb');
100
101is( $node_master->psql(
102		'otherdb',
103		"SELECT lsn FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
104	),
105	3,
106	'replaying logical slot from another database fails');
107
108$node_master->safe_psql('otherdb',
109	qq[SELECT pg_create_logical_replication_slot('otherdb_slot', 'test_decoding');]
110);
111
112# make sure you can't drop a slot while active
113SKIP:
114{
115
116	# some Windows Perls at least don't like IPC::Run's start/kill_kill regime.
117	skip "Test fails on Windows perl", 2 if $Config{osname} eq 'MSWin32';
118
119	my $pg_recvlogical = IPC::Run::start(
120		[
121			'pg_recvlogical', '-d', $node_master->connstr('otherdb'),
122			'-S', 'otherdb_slot', '-f', '-', '--start'
123		]);
124	$node_master->poll_query_until('otherdb',
125		"SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NOT NULL)"
126	) or die "slot never became active";
127	is($node_master->psql('postgres', 'DROP DATABASE otherdb'),
128		3, 'dropping a DB with active logical slots fails');
129	$pg_recvlogical->kill_kill;
130	is($node_master->slot('otherdb_slot')->{'slot_name'},
131		undef, 'logical slot still exists');
132}
133
134$node_master->poll_query_until('otherdb',
135	"SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'otherdb_slot' AND active_pid IS NULL)"
136) or die "slot never became inactive";
137
138is($node_master->psql('postgres', 'DROP DATABASE otherdb'),
139	0, 'dropping a DB with inactive logical slots succeeds');
140is($node_master->slot('otherdb_slot')->{'slot_name'},
141	undef, 'logical slot was actually dropped with DB');
142
143# Test logical slot advancing and its durability.
144my $logical_slot = 'logical_slot';
145$node_master->safe_psql('postgres',
146	"SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false);");
147$node_master->psql('postgres', "
148	CREATE TABLE tab_logical_slot (a int);
149	INSERT INTO tab_logical_slot VALUES (generate_series(1,10));");
150my $current_lsn = $node_master->safe_psql('postgres',
151	"SELECT pg_current_wal_lsn();");
152chomp($current_lsn);
153my $psql_rc = $node_master->psql('postgres',
154	"SELECT pg_replication_slot_advance('$logical_slot', '$current_lsn'::pg_lsn);");
155is($psql_rc, '0', 'slot advancing with logical slot');
156my $logical_restart_lsn_pre = $node_master->safe_psql('postgres',
157	"SELECT restart_lsn from pg_replication_slots WHERE slot_name = '$logical_slot';");
158chomp($logical_restart_lsn_pre);
159# Slot advance should persist across clean restarts.
160$node_master->restart;
161my $logical_restart_lsn_post = $node_master->safe_psql('postgres',
162	"SELECT restart_lsn from pg_replication_slots WHERE slot_name = '$logical_slot';");
163chomp($logical_restart_lsn_post);
164ok(($logical_restart_lsn_pre cmp $logical_restart_lsn_post) == 0,
165	"logical slot advance persists across restarts");
166
167# done with the node
168$node_master->stop;
169