1 /*-------------------------------------------------------------------------
2 *
3 * walsender.c
4 *
5 * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6 * care of sending XLOG from the primary server to a single recipient.
7 * (Note that there can be more than one walsender process concurrently.)
8 * It is started by the postmaster when the walreceiver of a standby server
9 * connects to the primary server and requests XLOG streaming replication.
10 *
11 * A walsender is similar to a regular backend, ie. there is a one-to-one
12 * relationship between a connection and a walsender process, but instead
13 * of processing SQL queries, it understands a small set of special
14 * replication-mode commands. The START_REPLICATION command begins streaming
15 * WAL to the client. While streaming, the walsender keeps reading XLOG
16 * records from the disk and sends them to the standby server over the
17 * COPY protocol, until either side ends the replication by exiting COPY
18 * mode (or until the connection is closed).
19 *
20 * Normal termination is by SIGTERM, which instructs the walsender to
21 * close the connection and exit(0) at the next convenient moment. Emergency
22 * termination is by SIGQUIT; like any backend, the walsender will simply
23 * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24 * are treated as not a crash but approximately normal termination;
25 * the walsender will exit quickly without sending any more XLOG records.
26 *
27 * If the server is shut down, checkpointer sends us
28 * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29 * the backend is idle or runs an SQL query this causes the backend to
30 * shutdown, if logical replication is in progress all existing WAL records
31 * are processed followed by a shutdown. Otherwise this causes the walsender
32 * to switch to the "stopping" state. In this state, the walsender will reject
33 * any further replication commands. The checkpointer begins the shutdown
34 * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35 * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36 * walsender to send any outstanding WAL, including the shutdown checkpoint
37 * record, wait for it to be replicated to the standby, and then exit.
38 *
39 *
40 * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
41 *
42 * IDENTIFICATION
43 * src/backend/replication/walsender.c
44 *
45 *-------------------------------------------------------------------------
46 */
47 #include "postgres.h"
48
49 #include <signal.h>
50 #include <unistd.h>
51
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogutils.h"
58
59 #include "catalog/pg_authid.h"
60 #include "catalog/pg_type.h"
61 #include "commands/dbcommands.h"
62 #include "commands/defrem.h"
63 #include "funcapi.h"
64 #include "libpq/libpq.h"
65 #include "libpq/pqformat.h"
66 #include "miscadmin.h"
67 #include "nodes/replnodes.h"
68 #include "pgstat.h"
69 #include "replication/basebackup.h"
70 #include "replication/decode.h"
71 #include "replication/logical.h"
72 #include "replication/logicalfuncs.h"
73 #include "replication/slot.h"
74 #include "replication/snapbuild.h"
75 #include "replication/syncrep.h"
76 #include "replication/walreceiver.h"
77 #include "replication/walsender.h"
78 #include "replication/walsender_private.h"
79 #include "storage/condition_variable.h"
80 #include "storage/fd.h"
81 #include "storage/ipc.h"
82 #include "storage/pmsignal.h"
83 #include "storage/proc.h"
84 #include "storage/procarray.h"
85 #include "tcop/dest.h"
86 #include "tcop/tcopprot.h"
87 #include "utils/builtins.h"
88 #include "utils/guc.h"
89 #include "utils/memutils.h"
90 #include "utils/pg_lsn.h"
91 #include "utils/portal.h"
92 #include "utils/ps_status.h"
93 #include "utils/resowner.h"
94 #include "utils/timeout.h"
95 #include "utils/timestamp.h"
96
97 /*
98 * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
99 *
100 * We don't have a good idea of what a good value would be; there's some
101 * overhead per message in both walsender and walreceiver, but on the other
102 * hand sending large batches makes walsender less responsive to signals
103 * because signals are checked only between messages. 128kB (with
104 * default 8k blocks) seems like a reasonable guess for now.
105 */
106 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
107
108 /* Array of WalSnds in shared memory */
109 WalSndCtlData *WalSndCtl = NULL;
110
111 /* My slot in the shared memory array */
112 WalSnd *MyWalSnd = NULL;
113
114 /* Global state */
115 bool am_walsender = false; /* Am I a walsender process? */
116 bool am_cascading_walsender = false; /* Am I cascading WAL to another
117 * standby? */
118 bool am_db_walsender = false; /* Connected to a database? */
119
120 /* User-settable parameters for walsender */
121 int max_wal_senders = 0; /* the maximum number of concurrent
122 * walsenders */
123 int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
124 * data message */
125 bool log_replication_commands = false;
126
127 /*
128 * State for WalSndWakeupRequest
129 */
130 bool wake_wal_senders = false;
131
132 /*
133 * These variables are used similarly to openLogFile/SegNo/Off,
134 * but for walsender to read the XLOG.
135 */
136 static int sendFile = -1;
137 static XLogSegNo sendSegNo = 0;
138 static uint32 sendOff = 0;
139
140 /* Timeline ID of the currently open file */
141 static TimeLineID curFileTimeLine = 0;
142
143 /*
144 * These variables keep track of the state of the timeline we're currently
145 * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
146 * the timeline is not the latest timeline on this server, and the server's
147 * history forked off from that timeline at sendTimeLineValidUpto.
148 */
149 static TimeLineID sendTimeLine = 0;
150 static TimeLineID sendTimeLineNextTLI = 0;
151 static bool sendTimeLineIsHistoric = false;
152 static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
153
154 /*
155 * How far have we sent WAL already? This is also advertised in
156 * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
157 */
158 static XLogRecPtr sentPtr = InvalidXLogRecPtr;
159
160 /* Buffers for constructing outgoing messages and processing reply messages. */
161 static StringInfoData output_message;
162 static StringInfoData reply_message;
163 static StringInfoData tmpbuf;
164
165 /* Timestamp of last ProcessRepliesIfAny(). */
166 static TimestampTz last_processing = 0;
167
168 /*
169 * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
170 * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
171 */
172 static TimestampTz last_reply_timestamp = 0;
173
174 /* Have we sent a heartbeat message asking for reply, since last reply? */
175 static bool waiting_for_ping_response = false;
176
177 /*
178 * While streaming WAL in Copy mode, streamingDoneSending is set to true
179 * after we have sent CopyDone. We should not send any more CopyData messages
180 * after that. streamingDoneReceiving is set to true when we receive CopyDone
181 * from the other end. When both become true, it's time to exit Copy mode.
182 */
183 static bool streamingDoneSending;
184 static bool streamingDoneReceiving;
185
186 /* Are we there yet? */
187 static bool WalSndCaughtUp = false;
188
189 /* Flags set by signal handlers for later service in main loop */
190 static volatile sig_atomic_t got_SIGUSR2 = false;
191 static volatile sig_atomic_t got_STOPPING = false;
192
193 /*
194 * This is set while we are streaming. When not set
195 * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
196 * the main loop is responsible for checking got_STOPPING and terminating when
197 * it's set (after streaming any remaining WAL).
198 */
199 static volatile sig_atomic_t replication_active = false;
200
201 static LogicalDecodingContext *logical_decoding_ctx = NULL;
202 static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
203
204 /* A sample associating a WAL location with the time it was written. */
205 typedef struct
206 {
207 XLogRecPtr lsn;
208 TimestampTz time;
209 } WalTimeSample;
210
211 /* The size of our buffer of time samples. */
212 #define LAG_TRACKER_BUFFER_SIZE 8192
213
214 /* A mechanism for tracking replication lag. */
215 static struct
216 {
217 XLogRecPtr last_lsn;
218 WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE];
219 int write_head;
220 int read_heads[NUM_SYNC_REP_WAIT_MODE];
221 WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
222 } LagTracker;
223
224 /* Signal handlers */
225 static void WalSndLastCycleHandler(SIGNAL_ARGS);
226
227 /* Prototypes for private functions */
228 typedef void (*WalSndSendDataCallback) (void);
229 static void WalSndLoop(WalSndSendDataCallback send_data);
230 static void InitWalSenderSlot(void);
231 static void WalSndKill(int code, Datum arg);
232 static void WalSndShutdown(void) pg_attribute_noreturn();
233 static void XLogSendPhysical(void);
234 static void XLogSendLogical(void);
235 static void WalSndDone(WalSndSendDataCallback send_data);
236 static XLogRecPtr GetStandbyFlushRecPtr(void);
237 static void IdentifySystem(void);
238 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
239 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
240 static void StartReplication(StartReplicationCmd *cmd);
241 static void StartLogicalReplication(StartReplicationCmd *cmd);
242 static void ProcessStandbyMessage(void);
243 static void ProcessStandbyReplyMessage(void);
244 static void ProcessStandbyHSFeedbackMessage(void);
245 static void ProcessRepliesIfAny(void);
246 static void WalSndKeepalive(bool requestReply);
247 static void WalSndKeepaliveIfNecessary(void);
248 static void WalSndCheckTimeOut(void);
249 static long WalSndComputeSleeptime(TimestampTz now);
250 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
251 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
252 static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
253 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
254 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
255 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
256 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
257
258 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
259
260
261 /* Initialize walsender process before entering the main command loop */
262 void
InitWalSender(void)263 InitWalSender(void)
264 {
265 am_cascading_walsender = RecoveryInProgress();
266
267 /* Create a per-walsender data structure in shared memory */
268 InitWalSenderSlot();
269
270 /* Set up resource owner */
271 CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
272
273 /*
274 * Let postmaster know that we're a WAL sender. Once we've declared us as
275 * a WAL sender process, postmaster will let us outlive the bgwriter and
276 * kill us last in the shutdown sequence, so we get a chance to stream all
277 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
278 * there's no going back, and we mustn't write any WAL records after this.
279 */
280 MarkPostmasterChildWalSender();
281 SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
282
283 /* Initialize empty timestamp buffer for lag tracking. */
284 memset(&LagTracker, 0, sizeof(LagTracker));
285 }
286
287 /*
288 * Clean up after an error.
289 *
290 * WAL sender processes don't use transactions like regular backends do.
291 * This function does any cleanup required after an error in a WAL sender
292 * process, similar to what transaction abort does in a regular backend.
293 */
294 void
WalSndErrorCleanup(void)295 WalSndErrorCleanup(void)
296 {
297 LWLockReleaseAll();
298 ConditionVariableCancelSleep();
299 pgstat_report_wait_end();
300
301 if (sendFile >= 0)
302 {
303 close(sendFile);
304 sendFile = -1;
305 }
306
307 if (MyReplicationSlot != NULL)
308 ReplicationSlotRelease();
309
310 ReplicationSlotCleanup();
311
312 replication_active = false;
313
314 if (got_STOPPING || got_SIGUSR2)
315 proc_exit(0);
316
317 /* Revert back to startup state */
318 WalSndSetState(WALSNDSTATE_STARTUP);
319 }
320
321 /*
322 * Handle a client's connection abort in an orderly manner.
323 */
324 static void
WalSndShutdown(void)325 WalSndShutdown(void)
326 {
327 /*
328 * Reset whereToSendOutput to prevent ereport from attempting to send any
329 * more messages to the standby.
330 */
331 if (whereToSendOutput == DestRemote)
332 whereToSendOutput = DestNone;
333
334 proc_exit(0);
335 abort(); /* keep the compiler quiet */
336 }
337
338 /*
339 * Handle the IDENTIFY_SYSTEM command.
340 */
341 static void
IdentifySystem(void)342 IdentifySystem(void)
343 {
344 char sysid[32];
345 char xloc[MAXFNAMELEN];
346 XLogRecPtr logptr;
347 char *dbname = NULL;
348 DestReceiver *dest;
349 TupOutputState *tstate;
350 TupleDesc tupdesc;
351 Datum values[4];
352 bool nulls[4];
353
354 /*
355 * Reply with a result set with one row, four columns. First col is system
356 * ID, second is timeline ID, third is current xlog location and the
357 * fourth contains the database name if we are connected to one.
358 */
359
360 snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
361 GetSystemIdentifier());
362
363 am_cascading_walsender = RecoveryInProgress();
364 if (am_cascading_walsender)
365 {
366 /* this also updates ThisTimeLineID */
367 logptr = GetStandbyFlushRecPtr();
368 }
369 else
370 logptr = GetFlushRecPtr();
371
372 snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
373
374 if (MyDatabaseId != InvalidOid)
375 {
376 MemoryContext cur = CurrentMemoryContext;
377
378 /* syscache access needs a transaction env. */
379 StartTransactionCommand();
380 /* make dbname live outside TX context */
381 MemoryContextSwitchTo(cur);
382 dbname = get_database_name(MyDatabaseId);
383 CommitTransactionCommand();
384 /* CommitTransactionCommand switches to TopMemoryContext */
385 MemoryContextSwitchTo(cur);
386 }
387
388 dest = CreateDestReceiver(DestRemoteSimple);
389 MemSet(nulls, false, sizeof(nulls));
390
391 /* need a tuple descriptor representing four columns */
392 tupdesc = CreateTemplateTupleDesc(4, false);
393 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
394 TEXTOID, -1, 0);
395 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
396 INT4OID, -1, 0);
397 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
398 TEXTOID, -1, 0);
399 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
400 TEXTOID, -1, 0);
401
402 /* prepare for projection of tuples */
403 tstate = begin_tup_output_tupdesc(dest, tupdesc);
404
405 /* column 1: system identifier */
406 values[0] = CStringGetTextDatum(sysid);
407
408 /* column 2: timeline */
409 values[1] = Int32GetDatum(ThisTimeLineID);
410
411 /* column 3: wal location */
412 values[2] = CStringGetTextDatum(xloc);
413
414 /* column 4: database name, or NULL if none */
415 if (dbname)
416 values[3] = CStringGetTextDatum(dbname);
417 else
418 nulls[3] = true;
419
420 /* send it to dest */
421 do_tup_output(tstate, values, nulls);
422
423 end_tup_output(tstate);
424 }
425
426
427 /*
428 * Handle TIMELINE_HISTORY command.
429 */
430 static void
SendTimeLineHistory(TimeLineHistoryCmd * cmd)431 SendTimeLineHistory(TimeLineHistoryCmd *cmd)
432 {
433 StringInfoData buf;
434 char histfname[MAXFNAMELEN];
435 char path[MAXPGPATH];
436 int fd;
437 off_t histfilelen;
438 off_t bytesleft;
439 Size len;
440
441 /*
442 * Reply with a result set with one row, and two columns. The first col is
443 * the name of the history file, 2nd is the contents.
444 */
445
446 TLHistoryFileName(histfname, cmd->timeline);
447 TLHistoryFilePath(path, cmd->timeline);
448
449 /* Send a RowDescription message */
450 pq_beginmessage(&buf, 'T');
451 pq_sendint(&buf, 2, 2); /* 2 fields */
452
453 /* first field */
454 pq_sendstring(&buf, "filename"); /* col name */
455 pq_sendint(&buf, 0, 4); /* table oid */
456 pq_sendint(&buf, 0, 2); /* attnum */
457 pq_sendint(&buf, TEXTOID, 4); /* type oid */
458 pq_sendint(&buf, -1, 2); /* typlen */
459 pq_sendint(&buf, 0, 4); /* typmod */
460 pq_sendint(&buf, 0, 2); /* format code */
461
462 /* second field */
463 pq_sendstring(&buf, "content"); /* col name */
464 pq_sendint(&buf, 0, 4); /* table oid */
465 pq_sendint(&buf, 0, 2); /* attnum */
466 /*
467 * While this is labeled as BYTEAOID, it is the same output format
468 * as TEXTOID above.
469 */
470 pq_sendint(&buf, BYTEAOID, 4); /* type oid */
471 pq_sendint(&buf, -1, 2); /* typlen */
472 pq_sendint(&buf, 0, 4); /* typmod */
473 pq_sendint(&buf, 0, 2); /* format code */
474 pq_endmessage(&buf);
475
476 /* Send a DataRow message */
477 pq_beginmessage(&buf, 'D');
478 pq_sendint(&buf, 2, 2); /* # of columns */
479 len = strlen(histfname);
480 pq_sendint(&buf, len, 4); /* col1 len */
481 pq_sendbytes(&buf, histfname, len);
482
483 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
484 if (fd < 0)
485 ereport(ERROR,
486 (errcode_for_file_access(),
487 errmsg("could not open file \"%s\": %m", path)));
488
489 /* Determine file length and send it to client */
490 histfilelen = lseek(fd, 0, SEEK_END);
491 if (histfilelen < 0)
492 ereport(ERROR,
493 (errcode_for_file_access(),
494 errmsg("could not seek to end of file \"%s\": %m", path)));
495 if (lseek(fd, 0, SEEK_SET) != 0)
496 ereport(ERROR,
497 (errcode_for_file_access(),
498 errmsg("could not seek to beginning of file \"%s\": %m", path)));
499
500 pq_sendint(&buf, histfilelen, 4); /* col2 len */
501
502 bytesleft = histfilelen;
503 while (bytesleft > 0)
504 {
505 PGAlignedBlock rbuf;
506 int nread;
507
508 pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
509 nread = read(fd, rbuf.data, sizeof(rbuf));
510 pgstat_report_wait_end();
511 if (nread <= 0)
512 ereport(ERROR,
513 (errcode_for_file_access(),
514 errmsg("could not read file \"%s\": %m",
515 path)));
516 pq_sendbytes(&buf, rbuf.data, nread);
517 bytesleft -= nread;
518 }
519 CloseTransientFile(fd);
520
521 pq_endmessage(&buf);
522 }
523
524 /*
525 * Handle START_REPLICATION command.
526 *
527 * At the moment, this never returns, but an ereport(ERROR) will take us back
528 * to the main loop.
529 */
530 static void
StartReplication(StartReplicationCmd * cmd)531 StartReplication(StartReplicationCmd *cmd)
532 {
533 StringInfoData buf;
534 XLogRecPtr FlushPtr;
535
536 if (ThisTimeLineID == 0)
537 ereport(ERROR,
538 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
539 errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
540
541 /*
542 * We assume here that we're logging enough information in the WAL for
543 * log-shipping, since this is checked in PostmasterMain().
544 *
545 * NOTE: wal_level can only change at shutdown, so in most cases it is
546 * difficult for there to be WAL data that we can still see that was
547 * written at wal_level='minimal'.
548 */
549
550 if (cmd->slotname)
551 {
552 ReplicationSlotAcquire(cmd->slotname, true);
553 if (SlotIsLogical(MyReplicationSlot))
554 ereport(ERROR,
555 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
556 (errmsg("cannot use a logical replication slot for physical replication"))));
557 }
558
559 /*
560 * Select the timeline. If it was given explicitly by the client, use
561 * that. Otherwise use the timeline of the last replayed record, which is
562 * kept in ThisTimeLineID.
563 */
564 if (am_cascading_walsender)
565 {
566 /* this also updates ThisTimeLineID */
567 FlushPtr = GetStandbyFlushRecPtr();
568 }
569 else
570 FlushPtr = GetFlushRecPtr();
571
572 if (cmd->timeline != 0)
573 {
574 XLogRecPtr switchpoint;
575
576 sendTimeLine = cmd->timeline;
577 if (sendTimeLine == ThisTimeLineID)
578 {
579 sendTimeLineIsHistoric = false;
580 sendTimeLineValidUpto = InvalidXLogRecPtr;
581 }
582 else
583 {
584 List *timeLineHistory;
585
586 sendTimeLineIsHistoric = true;
587
588 /*
589 * Check that the timeline the client requested exists, and the
590 * requested start location is on that timeline.
591 */
592 timeLineHistory = readTimeLineHistory(ThisTimeLineID);
593 switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
594 &sendTimeLineNextTLI);
595 list_free_deep(timeLineHistory);
596
597 /*
598 * Found the requested timeline in the history. Check that
599 * requested startpoint is on that timeline in our history.
600 *
601 * This is quite loose on purpose. We only check that we didn't
602 * fork off the requested timeline before the switchpoint. We
603 * don't check that we switched *to* it before the requested
604 * starting point. This is because the client can legitimately
605 * request to start replication from the beginning of the WAL
606 * segment that contains switchpoint, but on the new timeline, so
607 * that it doesn't end up with a partial segment. If you ask for
608 * too old a starting point, you'll get an error later when we
609 * fail to find the requested WAL segment in pg_wal.
610 *
611 * XXX: we could be more strict here and only allow a startpoint
612 * that's older than the switchpoint, if it's still in the same
613 * WAL segment.
614 */
615 if (!XLogRecPtrIsInvalid(switchpoint) &&
616 switchpoint < cmd->startpoint)
617 {
618 ereport(ERROR,
619 (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
620 (uint32) (cmd->startpoint >> 32),
621 (uint32) (cmd->startpoint),
622 cmd->timeline),
623 errdetail("This server's history forked from timeline %u at %X/%X.",
624 cmd->timeline,
625 (uint32) (switchpoint >> 32),
626 (uint32) (switchpoint))));
627 }
628 sendTimeLineValidUpto = switchpoint;
629 }
630 }
631 else
632 {
633 sendTimeLine = ThisTimeLineID;
634 sendTimeLineValidUpto = InvalidXLogRecPtr;
635 sendTimeLineIsHistoric = false;
636 }
637
638 streamingDoneSending = streamingDoneReceiving = false;
639
640 /* If there is nothing to stream, don't even enter COPY mode */
641 if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
642 {
643 /*
644 * When we first start replication the standby will be behind the
645 * primary. For some applications, for example synchronous
646 * replication, it is important to have a clear state for this initial
647 * catchup mode, so we can trigger actions when we change streaming
648 * state later. We may stay in this state for a long time, which is
649 * exactly why we want to be able to monitor whether or not we are
650 * still here.
651 */
652 WalSndSetState(WALSNDSTATE_CATCHUP);
653
654 /* Send a CopyBothResponse message, and start streaming */
655 pq_beginmessage(&buf, 'W');
656 pq_sendbyte(&buf, 0);
657 pq_sendint(&buf, 0, 2);
658 pq_endmessage(&buf);
659 pq_flush();
660
661 /*
662 * Don't allow a request to stream from a future point in WAL that
663 * hasn't been flushed to disk in this server yet.
664 */
665 if (FlushPtr < cmd->startpoint)
666 {
667 ereport(ERROR,
668 (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
669 (uint32) (cmd->startpoint >> 32),
670 (uint32) (cmd->startpoint),
671 (uint32) (FlushPtr >> 32),
672 (uint32) (FlushPtr))));
673 }
674
675 /* Start streaming from the requested point */
676 sentPtr = cmd->startpoint;
677
678 /* Initialize shared memory status, too */
679 SpinLockAcquire(&MyWalSnd->mutex);
680 MyWalSnd->sentPtr = sentPtr;
681 SpinLockRelease(&MyWalSnd->mutex);
682
683 SyncRepInitConfig();
684
685 /* Main loop of walsender */
686 replication_active = true;
687
688 WalSndLoop(XLogSendPhysical);
689
690 replication_active = false;
691 if (got_STOPPING)
692 proc_exit(0);
693 WalSndSetState(WALSNDSTATE_STARTUP);
694
695 Assert(streamingDoneSending && streamingDoneReceiving);
696 }
697
698 if (cmd->slotname)
699 ReplicationSlotRelease();
700
701 /*
702 * Copy is finished now. Send a single-row result set indicating the next
703 * timeline.
704 */
705 if (sendTimeLineIsHistoric)
706 {
707 char startpos_str[8 + 1 + 8 + 1];
708 DestReceiver *dest;
709 TupOutputState *tstate;
710 TupleDesc tupdesc;
711 Datum values[2];
712 bool nulls[2];
713
714 snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
715 (uint32) (sendTimeLineValidUpto >> 32),
716 (uint32) sendTimeLineValidUpto);
717
718 dest = CreateDestReceiver(DestRemoteSimple);
719 MemSet(nulls, false, sizeof(nulls));
720
721 /*
722 * Need a tuple descriptor representing two columns. int8 may seem
723 * like a surprising data type for this, but in theory int4 would not
724 * be wide enough for this, as TimeLineID is unsigned.
725 */
726 tupdesc = CreateTemplateTupleDesc(2, false);
727 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
728 INT8OID, -1, 0);
729 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
730 TEXTOID, -1, 0);
731
732 /* prepare for projection of tuple */
733 tstate = begin_tup_output_tupdesc(dest, tupdesc);
734
735 values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
736 values[1] = CStringGetTextDatum(startpos_str);
737
738 /* send it to dest */
739 do_tup_output(tstate, values, nulls);
740
741 end_tup_output(tstate);
742 }
743
744 /* Send CommandComplete message */
745 pq_puttextmessage('C', "START_STREAMING");
746 }
747
748 /*
749 * read_page callback for logical decoding contexts, as a walsender process.
750 *
751 * Inside the walsender we can do better than logical_read_local_xlog_page,
752 * which has to do a plain sleep/busy loop, because the walsender's latch gets
753 * set every time WAL is flushed.
754 */
755 static int
logical_read_xlog_page(XLogReaderState * state,XLogRecPtr targetPagePtr,int reqLen,XLogRecPtr targetRecPtr,char * cur_page,TimeLineID * pageTLI)756 logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
757 XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
758 {
759 XLogRecPtr flushptr;
760 int count;
761
762 XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
763 sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
764 sendTimeLine = state->currTLI;
765 sendTimeLineValidUpto = state->currTLIValidUntil;
766 sendTimeLineNextTLI = state->nextTLI;
767
768 /* make sure we have enough WAL available */
769 flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
770
771 /* fail if not (implies we are going to shut down) */
772 if (flushptr < targetPagePtr + reqLen)
773 return -1;
774
775 if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
776 count = XLOG_BLCKSZ; /* more than one block available */
777 else
778 count = flushptr - targetPagePtr; /* part of the page available */
779
780 /* now actually read the data, we know it's there */
781 XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
782
783 return count;
784 }
785
786 /*
787 * Process extra options given to CREATE_REPLICATION_SLOT.
788 */
789 static void
parseCreateReplSlotOptions(CreateReplicationSlotCmd * cmd,bool * reserve_wal,CRSSnapshotAction * snapshot_action)790 parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
791 bool *reserve_wal,
792 CRSSnapshotAction *snapshot_action)
793 {
794 ListCell *lc;
795 bool snapshot_action_given = false;
796 bool reserve_wal_given = false;
797
798 /* Parse options */
799 foreach(lc, cmd->options)
800 {
801 DefElem *defel = (DefElem *) lfirst(lc);
802
803 if (strcmp(defel->defname, "export_snapshot") == 0)
804 {
805 if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
806 ereport(ERROR,
807 (errcode(ERRCODE_SYNTAX_ERROR),
808 errmsg("conflicting or redundant options")));
809
810 snapshot_action_given = true;
811 *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
812 CRS_NOEXPORT_SNAPSHOT;
813 }
814 else if (strcmp(defel->defname, "use_snapshot") == 0)
815 {
816 if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
817 ereport(ERROR,
818 (errcode(ERRCODE_SYNTAX_ERROR),
819 errmsg("conflicting or redundant options")));
820
821 snapshot_action_given = true;
822 *snapshot_action = CRS_USE_SNAPSHOT;
823 }
824 else if (strcmp(defel->defname, "reserve_wal") == 0)
825 {
826 if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
827 ereport(ERROR,
828 (errcode(ERRCODE_SYNTAX_ERROR),
829 errmsg("conflicting or redundant options")));
830
831 reserve_wal_given = true;
832 *reserve_wal = true;
833 }
834 else
835 elog(ERROR, "unrecognized option: %s", defel->defname);
836 }
837 }
838
839 /*
840 * Create a new replication slot.
841 */
842 static void
CreateReplicationSlot(CreateReplicationSlotCmd * cmd)843 CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
844 {
845 const char *snapshot_name = NULL;
846 char xloc[MAXFNAMELEN];
847 char *slot_name;
848 bool reserve_wal = false;
849 CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
850 DestReceiver *dest;
851 TupOutputState *tstate;
852 TupleDesc tupdesc;
853 Datum values[4];
854 bool nulls[4];
855
856 Assert(!MyReplicationSlot);
857
858 parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
859
860 /* setup state for XLogReadPage */
861 sendTimeLineIsHistoric = false;
862 sendTimeLine = ThisTimeLineID;
863
864 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
865 {
866 ReplicationSlotCreate(cmd->slotname, false,
867 cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT);
868 }
869 else
870 {
871 CheckLogicalDecodingRequirements();
872
873 /*
874 * Initially create persistent slot as ephemeral - that allows us to
875 * nicely handle errors during initialization because it'll get
876 * dropped if this transaction fails. We'll make it persistent at the
877 * end. Temporary slots can be created as temporary from beginning as
878 * they get dropped on error as well.
879 */
880 ReplicationSlotCreate(cmd->slotname, true,
881 cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
882 }
883
884 if (cmd->kind == REPLICATION_KIND_LOGICAL)
885 {
886 LogicalDecodingContext *ctx;
887 bool need_full_snapshot = false;
888
889 /*
890 * Do options check early so that we can bail before calling the
891 * DecodingContextFindStartpoint which can take long time.
892 */
893 if (snapshot_action == CRS_EXPORT_SNAPSHOT)
894 {
895 if (IsTransactionBlock())
896 ereport(ERROR,
897 (errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
898 "must not be called inside a transaction")));
899
900 need_full_snapshot = true;
901 }
902 else if (snapshot_action == CRS_USE_SNAPSHOT)
903 {
904 if (!IsTransactionBlock())
905 ereport(ERROR,
906 (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
907 "must be called inside a transaction")));
908
909 if (XactIsoLevel != XACT_REPEATABLE_READ)
910 ereport(ERROR,
911 (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
912 "must be called in REPEATABLE READ isolation mode transaction")));
913
914 if (FirstSnapshotSet)
915 ereport(ERROR,
916 (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
917 "must be called before any query")));
918
919 if (IsSubTransaction())
920 ereport(ERROR,
921 (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
922 "must not be called in a subtransaction")));
923
924 need_full_snapshot = true;
925 }
926
927 ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
928 logical_read_xlog_page,
929 WalSndPrepareWrite, WalSndWriteData,
930 WalSndUpdateProgress);
931
932 /*
933 * Signal that we don't need the timeout mechanism. We're just
934 * creating the replication slot and don't yet accept feedback
935 * messages or send keepalives. As we possibly need to wait for
936 * further WAL the walsender would otherwise possibly be killed too
937 * soon.
938 */
939 last_reply_timestamp = 0;
940
941 /* build initial snapshot, might take a while */
942 DecodingContextFindStartpoint(ctx);
943
944 /*
945 * Export or use the snapshot if we've been asked to do so.
946 *
947 * NB. We will convert the snapbuild.c kind of snapshot to normal
948 * snapshot when doing this.
949 */
950 if (snapshot_action == CRS_EXPORT_SNAPSHOT)
951 {
952 snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
953 }
954 else if (snapshot_action == CRS_USE_SNAPSHOT)
955 {
956 Snapshot snap;
957
958 snap = SnapBuildInitialSnapshot(ctx->snapshot_builder);
959 RestoreTransactionSnapshot(snap, MyProc);
960 }
961
962 /* don't need the decoding context anymore */
963 FreeDecodingContext(ctx);
964
965 if (!cmd->temporary)
966 ReplicationSlotPersist();
967 }
968 else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
969 {
970 ReplicationSlotReserveWal();
971
972 ReplicationSlotMarkDirty();
973
974 /* Write this slot to disk if it's a permanent one. */
975 if (!cmd->temporary)
976 ReplicationSlotSave();
977 }
978
979 snprintf(xloc, sizeof(xloc), "%X/%X",
980 (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
981 (uint32) MyReplicationSlot->data.confirmed_flush);
982
983 dest = CreateDestReceiver(DestRemoteSimple);
984 MemSet(nulls, false, sizeof(nulls));
985
986 /*----------
987 * Need a tuple descriptor representing four columns:
988 * - first field: the slot name
989 * - second field: LSN at which we became consistent
990 * - third field: exported snapshot's name
991 * - fourth field: output plugin
992 *----------
993 */
994 tupdesc = CreateTemplateTupleDesc(4, false);
995 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
996 TEXTOID, -1, 0);
997 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
998 TEXTOID, -1, 0);
999 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1000 TEXTOID, -1, 0);
1001 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1002 TEXTOID, -1, 0);
1003
1004 /* prepare for projection of tuples */
1005 tstate = begin_tup_output_tupdesc(dest, tupdesc);
1006
1007 /* slot_name */
1008 slot_name = NameStr(MyReplicationSlot->data.name);
1009 values[0] = CStringGetTextDatum(slot_name);
1010
1011 /* consistent wal location */
1012 values[1] = CStringGetTextDatum(xloc);
1013
1014 /* snapshot name, or NULL if none */
1015 if (snapshot_name != NULL)
1016 values[2] = CStringGetTextDatum(snapshot_name);
1017 else
1018 nulls[2] = true;
1019
1020 /* plugin, or NULL if none */
1021 if (cmd->plugin != NULL)
1022 values[3] = CStringGetTextDatum(cmd->plugin);
1023 else
1024 nulls[3] = true;
1025
1026 /* send it to dest */
1027 do_tup_output(tstate, values, nulls);
1028 end_tup_output(tstate);
1029
1030 ReplicationSlotRelease();
1031 }
1032
1033 /*
1034 * Get rid of a replication slot that is no longer wanted.
1035 */
1036 static void
DropReplicationSlot(DropReplicationSlotCmd * cmd)1037 DropReplicationSlot(DropReplicationSlotCmd *cmd)
1038 {
1039 ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1040 EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1041 }
1042
1043 /*
1044 * Load previously initiated logical slot and prepare for sending data (via
1045 * WalSndLoop).
1046 */
1047 static void
StartLogicalReplication(StartReplicationCmd * cmd)1048 StartLogicalReplication(StartReplicationCmd *cmd)
1049 {
1050 StringInfoData buf;
1051
1052 /* make sure that our requirements are still fulfilled */
1053 CheckLogicalDecodingRequirements();
1054
1055 Assert(!MyReplicationSlot);
1056
1057 ReplicationSlotAcquire(cmd->slotname, true);
1058
1059 /*
1060 * Force a disconnect, so that the decoding code doesn't need to care
1061 * about an eventual switch from running in recovery, to running in a
1062 * normal environment. Client code is expected to handle reconnects.
1063 */
1064 if (am_cascading_walsender && !RecoveryInProgress())
1065 {
1066 ereport(LOG,
1067 (errmsg("terminating walsender process after promotion")));
1068 got_STOPPING = true;
1069 }
1070
1071 WalSndSetState(WALSNDSTATE_CATCHUP);
1072
1073 /* Send a CopyBothResponse message, and start streaming */
1074 pq_beginmessage(&buf, 'W');
1075 pq_sendbyte(&buf, 0);
1076 pq_sendint(&buf, 0, 2);
1077 pq_endmessage(&buf);
1078 pq_flush();
1079
1080 /*
1081 * Initialize position to the last ack'ed one, then the xlog records begin
1082 * to be shipped from that position.
1083 */
1084 logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
1085 logical_read_xlog_page,
1086 WalSndPrepareWrite,
1087 WalSndWriteData,
1088 WalSndUpdateProgress);
1089
1090 /* Start reading WAL from the oldest required WAL. */
1091 logical_startptr = MyReplicationSlot->data.restart_lsn;
1092
1093 /*
1094 * Report the location after which we'll send out further commits as the
1095 * current sentPtr.
1096 */
1097 sentPtr = MyReplicationSlot->data.confirmed_flush;
1098
1099 /* Also update the sent position status in shared memory */
1100 SpinLockAcquire(&MyWalSnd->mutex);
1101 MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn;
1102 SpinLockRelease(&MyWalSnd->mutex);
1103
1104 replication_active = true;
1105
1106 SyncRepInitConfig();
1107
1108 /* Main loop of walsender */
1109 WalSndLoop(XLogSendLogical);
1110
1111 FreeDecodingContext(logical_decoding_ctx);
1112 ReplicationSlotRelease();
1113
1114 replication_active = false;
1115 if (got_STOPPING)
1116 proc_exit(0);
1117 WalSndSetState(WALSNDSTATE_STARTUP);
1118
1119 /* Get out of COPY mode (CommandComplete). */
1120 EndCommand("COPY 0", DestRemote);
1121 }
1122
1123 /*
1124 * LogicalDecodingContext 'prepare_write' callback.
1125 *
1126 * Prepare a write into a StringInfo.
1127 *
1128 * Don't do anything lasting in here, it's quite possible that nothing will be done
1129 * with the data.
1130 */
1131 static void
WalSndPrepareWrite(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid,bool last_write)1132 WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
1133 {
1134 /* can't have sync rep confused by sending the same LSN several times */
1135 if (!last_write)
1136 lsn = InvalidXLogRecPtr;
1137
1138 resetStringInfo(ctx->out);
1139
1140 pq_sendbyte(ctx->out, 'w');
1141 pq_sendint64(ctx->out, lsn); /* dataStart */
1142 pq_sendint64(ctx->out, lsn); /* walEnd */
1143
1144 /*
1145 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1146 * reserve space here.
1147 */
1148 pq_sendint64(ctx->out, 0); /* sendtime */
1149 }
1150
1151 /*
1152 * LogicalDecodingContext 'write' callback.
1153 *
1154 * Actually write out data previously prepared by WalSndPrepareWrite out to
1155 * the network. Take as long as needed, but process replies from the other
1156 * side and check timeouts during that.
1157 */
1158 static void
WalSndWriteData(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid,bool last_write)1159 WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
1160 bool last_write)
1161 {
1162 TimestampTz now;
1163
1164 /*
1165 * Fill the send timestamp last, so that it is taken as late as possible.
1166 * This is somewhat ugly, but the protocol is set as it's already used for
1167 * several releases by streaming physical replication.
1168 */
1169 resetStringInfo(&tmpbuf);
1170 now = GetCurrentTimestamp();
1171 pq_sendint64(&tmpbuf, now);
1172 memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1173 tmpbuf.data, sizeof(int64));
1174
1175 /* output previously gathered data in a CopyData packet */
1176 pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1177
1178 CHECK_FOR_INTERRUPTS();
1179
1180 /* Try to flush pending output to the client */
1181 if (pq_flush_if_writable() != 0)
1182 WalSndShutdown();
1183
1184 /* Try taking fast path unless we get too close to walsender timeout. */
1185 if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
1186 wal_sender_timeout / 2) &&
1187 !pq_is_send_pending())
1188 {
1189 return;
1190 }
1191
1192 /* If we have pending write here, go to slow path */
1193 for (;;)
1194 {
1195 int wakeEvents;
1196 long sleeptime;
1197
1198 /* Check for input from the client */
1199 ProcessRepliesIfAny();
1200
1201 /* die if timeout was reached */
1202 WalSndCheckTimeOut();
1203
1204 /* Send keepalive if the time has come */
1205 WalSndKeepaliveIfNecessary();
1206
1207 if (!pq_is_send_pending())
1208 break;
1209
1210 sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1211
1212 wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1213 WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
1214
1215 /* Sleep until something happens or we time out */
1216 WaitLatchOrSocket(MyLatch, wakeEvents,
1217 MyProcPort->sock, sleeptime,
1218 WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1219
1220 /*
1221 * Emergency bailout if postmaster has died. This is to avoid the
1222 * necessity for manual cleanup of all postmaster children.
1223 */
1224 if (!PostmasterIsAlive())
1225 exit(1);
1226
1227 /* Clear any already-pending wakeups */
1228 ResetLatch(MyLatch);
1229
1230 CHECK_FOR_INTERRUPTS();
1231
1232 /* Process any requests or signals received recently */
1233 if (ConfigReloadPending)
1234 {
1235 ConfigReloadPending = false;
1236 ProcessConfigFile(PGC_SIGHUP);
1237 SyncRepInitConfig();
1238 }
1239
1240 /* Try to flush pending output to the client */
1241 if (pq_flush_if_writable() != 0)
1242 WalSndShutdown();
1243 }
1244
1245 /* reactivate latch so WalSndLoop knows to continue */
1246 SetLatch(MyLatch);
1247 }
1248
1249 /*
1250 * LogicalDecodingContext 'progress_update' callback.
1251 *
1252 * Write the current position to the log tracker (see XLogSendPhysical).
1253 */
1254 static void
WalSndUpdateProgress(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid)1255 WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
1256 {
1257 static TimestampTz sendTime = 0;
1258 TimestampTz now = GetCurrentTimestamp();
1259
1260 /*
1261 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1262 * avoid flooding the lag tracker when we commit frequently.
1263 */
1264 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1265 if (!TimestampDifferenceExceeds(sendTime, now,
1266 WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1267 return;
1268
1269 LagTrackerWrite(lsn, now);
1270 sendTime = now;
1271 }
1272
1273 /*
1274 * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1275 *
1276 * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1277 * if we detect a shutdown request (either from postmaster or client)
1278 * we will return early, so caller must always check.
1279 */
1280 static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)1281 WalSndWaitForWal(XLogRecPtr loc)
1282 {
1283 int wakeEvents;
1284 static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1285
1286 /*
1287 * Fast path to avoid acquiring the spinlock in case we already know we
1288 * have enough WAL available. This is particularly interesting if we're
1289 * far behind.
1290 */
1291 if (RecentFlushPtr != InvalidXLogRecPtr &&
1292 loc <= RecentFlushPtr)
1293 return RecentFlushPtr;
1294
1295 /* Get a more recent flush pointer. */
1296 if (!RecoveryInProgress())
1297 RecentFlushPtr = GetFlushRecPtr();
1298 else
1299 RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1300
1301 for (;;)
1302 {
1303 long sleeptime;
1304
1305 /*
1306 * Emergency bailout if postmaster has died. This is to avoid the
1307 * necessity for manual cleanup of all postmaster children.
1308 */
1309 if (!PostmasterIsAlive())
1310 exit(1);
1311
1312 /* Clear any already-pending wakeups */
1313 ResetLatch(MyLatch);
1314
1315 CHECK_FOR_INTERRUPTS();
1316
1317 /* Process any requests or signals received recently */
1318 if (ConfigReloadPending)
1319 {
1320 ConfigReloadPending = false;
1321 ProcessConfigFile(PGC_SIGHUP);
1322 SyncRepInitConfig();
1323 }
1324
1325 /* Check for input from the client */
1326 ProcessRepliesIfAny();
1327
1328 /*
1329 * If we're shutting down, trigger pending WAL to be written out,
1330 * otherwise we'd possibly end up waiting for WAL that never gets
1331 * written, because walwriter has shut down already.
1332 */
1333 if (got_STOPPING)
1334 XLogBackgroundFlush();
1335
1336 /* Update our idea of the currently flushed position. */
1337 if (!RecoveryInProgress())
1338 RecentFlushPtr = GetFlushRecPtr();
1339 else
1340 RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1341
1342 /*
1343 * If postmaster asked us to stop, don't wait anymore.
1344 *
1345 * It's important to do this check after the recomputation of
1346 * RecentFlushPtr, so we can send all remaining data before shutting
1347 * down.
1348 */
1349 if (got_STOPPING)
1350 break;
1351
1352 /*
1353 * We only send regular messages to the client for full decoded
1354 * transactions, but a synchronous replication and walsender shutdown
1355 * possibly are waiting for a later location. So, before sleeping, we
1356 * send a ping containing the flush location. If the receiver is
1357 * otherwise idle, this keepalive will trigger a reply. Processing the
1358 * reply will update these MyWalSnd locations.
1359 */
1360 if (MyWalSnd->flush < sentPtr &&
1361 MyWalSnd->write < sentPtr &&
1362 !waiting_for_ping_response)
1363 WalSndKeepalive(false);
1364
1365 /* check whether we're done */
1366 if (loc <= RecentFlushPtr)
1367 break;
1368
1369 /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1370 WalSndCaughtUp = true;
1371
1372 /*
1373 * Try to flush any pending output to the client.
1374 */
1375 if (pq_flush_if_writable() != 0)
1376 WalSndShutdown();
1377
1378 /*
1379 * If we have received CopyDone from the client, sent CopyDone
1380 * ourselves, and the output buffer is empty, it's time to exit
1381 * streaming, so fail the current WAL fetch request.
1382 */
1383 if (streamingDoneReceiving && streamingDoneSending &&
1384 !pq_is_send_pending())
1385 break;
1386
1387 /* die if timeout was reached */
1388 WalSndCheckTimeOut();
1389
1390 /* Send keepalive if the time has come */
1391 WalSndKeepaliveIfNecessary();
1392
1393 /*
1394 * Sleep until something happens or we time out. Also wait for the
1395 * socket becoming writable, if there's still pending output.
1396 * Otherwise we might sit on sendable output data while waiting for
1397 * new WAL to be generated. (But if we have nothing to send, we don't
1398 * want to wake on socket-writable.)
1399 */
1400 sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1401
1402 wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1403 WL_SOCKET_READABLE | WL_TIMEOUT;
1404
1405 if (pq_is_send_pending())
1406 wakeEvents |= WL_SOCKET_WRITEABLE;
1407
1408 WaitLatchOrSocket(MyLatch, wakeEvents,
1409 MyProcPort->sock, sleeptime,
1410 WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1411 }
1412
1413 /* reactivate latch so WalSndLoop knows to continue */
1414 SetLatch(MyLatch);
1415 return RecentFlushPtr;
1416 }
1417
1418 /*
1419 * Execute an incoming replication command.
1420 *
1421 * Returns true if the cmd_string was recognized as WalSender command, false
1422 * if not.
1423 */
1424 bool
exec_replication_command(const char * cmd_string)1425 exec_replication_command(const char *cmd_string)
1426 {
1427 int parse_rc;
1428 Node *cmd_node;
1429 MemoryContext cmd_context;
1430 MemoryContext old_context;
1431
1432 /*
1433 * If WAL sender has been told that shutdown is getting close, switch its
1434 * status accordingly to handle the next replication commands correctly.
1435 */
1436 if (got_STOPPING)
1437 WalSndSetState(WALSNDSTATE_STOPPING);
1438
1439 /*
1440 * Throw error if in stopping mode. We need prevent commands that could
1441 * generate WAL while the shutdown checkpoint is being written. To be
1442 * safe, we just prohibit all new commands.
1443 */
1444 if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1445 ereport(ERROR,
1446 (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1447
1448 /*
1449 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1450 * command arrives. Clean up the old stuff if there's anything.
1451 */
1452 SnapBuildClearExportedSnapshot();
1453
1454 CHECK_FOR_INTERRUPTS();
1455
1456 /*
1457 * Parse the command.
1458 */
1459 cmd_context = AllocSetContextCreate(CurrentMemoryContext,
1460 "Replication command context",
1461 ALLOCSET_DEFAULT_SIZES);
1462 old_context = MemoryContextSwitchTo(cmd_context);
1463
1464 replication_scanner_init(cmd_string);
1465 parse_rc = replication_yyparse();
1466 if (parse_rc != 0)
1467 ereport(ERROR,
1468 (errcode(ERRCODE_SYNTAX_ERROR),
1469 errmsg_internal("replication command parser returned %d",
1470 parse_rc)));
1471 replication_scanner_finish();
1472
1473 cmd_node = replication_parse_result;
1474
1475 /*
1476 * If it's a SQL command, just clean up our mess and return false; the
1477 * caller will take care of executing it.
1478 */
1479 if (IsA(cmd_node, SQLCmd))
1480 {
1481 if (MyDatabaseId == InvalidOid)
1482 ereport(ERROR,
1483 (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1484
1485 MemoryContextSwitchTo(old_context);
1486 MemoryContextDelete(cmd_context);
1487
1488 /* Tell the caller that this wasn't a WalSender command. */
1489 return false;
1490 }
1491
1492 /*
1493 * Report query to various monitoring facilities. For this purpose, we
1494 * report replication commands just like SQL commands.
1495 */
1496 debug_query_string = cmd_string;
1497
1498 pgstat_report_activity(STATE_RUNNING, cmd_string);
1499
1500 /*
1501 * Log replication command if log_replication_commands is enabled. Even
1502 * when it's disabled, log the command with DEBUG1 level for backward
1503 * compatibility.
1504 */
1505 ereport(log_replication_commands ? LOG : DEBUG1,
1506 (errmsg("received replication command: %s", cmd_string)));
1507
1508 /*
1509 * Disallow replication commands in aborted transaction blocks.
1510 */
1511 if (IsAbortedTransactionBlockState())
1512 ereport(ERROR,
1513 (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1514 errmsg("current transaction is aborted, "
1515 "commands ignored until end of transaction block")));
1516
1517 CHECK_FOR_INTERRUPTS();
1518
1519 /*
1520 * Allocate buffers that will be used for each outgoing and incoming
1521 * message. We do this just once per command to reduce palloc overhead.
1522 */
1523 initStringInfo(&output_message);
1524 initStringInfo(&reply_message);
1525 initStringInfo(&tmpbuf);
1526
1527 switch (cmd_node->type)
1528 {
1529 case T_IdentifySystemCmd:
1530 IdentifySystem();
1531 break;
1532
1533 case T_BaseBackupCmd:
1534 PreventTransactionChain(true, "BASE_BACKUP");
1535 SendBaseBackup((BaseBackupCmd *) cmd_node);
1536 break;
1537
1538 case T_CreateReplicationSlotCmd:
1539 CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
1540 break;
1541
1542 case T_DropReplicationSlotCmd:
1543 DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
1544 break;
1545
1546 case T_StartReplicationCmd:
1547 {
1548 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1549
1550 PreventTransactionChain(true, "START_REPLICATION");
1551
1552 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1553 StartReplication(cmd);
1554 else
1555 StartLogicalReplication(cmd);
1556 break;
1557 }
1558
1559 case T_TimeLineHistoryCmd:
1560 PreventTransactionChain(true, "TIMELINE_HISTORY");
1561 SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
1562 break;
1563
1564 case T_VariableShowStmt:
1565 {
1566 DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
1567 VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1568
1569 /* syscache access needs a transaction environment */
1570 StartTransactionCommand();
1571 GetPGVariable(n->name, dest);
1572 CommitTransactionCommand();
1573 }
1574 break;
1575
1576 default:
1577 elog(ERROR, "unrecognized replication command node tag: %u",
1578 cmd_node->type);
1579 }
1580
1581 /* done */
1582 MemoryContextSwitchTo(old_context);
1583 MemoryContextDelete(cmd_context);
1584
1585 /* Send CommandComplete message */
1586 EndCommand("SELECT", DestRemote);
1587
1588 /* Report to pgstat that this process is now idle */
1589 pgstat_report_activity(STATE_IDLE, NULL);
1590 debug_query_string = NULL;
1591
1592 return true;
1593 }
1594
1595 /*
1596 * Process any incoming messages while streaming. Also checks if the remote
1597 * end has closed the connection.
1598 */
1599 static void
ProcessRepliesIfAny(void)1600 ProcessRepliesIfAny(void)
1601 {
1602 unsigned char firstchar;
1603 int r;
1604 bool received = false;
1605
1606 last_processing = GetCurrentTimestamp();
1607
1608 /*
1609 * If we already received a CopyDone from the frontend, any subsequent
1610 * message is the beginning of a new command, and should be processed in
1611 * the main processing loop.
1612 */
1613 while (!streamingDoneReceiving)
1614 {
1615 pq_startmsgread();
1616 r = pq_getbyte_if_available(&firstchar);
1617 if (r < 0)
1618 {
1619 /* unexpected error or EOF */
1620 ereport(COMMERROR,
1621 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1622 errmsg("unexpected EOF on standby connection")));
1623 proc_exit(0);
1624 }
1625 if (r == 0)
1626 {
1627 /* no data available without blocking */
1628 pq_endmsgread();
1629 break;
1630 }
1631
1632 /* Read the message contents */
1633 resetStringInfo(&reply_message);
1634 if (pq_getmessage(&reply_message, 0))
1635 {
1636 ereport(COMMERROR,
1637 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1638 errmsg("unexpected EOF on standby connection")));
1639 proc_exit(0);
1640 }
1641
1642 /* Handle the very limited subset of commands expected in this phase */
1643 switch (firstchar)
1644 {
1645 /*
1646 * 'd' means a standby reply wrapped in a CopyData packet.
1647 */
1648 case 'd':
1649 ProcessStandbyMessage();
1650 received = true;
1651 break;
1652
1653 /*
1654 * CopyDone means the standby requested to finish streaming.
1655 * Reply with CopyDone, if we had not sent that already.
1656 */
1657 case 'c':
1658 if (!streamingDoneSending)
1659 {
1660 pq_putmessage_noblock('c', NULL, 0);
1661 streamingDoneSending = true;
1662 }
1663
1664 streamingDoneReceiving = true;
1665 received = true;
1666 break;
1667
1668 /*
1669 * 'X' means that the standby is closing down the socket.
1670 */
1671 case 'X':
1672 proc_exit(0);
1673
1674 default:
1675 ereport(FATAL,
1676 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1677 errmsg("invalid standby message type \"%c\"",
1678 firstchar)));
1679 }
1680 }
1681
1682 /*
1683 * Save the last reply timestamp if we've received at least one reply.
1684 */
1685 if (received)
1686 {
1687 last_reply_timestamp = last_processing;
1688 waiting_for_ping_response = false;
1689 }
1690 }
1691
1692 /*
1693 * Process a status update message received from standby.
1694 */
1695 static void
ProcessStandbyMessage(void)1696 ProcessStandbyMessage(void)
1697 {
1698 char msgtype;
1699
1700 /*
1701 * Check message type from the first byte.
1702 */
1703 msgtype = pq_getmsgbyte(&reply_message);
1704
1705 switch (msgtype)
1706 {
1707 case 'r':
1708 ProcessStandbyReplyMessage();
1709 break;
1710
1711 case 'h':
1712 ProcessStandbyHSFeedbackMessage();
1713 break;
1714
1715 default:
1716 ereport(COMMERROR,
1717 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1718 errmsg("unexpected message type \"%c\"", msgtype)));
1719 proc_exit(0);
1720 }
1721 }
1722
1723 /*
1724 * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1725 */
1726 static void
PhysicalConfirmReceivedLocation(XLogRecPtr lsn)1727 PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
1728 {
1729 bool changed = false;
1730 ReplicationSlot *slot = MyReplicationSlot;
1731
1732 Assert(lsn != InvalidXLogRecPtr);
1733 SpinLockAcquire(&slot->mutex);
1734 if (slot->data.restart_lsn != lsn)
1735 {
1736 changed = true;
1737 slot->data.restart_lsn = lsn;
1738 }
1739 SpinLockRelease(&slot->mutex);
1740
1741 if (changed)
1742 {
1743 ReplicationSlotMarkDirty();
1744 ReplicationSlotsComputeRequiredLSN();
1745 }
1746
1747 /*
1748 * One could argue that the slot should be saved to disk now, but that'd
1749 * be energy wasted - the worst lost information can do here is give us
1750 * wrong information in a statistics view - we'll just potentially be more
1751 * conservative in removing files.
1752 */
1753 }
1754
1755 /*
1756 * Regular reply from standby advising of WAL locations on standby server.
1757 */
1758 static void
ProcessStandbyReplyMessage(void)1759 ProcessStandbyReplyMessage(void)
1760 {
1761 XLogRecPtr writePtr,
1762 flushPtr,
1763 applyPtr;
1764 bool replyRequested;
1765 TimeOffset writeLag,
1766 flushLag,
1767 applyLag;
1768 bool clearLagTimes;
1769 TimestampTz now;
1770
1771 static bool fullyAppliedLastTime = false;
1772
1773 /* the caller already consumed the msgtype byte */
1774 writePtr = pq_getmsgint64(&reply_message);
1775 flushPtr = pq_getmsgint64(&reply_message);
1776 applyPtr = pq_getmsgint64(&reply_message);
1777 (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1778 replyRequested = pq_getmsgbyte(&reply_message);
1779
1780 elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1781 (uint32) (writePtr >> 32), (uint32) writePtr,
1782 (uint32) (flushPtr >> 32), (uint32) flushPtr,
1783 (uint32) (applyPtr >> 32), (uint32) applyPtr,
1784 replyRequested ? " (reply requested)" : "");
1785
1786 /* See if we can compute the round-trip lag for these positions. */
1787 now = GetCurrentTimestamp();
1788 writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1789 flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1790 applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1791
1792 /*
1793 * If the standby reports that it has fully replayed the WAL in two
1794 * consecutive reply messages, then the second such message must result
1795 * from wal_receiver_status_interval expiring on the standby. This is a
1796 * convenient time to forget the lag times measured when it last
1797 * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1798 * until more WAL traffic arrives.
1799 */
1800 clearLagTimes = false;
1801 if (applyPtr == sentPtr)
1802 {
1803 if (fullyAppliedLastTime)
1804 clearLagTimes = true;
1805 fullyAppliedLastTime = true;
1806 }
1807 else
1808 fullyAppliedLastTime = false;
1809
1810 /* Send a reply if the standby requested one. */
1811 if (replyRequested)
1812 WalSndKeepalive(false);
1813
1814 /*
1815 * Update shared state for this WalSender process based on reply data from
1816 * standby.
1817 */
1818 {
1819 WalSnd *walsnd = MyWalSnd;
1820
1821 SpinLockAcquire(&walsnd->mutex);
1822 walsnd->write = writePtr;
1823 walsnd->flush = flushPtr;
1824 walsnd->apply = applyPtr;
1825 if (writeLag != -1 || clearLagTimes)
1826 walsnd->writeLag = writeLag;
1827 if (flushLag != -1 || clearLagTimes)
1828 walsnd->flushLag = flushLag;
1829 if (applyLag != -1 || clearLagTimes)
1830 walsnd->applyLag = applyLag;
1831 SpinLockRelease(&walsnd->mutex);
1832 }
1833
1834 if (!am_cascading_walsender)
1835 SyncRepReleaseWaiters();
1836
1837 /*
1838 * Advance our local xmin horizon when the client confirmed a flush.
1839 */
1840 if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1841 {
1842 if (SlotIsLogical(MyReplicationSlot))
1843 LogicalConfirmReceivedLocation(flushPtr);
1844 else
1845 PhysicalConfirmReceivedLocation(flushPtr);
1846 }
1847 }
1848
1849 /* compute new replication slot xmin horizon if needed */
1850 static void
PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin,TransactionId feedbackCatalogXmin)1851 PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
1852 {
1853 bool changed = false;
1854 ReplicationSlot *slot = MyReplicationSlot;
1855
1856 SpinLockAcquire(&slot->mutex);
1857 MyPgXact->xmin = InvalidTransactionId;
1858
1859 /*
1860 * For physical replication we don't need the interlock provided by xmin
1861 * and effective_xmin since the consequences of a missed increase are
1862 * limited to query cancellations, so set both at once.
1863 */
1864 if (!TransactionIdIsNormal(slot->data.xmin) ||
1865 !TransactionIdIsNormal(feedbackXmin) ||
1866 TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1867 {
1868 changed = true;
1869 slot->data.xmin = feedbackXmin;
1870 slot->effective_xmin = feedbackXmin;
1871 }
1872 if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1873 !TransactionIdIsNormal(feedbackCatalogXmin) ||
1874 TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1875 {
1876 changed = true;
1877 slot->data.catalog_xmin = feedbackCatalogXmin;
1878 slot->effective_catalog_xmin = feedbackCatalogXmin;
1879 }
1880 SpinLockRelease(&slot->mutex);
1881
1882 if (changed)
1883 {
1884 ReplicationSlotMarkDirty();
1885 ReplicationSlotsComputeRequiredXmin(false);
1886 }
1887 }
1888
1889 /*
1890 * Check that the provided xmin/epoch are sane, that is, not in the future
1891 * and not so far back as to be already wrapped around.
1892 *
1893 * Epoch of nextXid should be same as standby, or if the counter has
1894 * wrapped, then one greater than standby.
1895 *
1896 * This check doesn't care about whether clog exists for these xids
1897 * at all.
1898 */
1899 static bool
TransactionIdInRecentPast(TransactionId xid,uint32 epoch)1900 TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
1901 {
1902 TransactionId nextXid;
1903 uint32 nextEpoch;
1904
1905 GetNextXidAndEpoch(&nextXid, &nextEpoch);
1906
1907 if (xid <= nextXid)
1908 {
1909 if (epoch != nextEpoch)
1910 return false;
1911 }
1912 else
1913 {
1914 if (epoch + 1 != nextEpoch)
1915 return false;
1916 }
1917
1918 if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1919 return false; /* epoch OK, but it's wrapped around */
1920
1921 return true;
1922 }
1923
1924 /*
1925 * Hot Standby feedback
1926 */
1927 static void
ProcessStandbyHSFeedbackMessage(void)1928 ProcessStandbyHSFeedbackMessage(void)
1929 {
1930 TransactionId feedbackXmin;
1931 uint32 feedbackEpoch;
1932 TransactionId feedbackCatalogXmin;
1933 uint32 feedbackCatalogEpoch;
1934
1935 /*
1936 * Decipher the reply message. The caller already consumed the msgtype
1937 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1938 * of this message.
1939 */
1940 (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1941 feedbackXmin = pq_getmsgint(&reply_message, 4);
1942 feedbackEpoch = pq_getmsgint(&reply_message, 4);
1943 feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1944 feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1945
1946 elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
1947 feedbackXmin,
1948 feedbackEpoch,
1949 feedbackCatalogXmin,
1950 feedbackCatalogEpoch);
1951
1952 /*
1953 * Unset WalSender's xmins if the feedback message values are invalid.
1954 * This happens when the downstream turned hot_standby_feedback off.
1955 */
1956 if (!TransactionIdIsNormal(feedbackXmin)
1957 && !TransactionIdIsNormal(feedbackCatalogXmin))
1958 {
1959 MyPgXact->xmin = InvalidTransactionId;
1960 if (MyReplicationSlot != NULL)
1961 PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1962 return;
1963 }
1964
1965 /*
1966 * Check that the provided xmin/epoch are sane, that is, not in the future
1967 * and not so far back as to be already wrapped around. Ignore if not.
1968 */
1969 if (TransactionIdIsNormal(feedbackXmin) &&
1970 !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
1971 return;
1972
1973 if (TransactionIdIsNormal(feedbackCatalogXmin) &&
1974 !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
1975 return;
1976
1977 /*
1978 * Set the WalSender's xmin equal to the standby's requested xmin, so that
1979 * the xmin will be taken into account by GetOldestXmin. This will hold
1980 * back the removal of dead rows and thereby prevent the generation of
1981 * cleanup conflicts on the standby server.
1982 *
1983 * There is a small window for a race condition here: although we just
1984 * checked that feedbackXmin precedes nextXid, the nextXid could have
1985 * gotten advanced between our fetching it and applying the xmin below,
1986 * perhaps far enough to make feedbackXmin wrap around. In that case the
1987 * xmin we set here would be "in the future" and have no effect. No point
1988 * in worrying about this since it's too late to save the desired data
1989 * anyway. Assuming that the standby sends us an increasing sequence of
1990 * xmins, this could only happen during the first reply cycle, else our
1991 * own xmin would prevent nextXid from advancing so far.
1992 *
1993 * We don't bother taking the ProcArrayLock here. Setting the xmin field
1994 * is assumed atomic, and there's no real need to prevent a concurrent
1995 * GetOldestXmin. (If we're moving our xmin forward, this is obviously
1996 * safe, and if we're moving it backwards, well, the data is at risk
1997 * already since a VACUUM could have just finished calling GetOldestXmin.)
1998 *
1999 * If we're using a replication slot we reserve the xmin via that,
2000 * otherwise via the walsender's PGXACT entry. We can only track the
2001 * catalog xmin separately when using a slot, so we store the least of the
2002 * two provided when not using a slot.
2003 *
2004 * XXX: It might make sense to generalize the ephemeral slot concept and
2005 * always use the slot mechanism to handle the feedback xmin.
2006 */
2007 if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2008 PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2009 else
2010 {
2011 if (TransactionIdIsNormal(feedbackCatalogXmin)
2012 && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2013 MyPgXact->xmin = feedbackCatalogXmin;
2014 else
2015 MyPgXact->xmin = feedbackXmin;
2016 }
2017 }
2018
2019 /*
2020 * Compute how long send/receive loops should sleep.
2021 *
2022 * If wal_sender_timeout is enabled we want to wake up in time to send
2023 * keepalives and to abort the connection if wal_sender_timeout has been
2024 * reached.
2025 */
2026 static long
WalSndComputeSleeptime(TimestampTz now)2027 WalSndComputeSleeptime(TimestampTz now)
2028 {
2029 long sleeptime = 10000; /* 10 s */
2030
2031 if (wal_sender_timeout > 0 && last_reply_timestamp > 0)
2032 {
2033 TimestampTz wakeup_time;
2034
2035 /*
2036 * At the latest stop sleeping once wal_sender_timeout has been
2037 * reached.
2038 */
2039 wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2040 wal_sender_timeout);
2041
2042 /*
2043 * If no ping has been sent yet, wakeup when it's time to do so.
2044 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2045 * the timeout passed without a response.
2046 */
2047 if (!waiting_for_ping_response)
2048 wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2049 wal_sender_timeout / 2);
2050
2051 /* Compute relative time until wakeup. */
2052 sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2053 }
2054
2055 return sleeptime;
2056 }
2057
2058 /*
2059 * Check whether there have been responses by the client within
2060 * wal_sender_timeout and shutdown if not. Using last_processing as the
2061 * reference point avoids counting server-side stalls against the client.
2062 * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2063 * postdate last_processing by more than wal_sender_timeout. If that happens,
2064 * the client must reply almost immediately to avoid a timeout. This rarely
2065 * affects the default configuration, under which clients spontaneously send a
2066 * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
2067 * could eliminate that problem by recognizing timeout expiration at
2068 * wal_sender_timeout/2 after the keepalive.
2069 */
2070 static void
WalSndCheckTimeOut(void)2071 WalSndCheckTimeOut(void)
2072 {
2073 TimestampTz timeout;
2074
2075 /* don't bail out if we're doing something that doesn't require timeouts */
2076 if (last_reply_timestamp <= 0)
2077 return;
2078
2079 timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
2080 wal_sender_timeout);
2081
2082 if (wal_sender_timeout > 0 && last_processing >= timeout)
2083 {
2084 /*
2085 * Since typically expiration of replication timeout means
2086 * communication problem, we don't send the error message to the
2087 * standby.
2088 */
2089 ereport(COMMERROR,
2090 (errmsg("terminating walsender process due to replication timeout")));
2091
2092 WalSndShutdown();
2093 }
2094 }
2095
2096 /* Main loop of walsender process that streams the WAL over Copy messages. */
2097 static void
WalSndLoop(WalSndSendDataCallback send_data)2098 WalSndLoop(WalSndSendDataCallback send_data)
2099 {
2100 /*
2101 * Initialize the last reply timestamp. That enables timeout processing
2102 * from hereon.
2103 */
2104 last_reply_timestamp = GetCurrentTimestamp();
2105 waiting_for_ping_response = false;
2106
2107 /*
2108 * Loop until we reach the end of this timeline or the client requests to
2109 * stop streaming.
2110 */
2111 for (;;)
2112 {
2113 /*
2114 * Emergency bailout if postmaster has died. This is to avoid the
2115 * necessity for manual cleanup of all postmaster children.
2116 */
2117 if (!PostmasterIsAlive())
2118 exit(1);
2119
2120 /* Clear any already-pending wakeups */
2121 ResetLatch(MyLatch);
2122
2123 CHECK_FOR_INTERRUPTS();
2124
2125 /* Process any requests or signals received recently */
2126 if (ConfigReloadPending)
2127 {
2128 ConfigReloadPending = false;
2129 ProcessConfigFile(PGC_SIGHUP);
2130 SyncRepInitConfig();
2131 }
2132
2133 /* Check for input from the client */
2134 ProcessRepliesIfAny();
2135
2136 /*
2137 * If we have received CopyDone from the client, sent CopyDone
2138 * ourselves, and the output buffer is empty, it's time to exit
2139 * streaming.
2140 */
2141 if (streamingDoneReceiving && streamingDoneSending &&
2142 !pq_is_send_pending())
2143 break;
2144
2145 /*
2146 * If we don't have any pending data in the output buffer, try to send
2147 * some more. If there is some, we don't bother to call send_data
2148 * again until we've flushed it ... but we'd better assume we are not
2149 * caught up.
2150 */
2151 if (!pq_is_send_pending())
2152 send_data();
2153 else
2154 WalSndCaughtUp = false;
2155
2156 /* Try to flush pending output to the client */
2157 if (pq_flush_if_writable() != 0)
2158 WalSndShutdown();
2159
2160 /* If nothing remains to be sent right now ... */
2161 if (WalSndCaughtUp && !pq_is_send_pending())
2162 {
2163 /*
2164 * If we're in catchup state, move to streaming. This is an
2165 * important state change for users to know about, since before
2166 * this point data loss might occur if the primary dies and we
2167 * need to failover to the standby. The state change is also
2168 * important for synchronous replication, since commits that
2169 * started to wait at that point might wait for some time.
2170 */
2171 if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2172 {
2173 ereport(DEBUG1,
2174 (errmsg("\"%s\" has now caught up with upstream server",
2175 application_name)));
2176 WalSndSetState(WALSNDSTATE_STREAMING);
2177 }
2178
2179 /*
2180 * When SIGUSR2 arrives, we send any outstanding logs up to the
2181 * shutdown checkpoint record (i.e., the latest record), wait for
2182 * them to be replicated to the standby, and exit. This may be a
2183 * normal termination at shutdown, or a promotion, the walsender
2184 * is not sure which.
2185 */
2186 if (got_SIGUSR2)
2187 WalSndDone(send_data);
2188 }
2189
2190 /* Check for replication timeout. */
2191 WalSndCheckTimeOut();
2192
2193 /* Send keepalive if the time has come */
2194 WalSndKeepaliveIfNecessary();
2195
2196 /*
2197 * We don't block if not caught up, unless there is unsent data
2198 * pending in which case we'd better block until the socket is
2199 * write-ready. This test is only needed for the case where the
2200 * send_data callback handled a subset of the available data but then
2201 * pq_flush_if_writable flushed it all --- we should immediately try
2202 * to send more.
2203 */
2204 if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
2205 {
2206 long sleeptime;
2207 int wakeEvents;
2208
2209 wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT;
2210
2211 if (!streamingDoneReceiving)
2212 wakeEvents |= WL_SOCKET_READABLE;
2213
2214 /*
2215 * Use fresh timestamp, not last_processed, to reduce the chance
2216 * of reaching wal_sender_timeout before sending a keepalive.
2217 */
2218 sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
2219
2220 if (pq_is_send_pending())
2221 wakeEvents |= WL_SOCKET_WRITEABLE;
2222
2223 /* Sleep until something happens or we time out */
2224 WaitLatchOrSocket(MyLatch, wakeEvents,
2225 MyProcPort->sock, sleeptime,
2226 WAIT_EVENT_WAL_SENDER_MAIN);
2227 }
2228 }
2229 return;
2230 }
2231
2232 /* Initialize a per-walsender data structure for this walsender process */
2233 static void
InitWalSenderSlot(void)2234 InitWalSenderSlot(void)
2235 {
2236 int i;
2237
2238 /*
2239 * WalSndCtl should be set up already (we inherit this by fork() or
2240 * EXEC_BACKEND mechanism from the postmaster).
2241 */
2242 Assert(WalSndCtl != NULL);
2243 Assert(MyWalSnd == NULL);
2244
2245 /*
2246 * Find a free walsender slot and reserve it. If this fails, we must be
2247 * out of WalSnd structures.
2248 */
2249 for (i = 0; i < max_wal_senders; i++)
2250 {
2251 WalSnd *walsnd = &WalSndCtl->walsnds[i];
2252
2253 SpinLockAcquire(&walsnd->mutex);
2254
2255 if (walsnd->pid != 0)
2256 {
2257 SpinLockRelease(&walsnd->mutex);
2258 continue;
2259 }
2260 else
2261 {
2262 /*
2263 * Found a free slot. Reserve it for us.
2264 */
2265 walsnd->pid = MyProcPid;
2266 walsnd->state = WALSNDSTATE_STARTUP;
2267 walsnd->sentPtr = InvalidXLogRecPtr;
2268 walsnd->needreload = false;
2269 walsnd->write = InvalidXLogRecPtr;
2270 walsnd->flush = InvalidXLogRecPtr;
2271 walsnd->apply = InvalidXLogRecPtr;
2272 walsnd->writeLag = -1;
2273 walsnd->flushLag = -1;
2274 walsnd->applyLag = -1;
2275 walsnd->sync_standby_priority = 0;
2276 walsnd->latch = &MyProc->procLatch;
2277 SpinLockRelease(&walsnd->mutex);
2278 /* don't need the lock anymore */
2279 MyWalSnd = (WalSnd *) walsnd;
2280
2281 break;
2282 }
2283 }
2284 if (MyWalSnd == NULL)
2285 ereport(FATAL,
2286 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2287 errmsg("number of requested standby connections "
2288 "exceeds max_wal_senders (currently %d)",
2289 max_wal_senders)));
2290
2291 /* Arrange to clean up at walsender exit */
2292 on_shmem_exit(WalSndKill, 0);
2293 }
2294
2295 /* Destroy the per-walsender data structure for this walsender process */
2296 static void
WalSndKill(int code,Datum arg)2297 WalSndKill(int code, Datum arg)
2298 {
2299 WalSnd *walsnd = MyWalSnd;
2300
2301 Assert(walsnd != NULL);
2302
2303 MyWalSnd = NULL;
2304
2305 SpinLockAcquire(&walsnd->mutex);
2306 /* clear latch while holding the spinlock, so it can safely be read */
2307 walsnd->latch = NULL;
2308 /* Mark WalSnd struct as no longer being in use. */
2309 walsnd->pid = 0;
2310 SpinLockRelease(&walsnd->mutex);
2311 }
2312
2313 /*
2314 * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
2315 *
2316 * XXX probably this should be improved to suck data directly from the
2317 * WAL buffers when possible.
2318 *
2319 * Will open, and keep open, one WAL segment stored in the global file
2320 * descriptor sendFile. This means if XLogRead is used once, there will
2321 * always be one descriptor left open until the process ends, but never
2322 * more than one.
2323 */
2324 static void
XLogRead(char * buf,XLogRecPtr startptr,Size count)2325 XLogRead(char *buf, XLogRecPtr startptr, Size count)
2326 {
2327 char *p;
2328 XLogRecPtr recptr;
2329 Size nbytes;
2330 XLogSegNo segno;
2331
2332 retry:
2333 p = buf;
2334 recptr = startptr;
2335 nbytes = count;
2336
2337 while (nbytes > 0)
2338 {
2339 uint32 startoff;
2340 int segbytes;
2341 int readbytes;
2342
2343 startoff = recptr % XLogSegSize;
2344
2345 if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
2346 {
2347 char path[MAXPGPATH];
2348
2349 /* Switch to another logfile segment */
2350 if (sendFile >= 0)
2351 close(sendFile);
2352
2353 XLByteToSeg(recptr, sendSegNo);
2354
2355 /*-------
2356 * When reading from a historic timeline, and there is a timeline
2357 * switch within this segment, read from the WAL segment belonging
2358 * to the new timeline.
2359 *
2360 * For example, imagine that this server is currently on timeline
2361 * 5, and we're streaming timeline 4. The switch from timeline 4
2362 * to 5 happened at 0/13002088. In pg_wal, we have these files:
2363 *
2364 * ...
2365 * 000000040000000000000012
2366 * 000000040000000000000013
2367 * 000000050000000000000013
2368 * 000000050000000000000014
2369 * ...
2370 *
2371 * In this situation, when requested to send the WAL from
2372 * segment 0x13, on timeline 4, we read the WAL from file
2373 * 000000050000000000000013. Archive recovery prefers files from
2374 * newer timelines, so if the segment was restored from the
2375 * archive on this server, the file belonging to the old timeline,
2376 * 000000040000000000000013, might not exist. Their contents are
2377 * equal up to the switchpoint, because at a timeline switch, the
2378 * used portion of the old segment is copied to the new file.
2379 *-------
2380 */
2381 curFileTimeLine = sendTimeLine;
2382 if (sendTimeLineIsHistoric)
2383 {
2384 XLogSegNo endSegNo;
2385
2386 XLByteToSeg(sendTimeLineValidUpto, endSegNo);
2387 if (sendSegNo == endSegNo)
2388 curFileTimeLine = sendTimeLineNextTLI;
2389 }
2390
2391 XLogFilePath(path, curFileTimeLine, sendSegNo);
2392
2393 sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2394 if (sendFile < 0)
2395 {
2396 /*
2397 * If the file is not found, assume it's because the standby
2398 * asked for a too old WAL segment that has already been
2399 * removed or recycled.
2400 */
2401 if (errno == ENOENT)
2402 ereport(ERROR,
2403 (errcode_for_file_access(),
2404 errmsg("requested WAL segment %s has already been removed",
2405 XLogFileNameP(curFileTimeLine, sendSegNo))));
2406 else
2407 ereport(ERROR,
2408 (errcode_for_file_access(),
2409 errmsg("could not open file \"%s\": %m",
2410 path)));
2411 }
2412 sendOff = 0;
2413 }
2414
2415 /* Need to seek in the file? */
2416 if (sendOff != startoff)
2417 {
2418 if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2419 ereport(ERROR,
2420 (errcode_for_file_access(),
2421 errmsg("could not seek in log segment %s to offset %u: %m",
2422 XLogFileNameP(curFileTimeLine, sendSegNo),
2423 startoff)));
2424 sendOff = startoff;
2425 }
2426
2427 /* How many bytes are within this segment? */
2428 if (nbytes > (XLogSegSize - startoff))
2429 segbytes = XLogSegSize - startoff;
2430 else
2431 segbytes = nbytes;
2432
2433 pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
2434 readbytes = read(sendFile, p, segbytes);
2435 pgstat_report_wait_end();
2436 if (readbytes <= 0)
2437 {
2438 ereport(ERROR,
2439 (errcode_for_file_access(),
2440 errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2441 XLogFileNameP(curFileTimeLine, sendSegNo),
2442 sendOff, (unsigned long) segbytes)));
2443 }
2444
2445 /* Update state for read */
2446 recptr += readbytes;
2447
2448 sendOff += readbytes;
2449 nbytes -= readbytes;
2450 p += readbytes;
2451 }
2452
2453 /*
2454 * After reading into the buffer, check that what we read was valid. We do
2455 * this after reading, because even though the segment was present when we
2456 * opened it, it might get recycled or removed while we read it. The
2457 * read() succeeds in that case, but the data we tried to read might
2458 * already have been overwritten with new WAL records.
2459 */
2460 XLByteToSeg(startptr, segno);
2461 CheckXLogRemoved(segno, ThisTimeLineID);
2462
2463 /*
2464 * During recovery, the currently-open WAL file might be replaced with the
2465 * file of the same name retrieved from archive. So we always need to
2466 * check what we read was valid after reading into the buffer. If it's
2467 * invalid, we try to open and read the file again.
2468 */
2469 if (am_cascading_walsender)
2470 {
2471 WalSnd *walsnd = MyWalSnd;
2472 bool reload;
2473
2474 SpinLockAcquire(&walsnd->mutex);
2475 reload = walsnd->needreload;
2476 walsnd->needreload = false;
2477 SpinLockRelease(&walsnd->mutex);
2478
2479 if (reload && sendFile >= 0)
2480 {
2481 close(sendFile);
2482 sendFile = -1;
2483
2484 goto retry;
2485 }
2486 }
2487 }
2488
2489 /*
2490 * Send out the WAL in its normal physical/stored form.
2491 *
2492 * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2493 * but not yet sent to the client, and buffer it in the libpq output
2494 * buffer.
2495 *
2496 * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2497 * otherwise WalSndCaughtUp is set to false.
2498 */
2499 static void
XLogSendPhysical(void)2500 XLogSendPhysical(void)
2501 {
2502 XLogRecPtr SendRqstPtr;
2503 XLogRecPtr startptr;
2504 XLogRecPtr endptr;
2505 Size nbytes;
2506
2507 /* If requested switch the WAL sender to the stopping state. */
2508 if (got_STOPPING)
2509 WalSndSetState(WALSNDSTATE_STOPPING);
2510
2511 if (streamingDoneSending)
2512 {
2513 WalSndCaughtUp = true;
2514 return;
2515 }
2516
2517 /* Figure out how far we can safely send the WAL. */
2518 if (sendTimeLineIsHistoric)
2519 {
2520 /*
2521 * Streaming an old timeline that's in this server's history, but is
2522 * not the one we're currently inserting or replaying. It can be
2523 * streamed up to the point where we switched off that timeline.
2524 */
2525 SendRqstPtr = sendTimeLineValidUpto;
2526 }
2527 else if (am_cascading_walsender)
2528 {
2529 /*
2530 * Streaming the latest timeline on a standby.
2531 *
2532 * Attempt to send all WAL that has already been replayed, so that we
2533 * know it's valid. If we're receiving WAL through streaming
2534 * replication, it's also OK to send any WAL that has been received
2535 * but not replayed.
2536 *
2537 * The timeline we're recovering from can change, or we can be
2538 * promoted. In either case, the current timeline becomes historic. We
2539 * need to detect that so that we don't try to stream past the point
2540 * where we switched to another timeline. We check for promotion or
2541 * timeline switch after calculating FlushPtr, to avoid a race
2542 * condition: if the timeline becomes historic just after we checked
2543 * that it was still current, it's still be OK to stream it up to the
2544 * FlushPtr that was calculated before it became historic.
2545 */
2546 bool becameHistoric = false;
2547
2548 SendRqstPtr = GetStandbyFlushRecPtr();
2549
2550 if (!RecoveryInProgress())
2551 {
2552 /*
2553 * We have been promoted. RecoveryInProgress() updated
2554 * ThisTimeLineID to the new current timeline.
2555 */
2556 am_cascading_walsender = false;
2557 becameHistoric = true;
2558 }
2559 else
2560 {
2561 /*
2562 * Still a cascading standby. But is the timeline we're sending
2563 * still the one recovery is recovering from? ThisTimeLineID was
2564 * updated by the GetStandbyFlushRecPtr() call above.
2565 */
2566 if (sendTimeLine != ThisTimeLineID)
2567 becameHistoric = true;
2568 }
2569
2570 if (becameHistoric)
2571 {
2572 /*
2573 * The timeline we were sending has become historic. Read the
2574 * timeline history file of the new timeline to see where exactly
2575 * we forked off from the timeline we were sending.
2576 */
2577 List *history;
2578
2579 history = readTimeLineHistory(ThisTimeLineID);
2580 sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
2581
2582 Assert(sendTimeLine < sendTimeLineNextTLI);
2583 list_free_deep(history);
2584
2585 sendTimeLineIsHistoric = true;
2586
2587 SendRqstPtr = sendTimeLineValidUpto;
2588 }
2589 }
2590 else
2591 {
2592 /*
2593 * Streaming the current timeline on a master.
2594 *
2595 * Attempt to send all data that's already been written out and
2596 * fsync'd to disk. We cannot go further than what's been written out
2597 * given the current implementation of XLogRead(). And in any case
2598 * it's unsafe to send WAL that is not securely down to disk on the
2599 * master: if the master subsequently crashes and restarts, standbys
2600 * must not have applied any WAL that got lost on the master.
2601 */
2602 SendRqstPtr = GetFlushRecPtr();
2603 }
2604
2605 /*
2606 * Record the current system time as an approximation of the time at which
2607 * this WAL location was written for the purposes of lag tracking.
2608 *
2609 * In theory we could make XLogFlush() record a time in shmem whenever WAL
2610 * is flushed and we could get that time as well as the LSN when we call
2611 * GetFlushRecPtr() above (and likewise for the cascading standby
2612 * equivalent), but rather than putting any new code into the hot WAL path
2613 * it seems good enough to capture the time here. We should reach this
2614 * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2615 * may take some time, we read the WAL flush pointer and take the time
2616 * very close to together here so that we'll get a later position if it is
2617 * still moving.
2618 *
2619 * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
2620 * this gives us a cheap approximation for the WAL flush time for this
2621 * LSN.
2622 *
2623 * Note that the LSN is not necessarily the LSN for the data contained in
2624 * the present message; it's the end of the WAL, which might be further
2625 * ahead. All the lag tracking machinery cares about is finding out when
2626 * that arbitrary LSN is eventually reported as written, flushed and
2627 * applied, so that it can measure the elapsed time.
2628 */
2629 LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2630
2631 /*
2632 * If this is a historic timeline and we've reached the point where we
2633 * forked to the next timeline, stop streaming.
2634 *
2635 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2636 * startup process will normally replay all WAL that has been received
2637 * from the master, before promoting, but if the WAL streaming is
2638 * terminated at a WAL page boundary, the valid portion of the timeline
2639 * might end in the middle of a WAL record. We might've already sent the
2640 * first half of that partial WAL record to the cascading standby, so that
2641 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2642 * replay the partial WAL record either, so it can still follow our
2643 * timeline switch.
2644 */
2645 if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
2646 {
2647 /* close the current file. */
2648 if (sendFile >= 0)
2649 close(sendFile);
2650 sendFile = -1;
2651
2652 /* Send CopyDone */
2653 pq_putmessage_noblock('c', NULL, 0);
2654 streamingDoneSending = true;
2655
2656 WalSndCaughtUp = true;
2657
2658 elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2659 (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
2660 (uint32) (sentPtr >> 32), (uint32) sentPtr);
2661 return;
2662 }
2663
2664 /* Do we have any work to do? */
2665 Assert(sentPtr <= SendRqstPtr);
2666 if (SendRqstPtr <= sentPtr)
2667 {
2668 WalSndCaughtUp = true;
2669 return;
2670 }
2671
2672 /*
2673 * Figure out how much to send in one message. If there's no more than
2674 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2675 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2676 *
2677 * The rounding is not only for performance reasons. Walreceiver relies on
2678 * the fact that we never split a WAL record across two messages. Since a
2679 * long WAL record is split at page boundary into continuation records,
2680 * page boundary is always a safe cut-off point. We also assume that
2681 * SendRqstPtr never points to the middle of a WAL record.
2682 */
2683 startptr = sentPtr;
2684 endptr = startptr;
2685 endptr += MAX_SEND_SIZE;
2686
2687 /* if we went beyond SendRqstPtr, back off */
2688 if (SendRqstPtr <= endptr)
2689 {
2690 endptr = SendRqstPtr;
2691 if (sendTimeLineIsHistoric)
2692 WalSndCaughtUp = false;
2693 else
2694 WalSndCaughtUp = true;
2695 }
2696 else
2697 {
2698 /* round down to page boundary. */
2699 endptr -= (endptr % XLOG_BLCKSZ);
2700 WalSndCaughtUp = false;
2701 }
2702
2703 nbytes = endptr - startptr;
2704 Assert(nbytes <= MAX_SEND_SIZE);
2705
2706 /*
2707 * OK to read and send the slice.
2708 */
2709 resetStringInfo(&output_message);
2710 pq_sendbyte(&output_message, 'w');
2711
2712 pq_sendint64(&output_message, startptr); /* dataStart */
2713 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2714 pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2715
2716 /*
2717 * Read the log directly into the output buffer to avoid extra memcpy
2718 * calls.
2719 */
2720 enlargeStringInfo(&output_message, nbytes);
2721 XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2722 output_message.len += nbytes;
2723 output_message.data[output_message.len] = '\0';
2724
2725 /*
2726 * Fill the send timestamp last, so that it is taken as late as possible.
2727 */
2728 resetStringInfo(&tmpbuf);
2729 pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2730 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2731 tmpbuf.data, sizeof(int64));
2732
2733 pq_putmessage_noblock('d', output_message.data, output_message.len);
2734
2735 sentPtr = endptr;
2736
2737 /* Update shared memory status */
2738 {
2739 WalSnd *walsnd = MyWalSnd;
2740
2741 SpinLockAcquire(&walsnd->mutex);
2742 walsnd->sentPtr = sentPtr;
2743 SpinLockRelease(&walsnd->mutex);
2744 }
2745
2746 /* Report progress of XLOG streaming in PS display */
2747 if (update_process_title)
2748 {
2749 char activitymsg[50];
2750
2751 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2752 (uint32) (sentPtr >> 32), (uint32) sentPtr);
2753 set_ps_display(activitymsg, false);
2754 }
2755
2756 return;
2757 }
2758
2759 /*
2760 * Stream out logically decoded data.
2761 */
2762 static void
XLogSendLogical(void)2763 XLogSendLogical(void)
2764 {
2765 XLogRecord *record;
2766 char *errm;
2767 XLogRecPtr flushPtr;
2768
2769 /*
2770 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2771 * true in WalSndWaitForWal, if we're actually waiting. We also set to
2772 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2773 * didn't wait - i.e. when we're shutting down.
2774 */
2775 WalSndCaughtUp = false;
2776
2777 record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
2778 logical_startptr = InvalidXLogRecPtr;
2779
2780 /* xlog record was invalid */
2781 if (errm != NULL)
2782 elog(ERROR, "%s", errm);
2783
2784 /*
2785 * We'll use the current flush point to determine whether we've caught up.
2786 */
2787 flushPtr = GetFlushRecPtr();
2788
2789 if (record != NULL)
2790 {
2791 /*
2792 * Note the lack of any call to LagTrackerWrite() which is handled by
2793 * WalSndUpdateProgress which is called by output plugin through
2794 * logical decoding write api.
2795 */
2796 LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2797
2798 sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2799 }
2800
2801 /* Set flag if we're caught up. */
2802 if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2803 WalSndCaughtUp = true;
2804
2805 /*
2806 * If we're caught up and have been requested to stop, have WalSndLoop()
2807 * terminate the connection in an orderly manner, after writing out all
2808 * the pending data.
2809 */
2810 if (WalSndCaughtUp && got_STOPPING)
2811 got_SIGUSR2 = true;
2812
2813 /* Update shared memory status */
2814 {
2815 WalSnd *walsnd = MyWalSnd;
2816
2817 SpinLockAcquire(&walsnd->mutex);
2818 walsnd->sentPtr = sentPtr;
2819 SpinLockRelease(&walsnd->mutex);
2820 }
2821 }
2822
2823 /*
2824 * Shutdown if the sender is caught up.
2825 *
2826 * NB: This should only be called when the shutdown signal has been received
2827 * from postmaster.
2828 *
2829 * Note that if we determine that there's still more data to send, this
2830 * function will return control to the caller.
2831 */
2832 static void
WalSndDone(WalSndSendDataCallback send_data)2833 WalSndDone(WalSndSendDataCallback send_data)
2834 {
2835 XLogRecPtr replicatedPtr;
2836
2837 /* ... let's just be real sure we're caught up ... */
2838 send_data();
2839
2840 /*
2841 * To figure out whether all WAL has successfully been replicated, check
2842 * flush location if valid, write otherwise. Tools like pg_receivewal will
2843 * usually (unless in synchronous mode) return an invalid flush location.
2844 */
2845 replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2846 MyWalSnd->write : MyWalSnd->flush;
2847
2848 if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2849 !pq_is_send_pending())
2850 {
2851 /* Inform the standby that XLOG streaming is done */
2852 EndCommand("COPY 0", DestRemote);
2853 pq_flush();
2854
2855 proc_exit(0);
2856 }
2857 if (!waiting_for_ping_response)
2858 WalSndKeepalive(true);
2859 }
2860
2861 /*
2862 * Returns the latest point in WAL that has been safely flushed to disk, and
2863 * can be sent to the standby. This should only be called when in recovery,
2864 * ie. we're streaming to a cascaded standby.
2865 *
2866 * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2867 * replayed WAL record.
2868 */
2869 static XLogRecPtr
GetStandbyFlushRecPtr(void)2870 GetStandbyFlushRecPtr(void)
2871 {
2872 XLogRecPtr replayPtr;
2873 TimeLineID replayTLI;
2874 XLogRecPtr receivePtr;
2875 TimeLineID receiveTLI;
2876 XLogRecPtr result;
2877
2878 /*
2879 * We can safely send what's already been replayed. Also, if walreceiver
2880 * is streaming WAL from the same timeline, we can send anything that it
2881 * has streamed, but hasn't been replayed yet.
2882 */
2883
2884 receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2885 replayPtr = GetXLogReplayRecPtr(&replayTLI);
2886
2887 ThisTimeLineID = replayTLI;
2888
2889 result = replayPtr;
2890 if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2891 result = receivePtr;
2892
2893 return result;
2894 }
2895
2896 /*
2897 * Request walsenders to reload the currently-open WAL file
2898 */
2899 void
WalSndRqstFileReload(void)2900 WalSndRqstFileReload(void)
2901 {
2902 int i;
2903
2904 for (i = 0; i < max_wal_senders; i++)
2905 {
2906 WalSnd *walsnd = &WalSndCtl->walsnds[i];
2907
2908 SpinLockAcquire(&walsnd->mutex);
2909 if (walsnd->pid == 0)
2910 {
2911 SpinLockRelease(&walsnd->mutex);
2912 continue;
2913 }
2914 walsnd->needreload = true;
2915 SpinLockRelease(&walsnd->mutex);
2916 }
2917 }
2918
2919 /*
2920 * Handle PROCSIG_WALSND_INIT_STOPPING signal.
2921 */
2922 void
HandleWalSndInitStopping(void)2923 HandleWalSndInitStopping(void)
2924 {
2925 Assert(am_walsender);
2926
2927 /*
2928 * If replication has not yet started, die like with SIGTERM. If
2929 * replication is active, only set a flag and wake up the main loop. It
2930 * will send any outstanding WAL, wait for it to be replicated to the
2931 * standby, and then exit gracefully.
2932 */
2933 if (!replication_active)
2934 kill(MyProcPid, SIGTERM);
2935 else
2936 got_STOPPING = true;
2937 }
2938
2939 /*
2940 * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
2941 * sender should already have been switched to WALSNDSTATE_STOPPING at
2942 * this point.
2943 */
2944 static void
WalSndLastCycleHandler(SIGNAL_ARGS)2945 WalSndLastCycleHandler(SIGNAL_ARGS)
2946 {
2947 int save_errno = errno;
2948
2949 got_SIGUSR2 = true;
2950 SetLatch(MyLatch);
2951
2952 errno = save_errno;
2953 }
2954
2955 /* Set up signal handlers */
2956 void
WalSndSignals(void)2957 WalSndSignals(void)
2958 {
2959 /* Set up signal handlers */
2960 pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config
2961 * file */
2962 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
2963 pqsignal(SIGTERM, die); /* request shutdown */
2964 pqsignal(SIGQUIT, quickdie); /* hard crash time */
2965 InitializeTimeouts(); /* establishes SIGALRM handler */
2966 pqsignal(SIGPIPE, SIG_IGN);
2967 pqsignal(SIGUSR1, procsignal_sigusr1_handler);
2968 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2969 * shutdown */
2970
2971 /* Reset some signals that are accepted by postmaster but not here */
2972 pqsignal(SIGCHLD, SIG_DFL);
2973 pqsignal(SIGTTIN, SIG_DFL);
2974 pqsignal(SIGTTOU, SIG_DFL);
2975 pqsignal(SIGCONT, SIG_DFL);
2976 pqsignal(SIGWINCH, SIG_DFL);
2977 }
2978
2979 /* Report shared-memory space needed by WalSndShmemInit */
2980 Size
WalSndShmemSize(void)2981 WalSndShmemSize(void)
2982 {
2983 Size size = 0;
2984
2985 size = offsetof(WalSndCtlData, walsnds);
2986 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2987
2988 return size;
2989 }
2990
2991 /* Allocate and initialize walsender-related shared memory */
2992 void
WalSndShmemInit(void)2993 WalSndShmemInit(void)
2994 {
2995 bool found;
2996 int i;
2997
2998 WalSndCtl = (WalSndCtlData *)
2999 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3000
3001 if (!found)
3002 {
3003 /* First time through, so initialize */
3004 MemSet(WalSndCtl, 0, WalSndShmemSize());
3005
3006 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3007 SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3008
3009 for (i = 0; i < max_wal_senders; i++)
3010 {
3011 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3012
3013 SpinLockInit(&walsnd->mutex);
3014 }
3015 }
3016 }
3017
3018 /*
3019 * Wake up all walsenders
3020 *
3021 * This will be called inside critical sections, so throwing an error is not
3022 * advisable.
3023 */
3024 void
WalSndWakeup(void)3025 WalSndWakeup(void)
3026 {
3027 int i;
3028
3029 for (i = 0; i < max_wal_senders; i++)
3030 {
3031 Latch *latch;
3032 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3033
3034 /*
3035 * Get latch pointer with spinlock held, for the unlikely case that
3036 * pointer reads aren't atomic (as they're 8 bytes).
3037 */
3038 SpinLockAcquire(&walsnd->mutex);
3039 latch = walsnd->latch;
3040 SpinLockRelease(&walsnd->mutex);
3041
3042 if (latch != NULL)
3043 SetLatch(latch);
3044 }
3045 }
3046
3047 /*
3048 * Signal all walsenders to move to stopping state.
3049 *
3050 * This will trigger walsenders to move to a state where no further WAL can be
3051 * generated. See this file's header for details.
3052 */
3053 void
WalSndInitStopping(void)3054 WalSndInitStopping(void)
3055 {
3056 int i;
3057
3058 for (i = 0; i < max_wal_senders; i++)
3059 {
3060 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3061 pid_t pid;
3062
3063 SpinLockAcquire(&walsnd->mutex);
3064 pid = walsnd->pid;
3065 SpinLockRelease(&walsnd->mutex);
3066
3067 if (pid == 0)
3068 continue;
3069
3070 SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId);
3071 }
3072 }
3073
3074 /*
3075 * Wait that all the WAL senders have quit or reached the stopping state. This
3076 * is used by the checkpointer to control when the shutdown checkpoint can
3077 * safely be performed.
3078 */
3079 void
WalSndWaitStopping(void)3080 WalSndWaitStopping(void)
3081 {
3082 for (;;)
3083 {
3084 int i;
3085 bool all_stopped = true;
3086
3087 for (i = 0; i < max_wal_senders; i++)
3088 {
3089 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3090
3091 SpinLockAcquire(&walsnd->mutex);
3092
3093 if (walsnd->pid == 0)
3094 {
3095 SpinLockRelease(&walsnd->mutex);
3096 continue;
3097 }
3098
3099 if (walsnd->state != WALSNDSTATE_STOPPING)
3100 {
3101 all_stopped = false;
3102 SpinLockRelease(&walsnd->mutex);
3103 break;
3104 }
3105 SpinLockRelease(&walsnd->mutex);
3106 }
3107
3108 /* safe to leave if confirmation is done for all WAL senders */
3109 if (all_stopped)
3110 return;
3111
3112 pg_usleep(10000L); /* wait for 10 msec */
3113 }
3114 }
3115
3116 /* Set state for current walsender (only called in walsender) */
3117 void
WalSndSetState(WalSndState state)3118 WalSndSetState(WalSndState state)
3119 {
3120 WalSnd *walsnd = MyWalSnd;
3121
3122 Assert(am_walsender);
3123
3124 if (walsnd->state == state)
3125 return;
3126
3127 SpinLockAcquire(&walsnd->mutex);
3128 walsnd->state = state;
3129 SpinLockRelease(&walsnd->mutex);
3130 }
3131
3132 /*
3133 * Return a string constant representing the state. This is used
3134 * in system views, and should *not* be translated.
3135 */
3136 static const char *
WalSndGetStateString(WalSndState state)3137 WalSndGetStateString(WalSndState state)
3138 {
3139 switch (state)
3140 {
3141 case WALSNDSTATE_STARTUP:
3142 return "startup";
3143 case WALSNDSTATE_BACKUP:
3144 return "backup";
3145 case WALSNDSTATE_CATCHUP:
3146 return "catchup";
3147 case WALSNDSTATE_STREAMING:
3148 return "streaming";
3149 case WALSNDSTATE_STOPPING:
3150 return "stopping";
3151 }
3152 return "UNKNOWN";
3153 }
3154
3155 static Interval *
offset_to_interval(TimeOffset offset)3156 offset_to_interval(TimeOffset offset)
3157 {
3158 Interval *result = palloc(sizeof(Interval));
3159
3160 result->month = 0;
3161 result->day = 0;
3162 result->time = offset;
3163
3164 return result;
3165 }
3166
3167 /*
3168 * Returns activity of walsenders, including pids and xlog locations sent to
3169 * standby servers.
3170 */
3171 Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)3172 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
3173 {
3174 #define PG_STAT_GET_WAL_SENDERS_COLS 11
3175 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3176 TupleDesc tupdesc;
3177 Tuplestorestate *tupstore;
3178 MemoryContext per_query_ctx;
3179 MemoryContext oldcontext;
3180 SyncRepStandbyData *sync_standbys;
3181 int num_standbys;
3182 int i;
3183
3184 /* check to see if caller supports us returning a tuplestore */
3185 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3186 ereport(ERROR,
3187 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3188 errmsg("set-valued function called in context that cannot accept a set")));
3189 if (!(rsinfo->allowedModes & SFRM_Materialize))
3190 ereport(ERROR,
3191 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3192 errmsg("materialize mode required, but it is not " \
3193 "allowed in this context")));
3194
3195 /* Build a tuple descriptor for our result type */
3196 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3197 elog(ERROR, "return type must be a row type");
3198
3199 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3200 oldcontext = MemoryContextSwitchTo(per_query_ctx);
3201
3202 tupstore = tuplestore_begin_heap(true, false, work_mem);
3203 rsinfo->returnMode = SFRM_Materialize;
3204 rsinfo->setResult = tupstore;
3205 rsinfo->setDesc = tupdesc;
3206
3207 MemoryContextSwitchTo(oldcontext);
3208
3209 /*
3210 * Get the currently active synchronous standbys. This could be out of
3211 * date before we're done, but we'll use the data anyway.
3212 */
3213 num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3214
3215 for (i = 0; i < max_wal_senders; i++)
3216 {
3217 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3218 XLogRecPtr sentPtr;
3219 XLogRecPtr write;
3220 XLogRecPtr flush;
3221 XLogRecPtr apply;
3222 TimeOffset writeLag;
3223 TimeOffset flushLag;
3224 TimeOffset applyLag;
3225 int priority;
3226 int pid;
3227 WalSndState state;
3228 bool is_sync_standby;
3229 Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
3230 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3231 int j;
3232
3233 /* Collect data from shared memory */
3234 SpinLockAcquire(&walsnd->mutex);
3235 if (walsnd->pid == 0)
3236 {
3237 SpinLockRelease(&walsnd->mutex);
3238 continue;
3239 }
3240 pid = walsnd->pid;
3241 sentPtr = walsnd->sentPtr;
3242 state = walsnd->state;
3243 write = walsnd->write;
3244 flush = walsnd->flush;
3245 apply = walsnd->apply;
3246 writeLag = walsnd->writeLag;
3247 flushLag = walsnd->flushLag;
3248 applyLag = walsnd->applyLag;
3249 priority = walsnd->sync_standby_priority;
3250 SpinLockRelease(&walsnd->mutex);
3251
3252 /*
3253 * Detect whether walsender is/was considered synchronous. We can
3254 * provide some protection against stale data by checking the PID
3255 * along with walsnd_index.
3256 */
3257 is_sync_standby = false;
3258 for (j = 0; j < num_standbys; j++)
3259 {
3260 if (sync_standbys[j].walsnd_index == i &&
3261 sync_standbys[j].pid == pid)
3262 {
3263 is_sync_standby = true;
3264 break;
3265 }
3266 }
3267
3268 memset(nulls, 0, sizeof(nulls));
3269 values[0] = Int32GetDatum(pid);
3270
3271 if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3272 {
3273 /*
3274 * Only superusers and members of pg_read_all_stats can see details.
3275 * Other users only get the pid value to know it's a walsender,
3276 * but no details.
3277 */
3278 MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3279 }
3280 else
3281 {
3282 values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3283
3284 if (XLogRecPtrIsInvalid(sentPtr))
3285 nulls[2] = true;
3286 values[2] = LSNGetDatum(sentPtr);
3287
3288 if (XLogRecPtrIsInvalid(write))
3289 nulls[3] = true;
3290 values[3] = LSNGetDatum(write);
3291
3292 if (XLogRecPtrIsInvalid(flush))
3293 nulls[4] = true;
3294 values[4] = LSNGetDatum(flush);
3295
3296 if (XLogRecPtrIsInvalid(apply))
3297 nulls[5] = true;
3298 values[5] = LSNGetDatum(apply);
3299
3300 /*
3301 * Treat a standby such as a pg_basebackup background process
3302 * which always returns an invalid flush location, as an
3303 * asynchronous standby.
3304 */
3305 priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3306
3307 if (writeLag < 0)
3308 nulls[6] = true;
3309 else
3310 values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3311
3312 if (flushLag < 0)
3313 nulls[7] = true;
3314 else
3315 values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3316
3317 if (applyLag < 0)
3318 nulls[8] = true;
3319 else
3320 values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3321
3322 values[9] = Int32GetDatum(priority);
3323
3324 /*
3325 * More easily understood version of standby state. This is purely
3326 * informational.
3327 *
3328 * In quorum-based sync replication, the role of each standby
3329 * listed in synchronous_standby_names can be changing very
3330 * frequently. Any standbys considered as "sync" at one moment can
3331 * be switched to "potential" ones at the next moment. So, it's
3332 * basically useless to report "sync" or "potential" as their sync
3333 * states. We report just "quorum" for them.
3334 */
3335 if (priority == 0)
3336 values[10] = CStringGetTextDatum("async");
3337 else if (is_sync_standby)
3338 values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
3339 CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3340 else
3341 values[10] = CStringGetTextDatum("potential");
3342 }
3343
3344 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3345 }
3346
3347 /* clean up and return the tuplestore */
3348 tuplestore_donestoring(tupstore);
3349
3350 return (Datum) 0;
3351 }
3352
3353 /*
3354 * Send a keepalive message to standby.
3355 *
3356 * If requestReply is set, the message requests the other party to send
3357 * a message back to us, for heartbeat purposes. We also set a flag to
3358 * let nearby code that we're waiting for that response, to avoid
3359 * repeated requests.
3360 */
3361 static void
WalSndKeepalive(bool requestReply)3362 WalSndKeepalive(bool requestReply)
3363 {
3364 elog(DEBUG2, "sending replication keepalive");
3365
3366 /* construct the message... */
3367 resetStringInfo(&output_message);
3368 pq_sendbyte(&output_message, 'k');
3369 pq_sendint64(&output_message, sentPtr);
3370 pq_sendint64(&output_message, GetCurrentTimestamp());
3371 pq_sendbyte(&output_message, requestReply ? 1 : 0);
3372
3373 /* ... and send it wrapped in CopyData */
3374 pq_putmessage_noblock('d', output_message.data, output_message.len);
3375
3376 /* Set local flag */
3377 if (requestReply)
3378 waiting_for_ping_response = true;
3379 }
3380
3381 /*
3382 * Send keepalive message if too much time has elapsed.
3383 */
3384 static void
WalSndKeepaliveIfNecessary(void)3385 WalSndKeepaliveIfNecessary(void)
3386 {
3387 TimestampTz ping_time;
3388
3389 /*
3390 * Don't send keepalive messages if timeouts are globally disabled or
3391 * we're doing something not partaking in timeouts.
3392 */
3393 if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3394 return;
3395
3396 if (waiting_for_ping_response)
3397 return;
3398
3399 /*
3400 * If half of wal_sender_timeout has lapsed without receiving any reply
3401 * from the standby, send a keep-alive message to the standby requesting
3402 * an immediate reply.
3403 */
3404 ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
3405 wal_sender_timeout / 2);
3406 if (last_processing >= ping_time)
3407 {
3408 WalSndKeepalive(true);
3409
3410 /* Try to flush pending output to the client */
3411 if (pq_flush_if_writable() != 0)
3412 WalSndShutdown();
3413 }
3414 }
3415
3416 /*
3417 * Record the end of the WAL and the time it was flushed locally, so that
3418 * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3419 * eventually reported to have been written, flushed and applied by the
3420 * standby in a reply message.
3421 */
3422 static void
LagTrackerWrite(XLogRecPtr lsn,TimestampTz local_flush_time)3423 LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
3424 {
3425 bool buffer_full;
3426 int new_write_head;
3427 int i;
3428
3429 if (!am_walsender)
3430 return;
3431
3432 /*
3433 * If the lsn hasn't advanced since last time, then do nothing. This way
3434 * we only record a new sample when new WAL has been written.
3435 */
3436 if (LagTracker.last_lsn == lsn)
3437 return;
3438 LagTracker.last_lsn = lsn;
3439
3440 /*
3441 * If advancing the write head of the circular buffer would crash into any
3442 * of the read heads, then the buffer is full. In other words, the
3443 * slowest reader (presumably apply) is the one that controls the release
3444 * of space.
3445 */
3446 new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3447 buffer_full = false;
3448 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3449 {
3450 if (new_write_head == LagTracker.read_heads[i])
3451 buffer_full = true;
3452 }
3453
3454 /*
3455 * If the buffer is full, for now we just rewind by one slot and overwrite
3456 * the last sample, as a simple (if somewhat uneven) way to lower the
3457 * sampling rate. There may be better adaptive compaction algorithms.
3458 */
3459 if (buffer_full)
3460 {
3461 new_write_head = LagTracker.write_head;
3462 if (LagTracker.write_head > 0)
3463 LagTracker.write_head--;
3464 else
3465 LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3466 }
3467
3468 /* Store a sample at the current write head position. */
3469 LagTracker.buffer[LagTracker.write_head].lsn = lsn;
3470 LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
3471 LagTracker.write_head = new_write_head;
3472 }
3473
3474 /*
3475 * Find out how much time has elapsed between the moment WAL location 'lsn'
3476 * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3477 * We have a separate read head for each of the reported LSN locations we
3478 * receive in replies from standby; 'head' controls which read head is
3479 * used. Whenever a read head crosses an LSN which was written into the
3480 * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3481 * find out the time this LSN (or an earlier one) was flushed locally, and
3482 * therefore compute the lag.
3483 *
3484 * Return -1 if no new sample data is available, and otherwise the elapsed
3485 * time in microseconds.
3486 */
3487 static TimeOffset
LagTrackerRead(int head,XLogRecPtr lsn,TimestampTz now)3488 LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
3489 {
3490 TimestampTz time = 0;
3491
3492 /* Read all unread samples up to this LSN or end of buffer. */
3493 while (LagTracker.read_heads[head] != LagTracker.write_head &&
3494 LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
3495 {
3496 time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3497 LagTracker.last_read[head] =
3498 LagTracker.buffer[LagTracker.read_heads[head]];
3499 LagTracker.read_heads[head] =
3500 (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3501 }
3502
3503 /*
3504 * If the lag tracker is empty, that means the standby has processed
3505 * everything we've ever sent so we should now clear 'last_read'. If we
3506 * didn't do that, we'd risk using a stale and irrelevant sample for
3507 * interpolation at the beginning of the next burst of WAL after a period
3508 * of idleness.
3509 */
3510 if (LagTracker.read_heads[head] == LagTracker.write_head)
3511 LagTracker.last_read[head].time = 0;
3512
3513 if (time > now)
3514 {
3515 /* If the clock somehow went backwards, treat as not found. */
3516 return -1;
3517 }
3518 else if (time == 0)
3519 {
3520 /*
3521 * We didn't cross a time. If there is a future sample that we
3522 * haven't reached yet, and we've already reached at least one sample,
3523 * let's interpolate the local flushed time. This is mainly useful
3524 * for reporting a completely stuck apply position as having
3525 * increasing lag, since otherwise we'd have to wait for it to
3526 * eventually start moving again and cross one of our samples before
3527 * we can show the lag increasing.
3528 */
3529 if (LagTracker.read_heads[head] == LagTracker.write_head)
3530 {
3531 /* There are no future samples, so we can't interpolate. */
3532 return -1;
3533 }
3534 else if (LagTracker.last_read[head].time != 0)
3535 {
3536 /* We can interpolate between last_read and the next sample. */
3537 double fraction;
3538 WalTimeSample prev = LagTracker.last_read[head];
3539 WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
3540
3541 if (lsn < prev.lsn)
3542 {
3543 /*
3544 * Reported LSNs shouldn't normally go backwards, but it's
3545 * possible when there is a timeline change. Treat as not
3546 * found.
3547 */
3548 return -1;
3549 }
3550
3551 Assert(prev.lsn < next.lsn);
3552
3553 if (prev.time > next.time)
3554 {
3555 /* If the clock somehow went backwards, treat as not found. */
3556 return -1;
3557 }
3558
3559 /* See how far we are between the previous and next samples. */
3560 fraction =
3561 (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3562
3563 /* Scale the local flush time proportionally. */
3564 time = (TimestampTz)
3565 ((double) prev.time + (next.time - prev.time) * fraction);
3566 }
3567 else
3568 {
3569 /*
3570 * We have only a future sample, implying that we were entirely
3571 * caught up but and now there is a new burst of WAL and the
3572 * standby hasn't processed the first sample yet. Until the
3573 * standby reaches the future sample the best we can do is report
3574 * the hypothetical lag if that sample were to be replayed now.
3575 */
3576 time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3577 }
3578 }
3579
3580 /* Return the elapsed time since local flush time in microseconds. */
3581 Assert(time != 0);
3582 return now - time;
3583 }
3584