1# Copyright (c) 2021, PostgreSQL Global Development Group
2
3# Tests for already-propagated WAL segments ending in incomplete WAL records.
4
5use strict;
6use warnings;
7
8use FindBin;
9use PostgresNode;
10use TestLib;
11use Test::More;
12
13plan tests => 3;
14
15# Test: Create a physical replica that's missing the last WAL file,
16# then restart the primary to create a divergent WAL file and observe
17# that the replica replays the "overwrite contrecord" from that new
18# file.
19
20my $node = PostgresNode->get_new_node('primary');
21$node->init(allows_streaming => 1);
22$node->append_conf('postgresql.conf', 'wal_keep_segments=16');
23$node->start;
24
25$node->safe_psql('postgres', 'create table filler (a int, b text)');
26
27# Now consume all remaining room in the current WAL segment, leaving
28# space enough only for the start of a largish record.
29$node->safe_psql(
30	'postgres', q{
31DO $$
32DECLARE
33    wal_segsize int :=
34        (max(setting) filter (where name = 'wal_segment_size'))::int *
35        (max(setting) filter (where name = 'wal_block_size'))::int from pg_settings ;
36    remain int;
37    iters  int := 0;
38BEGIN
39    LOOP
40        INSERT into filler
41        select g, repeat(md5(g::text), (random() * 60 + 1)::int)
42        from generate_series(1, 10) g;
43
44        remain := wal_segsize - (pg_current_wal_insert_lsn() - '0/0') % wal_segsize;
45        IF remain < 2 * setting::int from pg_settings where name = 'block_size' THEN
46            RAISE log 'exiting after % iterations, % bytes to end of WAL segment', iters, remain;
47            EXIT;
48        END IF;
49        iters := iters + 1;
50    END LOOP;
51END
52$$;
53});
54
55my $initfile = $node->safe_psql('postgres',
56	'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
57$node->safe_psql('postgres',
58qq{SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
59);
60
61#$node->safe_psql('postgres', qq{create table foo ()});
62my $endfile = $node->safe_psql('postgres',
63	'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
64ok($initfile != $endfile, "$initfile differs from $endfile");
65
66# Now stop abruptly, to avoid a stop checkpoint.  We can remove the tail file
67# afterwards, and on startup the large message should be overwritten with new
68# contents
69$node->stop('immediate');
70
71unlink $node->basedir . "/pgdata/pg_wal/$endfile"
72  or die "could not unlink " . $node->basedir . "/pgdata/pg_wal/$endfile: $!";
73
74# OK, create a standby at this spot.
75$node->backup_fs_cold('backup');
76my $node_standby = PostgresNode->get_new_node('standby');
77$node_standby->init_from_backup($node, 'backup', has_streaming => 1);
78
79$node_standby->start;
80$node->start;
81
82$node->safe_psql('postgres',
83	qq{create table foo (a text); insert into foo values ('hello')});
84$node->safe_psql('postgres',
85	qq{SELECT pg_logical_emit_message(true, 'test 026', 'AABBCC')});
86
87my $until_lsn = $node->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
88my $caughtup_query =
89  "SELECT '$until_lsn'::pg_lsn <= pg_last_wal_replay_lsn()";
90$node_standby->poll_query_until('postgres', $caughtup_query)
91  or die "Timed out while waiting for standby to catch up";
92
93ok($node_standby->safe_psql('postgres', 'select * from foo') eq 'hello',
94	'standby replays past overwritten contrecord');
95
96# Verify message appears in standby's log
97my $log = slurp_file($node_standby->logfile);
98like(
99	$log,
100	qr[successfully skipped missing contrecord at],
101	"found log line in standby");
102
103$node->stop;
104$node_standby->stop;
105