1 /*-------------------------------------------------------------------------
2 *
3 * walsender.c
4 *
5 * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6 * care of sending XLOG from the primary server to a single recipient.
7 * (Note that there can be more than one walsender process concurrently.)
8 * It is started by the postmaster when the walreceiver of a standby server
9 * connects to the primary server and requests XLOG streaming replication.
10 *
11 * A walsender is similar to a regular backend, ie. there is a one-to-one
12 * relationship between a connection and a walsender process, but instead
13 * of processing SQL queries, it understands a small set of special
14 * replication-mode commands. The START_REPLICATION command begins streaming
15 * WAL to the client. While streaming, the walsender keeps reading XLOG
16 * records from the disk and sends them to the standby server over the
17 * COPY protocol, until either side ends the replication by exiting COPY
18 * mode (or until the connection is closed).
19 *
20 * Normal termination is by SIGTERM, which instructs the walsender to
21 * close the connection and exit(0) at the next convenient moment. Emergency
22 * termination is by SIGQUIT; like any backend, the walsender will simply
23 * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24 * are treated as not a crash but approximately normal termination;
25 * the walsender will exit quickly without sending any more XLOG records.
26 *
27 * If the server is shut down, checkpointer sends us
28 * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29 * the backend is idle or runs an SQL query this causes the backend to
30 * shutdown, if logical replication is in progress all existing WAL records
31 * are processed followed by a shutdown. Otherwise this causes the walsender
32 * to switch to the "stopping" state. In this state, the walsender will reject
33 * any further replication commands. The checkpointer begins the shutdown
34 * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35 * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36 * walsender to send any outstanding WAL, including the shutdown checkpoint
37 * record, wait for it to be replicated to the standby, and then exit.
38 *
39 *
40 * Portions Copyright (c) 2010-2018, PostgreSQL Global Development Group
41 *
42 * IDENTIFICATION
43 * src/backend/replication/walsender.c
44 *
45 *-------------------------------------------------------------------------
46 */
47 #include "postgres.h"
48
49 #include <signal.h>
50 #include <unistd.h>
51
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogutils.h"
58
59 #include "catalog/pg_authid.h"
60 #include "catalog/pg_type.h"
61 #include "commands/dbcommands.h"
62 #include "commands/defrem.h"
63 #include "funcapi.h"
64 #include "libpq/libpq.h"
65 #include "libpq/pqformat.h"
66 #include "miscadmin.h"
67 #include "nodes/replnodes.h"
68 #include "pgstat.h"
69 #include "replication/basebackup.h"
70 #include "replication/decode.h"
71 #include "replication/logical.h"
72 #include "replication/logicalfuncs.h"
73 #include "replication/slot.h"
74 #include "replication/snapbuild.h"
75 #include "replication/syncrep.h"
76 #include "replication/walreceiver.h"
77 #include "replication/walsender.h"
78 #include "replication/walsender_private.h"
79 #include "storage/condition_variable.h"
80 #include "storage/fd.h"
81 #include "storage/ipc.h"
82 #include "storage/pmsignal.h"
83 #include "storage/proc.h"
84 #include "storage/procarray.h"
85 #include "tcop/dest.h"
86 #include "tcop/tcopprot.h"
87 #include "utils/builtins.h"
88 #include "utils/guc.h"
89 #include "utils/memutils.h"
90 #include "utils/pg_lsn.h"
91 #include "utils/portal.h"
92 #include "utils/ps_status.h"
93 #include "utils/resowner.h"
94 #include "utils/timeout.h"
95 #include "utils/timestamp.h"
96
97 /*
98 * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
99 *
100 * We don't have a good idea of what a good value would be; there's some
101 * overhead per message in both walsender and walreceiver, but on the other
102 * hand sending large batches makes walsender less responsive to signals
103 * because signals are checked only between messages. 128kB (with
104 * default 8k blocks) seems like a reasonable guess for now.
105 */
106 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
107
108 /* Array of WalSnds in shared memory */
109 WalSndCtlData *WalSndCtl = NULL;
110
111 /* My slot in the shared memory array */
112 WalSnd *MyWalSnd = NULL;
113
114 /* Global state */
115 bool am_walsender = false; /* Am I a walsender process? */
116 bool am_cascading_walsender = false; /* Am I cascading WAL to another
117 * standby? */
118 bool am_db_walsender = false; /* Connected to a database? */
119
120 /* User-settable parameters for walsender */
121 int max_wal_senders = 0; /* the maximum number of concurrent
122 * walsenders */
123 int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
124 * data message */
125 bool log_replication_commands = false;
126
127 /*
128 * State for WalSndWakeupRequest
129 */
130 bool wake_wal_senders = false;
131
132 /*
133 * These variables are used similarly to openLogFile/SegNo/Off,
134 * but for walsender to read the XLOG.
135 */
136 static int sendFile = -1;
137 static XLogSegNo sendSegNo = 0;
138 static uint32 sendOff = 0;
139
140 /* Timeline ID of the currently open file */
141 static TimeLineID curFileTimeLine = 0;
142
143 /*
144 * These variables keep track of the state of the timeline we're currently
145 * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
146 * the timeline is not the latest timeline on this server, and the server's
147 * history forked off from that timeline at sendTimeLineValidUpto.
148 */
149 static TimeLineID sendTimeLine = 0;
150 static TimeLineID sendTimeLineNextTLI = 0;
151 static bool sendTimeLineIsHistoric = false;
152 static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
153
154 /*
155 * How far have we sent WAL already? This is also advertised in
156 * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
157 */
158 static XLogRecPtr sentPtr = InvalidXLogRecPtr;
159
160 /* Buffers for constructing outgoing messages and processing reply messages. */
161 static StringInfoData output_message;
162 static StringInfoData reply_message;
163 static StringInfoData tmpbuf;
164
165 /* Timestamp of last ProcessRepliesIfAny(). */
166 static TimestampTz last_processing = 0;
167
168 /*
169 * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
170 * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
171 */
172 static TimestampTz last_reply_timestamp = 0;
173
174 /* Have we sent a heartbeat message asking for reply, since last reply? */
175 static bool waiting_for_ping_response = false;
176
177 /*
178 * While streaming WAL in Copy mode, streamingDoneSending is set to true
179 * after we have sent CopyDone. We should not send any more CopyData messages
180 * after that. streamingDoneReceiving is set to true when we receive CopyDone
181 * from the other end. When both become true, it's time to exit Copy mode.
182 */
183 static bool streamingDoneSending;
184 static bool streamingDoneReceiving;
185
186 /* Are we there yet? */
187 static bool WalSndCaughtUp = false;
188
189 /* Flags set by signal handlers for later service in main loop */
190 static volatile sig_atomic_t got_SIGUSR2 = false;
191 static volatile sig_atomic_t got_STOPPING = false;
192
193 /*
194 * This is set while we are streaming. When not set
195 * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
196 * the main loop is responsible for checking got_STOPPING and terminating when
197 * it's set (after streaming any remaining WAL).
198 */
199 static volatile sig_atomic_t replication_active = false;
200
201 static LogicalDecodingContext *logical_decoding_ctx = NULL;
202 static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
203
204 /* A sample associating a WAL location with the time it was written. */
205 typedef struct
206 {
207 XLogRecPtr lsn;
208 TimestampTz time;
209 } WalTimeSample;
210
211 /* The size of our buffer of time samples. */
212 #define LAG_TRACKER_BUFFER_SIZE 8192
213
214 /* A mechanism for tracking replication lag. */
215 static struct
216 {
217 XLogRecPtr last_lsn;
218 WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE];
219 int write_head;
220 int read_heads[NUM_SYNC_REP_WAIT_MODE];
221 WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
222 } LagTracker;
223
224 /* Signal handlers */
225 static void WalSndLastCycleHandler(SIGNAL_ARGS);
226
227 /* Prototypes for private functions */
228 typedef void (*WalSndSendDataCallback) (void);
229 static void WalSndLoop(WalSndSendDataCallback send_data);
230 static void InitWalSenderSlot(void);
231 static void WalSndKill(int code, Datum arg);
232 static void WalSndShutdown(void) pg_attribute_noreturn();
233 static void XLogSendPhysical(void);
234 static void XLogSendLogical(void);
235 static void WalSndDone(WalSndSendDataCallback send_data);
236 static XLogRecPtr GetStandbyFlushRecPtr(void);
237 static void IdentifySystem(void);
238 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
239 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
240 static void StartReplication(StartReplicationCmd *cmd);
241 static void StartLogicalReplication(StartReplicationCmd *cmd);
242 static void ProcessStandbyMessage(void);
243 static void ProcessStandbyReplyMessage(void);
244 static void ProcessStandbyHSFeedbackMessage(void);
245 static void ProcessRepliesIfAny(void);
246 static void WalSndKeepalive(bool requestReply);
247 static void WalSndKeepaliveIfNecessary(void);
248 static void WalSndCheckTimeOut(void);
249 static long WalSndComputeSleeptime(TimestampTz now);
250 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
251 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
252 static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
253 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
254 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
255 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
256 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
257
258 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
259
260
261 /* Initialize walsender process before entering the main command loop */
262 void
InitWalSender(void)263 InitWalSender(void)
264 {
265 am_cascading_walsender = RecoveryInProgress();
266
267 /* Create a per-walsender data structure in shared memory */
268 InitWalSenderSlot();
269
270 /* Set up resource owner */
271 CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
272
273 /*
274 * Let postmaster know that we're a WAL sender. Once we've declared us as
275 * a WAL sender process, postmaster will let us outlive the bgwriter and
276 * kill us last in the shutdown sequence, so we get a chance to stream all
277 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
278 * there's no going back, and we mustn't write any WAL records after this.
279 */
280 MarkPostmasterChildWalSender();
281 SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
282
283 /* Initialize empty timestamp buffer for lag tracking. */
284 memset(&LagTracker, 0, sizeof(LagTracker));
285 }
286
287 /*
288 * Clean up after an error.
289 *
290 * WAL sender processes don't use transactions like regular backends do.
291 * This function does any cleanup required after an error in a WAL sender
292 * process, similar to what transaction abort does in a regular backend.
293 */
294 void
WalSndErrorCleanup(void)295 WalSndErrorCleanup(void)
296 {
297 LWLockReleaseAll();
298 ConditionVariableCancelSleep();
299 pgstat_report_wait_end();
300
301 if (sendFile >= 0)
302 {
303 close(sendFile);
304 sendFile = -1;
305 }
306
307 if (MyReplicationSlot != NULL)
308 ReplicationSlotRelease();
309
310 ReplicationSlotCleanup();
311
312 replication_active = false;
313
314 if (got_STOPPING || got_SIGUSR2)
315 proc_exit(0);
316
317 /* Revert back to startup state */
318 WalSndSetState(WALSNDSTATE_STARTUP);
319 }
320
321 /*
322 * Handle a client's connection abort in an orderly manner.
323 */
324 static void
WalSndShutdown(void)325 WalSndShutdown(void)
326 {
327 /*
328 * Reset whereToSendOutput to prevent ereport from attempting to send any
329 * more messages to the standby.
330 */
331 if (whereToSendOutput == DestRemote)
332 whereToSendOutput = DestNone;
333
334 proc_exit(0);
335 abort(); /* keep the compiler quiet */
336 }
337
338 /*
339 * Handle the IDENTIFY_SYSTEM command.
340 */
341 static void
IdentifySystem(void)342 IdentifySystem(void)
343 {
344 char sysid[32];
345 char xloc[MAXFNAMELEN];
346 XLogRecPtr logptr;
347 char *dbname = NULL;
348 DestReceiver *dest;
349 TupOutputState *tstate;
350 TupleDesc tupdesc;
351 Datum values[4];
352 bool nulls[4];
353
354 /*
355 * Reply with a result set with one row, four columns. First col is system
356 * ID, second is timeline ID, third is current xlog location and the
357 * fourth contains the database name if we are connected to one.
358 */
359
360 snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
361 GetSystemIdentifier());
362
363 am_cascading_walsender = RecoveryInProgress();
364 if (am_cascading_walsender)
365 {
366 /* this also updates ThisTimeLineID */
367 logptr = GetStandbyFlushRecPtr();
368 }
369 else
370 logptr = GetFlushRecPtr();
371
372 snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
373
374 if (MyDatabaseId != InvalidOid)
375 {
376 MemoryContext cur = CurrentMemoryContext;
377
378 /* syscache access needs a transaction env. */
379 StartTransactionCommand();
380 /* make dbname live outside TX context */
381 MemoryContextSwitchTo(cur);
382 dbname = get_database_name(MyDatabaseId);
383 CommitTransactionCommand();
384 /* CommitTransactionCommand switches to TopMemoryContext */
385 MemoryContextSwitchTo(cur);
386 }
387
388 dest = CreateDestReceiver(DestRemoteSimple);
389 MemSet(nulls, false, sizeof(nulls));
390
391 /* need a tuple descriptor representing four columns */
392 tupdesc = CreateTemplateTupleDesc(4, false);
393 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
394 TEXTOID, -1, 0);
395 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
396 INT4OID, -1, 0);
397 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
398 TEXTOID, -1, 0);
399 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
400 TEXTOID, -1, 0);
401
402 /* prepare for projection of tuples */
403 tstate = begin_tup_output_tupdesc(dest, tupdesc);
404
405 /* column 1: system identifier */
406 values[0] = CStringGetTextDatum(sysid);
407
408 /* column 2: timeline */
409 values[1] = Int32GetDatum(ThisTimeLineID);
410
411 /* column 3: wal location */
412 values[2] = CStringGetTextDatum(xloc);
413
414 /* column 4: database name, or NULL if none */
415 if (dbname)
416 values[3] = CStringGetTextDatum(dbname);
417 else
418 nulls[3] = true;
419
420 /* send it to dest */
421 do_tup_output(tstate, values, nulls);
422
423 end_tup_output(tstate);
424 }
425
426
427 /*
428 * Handle TIMELINE_HISTORY command.
429 */
430 static void
SendTimeLineHistory(TimeLineHistoryCmd * cmd)431 SendTimeLineHistory(TimeLineHistoryCmd *cmd)
432 {
433 StringInfoData buf;
434 char histfname[MAXFNAMELEN];
435 char path[MAXPGPATH];
436 int fd;
437 off_t histfilelen;
438 off_t bytesleft;
439 Size len;
440
441 /*
442 * Reply with a result set with one row, and two columns. The first col is
443 * the name of the history file, 2nd is the contents.
444 */
445
446 TLHistoryFileName(histfname, cmd->timeline);
447 TLHistoryFilePath(path, cmd->timeline);
448
449 /* Send a RowDescription message */
450 pq_beginmessage(&buf, 'T');
451 pq_sendint16(&buf, 2); /* 2 fields */
452
453 /* first field */
454 pq_sendstring(&buf, "filename"); /* col name */
455 pq_sendint32(&buf, 0); /* table oid */
456 pq_sendint16(&buf, 0); /* attnum */
457 pq_sendint32(&buf, TEXTOID); /* type oid */
458 pq_sendint16(&buf, -1); /* typlen */
459 pq_sendint32(&buf, 0); /* typmod */
460 pq_sendint16(&buf, 0); /* format code */
461
462 /* second field */
463 pq_sendstring(&buf, "content"); /* col name */
464 pq_sendint32(&buf, 0); /* table oid */
465 pq_sendint16(&buf, 0); /* attnum */
466 /*
467 * While this is labeled as BYTEAOID, it is the same output format
468 * as TEXTOID above.
469 */
470 pq_sendint32(&buf, BYTEAOID); /* type oid */
471 pq_sendint16(&buf, -1); /* typlen */
472 pq_sendint32(&buf, 0); /* typmod */
473 pq_sendint16(&buf, 0); /* format code */
474 pq_endmessage(&buf);
475
476 /* Send a DataRow message */
477 pq_beginmessage(&buf, 'D');
478 pq_sendint16(&buf, 2); /* # of columns */
479 len = strlen(histfname);
480 pq_sendint32(&buf, len); /* col1 len */
481 pq_sendbytes(&buf, histfname, len);
482
483 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
484 if (fd < 0)
485 ereport(ERROR,
486 (errcode_for_file_access(),
487 errmsg("could not open file \"%s\": %m", path)));
488
489 /* Determine file length and send it to client */
490 histfilelen = lseek(fd, 0, SEEK_END);
491 if (histfilelen < 0)
492 ereport(ERROR,
493 (errcode_for_file_access(),
494 errmsg("could not seek to end of file \"%s\": %m", path)));
495 if (lseek(fd, 0, SEEK_SET) != 0)
496 ereport(ERROR,
497 (errcode_for_file_access(),
498 errmsg("could not seek to beginning of file \"%s\": %m", path)));
499
500 pq_sendint32(&buf, histfilelen); /* col2 len */
501
502 bytesleft = histfilelen;
503 while (bytesleft > 0)
504 {
505 PGAlignedBlock rbuf;
506 int nread;
507
508 pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
509 nread = read(fd, rbuf.data, sizeof(rbuf));
510 pgstat_report_wait_end();
511 if (nread <= 0)
512 ereport(ERROR,
513 (errcode_for_file_access(),
514 errmsg("could not read file \"%s\": %m",
515 path)));
516 pq_sendbytes(&buf, rbuf.data, nread);
517 bytesleft -= nread;
518 }
519 CloseTransientFile(fd);
520
521 pq_endmessage(&buf);
522 }
523
524 /*
525 * Handle START_REPLICATION command.
526 *
527 * At the moment, this never returns, but an ereport(ERROR) will take us back
528 * to the main loop.
529 */
530 static void
StartReplication(StartReplicationCmd * cmd)531 StartReplication(StartReplicationCmd *cmd)
532 {
533 StringInfoData buf;
534 XLogRecPtr FlushPtr;
535
536 if (ThisTimeLineID == 0)
537 ereport(ERROR,
538 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
539 errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
540
541 /*
542 * We assume here that we're logging enough information in the WAL for
543 * log-shipping, since this is checked in PostmasterMain().
544 *
545 * NOTE: wal_level can only change at shutdown, so in most cases it is
546 * difficult for there to be WAL data that we can still see that was
547 * written at wal_level='minimal'.
548 */
549
550 if (cmd->slotname)
551 {
552 ReplicationSlotAcquire(cmd->slotname, true);
553 if (SlotIsLogical(MyReplicationSlot))
554 ereport(ERROR,
555 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
556 (errmsg("cannot use a logical replication slot for physical replication"))));
557 }
558
559 /*
560 * Select the timeline. If it was given explicitly by the client, use
561 * that. Otherwise use the timeline of the last replayed record, which is
562 * kept in ThisTimeLineID.
563 */
564 if (am_cascading_walsender)
565 {
566 /* this also updates ThisTimeLineID */
567 FlushPtr = GetStandbyFlushRecPtr();
568 }
569 else
570 FlushPtr = GetFlushRecPtr();
571
572 if (cmd->timeline != 0)
573 {
574 XLogRecPtr switchpoint;
575
576 sendTimeLine = cmd->timeline;
577 if (sendTimeLine == ThisTimeLineID)
578 {
579 sendTimeLineIsHistoric = false;
580 sendTimeLineValidUpto = InvalidXLogRecPtr;
581 }
582 else
583 {
584 List *timeLineHistory;
585
586 sendTimeLineIsHistoric = true;
587
588 /*
589 * Check that the timeline the client requested exists, and the
590 * requested start location is on that timeline.
591 */
592 timeLineHistory = readTimeLineHistory(ThisTimeLineID);
593 switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
594 &sendTimeLineNextTLI);
595 list_free_deep(timeLineHistory);
596
597 /*
598 * Found the requested timeline in the history. Check that
599 * requested startpoint is on that timeline in our history.
600 *
601 * This is quite loose on purpose. We only check that we didn't
602 * fork off the requested timeline before the switchpoint. We
603 * don't check that we switched *to* it before the requested
604 * starting point. This is because the client can legitimately
605 * request to start replication from the beginning of the WAL
606 * segment that contains switchpoint, but on the new timeline, so
607 * that it doesn't end up with a partial segment. If you ask for
608 * too old a starting point, you'll get an error later when we
609 * fail to find the requested WAL segment in pg_wal.
610 *
611 * XXX: we could be more strict here and only allow a startpoint
612 * that's older than the switchpoint, if it's still in the same
613 * WAL segment.
614 */
615 if (!XLogRecPtrIsInvalid(switchpoint) &&
616 switchpoint < cmd->startpoint)
617 {
618 ereport(ERROR,
619 (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
620 (uint32) (cmd->startpoint >> 32),
621 (uint32) (cmd->startpoint),
622 cmd->timeline),
623 errdetail("This server's history forked from timeline %u at %X/%X.",
624 cmd->timeline,
625 (uint32) (switchpoint >> 32),
626 (uint32) (switchpoint))));
627 }
628 sendTimeLineValidUpto = switchpoint;
629 }
630 }
631 else
632 {
633 sendTimeLine = ThisTimeLineID;
634 sendTimeLineValidUpto = InvalidXLogRecPtr;
635 sendTimeLineIsHistoric = false;
636 }
637
638 streamingDoneSending = streamingDoneReceiving = false;
639
640 /* If there is nothing to stream, don't even enter COPY mode */
641 if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
642 {
643 /*
644 * When we first start replication the standby will be behind the
645 * primary. For some applications, for example synchronous
646 * replication, it is important to have a clear state for this initial
647 * catchup mode, so we can trigger actions when we change streaming
648 * state later. We may stay in this state for a long time, which is
649 * exactly why we want to be able to monitor whether or not we are
650 * still here.
651 */
652 WalSndSetState(WALSNDSTATE_CATCHUP);
653
654 /* Send a CopyBothResponse message, and start streaming */
655 pq_beginmessage(&buf, 'W');
656 pq_sendbyte(&buf, 0);
657 pq_sendint16(&buf, 0);
658 pq_endmessage(&buf);
659 pq_flush();
660
661 /*
662 * Don't allow a request to stream from a future point in WAL that
663 * hasn't been flushed to disk in this server yet.
664 */
665 if (FlushPtr < cmd->startpoint)
666 {
667 ereport(ERROR,
668 (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
669 (uint32) (cmd->startpoint >> 32),
670 (uint32) (cmd->startpoint),
671 (uint32) (FlushPtr >> 32),
672 (uint32) (FlushPtr))));
673 }
674
675 /* Start streaming from the requested point */
676 sentPtr = cmd->startpoint;
677
678 /* Initialize shared memory status, too */
679 SpinLockAcquire(&MyWalSnd->mutex);
680 MyWalSnd->sentPtr = sentPtr;
681 SpinLockRelease(&MyWalSnd->mutex);
682
683 SyncRepInitConfig();
684
685 /* Main loop of walsender */
686 replication_active = true;
687
688 WalSndLoop(XLogSendPhysical);
689
690 replication_active = false;
691 if (got_STOPPING)
692 proc_exit(0);
693 WalSndSetState(WALSNDSTATE_STARTUP);
694
695 Assert(streamingDoneSending && streamingDoneReceiving);
696 }
697
698 if (cmd->slotname)
699 ReplicationSlotRelease();
700
701 /*
702 * Copy is finished now. Send a single-row result set indicating the next
703 * timeline.
704 */
705 if (sendTimeLineIsHistoric)
706 {
707 char startpos_str[8 + 1 + 8 + 1];
708 DestReceiver *dest;
709 TupOutputState *tstate;
710 TupleDesc tupdesc;
711 Datum values[2];
712 bool nulls[2];
713
714 snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
715 (uint32) (sendTimeLineValidUpto >> 32),
716 (uint32) sendTimeLineValidUpto);
717
718 dest = CreateDestReceiver(DestRemoteSimple);
719 MemSet(nulls, false, sizeof(nulls));
720
721 /*
722 * Need a tuple descriptor representing two columns. int8 may seem
723 * like a surprising data type for this, but in theory int4 would not
724 * be wide enough for this, as TimeLineID is unsigned.
725 */
726 tupdesc = CreateTemplateTupleDesc(2, false);
727 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
728 INT8OID, -1, 0);
729 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
730 TEXTOID, -1, 0);
731
732 /* prepare for projection of tuple */
733 tstate = begin_tup_output_tupdesc(dest, tupdesc);
734
735 values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
736 values[1] = CStringGetTextDatum(startpos_str);
737
738 /* send it to dest */
739 do_tup_output(tstate, values, nulls);
740
741 end_tup_output(tstate);
742 }
743
744 /* Send CommandComplete message */
745 pq_puttextmessage('C', "START_STREAMING");
746 }
747
748 /*
749 * read_page callback for logical decoding contexts, as a walsender process.
750 *
751 * Inside the walsender we can do better than logical_read_local_xlog_page,
752 * which has to do a plain sleep/busy loop, because the walsender's latch gets
753 * set every time WAL is flushed.
754 */
755 static int
logical_read_xlog_page(XLogReaderState * state,XLogRecPtr targetPagePtr,int reqLen,XLogRecPtr targetRecPtr,char * cur_page,TimeLineID * pageTLI)756 logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
757 XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
758 {
759 XLogRecPtr flushptr;
760 int count;
761
762 XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
763 sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
764 sendTimeLine = state->currTLI;
765 sendTimeLineValidUpto = state->currTLIValidUntil;
766 sendTimeLineNextTLI = state->nextTLI;
767
768 /* make sure we have enough WAL available */
769 flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
770
771 /* fail if not (implies we are going to shut down) */
772 if (flushptr < targetPagePtr + reqLen)
773 return -1;
774
775 if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
776 count = XLOG_BLCKSZ; /* more than one block available */
777 else
778 count = flushptr - targetPagePtr; /* part of the page available */
779
780 /* now actually read the data, we know it's there */
781 XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
782
783 return count;
784 }
785
786 /*
787 * Process extra options given to CREATE_REPLICATION_SLOT.
788 */
789 static void
parseCreateReplSlotOptions(CreateReplicationSlotCmd * cmd,bool * reserve_wal,CRSSnapshotAction * snapshot_action)790 parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
791 bool *reserve_wal,
792 CRSSnapshotAction *snapshot_action)
793 {
794 ListCell *lc;
795 bool snapshot_action_given = false;
796 bool reserve_wal_given = false;
797
798 /* Parse options */
799 foreach(lc, cmd->options)
800 {
801 DefElem *defel = (DefElem *) lfirst(lc);
802
803 if (strcmp(defel->defname, "export_snapshot") == 0)
804 {
805 if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
806 ereport(ERROR,
807 (errcode(ERRCODE_SYNTAX_ERROR),
808 errmsg("conflicting or redundant options")));
809
810 snapshot_action_given = true;
811 *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
812 CRS_NOEXPORT_SNAPSHOT;
813 }
814 else if (strcmp(defel->defname, "use_snapshot") == 0)
815 {
816 if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
817 ereport(ERROR,
818 (errcode(ERRCODE_SYNTAX_ERROR),
819 errmsg("conflicting or redundant options")));
820
821 snapshot_action_given = true;
822 *snapshot_action = CRS_USE_SNAPSHOT;
823 }
824 else if (strcmp(defel->defname, "reserve_wal") == 0)
825 {
826 if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
827 ereport(ERROR,
828 (errcode(ERRCODE_SYNTAX_ERROR),
829 errmsg("conflicting or redundant options")));
830
831 reserve_wal_given = true;
832 *reserve_wal = true;
833 }
834 else
835 elog(ERROR, "unrecognized option: %s", defel->defname);
836 }
837 }
838
839 /*
840 * Create a new replication slot.
841 */
842 static void
CreateReplicationSlot(CreateReplicationSlotCmd * cmd)843 CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
844 {
845 const char *snapshot_name = NULL;
846 char xloc[MAXFNAMELEN];
847 char *slot_name;
848 bool reserve_wal = false;
849 CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
850 DestReceiver *dest;
851 TupOutputState *tstate;
852 TupleDesc tupdesc;
853 Datum values[4];
854 bool nulls[4];
855
856 Assert(!MyReplicationSlot);
857
858 parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
859
860 /* setup state for XLogReadPage */
861 sendTimeLineIsHistoric = false;
862 sendTimeLine = ThisTimeLineID;
863
864 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
865 {
866 ReplicationSlotCreate(cmd->slotname, false,
867 cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT);
868 }
869 else
870 {
871 CheckLogicalDecodingRequirements();
872
873 /*
874 * Initially create persistent slot as ephemeral - that allows us to
875 * nicely handle errors during initialization because it'll get
876 * dropped if this transaction fails. We'll make it persistent at the
877 * end. Temporary slots can be created as temporary from beginning as
878 * they get dropped on error as well.
879 */
880 ReplicationSlotCreate(cmd->slotname, true,
881 cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
882 }
883
884 if (cmd->kind == REPLICATION_KIND_LOGICAL)
885 {
886 LogicalDecodingContext *ctx;
887 bool need_full_snapshot = false;
888
889 /*
890 * Do options check early so that we can bail before calling the
891 * DecodingContextFindStartpoint which can take long time.
892 */
893 if (snapshot_action == CRS_EXPORT_SNAPSHOT)
894 {
895 if (IsTransactionBlock())
896 ereport(ERROR,
897 (errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
898 "must not be called inside a transaction")));
899
900 need_full_snapshot = true;
901 }
902 else if (snapshot_action == CRS_USE_SNAPSHOT)
903 {
904 if (!IsTransactionBlock())
905 ereport(ERROR,
906 (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
907 "must be called inside a transaction")));
908
909 if (XactIsoLevel != XACT_REPEATABLE_READ)
910 ereport(ERROR,
911 (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
912 "must be called in REPEATABLE READ isolation mode transaction")));
913
914 if (FirstSnapshotSet)
915 ereport(ERROR,
916 (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
917 "must be called before any query")));
918
919 if (IsSubTransaction())
920 ereport(ERROR,
921 (errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
922 "must not be called in a subtransaction")));
923
924 need_full_snapshot = true;
925 }
926
927 ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
928 logical_read_xlog_page,
929 WalSndPrepareWrite, WalSndWriteData,
930 WalSndUpdateProgress);
931
932 /*
933 * Signal that we don't need the timeout mechanism. We're just
934 * creating the replication slot and don't yet accept feedback
935 * messages or send keepalives. As we possibly need to wait for
936 * further WAL the walsender would otherwise possibly be killed too
937 * soon.
938 */
939 last_reply_timestamp = 0;
940
941 /* build initial snapshot, might take a while */
942 DecodingContextFindStartpoint(ctx);
943
944 /*
945 * Export or use the snapshot if we've been asked to do so.
946 *
947 * NB. We will convert the snapbuild.c kind of snapshot to normal
948 * snapshot when doing this.
949 */
950 if (snapshot_action == CRS_EXPORT_SNAPSHOT)
951 {
952 snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
953 }
954 else if (snapshot_action == CRS_USE_SNAPSHOT)
955 {
956 Snapshot snap;
957
958 snap = SnapBuildInitialSnapshot(ctx->snapshot_builder);
959 RestoreTransactionSnapshot(snap, MyProc);
960 }
961
962 /* don't need the decoding context anymore */
963 FreeDecodingContext(ctx);
964
965 if (!cmd->temporary)
966 ReplicationSlotPersist();
967 }
968 else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
969 {
970 ReplicationSlotReserveWal();
971
972 ReplicationSlotMarkDirty();
973
974 /* Write this slot to disk if it's a permanent one. */
975 if (!cmd->temporary)
976 ReplicationSlotSave();
977 }
978
979 snprintf(xloc, sizeof(xloc), "%X/%X",
980 (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
981 (uint32) MyReplicationSlot->data.confirmed_flush);
982
983 dest = CreateDestReceiver(DestRemoteSimple);
984 MemSet(nulls, false, sizeof(nulls));
985
986 /*----------
987 * Need a tuple descriptor representing four columns:
988 * - first field: the slot name
989 * - second field: LSN at which we became consistent
990 * - third field: exported snapshot's name
991 * - fourth field: output plugin
992 *----------
993 */
994 tupdesc = CreateTemplateTupleDesc(4, false);
995 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
996 TEXTOID, -1, 0);
997 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
998 TEXTOID, -1, 0);
999 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1000 TEXTOID, -1, 0);
1001 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1002 TEXTOID, -1, 0);
1003
1004 /* prepare for projection of tuples */
1005 tstate = begin_tup_output_tupdesc(dest, tupdesc);
1006
1007 /* slot_name */
1008 slot_name = NameStr(MyReplicationSlot->data.name);
1009 values[0] = CStringGetTextDatum(slot_name);
1010
1011 /* consistent wal location */
1012 values[1] = CStringGetTextDatum(xloc);
1013
1014 /* snapshot name, or NULL if none */
1015 if (snapshot_name != NULL)
1016 values[2] = CStringGetTextDatum(snapshot_name);
1017 else
1018 nulls[2] = true;
1019
1020 /* plugin, or NULL if none */
1021 if (cmd->plugin != NULL)
1022 values[3] = CStringGetTextDatum(cmd->plugin);
1023 else
1024 nulls[3] = true;
1025
1026 /* send it to dest */
1027 do_tup_output(tstate, values, nulls);
1028 end_tup_output(tstate);
1029
1030 ReplicationSlotRelease();
1031 }
1032
1033 /*
1034 * Get rid of a replication slot that is no longer wanted.
1035 */
1036 static void
DropReplicationSlot(DropReplicationSlotCmd * cmd)1037 DropReplicationSlot(DropReplicationSlotCmd *cmd)
1038 {
1039 ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1040 EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1041 }
1042
1043 /*
1044 * Load previously initiated logical slot and prepare for sending data (via
1045 * WalSndLoop).
1046 */
1047 static void
StartLogicalReplication(StartReplicationCmd * cmd)1048 StartLogicalReplication(StartReplicationCmd *cmd)
1049 {
1050 StringInfoData buf;
1051
1052 /* make sure that our requirements are still fulfilled */
1053 CheckLogicalDecodingRequirements();
1054
1055 Assert(!MyReplicationSlot);
1056
1057 ReplicationSlotAcquire(cmd->slotname, true);
1058
1059 /*
1060 * Force a disconnect, so that the decoding code doesn't need to care
1061 * about an eventual switch from running in recovery, to running in a
1062 * normal environment. Client code is expected to handle reconnects.
1063 */
1064 if (am_cascading_walsender && !RecoveryInProgress())
1065 {
1066 ereport(LOG,
1067 (errmsg("terminating walsender process after promotion")));
1068 got_STOPPING = true;
1069 }
1070
1071 /*
1072 * Create our decoding context, making it start at the previously ack'ed
1073 * position.
1074 *
1075 * Do this before sending CopyBoth, so that any errors are reported early.
1076 */
1077 logical_decoding_ctx =
1078 CreateDecodingContext(cmd->startpoint, cmd->options, false,
1079 logical_read_xlog_page,
1080 WalSndPrepareWrite, WalSndWriteData,
1081 WalSndUpdateProgress);
1082
1083
1084 WalSndSetState(WALSNDSTATE_CATCHUP);
1085
1086 /* Send a CopyBothResponse message, and start streaming */
1087 pq_beginmessage(&buf, 'W');
1088 pq_sendbyte(&buf, 0);
1089 pq_sendint16(&buf, 0);
1090 pq_endmessage(&buf);
1091 pq_flush();
1092
1093
1094 /* Start reading WAL from the oldest required WAL. */
1095 logical_startptr = MyReplicationSlot->data.restart_lsn;
1096
1097 /*
1098 * Report the location after which we'll send out further commits as the
1099 * current sentPtr.
1100 */
1101 sentPtr = MyReplicationSlot->data.confirmed_flush;
1102
1103 /* Also update the sent position status in shared memory */
1104 SpinLockAcquire(&MyWalSnd->mutex);
1105 MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn;
1106 SpinLockRelease(&MyWalSnd->mutex);
1107
1108 replication_active = true;
1109
1110 SyncRepInitConfig();
1111
1112 /* Main loop of walsender */
1113 WalSndLoop(XLogSendLogical);
1114
1115 FreeDecodingContext(logical_decoding_ctx);
1116 ReplicationSlotRelease();
1117
1118 replication_active = false;
1119 if (got_STOPPING)
1120 proc_exit(0);
1121 WalSndSetState(WALSNDSTATE_STARTUP);
1122
1123 /* Get out of COPY mode (CommandComplete). */
1124 EndCommand("COPY 0", DestRemote);
1125 }
1126
1127 /*
1128 * LogicalDecodingContext 'prepare_write' callback.
1129 *
1130 * Prepare a write into a StringInfo.
1131 *
1132 * Don't do anything lasting in here, it's quite possible that nothing will be done
1133 * with the data.
1134 */
1135 static void
WalSndPrepareWrite(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid,bool last_write)1136 WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
1137 {
1138 /* can't have sync rep confused by sending the same LSN several times */
1139 if (!last_write)
1140 lsn = InvalidXLogRecPtr;
1141
1142 resetStringInfo(ctx->out);
1143
1144 pq_sendbyte(ctx->out, 'w');
1145 pq_sendint64(ctx->out, lsn); /* dataStart */
1146 pq_sendint64(ctx->out, lsn); /* walEnd */
1147
1148 /*
1149 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1150 * reserve space here.
1151 */
1152 pq_sendint64(ctx->out, 0); /* sendtime */
1153 }
1154
1155 /*
1156 * LogicalDecodingContext 'write' callback.
1157 *
1158 * Actually write out data previously prepared by WalSndPrepareWrite out to
1159 * the network. Take as long as needed, but process replies from the other
1160 * side and check timeouts during that.
1161 */
1162 static void
WalSndWriteData(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid,bool last_write)1163 WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
1164 bool last_write)
1165 {
1166 TimestampTz now;
1167
1168 /*
1169 * Fill the send timestamp last, so that it is taken as late as possible.
1170 * This is somewhat ugly, but the protocol is set as it's already used for
1171 * several releases by streaming physical replication.
1172 */
1173 resetStringInfo(&tmpbuf);
1174 now = GetCurrentTimestamp();
1175 pq_sendint64(&tmpbuf, now);
1176 memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1177 tmpbuf.data, sizeof(int64));
1178
1179 /* output previously gathered data in a CopyData packet */
1180 pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1181
1182 CHECK_FOR_INTERRUPTS();
1183
1184 /* Try to flush pending output to the client */
1185 if (pq_flush_if_writable() != 0)
1186 WalSndShutdown();
1187
1188 /* Try taking fast path unless we get too close to walsender timeout. */
1189 if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
1190 wal_sender_timeout / 2) &&
1191 !pq_is_send_pending())
1192 {
1193 return;
1194 }
1195
1196 /* If we have pending write here, go to slow path */
1197 for (;;)
1198 {
1199 int wakeEvents;
1200 long sleeptime;
1201
1202 /* Check for input from the client */
1203 ProcessRepliesIfAny();
1204
1205 /* die if timeout was reached */
1206 WalSndCheckTimeOut();
1207
1208 /* Send keepalive if the time has come */
1209 WalSndKeepaliveIfNecessary();
1210
1211 if (!pq_is_send_pending())
1212 break;
1213
1214 sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1215
1216 wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1217 WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
1218
1219 /* Sleep until something happens or we time out */
1220 WaitLatchOrSocket(MyLatch, wakeEvents,
1221 MyProcPort->sock, sleeptime,
1222 WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1223
1224 /*
1225 * Emergency bailout if postmaster has died. This is to avoid the
1226 * necessity for manual cleanup of all postmaster children.
1227 */
1228 if (!PostmasterIsAlive())
1229 exit(1);
1230
1231 /* Clear any already-pending wakeups */
1232 ResetLatch(MyLatch);
1233
1234 CHECK_FOR_INTERRUPTS();
1235
1236 /* Process any requests or signals received recently */
1237 if (ConfigReloadPending)
1238 {
1239 ConfigReloadPending = false;
1240 ProcessConfigFile(PGC_SIGHUP);
1241 SyncRepInitConfig();
1242 }
1243
1244 /* Try to flush pending output to the client */
1245 if (pq_flush_if_writable() != 0)
1246 WalSndShutdown();
1247 }
1248
1249 /* reactivate latch so WalSndLoop knows to continue */
1250 SetLatch(MyLatch);
1251 }
1252
1253 /*
1254 * LogicalDecodingContext 'update_progress' callback.
1255 *
1256 * Write the current position to the lag tracker (see XLogSendPhysical).
1257 */
1258 static void
WalSndUpdateProgress(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid)1259 WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
1260 {
1261 static TimestampTz sendTime = 0;
1262 TimestampTz now = GetCurrentTimestamp();
1263
1264 /*
1265 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1266 * avoid flooding the lag tracker when we commit frequently.
1267 */
1268 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1269 if (!TimestampDifferenceExceeds(sendTime, now,
1270 WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1271 return;
1272
1273 LagTrackerWrite(lsn, now);
1274 sendTime = now;
1275 }
1276
1277 /*
1278 * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1279 *
1280 * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1281 * if we detect a shutdown request (either from postmaster or client)
1282 * we will return early, so caller must always check.
1283 */
1284 static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)1285 WalSndWaitForWal(XLogRecPtr loc)
1286 {
1287 int wakeEvents;
1288 static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1289
1290 /*
1291 * Fast path to avoid acquiring the spinlock in case we already know we
1292 * have enough WAL available. This is particularly interesting if we're
1293 * far behind.
1294 */
1295 if (RecentFlushPtr != InvalidXLogRecPtr &&
1296 loc <= RecentFlushPtr)
1297 return RecentFlushPtr;
1298
1299 /* Get a more recent flush pointer. */
1300 if (!RecoveryInProgress())
1301 RecentFlushPtr = GetFlushRecPtr();
1302 else
1303 RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1304
1305 for (;;)
1306 {
1307 long sleeptime;
1308
1309 /*
1310 * Emergency bailout if postmaster has died. This is to avoid the
1311 * necessity for manual cleanup of all postmaster children.
1312 */
1313 if (!PostmasterIsAlive())
1314 exit(1);
1315
1316 /* Clear any already-pending wakeups */
1317 ResetLatch(MyLatch);
1318
1319 CHECK_FOR_INTERRUPTS();
1320
1321 /* Process any requests or signals received recently */
1322 if (ConfigReloadPending)
1323 {
1324 ConfigReloadPending = false;
1325 ProcessConfigFile(PGC_SIGHUP);
1326 SyncRepInitConfig();
1327 }
1328
1329 /* Check for input from the client */
1330 ProcessRepliesIfAny();
1331
1332 /*
1333 * If we're shutting down, trigger pending WAL to be written out,
1334 * otherwise we'd possibly end up waiting for WAL that never gets
1335 * written, because walwriter has shut down already.
1336 */
1337 if (got_STOPPING)
1338 XLogBackgroundFlush();
1339
1340 /* Update our idea of the currently flushed position. */
1341 if (!RecoveryInProgress())
1342 RecentFlushPtr = GetFlushRecPtr();
1343 else
1344 RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1345
1346 /*
1347 * If postmaster asked us to stop, don't wait anymore.
1348 *
1349 * It's important to do this check after the recomputation of
1350 * RecentFlushPtr, so we can send all remaining data before shutting
1351 * down.
1352 */
1353 if (got_STOPPING)
1354 break;
1355
1356 /*
1357 * We only send regular messages to the client for full decoded
1358 * transactions, but a synchronous replication and walsender shutdown
1359 * possibly are waiting for a later location. So, before sleeping, we
1360 * send a ping containing the flush location. If the receiver is
1361 * otherwise idle, this keepalive will trigger a reply. Processing the
1362 * reply will update these MyWalSnd locations.
1363 */
1364 if (MyWalSnd->flush < sentPtr &&
1365 MyWalSnd->write < sentPtr &&
1366 !waiting_for_ping_response)
1367 WalSndKeepalive(false);
1368
1369 /* check whether we're done */
1370 if (loc <= RecentFlushPtr)
1371 break;
1372
1373 /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1374 WalSndCaughtUp = true;
1375
1376 /*
1377 * Try to flush any pending output to the client.
1378 */
1379 if (pq_flush_if_writable() != 0)
1380 WalSndShutdown();
1381
1382 /*
1383 * If we have received CopyDone from the client, sent CopyDone
1384 * ourselves, and the output buffer is empty, it's time to exit
1385 * streaming, so fail the current WAL fetch request.
1386 */
1387 if (streamingDoneReceiving && streamingDoneSending &&
1388 !pq_is_send_pending())
1389 break;
1390
1391 /* die if timeout was reached */
1392 WalSndCheckTimeOut();
1393
1394 /* Send keepalive if the time has come */
1395 WalSndKeepaliveIfNecessary();
1396
1397 /*
1398 * Sleep until something happens or we time out. Also wait for the
1399 * socket becoming writable, if there's still pending output.
1400 * Otherwise we might sit on sendable output data while waiting for
1401 * new WAL to be generated. (But if we have nothing to send, we don't
1402 * want to wake on socket-writable.)
1403 */
1404 sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1405
1406 wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1407 WL_SOCKET_READABLE | WL_TIMEOUT;
1408
1409 if (pq_is_send_pending())
1410 wakeEvents |= WL_SOCKET_WRITEABLE;
1411
1412 WaitLatchOrSocket(MyLatch, wakeEvents,
1413 MyProcPort->sock, sleeptime,
1414 WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1415 }
1416
1417 /* reactivate latch so WalSndLoop knows to continue */
1418 SetLatch(MyLatch);
1419 return RecentFlushPtr;
1420 }
1421
1422 /*
1423 * Execute an incoming replication command.
1424 *
1425 * Returns true if the cmd_string was recognized as WalSender command, false
1426 * if not.
1427 */
1428 bool
exec_replication_command(const char * cmd_string)1429 exec_replication_command(const char *cmd_string)
1430 {
1431 int parse_rc;
1432 Node *cmd_node;
1433 MemoryContext cmd_context;
1434 MemoryContext old_context;
1435
1436 /*
1437 * If WAL sender has been told that shutdown is getting close, switch its
1438 * status accordingly to handle the next replication commands correctly.
1439 */
1440 if (got_STOPPING)
1441 WalSndSetState(WALSNDSTATE_STOPPING);
1442
1443 /*
1444 * Throw error if in stopping mode. We need prevent commands that could
1445 * generate WAL while the shutdown checkpoint is being written. To be
1446 * safe, we just prohibit all new commands.
1447 */
1448 if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1449 ereport(ERROR,
1450 (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1451
1452 /*
1453 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1454 * command arrives. Clean up the old stuff if there's anything.
1455 */
1456 SnapBuildClearExportedSnapshot();
1457
1458 CHECK_FOR_INTERRUPTS();
1459
1460 /*
1461 * Parse the command.
1462 */
1463 cmd_context = AllocSetContextCreate(CurrentMemoryContext,
1464 "Replication command context",
1465 ALLOCSET_DEFAULT_SIZES);
1466 old_context = MemoryContextSwitchTo(cmd_context);
1467
1468 replication_scanner_init(cmd_string);
1469 parse_rc = replication_yyparse();
1470 if (parse_rc != 0)
1471 ereport(ERROR,
1472 (errcode(ERRCODE_SYNTAX_ERROR),
1473 errmsg_internal("replication command parser returned %d",
1474 parse_rc)));
1475 replication_scanner_finish();
1476
1477 cmd_node = replication_parse_result;
1478
1479 /*
1480 * If it's a SQL command, just clean up our mess and return false; the
1481 * caller will take care of executing it.
1482 */
1483 if (IsA(cmd_node, SQLCmd))
1484 {
1485 if (MyDatabaseId == InvalidOid)
1486 ereport(ERROR,
1487 (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1488
1489 MemoryContextSwitchTo(old_context);
1490 MemoryContextDelete(cmd_context);
1491
1492 /* Tell the caller that this wasn't a WalSender command. */
1493 return false;
1494 }
1495
1496 /*
1497 * Report query to various monitoring facilities. For this purpose, we
1498 * report replication commands just like SQL commands.
1499 */
1500 debug_query_string = cmd_string;
1501
1502 pgstat_report_activity(STATE_RUNNING, cmd_string);
1503
1504 /*
1505 * Log replication command if log_replication_commands is enabled. Even
1506 * when it's disabled, log the command with DEBUG1 level for backward
1507 * compatibility.
1508 */
1509 ereport(log_replication_commands ? LOG : DEBUG1,
1510 (errmsg("received replication command: %s", cmd_string)));
1511
1512 /*
1513 * Disallow replication commands in aborted transaction blocks.
1514 */
1515 if (IsAbortedTransactionBlockState())
1516 ereport(ERROR,
1517 (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1518 errmsg("current transaction is aborted, "
1519 "commands ignored until end of transaction block")));
1520
1521 CHECK_FOR_INTERRUPTS();
1522
1523 /*
1524 * Allocate buffers that will be used for each outgoing and incoming
1525 * message. We do this just once per command to reduce palloc overhead.
1526 */
1527 initStringInfo(&output_message);
1528 initStringInfo(&reply_message);
1529 initStringInfo(&tmpbuf);
1530
1531 switch (cmd_node->type)
1532 {
1533 case T_IdentifySystemCmd:
1534 IdentifySystem();
1535 break;
1536
1537 case T_BaseBackupCmd:
1538 PreventInTransactionBlock(true, "BASE_BACKUP");
1539 SendBaseBackup((BaseBackupCmd *) cmd_node);
1540 break;
1541
1542 case T_CreateReplicationSlotCmd:
1543 CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
1544 break;
1545
1546 case T_DropReplicationSlotCmd:
1547 DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
1548 break;
1549
1550 case T_StartReplicationCmd:
1551 {
1552 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1553
1554 PreventInTransactionBlock(true, "START_REPLICATION");
1555
1556 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1557 StartReplication(cmd);
1558 else
1559 StartLogicalReplication(cmd);
1560 break;
1561 }
1562
1563 case T_TimeLineHistoryCmd:
1564 PreventInTransactionBlock(true, "TIMELINE_HISTORY");
1565 SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
1566 break;
1567
1568 case T_VariableShowStmt:
1569 {
1570 DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
1571 VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1572
1573 /* syscache access needs a transaction environment */
1574 StartTransactionCommand();
1575 GetPGVariable(n->name, dest);
1576 CommitTransactionCommand();
1577 }
1578 break;
1579
1580 default:
1581 elog(ERROR, "unrecognized replication command node tag: %u",
1582 cmd_node->type);
1583 }
1584
1585 /* done */
1586 MemoryContextSwitchTo(old_context);
1587 MemoryContextDelete(cmd_context);
1588
1589 /* Send CommandComplete message */
1590 EndCommand("SELECT", DestRemote);
1591
1592 /* Report to pgstat that this process is now idle */
1593 pgstat_report_activity(STATE_IDLE, NULL);
1594 debug_query_string = NULL;
1595
1596 return true;
1597 }
1598
1599 /*
1600 * Process any incoming messages while streaming. Also checks if the remote
1601 * end has closed the connection.
1602 */
1603 static void
ProcessRepliesIfAny(void)1604 ProcessRepliesIfAny(void)
1605 {
1606 unsigned char firstchar;
1607 int r;
1608 bool received = false;
1609
1610 last_processing = GetCurrentTimestamp();
1611
1612 /*
1613 * If we already received a CopyDone from the frontend, any subsequent
1614 * message is the beginning of a new command, and should be processed in
1615 * the main processing loop.
1616 */
1617 while (!streamingDoneReceiving)
1618 {
1619 pq_startmsgread();
1620 r = pq_getbyte_if_available(&firstchar);
1621 if (r < 0)
1622 {
1623 /* unexpected error or EOF */
1624 ereport(COMMERROR,
1625 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1626 errmsg("unexpected EOF on standby connection")));
1627 proc_exit(0);
1628 }
1629 if (r == 0)
1630 {
1631 /* no data available without blocking */
1632 pq_endmsgread();
1633 break;
1634 }
1635
1636 /* Read the message contents */
1637 resetStringInfo(&reply_message);
1638 if (pq_getmessage(&reply_message, 0))
1639 {
1640 ereport(COMMERROR,
1641 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1642 errmsg("unexpected EOF on standby connection")));
1643 proc_exit(0);
1644 }
1645
1646 /* Handle the very limited subset of commands expected in this phase */
1647 switch (firstchar)
1648 {
1649 /*
1650 * 'd' means a standby reply wrapped in a CopyData packet.
1651 */
1652 case 'd':
1653 ProcessStandbyMessage();
1654 received = true;
1655 break;
1656
1657 /*
1658 * CopyDone means the standby requested to finish streaming.
1659 * Reply with CopyDone, if we had not sent that already.
1660 */
1661 case 'c':
1662 if (!streamingDoneSending)
1663 {
1664 pq_putmessage_noblock('c', NULL, 0);
1665 streamingDoneSending = true;
1666 }
1667
1668 streamingDoneReceiving = true;
1669 received = true;
1670 break;
1671
1672 /*
1673 * 'X' means that the standby is closing down the socket.
1674 */
1675 case 'X':
1676 proc_exit(0);
1677
1678 default:
1679 ereport(FATAL,
1680 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1681 errmsg("invalid standby message type \"%c\"",
1682 firstchar)));
1683 }
1684 }
1685
1686 /*
1687 * Save the last reply timestamp if we've received at least one reply.
1688 */
1689 if (received)
1690 {
1691 last_reply_timestamp = last_processing;
1692 waiting_for_ping_response = false;
1693 }
1694 }
1695
1696 /*
1697 * Process a status update message received from standby.
1698 */
1699 static void
ProcessStandbyMessage(void)1700 ProcessStandbyMessage(void)
1701 {
1702 char msgtype;
1703
1704 /*
1705 * Check message type from the first byte.
1706 */
1707 msgtype = pq_getmsgbyte(&reply_message);
1708
1709 switch (msgtype)
1710 {
1711 case 'r':
1712 ProcessStandbyReplyMessage();
1713 break;
1714
1715 case 'h':
1716 ProcessStandbyHSFeedbackMessage();
1717 break;
1718
1719 default:
1720 ereport(COMMERROR,
1721 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1722 errmsg("unexpected message type \"%c\"", msgtype)));
1723 proc_exit(0);
1724 }
1725 }
1726
1727 /*
1728 * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1729 */
1730 static void
PhysicalConfirmReceivedLocation(XLogRecPtr lsn)1731 PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
1732 {
1733 bool changed = false;
1734 ReplicationSlot *slot = MyReplicationSlot;
1735
1736 Assert(lsn != InvalidXLogRecPtr);
1737 SpinLockAcquire(&slot->mutex);
1738 if (slot->data.restart_lsn != lsn)
1739 {
1740 changed = true;
1741 slot->data.restart_lsn = lsn;
1742 }
1743 SpinLockRelease(&slot->mutex);
1744
1745 if (changed)
1746 {
1747 ReplicationSlotMarkDirty();
1748 ReplicationSlotsComputeRequiredLSN();
1749 }
1750
1751 /*
1752 * One could argue that the slot should be saved to disk now, but that'd
1753 * be energy wasted - the worst lost information can do here is give us
1754 * wrong information in a statistics view - we'll just potentially be more
1755 * conservative in removing files.
1756 */
1757 }
1758
1759 /*
1760 * Regular reply from standby advising of WAL locations on standby server.
1761 */
1762 static void
ProcessStandbyReplyMessage(void)1763 ProcessStandbyReplyMessage(void)
1764 {
1765 XLogRecPtr writePtr,
1766 flushPtr,
1767 applyPtr;
1768 bool replyRequested;
1769 TimeOffset writeLag,
1770 flushLag,
1771 applyLag;
1772 bool clearLagTimes;
1773 TimestampTz now;
1774
1775 static bool fullyAppliedLastTime = false;
1776
1777 /* the caller already consumed the msgtype byte */
1778 writePtr = pq_getmsgint64(&reply_message);
1779 flushPtr = pq_getmsgint64(&reply_message);
1780 applyPtr = pq_getmsgint64(&reply_message);
1781 (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1782 replyRequested = pq_getmsgbyte(&reply_message);
1783
1784 elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1785 (uint32) (writePtr >> 32), (uint32) writePtr,
1786 (uint32) (flushPtr >> 32), (uint32) flushPtr,
1787 (uint32) (applyPtr >> 32), (uint32) applyPtr,
1788 replyRequested ? " (reply requested)" : "");
1789
1790 /* See if we can compute the round-trip lag for these positions. */
1791 now = GetCurrentTimestamp();
1792 writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1793 flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1794 applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1795
1796 /*
1797 * If the standby reports that it has fully replayed the WAL in two
1798 * consecutive reply messages, then the second such message must result
1799 * from wal_receiver_status_interval expiring on the standby. This is a
1800 * convenient time to forget the lag times measured when it last
1801 * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1802 * until more WAL traffic arrives.
1803 */
1804 clearLagTimes = false;
1805 if (applyPtr == sentPtr)
1806 {
1807 if (fullyAppliedLastTime)
1808 clearLagTimes = true;
1809 fullyAppliedLastTime = true;
1810 }
1811 else
1812 fullyAppliedLastTime = false;
1813
1814 /* Send a reply if the standby requested one. */
1815 if (replyRequested)
1816 WalSndKeepalive(false);
1817
1818 /*
1819 * Update shared state for this WalSender process based on reply data from
1820 * standby.
1821 */
1822 {
1823 WalSnd *walsnd = MyWalSnd;
1824
1825 SpinLockAcquire(&walsnd->mutex);
1826 walsnd->write = writePtr;
1827 walsnd->flush = flushPtr;
1828 walsnd->apply = applyPtr;
1829 if (writeLag != -1 || clearLagTimes)
1830 walsnd->writeLag = writeLag;
1831 if (flushLag != -1 || clearLagTimes)
1832 walsnd->flushLag = flushLag;
1833 if (applyLag != -1 || clearLagTimes)
1834 walsnd->applyLag = applyLag;
1835 SpinLockRelease(&walsnd->mutex);
1836 }
1837
1838 if (!am_cascading_walsender)
1839 SyncRepReleaseWaiters();
1840
1841 /*
1842 * Advance our local xmin horizon when the client confirmed a flush.
1843 */
1844 if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1845 {
1846 if (SlotIsLogical(MyReplicationSlot))
1847 LogicalConfirmReceivedLocation(flushPtr);
1848 else
1849 PhysicalConfirmReceivedLocation(flushPtr);
1850 }
1851 }
1852
1853 /* compute new replication slot xmin horizon if needed */
1854 static void
PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin,TransactionId feedbackCatalogXmin)1855 PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
1856 {
1857 bool changed = false;
1858 ReplicationSlot *slot = MyReplicationSlot;
1859
1860 SpinLockAcquire(&slot->mutex);
1861 MyPgXact->xmin = InvalidTransactionId;
1862
1863 /*
1864 * For physical replication we don't need the interlock provided by xmin
1865 * and effective_xmin since the consequences of a missed increase are
1866 * limited to query cancellations, so set both at once.
1867 */
1868 if (!TransactionIdIsNormal(slot->data.xmin) ||
1869 !TransactionIdIsNormal(feedbackXmin) ||
1870 TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1871 {
1872 changed = true;
1873 slot->data.xmin = feedbackXmin;
1874 slot->effective_xmin = feedbackXmin;
1875 }
1876 if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1877 !TransactionIdIsNormal(feedbackCatalogXmin) ||
1878 TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1879 {
1880 changed = true;
1881 slot->data.catalog_xmin = feedbackCatalogXmin;
1882 slot->effective_catalog_xmin = feedbackCatalogXmin;
1883 }
1884 SpinLockRelease(&slot->mutex);
1885
1886 if (changed)
1887 {
1888 ReplicationSlotMarkDirty();
1889 ReplicationSlotsComputeRequiredXmin(false);
1890 }
1891 }
1892
1893 /*
1894 * Check that the provided xmin/epoch are sane, that is, not in the future
1895 * and not so far back as to be already wrapped around.
1896 *
1897 * Epoch of nextXid should be same as standby, or if the counter has
1898 * wrapped, then one greater than standby.
1899 *
1900 * This check doesn't care about whether clog exists for these xids
1901 * at all.
1902 */
1903 static bool
TransactionIdInRecentPast(TransactionId xid,uint32 epoch)1904 TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
1905 {
1906 TransactionId nextXid;
1907 uint32 nextEpoch;
1908
1909 GetNextXidAndEpoch(&nextXid, &nextEpoch);
1910
1911 if (xid <= nextXid)
1912 {
1913 if (epoch != nextEpoch)
1914 return false;
1915 }
1916 else
1917 {
1918 if (epoch + 1 != nextEpoch)
1919 return false;
1920 }
1921
1922 if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1923 return false; /* epoch OK, but it's wrapped around */
1924
1925 return true;
1926 }
1927
1928 /*
1929 * Hot Standby feedback
1930 */
1931 static void
ProcessStandbyHSFeedbackMessage(void)1932 ProcessStandbyHSFeedbackMessage(void)
1933 {
1934 TransactionId feedbackXmin;
1935 uint32 feedbackEpoch;
1936 TransactionId feedbackCatalogXmin;
1937 uint32 feedbackCatalogEpoch;
1938
1939 /*
1940 * Decipher the reply message. The caller already consumed the msgtype
1941 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1942 * of this message.
1943 */
1944 (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1945 feedbackXmin = pq_getmsgint(&reply_message, 4);
1946 feedbackEpoch = pq_getmsgint(&reply_message, 4);
1947 feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1948 feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1949
1950 elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
1951 feedbackXmin,
1952 feedbackEpoch,
1953 feedbackCatalogXmin,
1954 feedbackCatalogEpoch);
1955
1956 /*
1957 * Unset WalSender's xmins if the feedback message values are invalid.
1958 * This happens when the downstream turned hot_standby_feedback off.
1959 */
1960 if (!TransactionIdIsNormal(feedbackXmin)
1961 && !TransactionIdIsNormal(feedbackCatalogXmin))
1962 {
1963 MyPgXact->xmin = InvalidTransactionId;
1964 if (MyReplicationSlot != NULL)
1965 PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1966 return;
1967 }
1968
1969 /*
1970 * Check that the provided xmin/epoch are sane, that is, not in the future
1971 * and not so far back as to be already wrapped around. Ignore if not.
1972 */
1973 if (TransactionIdIsNormal(feedbackXmin) &&
1974 !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
1975 return;
1976
1977 if (TransactionIdIsNormal(feedbackCatalogXmin) &&
1978 !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
1979 return;
1980
1981 /*
1982 * Set the WalSender's xmin equal to the standby's requested xmin, so that
1983 * the xmin will be taken into account by GetOldestXmin. This will hold
1984 * back the removal of dead rows and thereby prevent the generation of
1985 * cleanup conflicts on the standby server.
1986 *
1987 * There is a small window for a race condition here: although we just
1988 * checked that feedbackXmin precedes nextXid, the nextXid could have
1989 * gotten advanced between our fetching it and applying the xmin below,
1990 * perhaps far enough to make feedbackXmin wrap around. In that case the
1991 * xmin we set here would be "in the future" and have no effect. No point
1992 * in worrying about this since it's too late to save the desired data
1993 * anyway. Assuming that the standby sends us an increasing sequence of
1994 * xmins, this could only happen during the first reply cycle, else our
1995 * own xmin would prevent nextXid from advancing so far.
1996 *
1997 * We don't bother taking the ProcArrayLock here. Setting the xmin field
1998 * is assumed atomic, and there's no real need to prevent a concurrent
1999 * GetOldestXmin. (If we're moving our xmin forward, this is obviously
2000 * safe, and if we're moving it backwards, well, the data is at risk
2001 * already since a VACUUM could have just finished calling GetOldestXmin.)
2002 *
2003 * If we're using a replication slot we reserve the xmin via that,
2004 * otherwise via the walsender's PGXACT entry. We can only track the
2005 * catalog xmin separately when using a slot, so we store the least of the
2006 * two provided when not using a slot.
2007 *
2008 * XXX: It might make sense to generalize the ephemeral slot concept and
2009 * always use the slot mechanism to handle the feedback xmin.
2010 */
2011 if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2012 PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2013 else
2014 {
2015 if (TransactionIdIsNormal(feedbackCatalogXmin)
2016 && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2017 MyPgXact->xmin = feedbackCatalogXmin;
2018 else
2019 MyPgXact->xmin = feedbackXmin;
2020 }
2021 }
2022
2023 /*
2024 * Compute how long send/receive loops should sleep.
2025 *
2026 * If wal_sender_timeout is enabled we want to wake up in time to send
2027 * keepalives and to abort the connection if wal_sender_timeout has been
2028 * reached.
2029 */
2030 static long
WalSndComputeSleeptime(TimestampTz now)2031 WalSndComputeSleeptime(TimestampTz now)
2032 {
2033 long sleeptime = 10000; /* 10 s */
2034
2035 if (wal_sender_timeout > 0 && last_reply_timestamp > 0)
2036 {
2037 TimestampTz wakeup_time;
2038
2039 /*
2040 * At the latest stop sleeping once wal_sender_timeout has been
2041 * reached.
2042 */
2043 wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2044 wal_sender_timeout);
2045
2046 /*
2047 * If no ping has been sent yet, wakeup when it's time to do so.
2048 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2049 * the timeout passed without a response.
2050 */
2051 if (!waiting_for_ping_response)
2052 wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2053 wal_sender_timeout / 2);
2054
2055 /* Compute relative time until wakeup. */
2056 sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2057 }
2058
2059 return sleeptime;
2060 }
2061
2062 /*
2063 * Check whether there have been responses by the client within
2064 * wal_sender_timeout and shutdown if not. Using last_processing as the
2065 * reference point avoids counting server-side stalls against the client.
2066 * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2067 * postdate last_processing by more than wal_sender_timeout. If that happens,
2068 * the client must reply almost immediately to avoid a timeout. This rarely
2069 * affects the default configuration, under which clients spontaneously send a
2070 * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
2071 * could eliminate that problem by recognizing timeout expiration at
2072 * wal_sender_timeout/2 after the keepalive.
2073 */
2074 static void
WalSndCheckTimeOut(void)2075 WalSndCheckTimeOut(void)
2076 {
2077 TimestampTz timeout;
2078
2079 /* don't bail out if we're doing something that doesn't require timeouts */
2080 if (last_reply_timestamp <= 0)
2081 return;
2082
2083 timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
2084 wal_sender_timeout);
2085
2086 if (wal_sender_timeout > 0 && last_processing >= timeout)
2087 {
2088 /*
2089 * Since typically expiration of replication timeout means
2090 * communication problem, we don't send the error message to the
2091 * standby.
2092 */
2093 ereport(COMMERROR,
2094 (errmsg("terminating walsender process due to replication timeout")));
2095
2096 WalSndShutdown();
2097 }
2098 }
2099
2100 /* Main loop of walsender process that streams the WAL over Copy messages. */
2101 static void
WalSndLoop(WalSndSendDataCallback send_data)2102 WalSndLoop(WalSndSendDataCallback send_data)
2103 {
2104 /*
2105 * Initialize the last reply timestamp. That enables timeout processing
2106 * from hereon.
2107 */
2108 last_reply_timestamp = GetCurrentTimestamp();
2109 waiting_for_ping_response = false;
2110
2111 /*
2112 * Loop until we reach the end of this timeline or the client requests to
2113 * stop streaming.
2114 */
2115 for (;;)
2116 {
2117 /*
2118 * Emergency bailout if postmaster has died. This is to avoid the
2119 * necessity for manual cleanup of all postmaster children.
2120 */
2121 if (!PostmasterIsAlive())
2122 exit(1);
2123
2124 /* Clear any already-pending wakeups */
2125 ResetLatch(MyLatch);
2126
2127 CHECK_FOR_INTERRUPTS();
2128
2129 /* Process any requests or signals received recently */
2130 if (ConfigReloadPending)
2131 {
2132 ConfigReloadPending = false;
2133 ProcessConfigFile(PGC_SIGHUP);
2134 SyncRepInitConfig();
2135 }
2136
2137 /* Check for input from the client */
2138 ProcessRepliesIfAny();
2139
2140 /*
2141 * If we have received CopyDone from the client, sent CopyDone
2142 * ourselves, and the output buffer is empty, it's time to exit
2143 * streaming.
2144 */
2145 if (streamingDoneReceiving && streamingDoneSending &&
2146 !pq_is_send_pending())
2147 break;
2148
2149 /*
2150 * If we don't have any pending data in the output buffer, try to send
2151 * some more. If there is some, we don't bother to call send_data
2152 * again until we've flushed it ... but we'd better assume we are not
2153 * caught up.
2154 */
2155 if (!pq_is_send_pending())
2156 send_data();
2157 else
2158 WalSndCaughtUp = false;
2159
2160 /* Try to flush pending output to the client */
2161 if (pq_flush_if_writable() != 0)
2162 WalSndShutdown();
2163
2164 /* If nothing remains to be sent right now ... */
2165 if (WalSndCaughtUp && !pq_is_send_pending())
2166 {
2167 /*
2168 * If we're in catchup state, move to streaming. This is an
2169 * important state change for users to know about, since before
2170 * this point data loss might occur if the primary dies and we
2171 * need to failover to the standby. The state change is also
2172 * important for synchronous replication, since commits that
2173 * started to wait at that point might wait for some time.
2174 */
2175 if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2176 {
2177 ereport(DEBUG1,
2178 (errmsg("\"%s\" has now caught up with upstream server",
2179 application_name)));
2180 WalSndSetState(WALSNDSTATE_STREAMING);
2181 }
2182
2183 /*
2184 * When SIGUSR2 arrives, we send any outstanding logs up to the
2185 * shutdown checkpoint record (i.e., the latest record), wait for
2186 * them to be replicated to the standby, and exit. This may be a
2187 * normal termination at shutdown, or a promotion, the walsender
2188 * is not sure which.
2189 */
2190 if (got_SIGUSR2)
2191 WalSndDone(send_data);
2192 }
2193
2194 /* Check for replication timeout. */
2195 WalSndCheckTimeOut();
2196
2197 /* Send keepalive if the time has come */
2198 WalSndKeepaliveIfNecessary();
2199
2200 /*
2201 * We don't block if not caught up, unless there is unsent data
2202 * pending in which case we'd better block until the socket is
2203 * write-ready. This test is only needed for the case where the
2204 * send_data callback handled a subset of the available data but then
2205 * pq_flush_if_writable flushed it all --- we should immediately try
2206 * to send more.
2207 */
2208 if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
2209 {
2210 long sleeptime;
2211 int wakeEvents;
2212
2213 wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT;
2214
2215 if (!streamingDoneReceiving)
2216 wakeEvents |= WL_SOCKET_READABLE;
2217
2218 /*
2219 * Use fresh timestamp, not last_processed, to reduce the chance
2220 * of reaching wal_sender_timeout before sending a keepalive.
2221 */
2222 sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
2223
2224 if (pq_is_send_pending())
2225 wakeEvents |= WL_SOCKET_WRITEABLE;
2226
2227 /* Sleep until something happens or we time out */
2228 WaitLatchOrSocket(MyLatch, wakeEvents,
2229 MyProcPort->sock, sleeptime,
2230 WAIT_EVENT_WAL_SENDER_MAIN);
2231 }
2232 }
2233 return;
2234 }
2235
2236 /* Initialize a per-walsender data structure for this walsender process */
2237 static void
InitWalSenderSlot(void)2238 InitWalSenderSlot(void)
2239 {
2240 int i;
2241
2242 /*
2243 * WalSndCtl should be set up already (we inherit this by fork() or
2244 * EXEC_BACKEND mechanism from the postmaster).
2245 */
2246 Assert(WalSndCtl != NULL);
2247 Assert(MyWalSnd == NULL);
2248
2249 /*
2250 * Find a free walsender slot and reserve it. If this fails, we must be
2251 * out of WalSnd structures.
2252 */
2253 for (i = 0; i < max_wal_senders; i++)
2254 {
2255 WalSnd *walsnd = &WalSndCtl->walsnds[i];
2256
2257 SpinLockAcquire(&walsnd->mutex);
2258
2259 if (walsnd->pid != 0)
2260 {
2261 SpinLockRelease(&walsnd->mutex);
2262 continue;
2263 }
2264 else
2265 {
2266 /*
2267 * Found a free slot. Reserve it for us.
2268 */
2269 walsnd->pid = MyProcPid;
2270 walsnd->state = WALSNDSTATE_STARTUP;
2271 walsnd->sentPtr = InvalidXLogRecPtr;
2272 walsnd->needreload = false;
2273 walsnd->write = InvalidXLogRecPtr;
2274 walsnd->flush = InvalidXLogRecPtr;
2275 walsnd->apply = InvalidXLogRecPtr;
2276 walsnd->writeLag = -1;
2277 walsnd->flushLag = -1;
2278 walsnd->applyLag = -1;
2279 walsnd->sync_standby_priority = 0;
2280 walsnd->latch = &MyProc->procLatch;
2281 SpinLockRelease(&walsnd->mutex);
2282 /* don't need the lock anymore */
2283 MyWalSnd = (WalSnd *) walsnd;
2284
2285 break;
2286 }
2287 }
2288 if (MyWalSnd == NULL)
2289 ereport(FATAL,
2290 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2291 errmsg("number of requested standby connections "
2292 "exceeds max_wal_senders (currently %d)",
2293 max_wal_senders)));
2294
2295 /* Arrange to clean up at walsender exit */
2296 on_shmem_exit(WalSndKill, 0);
2297 }
2298
2299 /* Destroy the per-walsender data structure for this walsender process */
2300 static void
WalSndKill(int code,Datum arg)2301 WalSndKill(int code, Datum arg)
2302 {
2303 WalSnd *walsnd = MyWalSnd;
2304
2305 Assert(walsnd != NULL);
2306
2307 MyWalSnd = NULL;
2308
2309 SpinLockAcquire(&walsnd->mutex);
2310 /* clear latch while holding the spinlock, so it can safely be read */
2311 walsnd->latch = NULL;
2312 /* Mark WalSnd struct as no longer being in use. */
2313 walsnd->pid = 0;
2314 SpinLockRelease(&walsnd->mutex);
2315 }
2316
2317 /*
2318 * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
2319 *
2320 * XXX probably this should be improved to suck data directly from the
2321 * WAL buffers when possible.
2322 *
2323 * Will open, and keep open, one WAL segment stored in the global file
2324 * descriptor sendFile. This means if XLogRead is used once, there will
2325 * always be one descriptor left open until the process ends, but never
2326 * more than one.
2327 */
2328 static void
XLogRead(char * buf,XLogRecPtr startptr,Size count)2329 XLogRead(char *buf, XLogRecPtr startptr, Size count)
2330 {
2331 char *p;
2332 XLogRecPtr recptr;
2333 Size nbytes;
2334 XLogSegNo segno;
2335
2336 retry:
2337 p = buf;
2338 recptr = startptr;
2339 nbytes = count;
2340
2341 while (nbytes > 0)
2342 {
2343 uint32 startoff;
2344 int segbytes;
2345 int readbytes;
2346
2347 startoff = XLogSegmentOffset(recptr, wal_segment_size);
2348
2349 if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size))
2350 {
2351 char path[MAXPGPATH];
2352
2353 /* Switch to another logfile segment */
2354 if (sendFile >= 0)
2355 close(sendFile);
2356
2357 XLByteToSeg(recptr, sendSegNo, wal_segment_size);
2358
2359 /*-------
2360 * When reading from a historic timeline, and there is a timeline
2361 * switch within this segment, read from the WAL segment belonging
2362 * to the new timeline.
2363 *
2364 * For example, imagine that this server is currently on timeline
2365 * 5, and we're streaming timeline 4. The switch from timeline 4
2366 * to 5 happened at 0/13002088. In pg_wal, we have these files:
2367 *
2368 * ...
2369 * 000000040000000000000012
2370 * 000000040000000000000013
2371 * 000000050000000000000013
2372 * 000000050000000000000014
2373 * ...
2374 *
2375 * In this situation, when requested to send the WAL from
2376 * segment 0x13, on timeline 4, we read the WAL from file
2377 * 000000050000000000000013. Archive recovery prefers files from
2378 * newer timelines, so if the segment was restored from the
2379 * archive on this server, the file belonging to the old timeline,
2380 * 000000040000000000000013, might not exist. Their contents are
2381 * equal up to the switchpoint, because at a timeline switch, the
2382 * used portion of the old segment is copied to the new file.
2383 *-------
2384 */
2385 curFileTimeLine = sendTimeLine;
2386 if (sendTimeLineIsHistoric)
2387 {
2388 XLogSegNo endSegNo;
2389
2390 XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size);
2391 if (sendSegNo == endSegNo)
2392 curFileTimeLine = sendTimeLineNextTLI;
2393 }
2394
2395 XLogFilePath(path, curFileTimeLine, sendSegNo, wal_segment_size);
2396
2397 sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2398 if (sendFile < 0)
2399 {
2400 /*
2401 * If the file is not found, assume it's because the standby
2402 * asked for a too old WAL segment that has already been
2403 * removed or recycled.
2404 */
2405 if (errno == ENOENT)
2406 ereport(ERROR,
2407 (errcode_for_file_access(),
2408 errmsg("requested WAL segment %s has already been removed",
2409 XLogFileNameP(curFileTimeLine, sendSegNo))));
2410 else
2411 ereport(ERROR,
2412 (errcode_for_file_access(),
2413 errmsg("could not open file \"%s\": %m",
2414 path)));
2415 }
2416 sendOff = 0;
2417 }
2418
2419 /* Need to seek in the file? */
2420 if (sendOff != startoff)
2421 {
2422 if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2423 ereport(ERROR,
2424 (errcode_for_file_access(),
2425 errmsg("could not seek in log segment %s to offset %u: %m",
2426 XLogFileNameP(curFileTimeLine, sendSegNo),
2427 startoff)));
2428 sendOff = startoff;
2429 }
2430
2431 /* How many bytes are within this segment? */
2432 if (nbytes > (wal_segment_size - startoff))
2433 segbytes = wal_segment_size - startoff;
2434 else
2435 segbytes = nbytes;
2436
2437 pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
2438 readbytes = read(sendFile, p, segbytes);
2439 pgstat_report_wait_end();
2440 if (readbytes <= 0)
2441 {
2442 ereport(ERROR,
2443 (errcode_for_file_access(),
2444 errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2445 XLogFileNameP(curFileTimeLine, sendSegNo),
2446 sendOff, (unsigned long) segbytes)));
2447 }
2448
2449 /* Update state for read */
2450 recptr += readbytes;
2451
2452 sendOff += readbytes;
2453 nbytes -= readbytes;
2454 p += readbytes;
2455 }
2456
2457 /*
2458 * After reading into the buffer, check that what we read was valid. We do
2459 * this after reading, because even though the segment was present when we
2460 * opened it, it might get recycled or removed while we read it. The
2461 * read() succeeds in that case, but the data we tried to read might
2462 * already have been overwritten with new WAL records.
2463 */
2464 XLByteToSeg(startptr, segno, wal_segment_size);
2465 CheckXLogRemoved(segno, ThisTimeLineID);
2466
2467 /*
2468 * During recovery, the currently-open WAL file might be replaced with the
2469 * file of the same name retrieved from archive. So we always need to
2470 * check what we read was valid after reading into the buffer. If it's
2471 * invalid, we try to open and read the file again.
2472 */
2473 if (am_cascading_walsender)
2474 {
2475 WalSnd *walsnd = MyWalSnd;
2476 bool reload;
2477
2478 SpinLockAcquire(&walsnd->mutex);
2479 reload = walsnd->needreload;
2480 walsnd->needreload = false;
2481 SpinLockRelease(&walsnd->mutex);
2482
2483 if (reload && sendFile >= 0)
2484 {
2485 close(sendFile);
2486 sendFile = -1;
2487
2488 goto retry;
2489 }
2490 }
2491 }
2492
2493 /*
2494 * Send out the WAL in its normal physical/stored form.
2495 *
2496 * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2497 * but not yet sent to the client, and buffer it in the libpq output
2498 * buffer.
2499 *
2500 * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2501 * otherwise WalSndCaughtUp is set to false.
2502 */
2503 static void
XLogSendPhysical(void)2504 XLogSendPhysical(void)
2505 {
2506 XLogRecPtr SendRqstPtr;
2507 XLogRecPtr startptr;
2508 XLogRecPtr endptr;
2509 Size nbytes;
2510
2511 /* If requested switch the WAL sender to the stopping state. */
2512 if (got_STOPPING)
2513 WalSndSetState(WALSNDSTATE_STOPPING);
2514
2515 if (streamingDoneSending)
2516 {
2517 WalSndCaughtUp = true;
2518 return;
2519 }
2520
2521 /* Figure out how far we can safely send the WAL. */
2522 if (sendTimeLineIsHistoric)
2523 {
2524 /*
2525 * Streaming an old timeline that's in this server's history, but is
2526 * not the one we're currently inserting or replaying. It can be
2527 * streamed up to the point where we switched off that timeline.
2528 */
2529 SendRqstPtr = sendTimeLineValidUpto;
2530 }
2531 else if (am_cascading_walsender)
2532 {
2533 /*
2534 * Streaming the latest timeline on a standby.
2535 *
2536 * Attempt to send all WAL that has already been replayed, so that we
2537 * know it's valid. If we're receiving WAL through streaming
2538 * replication, it's also OK to send any WAL that has been received
2539 * but not replayed.
2540 *
2541 * The timeline we're recovering from can change, or we can be
2542 * promoted. In either case, the current timeline becomes historic. We
2543 * need to detect that so that we don't try to stream past the point
2544 * where we switched to another timeline. We check for promotion or
2545 * timeline switch after calculating FlushPtr, to avoid a race
2546 * condition: if the timeline becomes historic just after we checked
2547 * that it was still current, it's still be OK to stream it up to the
2548 * FlushPtr that was calculated before it became historic.
2549 */
2550 bool becameHistoric = false;
2551
2552 SendRqstPtr = GetStandbyFlushRecPtr();
2553
2554 if (!RecoveryInProgress())
2555 {
2556 /*
2557 * We have been promoted. RecoveryInProgress() updated
2558 * ThisTimeLineID to the new current timeline.
2559 */
2560 am_cascading_walsender = false;
2561 becameHistoric = true;
2562 }
2563 else
2564 {
2565 /*
2566 * Still a cascading standby. But is the timeline we're sending
2567 * still the one recovery is recovering from? ThisTimeLineID was
2568 * updated by the GetStandbyFlushRecPtr() call above.
2569 */
2570 if (sendTimeLine != ThisTimeLineID)
2571 becameHistoric = true;
2572 }
2573
2574 if (becameHistoric)
2575 {
2576 /*
2577 * The timeline we were sending has become historic. Read the
2578 * timeline history file of the new timeline to see where exactly
2579 * we forked off from the timeline we were sending.
2580 */
2581 List *history;
2582
2583 history = readTimeLineHistory(ThisTimeLineID);
2584 sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
2585
2586 Assert(sendTimeLine < sendTimeLineNextTLI);
2587 list_free_deep(history);
2588
2589 sendTimeLineIsHistoric = true;
2590
2591 SendRqstPtr = sendTimeLineValidUpto;
2592 }
2593 }
2594 else
2595 {
2596 /*
2597 * Streaming the current timeline on a master.
2598 *
2599 * Attempt to send all data that's already been written out and
2600 * fsync'd to disk. We cannot go further than what's been written out
2601 * given the current implementation of XLogRead(). And in any case
2602 * it's unsafe to send WAL that is not securely down to disk on the
2603 * master: if the master subsequently crashes and restarts, standbys
2604 * must not have applied any WAL that got lost on the master.
2605 */
2606 SendRqstPtr = GetFlushRecPtr();
2607 }
2608
2609 /*
2610 * Record the current system time as an approximation of the time at which
2611 * this WAL location was written for the purposes of lag tracking.
2612 *
2613 * In theory we could make XLogFlush() record a time in shmem whenever WAL
2614 * is flushed and we could get that time as well as the LSN when we call
2615 * GetFlushRecPtr() above (and likewise for the cascading standby
2616 * equivalent), but rather than putting any new code into the hot WAL path
2617 * it seems good enough to capture the time here. We should reach this
2618 * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2619 * may take some time, we read the WAL flush pointer and take the time
2620 * very close to together here so that we'll get a later position if it is
2621 * still moving.
2622 *
2623 * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
2624 * this gives us a cheap approximation for the WAL flush time for this
2625 * LSN.
2626 *
2627 * Note that the LSN is not necessarily the LSN for the data contained in
2628 * the present message; it's the end of the WAL, which might be further
2629 * ahead. All the lag tracking machinery cares about is finding out when
2630 * that arbitrary LSN is eventually reported as written, flushed and
2631 * applied, so that it can measure the elapsed time.
2632 */
2633 LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2634
2635 /*
2636 * If this is a historic timeline and we've reached the point where we
2637 * forked to the next timeline, stop streaming.
2638 *
2639 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2640 * startup process will normally replay all WAL that has been received
2641 * from the master, before promoting, but if the WAL streaming is
2642 * terminated at a WAL page boundary, the valid portion of the timeline
2643 * might end in the middle of a WAL record. We might've already sent the
2644 * first half of that partial WAL record to the cascading standby, so that
2645 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2646 * replay the partial WAL record either, so it can still follow our
2647 * timeline switch.
2648 */
2649 if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
2650 {
2651 /* close the current file. */
2652 if (sendFile >= 0)
2653 close(sendFile);
2654 sendFile = -1;
2655
2656 /* Send CopyDone */
2657 pq_putmessage_noblock('c', NULL, 0);
2658 streamingDoneSending = true;
2659
2660 WalSndCaughtUp = true;
2661
2662 elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2663 (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
2664 (uint32) (sentPtr >> 32), (uint32) sentPtr);
2665 return;
2666 }
2667
2668 /* Do we have any work to do? */
2669 Assert(sentPtr <= SendRqstPtr);
2670 if (SendRqstPtr <= sentPtr)
2671 {
2672 WalSndCaughtUp = true;
2673 return;
2674 }
2675
2676 /*
2677 * Figure out how much to send in one message. If there's no more than
2678 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2679 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2680 *
2681 * The rounding is not only for performance reasons. Walreceiver relies on
2682 * the fact that we never split a WAL record across two messages. Since a
2683 * long WAL record is split at page boundary into continuation records,
2684 * page boundary is always a safe cut-off point. We also assume that
2685 * SendRqstPtr never points to the middle of a WAL record.
2686 */
2687 startptr = sentPtr;
2688 endptr = startptr;
2689 endptr += MAX_SEND_SIZE;
2690
2691 /* if we went beyond SendRqstPtr, back off */
2692 if (SendRqstPtr <= endptr)
2693 {
2694 endptr = SendRqstPtr;
2695 if (sendTimeLineIsHistoric)
2696 WalSndCaughtUp = false;
2697 else
2698 WalSndCaughtUp = true;
2699 }
2700 else
2701 {
2702 /* round down to page boundary. */
2703 endptr -= (endptr % XLOG_BLCKSZ);
2704 WalSndCaughtUp = false;
2705 }
2706
2707 nbytes = endptr - startptr;
2708 Assert(nbytes <= MAX_SEND_SIZE);
2709
2710 /*
2711 * OK to read and send the slice.
2712 */
2713 resetStringInfo(&output_message);
2714 pq_sendbyte(&output_message, 'w');
2715
2716 pq_sendint64(&output_message, startptr); /* dataStart */
2717 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2718 pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2719
2720 /*
2721 * Read the log directly into the output buffer to avoid extra memcpy
2722 * calls.
2723 */
2724 enlargeStringInfo(&output_message, nbytes);
2725 XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2726 output_message.len += nbytes;
2727 output_message.data[output_message.len] = '\0';
2728
2729 /*
2730 * Fill the send timestamp last, so that it is taken as late as possible.
2731 */
2732 resetStringInfo(&tmpbuf);
2733 pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2734 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2735 tmpbuf.data, sizeof(int64));
2736
2737 pq_putmessage_noblock('d', output_message.data, output_message.len);
2738
2739 sentPtr = endptr;
2740
2741 /* Update shared memory status */
2742 {
2743 WalSnd *walsnd = MyWalSnd;
2744
2745 SpinLockAcquire(&walsnd->mutex);
2746 walsnd->sentPtr = sentPtr;
2747 SpinLockRelease(&walsnd->mutex);
2748 }
2749
2750 /* Report progress of XLOG streaming in PS display */
2751 if (update_process_title)
2752 {
2753 char activitymsg[50];
2754
2755 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2756 (uint32) (sentPtr >> 32), (uint32) sentPtr);
2757 set_ps_display(activitymsg, false);
2758 }
2759
2760 return;
2761 }
2762
2763 /*
2764 * Stream out logically decoded data.
2765 */
2766 static void
XLogSendLogical(void)2767 XLogSendLogical(void)
2768 {
2769 XLogRecord *record;
2770 char *errm;
2771 XLogRecPtr flushPtr;
2772
2773 /*
2774 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2775 * true in WalSndWaitForWal, if we're actually waiting. We also set to
2776 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2777 * didn't wait - i.e. when we're shutting down.
2778 */
2779 WalSndCaughtUp = false;
2780
2781 record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
2782 logical_startptr = InvalidXLogRecPtr;
2783
2784 /* xlog record was invalid */
2785 if (errm != NULL)
2786 elog(ERROR, "%s", errm);
2787
2788 /*
2789 * We'll use the current flush point to determine whether we've caught up.
2790 */
2791 flushPtr = GetFlushRecPtr();
2792
2793 if (record != NULL)
2794 {
2795 /*
2796 * Note the lack of any call to LagTrackerWrite() which is handled by
2797 * WalSndUpdateProgress which is called by output plugin through
2798 * logical decoding write api.
2799 */
2800 LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2801
2802 sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2803 }
2804
2805 /* Set flag if we're caught up. */
2806 if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2807 WalSndCaughtUp = true;
2808
2809 /*
2810 * If we're caught up and have been requested to stop, have WalSndLoop()
2811 * terminate the connection in an orderly manner, after writing out all
2812 * the pending data.
2813 */
2814 if (WalSndCaughtUp && got_STOPPING)
2815 got_SIGUSR2 = true;
2816
2817 /* Update shared memory status */
2818 {
2819 WalSnd *walsnd = MyWalSnd;
2820
2821 SpinLockAcquire(&walsnd->mutex);
2822 walsnd->sentPtr = sentPtr;
2823 SpinLockRelease(&walsnd->mutex);
2824 }
2825 }
2826
2827 /*
2828 * Shutdown if the sender is caught up.
2829 *
2830 * NB: This should only be called when the shutdown signal has been received
2831 * from postmaster.
2832 *
2833 * Note that if we determine that there's still more data to send, this
2834 * function will return control to the caller.
2835 */
2836 static void
WalSndDone(WalSndSendDataCallback send_data)2837 WalSndDone(WalSndSendDataCallback send_data)
2838 {
2839 XLogRecPtr replicatedPtr;
2840
2841 /* ... let's just be real sure we're caught up ... */
2842 send_data();
2843
2844 /*
2845 * To figure out whether all WAL has successfully been replicated, check
2846 * flush location if valid, write otherwise. Tools like pg_receivewal will
2847 * usually (unless in synchronous mode) return an invalid flush location.
2848 */
2849 replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2850 MyWalSnd->write : MyWalSnd->flush;
2851
2852 if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2853 !pq_is_send_pending())
2854 {
2855 /* Inform the standby that XLOG streaming is done */
2856 EndCommand("COPY 0", DestRemote);
2857 pq_flush();
2858
2859 proc_exit(0);
2860 }
2861 if (!waiting_for_ping_response)
2862 WalSndKeepalive(true);
2863 }
2864
2865 /*
2866 * Returns the latest point in WAL that has been safely flushed to disk, and
2867 * can be sent to the standby. This should only be called when in recovery,
2868 * ie. we're streaming to a cascaded standby.
2869 *
2870 * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2871 * replayed WAL record.
2872 */
2873 static XLogRecPtr
GetStandbyFlushRecPtr(void)2874 GetStandbyFlushRecPtr(void)
2875 {
2876 XLogRecPtr replayPtr;
2877 TimeLineID replayTLI;
2878 XLogRecPtr receivePtr;
2879 TimeLineID receiveTLI;
2880 XLogRecPtr result;
2881
2882 /*
2883 * We can safely send what's already been replayed. Also, if walreceiver
2884 * is streaming WAL from the same timeline, we can send anything that it
2885 * has streamed, but hasn't been replayed yet.
2886 */
2887
2888 receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2889 replayPtr = GetXLogReplayRecPtr(&replayTLI);
2890
2891 ThisTimeLineID = replayTLI;
2892
2893 result = replayPtr;
2894 if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2895 result = receivePtr;
2896
2897 return result;
2898 }
2899
2900 /*
2901 * Request walsenders to reload the currently-open WAL file
2902 */
2903 void
WalSndRqstFileReload(void)2904 WalSndRqstFileReload(void)
2905 {
2906 int i;
2907
2908 for (i = 0; i < max_wal_senders; i++)
2909 {
2910 WalSnd *walsnd = &WalSndCtl->walsnds[i];
2911
2912 SpinLockAcquire(&walsnd->mutex);
2913 if (walsnd->pid == 0)
2914 {
2915 SpinLockRelease(&walsnd->mutex);
2916 continue;
2917 }
2918 walsnd->needreload = true;
2919 SpinLockRelease(&walsnd->mutex);
2920 }
2921 }
2922
2923 /*
2924 * Handle PROCSIG_WALSND_INIT_STOPPING signal.
2925 */
2926 void
HandleWalSndInitStopping(void)2927 HandleWalSndInitStopping(void)
2928 {
2929 Assert(am_walsender);
2930
2931 /*
2932 * If replication has not yet started, die like with SIGTERM. If
2933 * replication is active, only set a flag and wake up the main loop. It
2934 * will send any outstanding WAL, wait for it to be replicated to the
2935 * standby, and then exit gracefully.
2936 */
2937 if (!replication_active)
2938 kill(MyProcPid, SIGTERM);
2939 else
2940 got_STOPPING = true;
2941 }
2942
2943 /*
2944 * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
2945 * sender should already have been switched to WALSNDSTATE_STOPPING at
2946 * this point.
2947 */
2948 static void
WalSndLastCycleHandler(SIGNAL_ARGS)2949 WalSndLastCycleHandler(SIGNAL_ARGS)
2950 {
2951 int save_errno = errno;
2952
2953 got_SIGUSR2 = true;
2954 SetLatch(MyLatch);
2955
2956 errno = save_errno;
2957 }
2958
2959 /* Set up signal handlers */
2960 void
WalSndSignals(void)2961 WalSndSignals(void)
2962 {
2963 /* Set up signal handlers */
2964 pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config
2965 * file */
2966 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
2967 pqsignal(SIGTERM, die); /* request shutdown */
2968 pqsignal(SIGQUIT, quickdie); /* hard crash time */
2969 InitializeTimeouts(); /* establishes SIGALRM handler */
2970 pqsignal(SIGPIPE, SIG_IGN);
2971 pqsignal(SIGUSR1, procsignal_sigusr1_handler);
2972 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2973 * shutdown */
2974
2975 /* Reset some signals that are accepted by postmaster but not here */
2976 pqsignal(SIGCHLD, SIG_DFL);
2977 pqsignal(SIGTTIN, SIG_DFL);
2978 pqsignal(SIGTTOU, SIG_DFL);
2979 pqsignal(SIGCONT, SIG_DFL);
2980 pqsignal(SIGWINCH, SIG_DFL);
2981 }
2982
2983 /* Report shared-memory space needed by WalSndShmemInit */
2984 Size
WalSndShmemSize(void)2985 WalSndShmemSize(void)
2986 {
2987 Size size = 0;
2988
2989 size = offsetof(WalSndCtlData, walsnds);
2990 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2991
2992 return size;
2993 }
2994
2995 /* Allocate and initialize walsender-related shared memory */
2996 void
WalSndShmemInit(void)2997 WalSndShmemInit(void)
2998 {
2999 bool found;
3000 int i;
3001
3002 WalSndCtl = (WalSndCtlData *)
3003 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3004
3005 if (!found)
3006 {
3007 /* First time through, so initialize */
3008 MemSet(WalSndCtl, 0, WalSndShmemSize());
3009
3010 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3011 SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3012
3013 for (i = 0; i < max_wal_senders; i++)
3014 {
3015 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3016
3017 SpinLockInit(&walsnd->mutex);
3018 }
3019 }
3020 }
3021
3022 /*
3023 * Wake up all walsenders
3024 *
3025 * This will be called inside critical sections, so throwing an error is not
3026 * advisable.
3027 */
3028 void
WalSndWakeup(void)3029 WalSndWakeup(void)
3030 {
3031 int i;
3032
3033 for (i = 0; i < max_wal_senders; i++)
3034 {
3035 Latch *latch;
3036 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3037
3038 /*
3039 * Get latch pointer with spinlock held, for the unlikely case that
3040 * pointer reads aren't atomic (as they're 8 bytes).
3041 */
3042 SpinLockAcquire(&walsnd->mutex);
3043 latch = walsnd->latch;
3044 SpinLockRelease(&walsnd->mutex);
3045
3046 if (latch != NULL)
3047 SetLatch(latch);
3048 }
3049 }
3050
3051 /*
3052 * Signal all walsenders to move to stopping state.
3053 *
3054 * This will trigger walsenders to move to a state where no further WAL can be
3055 * generated. See this file's header for details.
3056 */
3057 void
WalSndInitStopping(void)3058 WalSndInitStopping(void)
3059 {
3060 int i;
3061
3062 for (i = 0; i < max_wal_senders; i++)
3063 {
3064 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3065 pid_t pid;
3066
3067 SpinLockAcquire(&walsnd->mutex);
3068 pid = walsnd->pid;
3069 SpinLockRelease(&walsnd->mutex);
3070
3071 if (pid == 0)
3072 continue;
3073
3074 SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId);
3075 }
3076 }
3077
3078 /*
3079 * Wait that all the WAL senders have quit or reached the stopping state. This
3080 * is used by the checkpointer to control when the shutdown checkpoint can
3081 * safely be performed.
3082 */
3083 void
WalSndWaitStopping(void)3084 WalSndWaitStopping(void)
3085 {
3086 for (;;)
3087 {
3088 int i;
3089 bool all_stopped = true;
3090
3091 for (i = 0; i < max_wal_senders; i++)
3092 {
3093 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3094
3095 SpinLockAcquire(&walsnd->mutex);
3096
3097 if (walsnd->pid == 0)
3098 {
3099 SpinLockRelease(&walsnd->mutex);
3100 continue;
3101 }
3102
3103 if (walsnd->state != WALSNDSTATE_STOPPING)
3104 {
3105 all_stopped = false;
3106 SpinLockRelease(&walsnd->mutex);
3107 break;
3108 }
3109 SpinLockRelease(&walsnd->mutex);
3110 }
3111
3112 /* safe to leave if confirmation is done for all WAL senders */
3113 if (all_stopped)
3114 return;
3115
3116 pg_usleep(10000L); /* wait for 10 msec */
3117 }
3118 }
3119
3120 /* Set state for current walsender (only called in walsender) */
3121 void
WalSndSetState(WalSndState state)3122 WalSndSetState(WalSndState state)
3123 {
3124 WalSnd *walsnd = MyWalSnd;
3125
3126 Assert(am_walsender);
3127
3128 if (walsnd->state == state)
3129 return;
3130
3131 SpinLockAcquire(&walsnd->mutex);
3132 walsnd->state = state;
3133 SpinLockRelease(&walsnd->mutex);
3134 }
3135
3136 /*
3137 * Return a string constant representing the state. This is used
3138 * in system views, and should *not* be translated.
3139 */
3140 static const char *
WalSndGetStateString(WalSndState state)3141 WalSndGetStateString(WalSndState state)
3142 {
3143 switch (state)
3144 {
3145 case WALSNDSTATE_STARTUP:
3146 return "startup";
3147 case WALSNDSTATE_BACKUP:
3148 return "backup";
3149 case WALSNDSTATE_CATCHUP:
3150 return "catchup";
3151 case WALSNDSTATE_STREAMING:
3152 return "streaming";
3153 case WALSNDSTATE_STOPPING:
3154 return "stopping";
3155 }
3156 return "UNKNOWN";
3157 }
3158
3159 static Interval *
offset_to_interval(TimeOffset offset)3160 offset_to_interval(TimeOffset offset)
3161 {
3162 Interval *result = palloc(sizeof(Interval));
3163
3164 result->month = 0;
3165 result->day = 0;
3166 result->time = offset;
3167
3168 return result;
3169 }
3170
3171 /*
3172 * Returns activity of walsenders, including pids and xlog locations sent to
3173 * standby servers.
3174 */
3175 Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)3176 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
3177 {
3178 #define PG_STAT_GET_WAL_SENDERS_COLS 11
3179 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3180 TupleDesc tupdesc;
3181 Tuplestorestate *tupstore;
3182 MemoryContext per_query_ctx;
3183 MemoryContext oldcontext;
3184 SyncRepStandbyData *sync_standbys;
3185 int num_standbys;
3186 int i;
3187
3188 /* check to see if caller supports us returning a tuplestore */
3189 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3190 ereport(ERROR,
3191 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3192 errmsg("set-valued function called in context that cannot accept a set")));
3193 if (!(rsinfo->allowedModes & SFRM_Materialize))
3194 ereport(ERROR,
3195 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3196 errmsg("materialize mode required, but it is not " \
3197 "allowed in this context")));
3198
3199 /* Build a tuple descriptor for our result type */
3200 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3201 elog(ERROR, "return type must be a row type");
3202
3203 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3204 oldcontext = MemoryContextSwitchTo(per_query_ctx);
3205
3206 tupstore = tuplestore_begin_heap(true, false, work_mem);
3207 rsinfo->returnMode = SFRM_Materialize;
3208 rsinfo->setResult = tupstore;
3209 rsinfo->setDesc = tupdesc;
3210
3211 MemoryContextSwitchTo(oldcontext);
3212
3213 /*
3214 * Get the currently active synchronous standbys. This could be out of
3215 * date before we're done, but we'll use the data anyway.
3216 */
3217 num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3218
3219 for (i = 0; i < max_wal_senders; i++)
3220 {
3221 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3222 XLogRecPtr sentPtr;
3223 XLogRecPtr write;
3224 XLogRecPtr flush;
3225 XLogRecPtr apply;
3226 TimeOffset writeLag;
3227 TimeOffset flushLag;
3228 TimeOffset applyLag;
3229 int priority;
3230 int pid;
3231 WalSndState state;
3232 bool is_sync_standby;
3233 Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
3234 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3235 int j;
3236
3237 /* Collect data from shared memory */
3238 SpinLockAcquire(&walsnd->mutex);
3239 if (walsnd->pid == 0)
3240 {
3241 SpinLockRelease(&walsnd->mutex);
3242 continue;
3243 }
3244 pid = walsnd->pid;
3245 sentPtr = walsnd->sentPtr;
3246 state = walsnd->state;
3247 write = walsnd->write;
3248 flush = walsnd->flush;
3249 apply = walsnd->apply;
3250 writeLag = walsnd->writeLag;
3251 flushLag = walsnd->flushLag;
3252 applyLag = walsnd->applyLag;
3253 priority = walsnd->sync_standby_priority;
3254 SpinLockRelease(&walsnd->mutex);
3255
3256 /*
3257 * Detect whether walsender is/was considered synchronous. We can
3258 * provide some protection against stale data by checking the PID
3259 * along with walsnd_index.
3260 */
3261 is_sync_standby = false;
3262 for (j = 0; j < num_standbys; j++)
3263 {
3264 if (sync_standbys[j].walsnd_index == i &&
3265 sync_standbys[j].pid == pid)
3266 {
3267 is_sync_standby = true;
3268 break;
3269 }
3270 }
3271
3272 memset(nulls, 0, sizeof(nulls));
3273 values[0] = Int32GetDatum(pid);
3274
3275 if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3276 {
3277 /*
3278 * Only superusers and members of pg_read_all_stats can see
3279 * details. Other users only get the pid value to know it's a
3280 * walsender, but no details.
3281 */
3282 MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3283 }
3284 else
3285 {
3286 values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3287
3288 if (XLogRecPtrIsInvalid(sentPtr))
3289 nulls[2] = true;
3290 values[2] = LSNGetDatum(sentPtr);
3291
3292 if (XLogRecPtrIsInvalid(write))
3293 nulls[3] = true;
3294 values[3] = LSNGetDatum(write);
3295
3296 if (XLogRecPtrIsInvalid(flush))
3297 nulls[4] = true;
3298 values[4] = LSNGetDatum(flush);
3299
3300 if (XLogRecPtrIsInvalid(apply))
3301 nulls[5] = true;
3302 values[5] = LSNGetDatum(apply);
3303
3304 /*
3305 * Treat a standby such as a pg_basebackup background process
3306 * which always returns an invalid flush location, as an
3307 * asynchronous standby.
3308 */
3309 priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3310
3311 if (writeLag < 0)
3312 nulls[6] = true;
3313 else
3314 values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3315
3316 if (flushLag < 0)
3317 nulls[7] = true;
3318 else
3319 values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3320
3321 if (applyLag < 0)
3322 nulls[8] = true;
3323 else
3324 values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3325
3326 values[9] = Int32GetDatum(priority);
3327
3328 /*
3329 * More easily understood version of standby state. This is purely
3330 * informational.
3331 *
3332 * In quorum-based sync replication, the role of each standby
3333 * listed in synchronous_standby_names can be changing very
3334 * frequently. Any standbys considered as "sync" at one moment can
3335 * be switched to "potential" ones at the next moment. So, it's
3336 * basically useless to report "sync" or "potential" as their sync
3337 * states. We report just "quorum" for them.
3338 */
3339 if (priority == 0)
3340 values[10] = CStringGetTextDatum("async");
3341 else if (is_sync_standby)
3342 values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
3343 CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3344 else
3345 values[10] = CStringGetTextDatum("potential");
3346 }
3347
3348 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3349 }
3350
3351 /* clean up and return the tuplestore */
3352 tuplestore_donestoring(tupstore);
3353
3354 return (Datum) 0;
3355 }
3356
3357 /*
3358 * Send a keepalive message to standby.
3359 *
3360 * If requestReply is set, the message requests the other party to send
3361 * a message back to us, for heartbeat purposes. We also set a flag to
3362 * let nearby code that we're waiting for that response, to avoid
3363 * repeated requests.
3364 */
3365 static void
WalSndKeepalive(bool requestReply)3366 WalSndKeepalive(bool requestReply)
3367 {
3368 elog(DEBUG2, "sending replication keepalive");
3369
3370 /* construct the message... */
3371 resetStringInfo(&output_message);
3372 pq_sendbyte(&output_message, 'k');
3373 pq_sendint64(&output_message, sentPtr);
3374 pq_sendint64(&output_message, GetCurrentTimestamp());
3375 pq_sendbyte(&output_message, requestReply ? 1 : 0);
3376
3377 /* ... and send it wrapped in CopyData */
3378 pq_putmessage_noblock('d', output_message.data, output_message.len);
3379
3380 /* Set local flag */
3381 if (requestReply)
3382 waiting_for_ping_response = true;
3383 }
3384
3385 /*
3386 * Send keepalive message if too much time has elapsed.
3387 */
3388 static void
WalSndKeepaliveIfNecessary(void)3389 WalSndKeepaliveIfNecessary(void)
3390 {
3391 TimestampTz ping_time;
3392
3393 /*
3394 * Don't send keepalive messages if timeouts are globally disabled or
3395 * we're doing something not partaking in timeouts.
3396 */
3397 if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3398 return;
3399
3400 if (waiting_for_ping_response)
3401 return;
3402
3403 /*
3404 * If half of wal_sender_timeout has lapsed without receiving any reply
3405 * from the standby, send a keep-alive message to the standby requesting
3406 * an immediate reply.
3407 */
3408 ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
3409 wal_sender_timeout / 2);
3410 if (last_processing >= ping_time)
3411 {
3412 WalSndKeepalive(true);
3413
3414 /* Try to flush pending output to the client */
3415 if (pq_flush_if_writable() != 0)
3416 WalSndShutdown();
3417 }
3418 }
3419
3420 /*
3421 * Record the end of the WAL and the time it was flushed locally, so that
3422 * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3423 * eventually reported to have been written, flushed and applied by the
3424 * standby in a reply message.
3425 */
3426 static void
LagTrackerWrite(XLogRecPtr lsn,TimestampTz local_flush_time)3427 LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
3428 {
3429 bool buffer_full;
3430 int new_write_head;
3431 int i;
3432
3433 if (!am_walsender)
3434 return;
3435
3436 /*
3437 * If the lsn hasn't advanced since last time, then do nothing. This way
3438 * we only record a new sample when new WAL has been written.
3439 */
3440 if (LagTracker.last_lsn == lsn)
3441 return;
3442 LagTracker.last_lsn = lsn;
3443
3444 /*
3445 * If advancing the write head of the circular buffer would crash into any
3446 * of the read heads, then the buffer is full. In other words, the
3447 * slowest reader (presumably apply) is the one that controls the release
3448 * of space.
3449 */
3450 new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3451 buffer_full = false;
3452 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3453 {
3454 if (new_write_head == LagTracker.read_heads[i])
3455 buffer_full = true;
3456 }
3457
3458 /*
3459 * If the buffer is full, for now we just rewind by one slot and overwrite
3460 * the last sample, as a simple (if somewhat uneven) way to lower the
3461 * sampling rate. There may be better adaptive compaction algorithms.
3462 */
3463 if (buffer_full)
3464 {
3465 new_write_head = LagTracker.write_head;
3466 if (LagTracker.write_head > 0)
3467 LagTracker.write_head--;
3468 else
3469 LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3470 }
3471
3472 /* Store a sample at the current write head position. */
3473 LagTracker.buffer[LagTracker.write_head].lsn = lsn;
3474 LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
3475 LagTracker.write_head = new_write_head;
3476 }
3477
3478 /*
3479 * Find out how much time has elapsed between the moment WAL location 'lsn'
3480 * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3481 * We have a separate read head for each of the reported LSN locations we
3482 * receive in replies from standby; 'head' controls which read head is
3483 * used. Whenever a read head crosses an LSN which was written into the
3484 * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3485 * find out the time this LSN (or an earlier one) was flushed locally, and
3486 * therefore compute the lag.
3487 *
3488 * Return -1 if no new sample data is available, and otherwise the elapsed
3489 * time in microseconds.
3490 */
3491 static TimeOffset
LagTrackerRead(int head,XLogRecPtr lsn,TimestampTz now)3492 LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
3493 {
3494 TimestampTz time = 0;
3495
3496 /* Read all unread samples up to this LSN or end of buffer. */
3497 while (LagTracker.read_heads[head] != LagTracker.write_head &&
3498 LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
3499 {
3500 time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3501 LagTracker.last_read[head] =
3502 LagTracker.buffer[LagTracker.read_heads[head]];
3503 LagTracker.read_heads[head] =
3504 (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3505 }
3506
3507 /*
3508 * If the lag tracker is empty, that means the standby has processed
3509 * everything we've ever sent so we should now clear 'last_read'. If we
3510 * didn't do that, we'd risk using a stale and irrelevant sample for
3511 * interpolation at the beginning of the next burst of WAL after a period
3512 * of idleness.
3513 */
3514 if (LagTracker.read_heads[head] == LagTracker.write_head)
3515 LagTracker.last_read[head].time = 0;
3516
3517 if (time > now)
3518 {
3519 /* If the clock somehow went backwards, treat as not found. */
3520 return -1;
3521 }
3522 else if (time == 0)
3523 {
3524 /*
3525 * We didn't cross a time. If there is a future sample that we
3526 * haven't reached yet, and we've already reached at least one sample,
3527 * let's interpolate the local flushed time. This is mainly useful
3528 * for reporting a completely stuck apply position as having
3529 * increasing lag, since otherwise we'd have to wait for it to
3530 * eventually start moving again and cross one of our samples before
3531 * we can show the lag increasing.
3532 */
3533 if (LagTracker.read_heads[head] == LagTracker.write_head)
3534 {
3535 /* There are no future samples, so we can't interpolate. */
3536 return -1;
3537 }
3538 else if (LagTracker.last_read[head].time != 0)
3539 {
3540 /* We can interpolate between last_read and the next sample. */
3541 double fraction;
3542 WalTimeSample prev = LagTracker.last_read[head];
3543 WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
3544
3545 if (lsn < prev.lsn)
3546 {
3547 /*
3548 * Reported LSNs shouldn't normally go backwards, but it's
3549 * possible when there is a timeline change. Treat as not
3550 * found.
3551 */
3552 return -1;
3553 }
3554
3555 Assert(prev.lsn < next.lsn);
3556
3557 if (prev.time > next.time)
3558 {
3559 /* If the clock somehow went backwards, treat as not found. */
3560 return -1;
3561 }
3562
3563 /* See how far we are between the previous and next samples. */
3564 fraction =
3565 (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3566
3567 /* Scale the local flush time proportionally. */
3568 time = (TimestampTz)
3569 ((double) prev.time + (next.time - prev.time) * fraction);
3570 }
3571 else
3572 {
3573 /*
3574 * We have only a future sample, implying that we were entirely
3575 * caught up but and now there is a new burst of WAL and the
3576 * standby hasn't processed the first sample yet. Until the
3577 * standby reaches the future sample the best we can do is report
3578 * the hypothetical lag if that sample were to be replayed now.
3579 */
3580 time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3581 }
3582 }
3583
3584 /* Return the elapsed time since local flush time in microseconds. */
3585 Assert(time != 0);
3586 return now - time;
3587 }
3588