1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, checkpointer sends us
28  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited.  If
29  * the backend is idle or runs an SQL query this causes the backend to
30  * shutdown, if logical replication is in progress all existing WAL records
31  * are processed followed by a shutdown.  Otherwise this causes the walsender
32  * to switch to the "stopping" state. In this state, the walsender will reject
33  * any further replication commands. The checkpointer begins the shutdown
34  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36  * walsender to send any outstanding WAL, including the shutdown checkpoint
37  * record, wait for it to be replicated to the standby, and then exit.
38  *
39  *
40  * Portions Copyright (c) 2010-2018, PostgreSQL Global Development Group
41  *
42  * IDENTIFICATION
43  *	  src/backend/replication/walsender.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <signal.h>
50 #include <unistd.h>
51 
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogutils.h"
58 
59 #include "catalog/pg_authid.h"
60 #include "catalog/pg_type.h"
61 #include "commands/dbcommands.h"
62 #include "commands/defrem.h"
63 #include "funcapi.h"
64 #include "libpq/libpq.h"
65 #include "libpq/pqformat.h"
66 #include "miscadmin.h"
67 #include "nodes/replnodes.h"
68 #include "pgstat.h"
69 #include "replication/basebackup.h"
70 #include "replication/decode.h"
71 #include "replication/logical.h"
72 #include "replication/logicalfuncs.h"
73 #include "replication/slot.h"
74 #include "replication/snapbuild.h"
75 #include "replication/syncrep.h"
76 #include "replication/walreceiver.h"
77 #include "replication/walsender.h"
78 #include "replication/walsender_private.h"
79 #include "storage/condition_variable.h"
80 #include "storage/fd.h"
81 #include "storage/ipc.h"
82 #include "storage/pmsignal.h"
83 #include "storage/proc.h"
84 #include "storage/procarray.h"
85 #include "tcop/dest.h"
86 #include "tcop/tcopprot.h"
87 #include "utils/builtins.h"
88 #include "utils/guc.h"
89 #include "utils/memutils.h"
90 #include "utils/pg_lsn.h"
91 #include "utils/portal.h"
92 #include "utils/ps_status.h"
93 #include "utils/resowner.h"
94 #include "utils/timeout.h"
95 #include "utils/timestamp.h"
96 
97 /*
98  * Maximum data payload in a WAL data message.  Must be >= XLOG_BLCKSZ.
99  *
100  * We don't have a good idea of what a good value would be; there's some
101  * overhead per message in both walsender and walreceiver, but on the other
102  * hand sending large batches makes walsender less responsive to signals
103  * because signals are checked only between messages.  128kB (with
104  * default 8k blocks) seems like a reasonable guess for now.
105  */
106 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
107 
108 /* Array of WalSnds in shared memory */
109 WalSndCtlData *WalSndCtl = NULL;
110 
111 /* My slot in the shared memory array */
112 WalSnd	   *MyWalSnd = NULL;
113 
114 /* Global state */
115 bool		am_walsender = false;	/* Am I a walsender process? */
116 bool		am_cascading_walsender = false; /* Am I cascading WAL to another
117 											 * standby? */
118 bool		am_db_walsender = false;	/* Connected to a database? */
119 
120 /* User-settable parameters for walsender */
121 int			max_wal_senders = 0;	/* the maximum number of concurrent
122 									 * walsenders */
123 int			wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
124 											 * data message */
125 bool		log_replication_commands = false;
126 
127 /*
128  * State for WalSndWakeupRequest
129  */
130 bool		wake_wal_senders = false;
131 
132 /*
133  * These variables are used similarly to openLogFile/SegNo/Off,
134  * but for walsender to read the XLOG.
135  */
136 static int	sendFile = -1;
137 static XLogSegNo sendSegNo = 0;
138 static uint32 sendOff = 0;
139 
140 /* Timeline ID of the currently open file */
141 static TimeLineID curFileTimeLine = 0;
142 
143 /*
144  * These variables keep track of the state of the timeline we're currently
145  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
146  * the timeline is not the latest timeline on this server, and the server's
147  * history forked off from that timeline at sendTimeLineValidUpto.
148  */
149 static TimeLineID sendTimeLine = 0;
150 static TimeLineID sendTimeLineNextTLI = 0;
151 static bool sendTimeLineIsHistoric = false;
152 static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
153 
154 /*
155  * How far have we sent WAL already? This is also advertised in
156  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
157  */
158 static XLogRecPtr sentPtr = InvalidXLogRecPtr;
159 
160 /* Buffers for constructing outgoing messages and processing reply messages. */
161 static StringInfoData output_message;
162 static StringInfoData reply_message;
163 static StringInfoData tmpbuf;
164 
165 /* Timestamp of last ProcessRepliesIfAny(). */
166 static TimestampTz last_processing = 0;
167 
168 /*
169  * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
170  * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
171  */
172 static TimestampTz last_reply_timestamp = 0;
173 
174 /* Have we sent a heartbeat message asking for reply, since last reply? */
175 static bool waiting_for_ping_response = false;
176 
177 /*
178  * While streaming WAL in Copy mode, streamingDoneSending is set to true
179  * after we have sent CopyDone. We should not send any more CopyData messages
180  * after that. streamingDoneReceiving is set to true when we receive CopyDone
181  * from the other end. When both become true, it's time to exit Copy mode.
182  */
183 static bool streamingDoneSending;
184 static bool streamingDoneReceiving;
185 
186 /* Are we there yet? */
187 static bool WalSndCaughtUp = false;
188 
189 /* Flags set by signal handlers for later service in main loop */
190 static volatile sig_atomic_t got_SIGUSR2 = false;
191 static volatile sig_atomic_t got_STOPPING = false;
192 
193 /*
194  * This is set while we are streaming. When not set
195  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
196  * the main loop is responsible for checking got_STOPPING and terminating when
197  * it's set (after streaming any remaining WAL).
198  */
199 static volatile sig_atomic_t replication_active = false;
200 
201 static LogicalDecodingContext *logical_decoding_ctx = NULL;
202 static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
203 
204 /* A sample associating a WAL location with the time it was written. */
205 typedef struct
206 {
207 	XLogRecPtr	lsn;
208 	TimestampTz time;
209 } WalTimeSample;
210 
211 /* The size of our buffer of time samples. */
212 #define LAG_TRACKER_BUFFER_SIZE 8192
213 
214 /* A mechanism for tracking replication lag. */
215 static struct
216 {
217 	XLogRecPtr	last_lsn;
218 	WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE];
219 	int			write_head;
220 	int			read_heads[NUM_SYNC_REP_WAIT_MODE];
221 	WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
222 }			LagTracker;
223 
224 /* Signal handlers */
225 static void WalSndLastCycleHandler(SIGNAL_ARGS);
226 
227 /* Prototypes for private functions */
228 typedef void (*WalSndSendDataCallback) (void);
229 static void WalSndLoop(WalSndSendDataCallback send_data);
230 static void InitWalSenderSlot(void);
231 static void WalSndKill(int code, Datum arg);
232 static void WalSndShutdown(void) pg_attribute_noreturn();
233 static void XLogSendPhysical(void);
234 static void XLogSendLogical(void);
235 static void WalSndDone(WalSndSendDataCallback send_data);
236 static XLogRecPtr GetStandbyFlushRecPtr(void);
237 static void IdentifySystem(void);
238 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
239 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
240 static void StartReplication(StartReplicationCmd *cmd);
241 static void StartLogicalReplication(StartReplicationCmd *cmd);
242 static void ProcessStandbyMessage(void);
243 static void ProcessStandbyReplyMessage(void);
244 static void ProcessStandbyHSFeedbackMessage(void);
245 static void ProcessRepliesIfAny(void);
246 static void WalSndKeepalive(bool requestReply);
247 static void WalSndKeepaliveIfNecessary(void);
248 static void WalSndCheckTimeOut(void);
249 static long WalSndComputeSleeptime(TimestampTz now);
250 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
251 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
252 static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
253 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
254 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
255 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
256 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
257 
258 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
259 
260 
261 /* Initialize walsender process before entering the main command loop */
262 void
InitWalSender(void)263 InitWalSender(void)
264 {
265 	am_cascading_walsender = RecoveryInProgress();
266 
267 	/* Create a per-walsender data structure in shared memory */
268 	InitWalSenderSlot();
269 
270 	/* Set up resource owner */
271 	CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
272 
273 	/*
274 	 * Let postmaster know that we're a WAL sender. Once we've declared us as
275 	 * a WAL sender process, postmaster will let us outlive the bgwriter and
276 	 * kill us last in the shutdown sequence, so we get a chance to stream all
277 	 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
278 	 * there's no going back, and we mustn't write any WAL records after this.
279 	 */
280 	MarkPostmasterChildWalSender();
281 	SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
282 
283 	/* Initialize empty timestamp buffer for lag tracking. */
284 	memset(&LagTracker, 0, sizeof(LagTracker));
285 }
286 
287 /*
288  * Clean up after an error.
289  *
290  * WAL sender processes don't use transactions like regular backends do.
291  * This function does any cleanup required after an error in a WAL sender
292  * process, similar to what transaction abort does in a regular backend.
293  */
294 void
WalSndErrorCleanup(void)295 WalSndErrorCleanup(void)
296 {
297 	LWLockReleaseAll();
298 	ConditionVariableCancelSleep();
299 	pgstat_report_wait_end();
300 
301 	if (sendFile >= 0)
302 	{
303 		close(sendFile);
304 		sendFile = -1;
305 	}
306 
307 	if (MyReplicationSlot != NULL)
308 		ReplicationSlotRelease();
309 
310 	ReplicationSlotCleanup();
311 
312 	replication_active = false;
313 
314 	if (got_STOPPING || got_SIGUSR2)
315 		proc_exit(0);
316 
317 	/* Revert back to startup state */
318 	WalSndSetState(WALSNDSTATE_STARTUP);
319 }
320 
321 /*
322  * Handle a client's connection abort in an orderly manner.
323  */
324 static void
WalSndShutdown(void)325 WalSndShutdown(void)
326 {
327 	/*
328 	 * Reset whereToSendOutput to prevent ereport from attempting to send any
329 	 * more messages to the standby.
330 	 */
331 	if (whereToSendOutput == DestRemote)
332 		whereToSendOutput = DestNone;
333 
334 	proc_exit(0);
335 	abort();					/* keep the compiler quiet */
336 }
337 
338 /*
339  * Handle the IDENTIFY_SYSTEM command.
340  */
341 static void
IdentifySystem(void)342 IdentifySystem(void)
343 {
344 	char		sysid[32];
345 	char		xloc[MAXFNAMELEN];
346 	XLogRecPtr	logptr;
347 	char	   *dbname = NULL;
348 	DestReceiver *dest;
349 	TupOutputState *tstate;
350 	TupleDesc	tupdesc;
351 	Datum		values[4];
352 	bool		nulls[4];
353 
354 	/*
355 	 * Reply with a result set with one row, four columns. First col is system
356 	 * ID, second is timeline ID, third is current xlog location and the
357 	 * fourth contains the database name if we are connected to one.
358 	 */
359 
360 	snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
361 			 GetSystemIdentifier());
362 
363 	am_cascading_walsender = RecoveryInProgress();
364 	if (am_cascading_walsender)
365 	{
366 		/* this also updates ThisTimeLineID */
367 		logptr = GetStandbyFlushRecPtr();
368 	}
369 	else
370 		logptr = GetFlushRecPtr();
371 
372 	snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
373 
374 	if (MyDatabaseId != InvalidOid)
375 	{
376 		MemoryContext cur = CurrentMemoryContext;
377 
378 		/* syscache access needs a transaction env. */
379 		StartTransactionCommand();
380 		/* make dbname live outside TX context */
381 		MemoryContextSwitchTo(cur);
382 		dbname = get_database_name(MyDatabaseId);
383 		CommitTransactionCommand();
384 		/* CommitTransactionCommand switches to TopMemoryContext */
385 		MemoryContextSwitchTo(cur);
386 	}
387 
388 	dest = CreateDestReceiver(DestRemoteSimple);
389 	MemSet(nulls, false, sizeof(nulls));
390 
391 	/* need a tuple descriptor representing four columns */
392 	tupdesc = CreateTemplateTupleDesc(4, false);
393 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
394 							  TEXTOID, -1, 0);
395 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
396 							  INT4OID, -1, 0);
397 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
398 							  TEXTOID, -1, 0);
399 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
400 							  TEXTOID, -1, 0);
401 
402 	/* prepare for projection of tuples */
403 	tstate = begin_tup_output_tupdesc(dest, tupdesc);
404 
405 	/* column 1: system identifier */
406 	values[0] = CStringGetTextDatum(sysid);
407 
408 	/* column 2: timeline */
409 	values[1] = Int32GetDatum(ThisTimeLineID);
410 
411 	/* column 3: wal location */
412 	values[2] = CStringGetTextDatum(xloc);
413 
414 	/* column 4: database name, or NULL if none */
415 	if (dbname)
416 		values[3] = CStringGetTextDatum(dbname);
417 	else
418 		nulls[3] = true;
419 
420 	/* send it to dest */
421 	do_tup_output(tstate, values, nulls);
422 
423 	end_tup_output(tstate);
424 }
425 
426 
427 /*
428  * Handle TIMELINE_HISTORY command.
429  */
430 static void
SendTimeLineHistory(TimeLineHistoryCmd * cmd)431 SendTimeLineHistory(TimeLineHistoryCmd *cmd)
432 {
433 	StringInfoData buf;
434 	char		histfname[MAXFNAMELEN];
435 	char		path[MAXPGPATH];
436 	int			fd;
437 	off_t		histfilelen;
438 	off_t		bytesleft;
439 	Size		len;
440 
441 	/*
442 	 * Reply with a result set with one row, and two columns. The first col is
443 	 * the name of the history file, 2nd is the contents.
444 	 */
445 
446 	TLHistoryFileName(histfname, cmd->timeline);
447 	TLHistoryFilePath(path, cmd->timeline);
448 
449 	/* Send a RowDescription message */
450 	pq_beginmessage(&buf, 'T');
451 	pq_sendint16(&buf, 2);		/* 2 fields */
452 
453 	/* first field */
454 	pq_sendstring(&buf, "filename");	/* col name */
455 	pq_sendint32(&buf, 0);		/* table oid */
456 	pq_sendint16(&buf, 0);		/* attnum */
457 	pq_sendint32(&buf, TEXTOID);	/* type oid */
458 	pq_sendint16(&buf, -1);		/* typlen */
459 	pq_sendint32(&buf, 0);		/* typmod */
460 	pq_sendint16(&buf, 0);		/* format code */
461 
462 	/* second field */
463 	pq_sendstring(&buf, "content"); /* col name */
464 	pq_sendint32(&buf, 0);		/* table oid */
465 	pq_sendint16(&buf, 0);		/* attnum */
466 	/*
467 	 * While this is labeled as BYTEAOID, it is the same output format
468 	 * as TEXTOID above.
469 	 */
470 	pq_sendint32(&buf, BYTEAOID);	/* type oid */
471 	pq_sendint16(&buf, -1);		/* typlen */
472 	pq_sendint32(&buf, 0);		/* typmod */
473 	pq_sendint16(&buf, 0);		/* format code */
474 	pq_endmessage(&buf);
475 
476 	/* Send a DataRow message */
477 	pq_beginmessage(&buf, 'D');
478 	pq_sendint16(&buf, 2);		/* # of columns */
479 	len = strlen(histfname);
480 	pq_sendint32(&buf, len);	/* col1 len */
481 	pq_sendbytes(&buf, histfname, len);
482 
483 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
484 	if (fd < 0)
485 		ereport(ERROR,
486 				(errcode_for_file_access(),
487 				 errmsg("could not open file \"%s\": %m", path)));
488 
489 	/* Determine file length and send it to client */
490 	histfilelen = lseek(fd, 0, SEEK_END);
491 	if (histfilelen < 0)
492 		ereport(ERROR,
493 				(errcode_for_file_access(),
494 				 errmsg("could not seek to end of file \"%s\": %m", path)));
495 	if (lseek(fd, 0, SEEK_SET) != 0)
496 		ereport(ERROR,
497 				(errcode_for_file_access(),
498 				 errmsg("could not seek to beginning of file \"%s\": %m", path)));
499 
500 	pq_sendint32(&buf, histfilelen);	/* col2 len */
501 
502 	bytesleft = histfilelen;
503 	while (bytesleft > 0)
504 	{
505 		PGAlignedBlock rbuf;
506 		int			nread;
507 
508 		pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
509 		nread = read(fd, rbuf.data, sizeof(rbuf));
510 		pgstat_report_wait_end();
511 		if (nread <= 0)
512 			ereport(ERROR,
513 					(errcode_for_file_access(),
514 					 errmsg("could not read file \"%s\": %m",
515 							path)));
516 		pq_sendbytes(&buf, rbuf.data, nread);
517 		bytesleft -= nread;
518 	}
519 	CloseTransientFile(fd);
520 
521 	pq_endmessage(&buf);
522 }
523 
524 /*
525  * Handle START_REPLICATION command.
526  *
527  * At the moment, this never returns, but an ereport(ERROR) will take us back
528  * to the main loop.
529  */
530 static void
StartReplication(StartReplicationCmd * cmd)531 StartReplication(StartReplicationCmd *cmd)
532 {
533 	StringInfoData buf;
534 	XLogRecPtr	FlushPtr;
535 
536 	if (ThisTimeLineID == 0)
537 		ereport(ERROR,
538 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
539 				 errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
540 
541 	/*
542 	 * We assume here that we're logging enough information in the WAL for
543 	 * log-shipping, since this is checked in PostmasterMain().
544 	 *
545 	 * NOTE: wal_level can only change at shutdown, so in most cases it is
546 	 * difficult for there to be WAL data that we can still see that was
547 	 * written at wal_level='minimal'.
548 	 */
549 
550 	if (cmd->slotname)
551 	{
552 		ReplicationSlotAcquire(cmd->slotname, true);
553 		if (SlotIsLogical(MyReplicationSlot))
554 			ereport(ERROR,
555 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
556 					 (errmsg("cannot use a logical replication slot for physical replication"))));
557 	}
558 
559 	/*
560 	 * Select the timeline. If it was given explicitly by the client, use
561 	 * that. Otherwise use the timeline of the last replayed record, which is
562 	 * kept in ThisTimeLineID.
563 	 */
564 	if (am_cascading_walsender)
565 	{
566 		/* this also updates ThisTimeLineID */
567 		FlushPtr = GetStandbyFlushRecPtr();
568 	}
569 	else
570 		FlushPtr = GetFlushRecPtr();
571 
572 	if (cmd->timeline != 0)
573 	{
574 		XLogRecPtr	switchpoint;
575 
576 		sendTimeLine = cmd->timeline;
577 		if (sendTimeLine == ThisTimeLineID)
578 		{
579 			sendTimeLineIsHistoric = false;
580 			sendTimeLineValidUpto = InvalidXLogRecPtr;
581 		}
582 		else
583 		{
584 			List	   *timeLineHistory;
585 
586 			sendTimeLineIsHistoric = true;
587 
588 			/*
589 			 * Check that the timeline the client requested exists, and the
590 			 * requested start location is on that timeline.
591 			 */
592 			timeLineHistory = readTimeLineHistory(ThisTimeLineID);
593 			switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
594 										 &sendTimeLineNextTLI);
595 			list_free_deep(timeLineHistory);
596 
597 			/*
598 			 * Found the requested timeline in the history. Check that
599 			 * requested startpoint is on that timeline in our history.
600 			 *
601 			 * This is quite loose on purpose. We only check that we didn't
602 			 * fork off the requested timeline before the switchpoint. We
603 			 * don't check that we switched *to* it before the requested
604 			 * starting point. This is because the client can legitimately
605 			 * request to start replication from the beginning of the WAL
606 			 * segment that contains switchpoint, but on the new timeline, so
607 			 * that it doesn't end up with a partial segment. If you ask for
608 			 * too old a starting point, you'll get an error later when we
609 			 * fail to find the requested WAL segment in pg_wal.
610 			 *
611 			 * XXX: we could be more strict here and only allow a startpoint
612 			 * that's older than the switchpoint, if it's still in the same
613 			 * WAL segment.
614 			 */
615 			if (!XLogRecPtrIsInvalid(switchpoint) &&
616 				switchpoint < cmd->startpoint)
617 			{
618 				ereport(ERROR,
619 						(errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
620 								(uint32) (cmd->startpoint >> 32),
621 								(uint32) (cmd->startpoint),
622 								cmd->timeline),
623 						 errdetail("This server's history forked from timeline %u at %X/%X.",
624 								   cmd->timeline,
625 								   (uint32) (switchpoint >> 32),
626 								   (uint32) (switchpoint))));
627 			}
628 			sendTimeLineValidUpto = switchpoint;
629 		}
630 	}
631 	else
632 	{
633 		sendTimeLine = ThisTimeLineID;
634 		sendTimeLineValidUpto = InvalidXLogRecPtr;
635 		sendTimeLineIsHistoric = false;
636 	}
637 
638 	streamingDoneSending = streamingDoneReceiving = false;
639 
640 	/* If there is nothing to stream, don't even enter COPY mode */
641 	if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
642 	{
643 		/*
644 		 * When we first start replication the standby will be behind the
645 		 * primary. For some applications, for example synchronous
646 		 * replication, it is important to have a clear state for this initial
647 		 * catchup mode, so we can trigger actions when we change streaming
648 		 * state later. We may stay in this state for a long time, which is
649 		 * exactly why we want to be able to monitor whether or not we are
650 		 * still here.
651 		 */
652 		WalSndSetState(WALSNDSTATE_CATCHUP);
653 
654 		/* Send a CopyBothResponse message, and start streaming */
655 		pq_beginmessage(&buf, 'W');
656 		pq_sendbyte(&buf, 0);
657 		pq_sendint16(&buf, 0);
658 		pq_endmessage(&buf);
659 		pq_flush();
660 
661 		/*
662 		 * Don't allow a request to stream from a future point in WAL that
663 		 * hasn't been flushed to disk in this server yet.
664 		 */
665 		if (FlushPtr < cmd->startpoint)
666 		{
667 			ereport(ERROR,
668 					(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
669 							(uint32) (cmd->startpoint >> 32),
670 							(uint32) (cmd->startpoint),
671 							(uint32) (FlushPtr >> 32),
672 							(uint32) (FlushPtr))));
673 		}
674 
675 		/* Start streaming from the requested point */
676 		sentPtr = cmd->startpoint;
677 
678 		/* Initialize shared memory status, too */
679 		SpinLockAcquire(&MyWalSnd->mutex);
680 		MyWalSnd->sentPtr = sentPtr;
681 		SpinLockRelease(&MyWalSnd->mutex);
682 
683 		SyncRepInitConfig();
684 
685 		/* Main loop of walsender */
686 		replication_active = true;
687 
688 		WalSndLoop(XLogSendPhysical);
689 
690 		replication_active = false;
691 		if (got_STOPPING)
692 			proc_exit(0);
693 		WalSndSetState(WALSNDSTATE_STARTUP);
694 
695 		Assert(streamingDoneSending && streamingDoneReceiving);
696 	}
697 
698 	if (cmd->slotname)
699 		ReplicationSlotRelease();
700 
701 	/*
702 	 * Copy is finished now. Send a single-row result set indicating the next
703 	 * timeline.
704 	 */
705 	if (sendTimeLineIsHistoric)
706 	{
707 		char		startpos_str[8 + 1 + 8 + 1];
708 		DestReceiver *dest;
709 		TupOutputState *tstate;
710 		TupleDesc	tupdesc;
711 		Datum		values[2];
712 		bool		nulls[2];
713 
714 		snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
715 				 (uint32) (sendTimeLineValidUpto >> 32),
716 				 (uint32) sendTimeLineValidUpto);
717 
718 		dest = CreateDestReceiver(DestRemoteSimple);
719 		MemSet(nulls, false, sizeof(nulls));
720 
721 		/*
722 		 * Need a tuple descriptor representing two columns. int8 may seem
723 		 * like a surprising data type for this, but in theory int4 would not
724 		 * be wide enough for this, as TimeLineID is unsigned.
725 		 */
726 		tupdesc = CreateTemplateTupleDesc(2, false);
727 		TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
728 								  INT8OID, -1, 0);
729 		TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
730 								  TEXTOID, -1, 0);
731 
732 		/* prepare for projection of tuple */
733 		tstate = begin_tup_output_tupdesc(dest, tupdesc);
734 
735 		values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
736 		values[1] = CStringGetTextDatum(startpos_str);
737 
738 		/* send it to dest */
739 		do_tup_output(tstate, values, nulls);
740 
741 		end_tup_output(tstate);
742 	}
743 
744 	/* Send CommandComplete message */
745 	pq_puttextmessage('C', "START_STREAMING");
746 }
747 
748 /*
749  * read_page callback for logical decoding contexts, as a walsender process.
750  *
751  * Inside the walsender we can do better than logical_read_local_xlog_page,
752  * which has to do a plain sleep/busy loop, because the walsender's latch gets
753  * set every time WAL is flushed.
754  */
755 static int
logical_read_xlog_page(XLogReaderState * state,XLogRecPtr targetPagePtr,int reqLen,XLogRecPtr targetRecPtr,char * cur_page,TimeLineID * pageTLI)756 logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
757 					   XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
758 {
759 	XLogRecPtr	flushptr;
760 	int			count;
761 
762 	XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
763 	sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
764 	sendTimeLine = state->currTLI;
765 	sendTimeLineValidUpto = state->currTLIValidUntil;
766 	sendTimeLineNextTLI = state->nextTLI;
767 
768 	/* make sure we have enough WAL available */
769 	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
770 
771 	/* fail if not (implies we are going to shut down) */
772 	if (flushptr < targetPagePtr + reqLen)
773 		return -1;
774 
775 	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
776 		count = XLOG_BLCKSZ;	/* more than one block available */
777 	else
778 		count = flushptr - targetPagePtr;	/* part of the page available */
779 
780 	/* now actually read the data, we know it's there */
781 	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
782 
783 	return count;
784 }
785 
786 /*
787  * Process extra options given to CREATE_REPLICATION_SLOT.
788  */
789 static void
parseCreateReplSlotOptions(CreateReplicationSlotCmd * cmd,bool * reserve_wal,CRSSnapshotAction * snapshot_action)790 parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
791 						   bool *reserve_wal,
792 						   CRSSnapshotAction *snapshot_action)
793 {
794 	ListCell   *lc;
795 	bool		snapshot_action_given = false;
796 	bool		reserve_wal_given = false;
797 
798 	/* Parse options */
799 	foreach(lc, cmd->options)
800 	{
801 		DefElem    *defel = (DefElem *) lfirst(lc);
802 
803 		if (strcmp(defel->defname, "export_snapshot") == 0)
804 		{
805 			if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
806 				ereport(ERROR,
807 						(errcode(ERRCODE_SYNTAX_ERROR),
808 						 errmsg("conflicting or redundant options")));
809 
810 			snapshot_action_given = true;
811 			*snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
812 				CRS_NOEXPORT_SNAPSHOT;
813 		}
814 		else if (strcmp(defel->defname, "use_snapshot") == 0)
815 		{
816 			if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
817 				ereport(ERROR,
818 						(errcode(ERRCODE_SYNTAX_ERROR),
819 						 errmsg("conflicting or redundant options")));
820 
821 			snapshot_action_given = true;
822 			*snapshot_action = CRS_USE_SNAPSHOT;
823 		}
824 		else if (strcmp(defel->defname, "reserve_wal") == 0)
825 		{
826 			if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
827 				ereport(ERROR,
828 						(errcode(ERRCODE_SYNTAX_ERROR),
829 						 errmsg("conflicting or redundant options")));
830 
831 			reserve_wal_given = true;
832 			*reserve_wal = true;
833 		}
834 		else
835 			elog(ERROR, "unrecognized option: %s", defel->defname);
836 	}
837 }
838 
839 /*
840  * Create a new replication slot.
841  */
842 static void
CreateReplicationSlot(CreateReplicationSlotCmd * cmd)843 CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
844 {
845 	const char *snapshot_name = NULL;
846 	char		xloc[MAXFNAMELEN];
847 	char	   *slot_name;
848 	bool		reserve_wal = false;
849 	CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
850 	DestReceiver *dest;
851 	TupOutputState *tstate;
852 	TupleDesc	tupdesc;
853 	Datum		values[4];
854 	bool		nulls[4];
855 
856 	Assert(!MyReplicationSlot);
857 
858 	parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
859 
860 	/* setup state for XLogReadPage */
861 	sendTimeLineIsHistoric = false;
862 	sendTimeLine = ThisTimeLineID;
863 
864 	if (cmd->kind == REPLICATION_KIND_PHYSICAL)
865 	{
866 		ReplicationSlotCreate(cmd->slotname, false,
867 							  cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT);
868 	}
869 	else
870 	{
871 		CheckLogicalDecodingRequirements();
872 
873 		/*
874 		 * Initially create persistent slot as ephemeral - that allows us to
875 		 * nicely handle errors during initialization because it'll get
876 		 * dropped if this transaction fails. We'll make it persistent at the
877 		 * end. Temporary slots can be created as temporary from beginning as
878 		 * they get dropped on error as well.
879 		 */
880 		ReplicationSlotCreate(cmd->slotname, true,
881 							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
882 	}
883 
884 	if (cmd->kind == REPLICATION_KIND_LOGICAL)
885 	{
886 		LogicalDecodingContext *ctx;
887 		bool		need_full_snapshot = false;
888 
889 		/*
890 		 * Do options check early so that we can bail before calling the
891 		 * DecodingContextFindStartpoint which can take long time.
892 		 */
893 		if (snapshot_action == CRS_EXPORT_SNAPSHOT)
894 		{
895 			if (IsTransactionBlock())
896 				ereport(ERROR,
897 						(errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
898 								"must not be called inside a transaction")));
899 
900 			need_full_snapshot = true;
901 		}
902 		else if (snapshot_action == CRS_USE_SNAPSHOT)
903 		{
904 			if (!IsTransactionBlock())
905 				ereport(ERROR,
906 						(errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
907 								"must be called inside a transaction")));
908 
909 			if (XactIsoLevel != XACT_REPEATABLE_READ)
910 				ereport(ERROR,
911 						(errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
912 								"must be called in REPEATABLE READ isolation mode transaction")));
913 
914 			if (FirstSnapshotSet)
915 				ereport(ERROR,
916 						(errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
917 								"must be called before any query")));
918 
919 			if (IsSubTransaction())
920 				ereport(ERROR,
921 						(errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
922 								"must not be called in a subtransaction")));
923 
924 			need_full_snapshot = true;
925 		}
926 
927 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
928 										logical_read_xlog_page,
929 										WalSndPrepareWrite, WalSndWriteData,
930 										WalSndUpdateProgress);
931 
932 		/*
933 		 * Signal that we don't need the timeout mechanism. We're just
934 		 * creating the replication slot and don't yet accept feedback
935 		 * messages or send keepalives. As we possibly need to wait for
936 		 * further WAL the walsender would otherwise possibly be killed too
937 		 * soon.
938 		 */
939 		last_reply_timestamp = 0;
940 
941 		/* build initial snapshot, might take a while */
942 		DecodingContextFindStartpoint(ctx);
943 
944 		/*
945 		 * Export or use the snapshot if we've been asked to do so.
946 		 *
947 		 * NB. We will convert the snapbuild.c kind of snapshot to normal
948 		 * snapshot when doing this.
949 		 */
950 		if (snapshot_action == CRS_EXPORT_SNAPSHOT)
951 		{
952 			snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
953 		}
954 		else if (snapshot_action == CRS_USE_SNAPSHOT)
955 		{
956 			Snapshot	snap;
957 
958 			snap = SnapBuildInitialSnapshot(ctx->snapshot_builder);
959 			RestoreTransactionSnapshot(snap, MyProc);
960 		}
961 
962 		/* don't need the decoding context anymore */
963 		FreeDecodingContext(ctx);
964 
965 		if (!cmd->temporary)
966 			ReplicationSlotPersist();
967 	}
968 	else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
969 	{
970 		ReplicationSlotReserveWal();
971 
972 		ReplicationSlotMarkDirty();
973 
974 		/* Write this slot to disk if it's a permanent one. */
975 		if (!cmd->temporary)
976 			ReplicationSlotSave();
977 	}
978 
979 	snprintf(xloc, sizeof(xloc), "%X/%X",
980 			 (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
981 			 (uint32) MyReplicationSlot->data.confirmed_flush);
982 
983 	dest = CreateDestReceiver(DestRemoteSimple);
984 	MemSet(nulls, false, sizeof(nulls));
985 
986 	/*----------
987 	 * Need a tuple descriptor representing four columns:
988 	 * - first field: the slot name
989 	 * - second field: LSN at which we became consistent
990 	 * - third field: exported snapshot's name
991 	 * - fourth field: output plugin
992 	 *----------
993 	 */
994 	tupdesc = CreateTemplateTupleDesc(4, false);
995 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
996 							  TEXTOID, -1, 0);
997 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
998 							  TEXTOID, -1, 0);
999 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1000 							  TEXTOID, -1, 0);
1001 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1002 							  TEXTOID, -1, 0);
1003 
1004 	/* prepare for projection of tuples */
1005 	tstate = begin_tup_output_tupdesc(dest, tupdesc);
1006 
1007 	/* slot_name */
1008 	slot_name = NameStr(MyReplicationSlot->data.name);
1009 	values[0] = CStringGetTextDatum(slot_name);
1010 
1011 	/* consistent wal location */
1012 	values[1] = CStringGetTextDatum(xloc);
1013 
1014 	/* snapshot name, or NULL if none */
1015 	if (snapshot_name != NULL)
1016 		values[2] = CStringGetTextDatum(snapshot_name);
1017 	else
1018 		nulls[2] = true;
1019 
1020 	/* plugin, or NULL if none */
1021 	if (cmd->plugin != NULL)
1022 		values[3] = CStringGetTextDatum(cmd->plugin);
1023 	else
1024 		nulls[3] = true;
1025 
1026 	/* send it to dest */
1027 	do_tup_output(tstate, values, nulls);
1028 	end_tup_output(tstate);
1029 
1030 	ReplicationSlotRelease();
1031 }
1032 
1033 /*
1034  * Get rid of a replication slot that is no longer wanted.
1035  */
1036 static void
DropReplicationSlot(DropReplicationSlotCmd * cmd)1037 DropReplicationSlot(DropReplicationSlotCmd *cmd)
1038 {
1039 	ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1040 	EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1041 }
1042 
1043 /*
1044  * Load previously initiated logical slot and prepare for sending data (via
1045  * WalSndLoop).
1046  */
1047 static void
StartLogicalReplication(StartReplicationCmd * cmd)1048 StartLogicalReplication(StartReplicationCmd *cmd)
1049 {
1050 	StringInfoData buf;
1051 
1052 	/* make sure that our requirements are still fulfilled */
1053 	CheckLogicalDecodingRequirements();
1054 
1055 	Assert(!MyReplicationSlot);
1056 
1057 	ReplicationSlotAcquire(cmd->slotname, true);
1058 
1059 	/*
1060 	 * Force a disconnect, so that the decoding code doesn't need to care
1061 	 * about an eventual switch from running in recovery, to running in a
1062 	 * normal environment. Client code is expected to handle reconnects.
1063 	 */
1064 	if (am_cascading_walsender && !RecoveryInProgress())
1065 	{
1066 		ereport(LOG,
1067 				(errmsg("terminating walsender process after promotion")));
1068 		got_STOPPING = true;
1069 	}
1070 
1071 	/*
1072 	 * Create our decoding context, making it start at the previously ack'ed
1073 	 * position.
1074 	 *
1075 	 * Do this before sending CopyBoth, so that any errors are reported early.
1076 	 */
1077 	logical_decoding_ctx =
1078 		CreateDecodingContext(cmd->startpoint, cmd->options, false,
1079 							  logical_read_xlog_page,
1080 							  WalSndPrepareWrite, WalSndWriteData,
1081 							  WalSndUpdateProgress);
1082 
1083 
1084 	WalSndSetState(WALSNDSTATE_CATCHUP);
1085 
1086 	/* Send a CopyBothResponse message, and start streaming */
1087 	pq_beginmessage(&buf, 'W');
1088 	pq_sendbyte(&buf, 0);
1089 	pq_sendint16(&buf, 0);
1090 	pq_endmessage(&buf);
1091 	pq_flush();
1092 
1093 
1094 	/* Start reading WAL from the oldest required WAL. */
1095 	logical_startptr = MyReplicationSlot->data.restart_lsn;
1096 
1097 	/*
1098 	 * Report the location after which we'll send out further commits as the
1099 	 * current sentPtr.
1100 	 */
1101 	sentPtr = MyReplicationSlot->data.confirmed_flush;
1102 
1103 	/* Also update the sent position status in shared memory */
1104 	SpinLockAcquire(&MyWalSnd->mutex);
1105 	MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn;
1106 	SpinLockRelease(&MyWalSnd->mutex);
1107 
1108 	replication_active = true;
1109 
1110 	SyncRepInitConfig();
1111 
1112 	/* Main loop of walsender */
1113 	WalSndLoop(XLogSendLogical);
1114 
1115 	FreeDecodingContext(logical_decoding_ctx);
1116 	ReplicationSlotRelease();
1117 
1118 	replication_active = false;
1119 	if (got_STOPPING)
1120 		proc_exit(0);
1121 	WalSndSetState(WALSNDSTATE_STARTUP);
1122 
1123 	/* Get out of COPY mode (CommandComplete). */
1124 	EndCommand("COPY 0", DestRemote);
1125 }
1126 
1127 /*
1128  * LogicalDecodingContext 'prepare_write' callback.
1129  *
1130  * Prepare a write into a StringInfo.
1131  *
1132  * Don't do anything lasting in here, it's quite possible that nothing will be done
1133  * with the data.
1134  */
1135 static void
WalSndPrepareWrite(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid,bool last_write)1136 WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
1137 {
1138 	/* can't have sync rep confused by sending the same LSN several times */
1139 	if (!last_write)
1140 		lsn = InvalidXLogRecPtr;
1141 
1142 	resetStringInfo(ctx->out);
1143 
1144 	pq_sendbyte(ctx->out, 'w');
1145 	pq_sendint64(ctx->out, lsn);	/* dataStart */
1146 	pq_sendint64(ctx->out, lsn);	/* walEnd */
1147 
1148 	/*
1149 	 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1150 	 * reserve space here.
1151 	 */
1152 	pq_sendint64(ctx->out, 0);	/* sendtime */
1153 }
1154 
1155 /*
1156  * LogicalDecodingContext 'write' callback.
1157  *
1158  * Actually write out data previously prepared by WalSndPrepareWrite out to
1159  * the network. Take as long as needed, but process replies from the other
1160  * side and check timeouts during that.
1161  */
1162 static void
WalSndWriteData(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid,bool last_write)1163 WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
1164 				bool last_write)
1165 {
1166 	TimestampTz now;
1167 
1168 	/*
1169 	 * Fill the send timestamp last, so that it is taken as late as possible.
1170 	 * This is somewhat ugly, but the protocol is set as it's already used for
1171 	 * several releases by streaming physical replication.
1172 	 */
1173 	resetStringInfo(&tmpbuf);
1174 	now = GetCurrentTimestamp();
1175 	pq_sendint64(&tmpbuf, now);
1176 	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1177 		   tmpbuf.data, sizeof(int64));
1178 
1179 	/* output previously gathered data in a CopyData packet */
1180 	pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1181 
1182 	CHECK_FOR_INTERRUPTS();
1183 
1184 	/* Try to flush pending output to the client */
1185 	if (pq_flush_if_writable() != 0)
1186 		WalSndShutdown();
1187 
1188 	/* Try taking fast path unless we get too close to walsender timeout. */
1189 	if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
1190 										  wal_sender_timeout / 2) &&
1191 		!pq_is_send_pending())
1192 	{
1193 		return;
1194 	}
1195 
1196 	/* If we have pending write here, go to slow path */
1197 	for (;;)
1198 	{
1199 		int			wakeEvents;
1200 		long		sleeptime;
1201 
1202 		/* Check for input from the client */
1203 		ProcessRepliesIfAny();
1204 
1205 		/* die if timeout was reached */
1206 		WalSndCheckTimeOut();
1207 
1208 		/* Send keepalive if the time has come */
1209 		WalSndKeepaliveIfNecessary();
1210 
1211 		if (!pq_is_send_pending())
1212 			break;
1213 
1214 		sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1215 
1216 		wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1217 			WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
1218 
1219 		/* Sleep until something happens or we time out */
1220 		WaitLatchOrSocket(MyLatch, wakeEvents,
1221 						  MyProcPort->sock, sleeptime,
1222 						  WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1223 
1224 		/*
1225 		 * Emergency bailout if postmaster has died.  This is to avoid the
1226 		 * necessity for manual cleanup of all postmaster children.
1227 		 */
1228 		if (!PostmasterIsAlive())
1229 			exit(1);
1230 
1231 		/* Clear any already-pending wakeups */
1232 		ResetLatch(MyLatch);
1233 
1234 		CHECK_FOR_INTERRUPTS();
1235 
1236 		/* Process any requests or signals received recently */
1237 		if (ConfigReloadPending)
1238 		{
1239 			ConfigReloadPending = false;
1240 			ProcessConfigFile(PGC_SIGHUP);
1241 			SyncRepInitConfig();
1242 		}
1243 
1244 		/* Try to flush pending output to the client */
1245 		if (pq_flush_if_writable() != 0)
1246 			WalSndShutdown();
1247 	}
1248 
1249 	/* reactivate latch so WalSndLoop knows to continue */
1250 	SetLatch(MyLatch);
1251 }
1252 
1253 /*
1254  * LogicalDecodingContext 'update_progress' callback.
1255  *
1256  * Write the current position to the lag tracker (see XLogSendPhysical).
1257  */
1258 static void
WalSndUpdateProgress(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid)1259 WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
1260 {
1261 	static TimestampTz sendTime = 0;
1262 	TimestampTz now = GetCurrentTimestamp();
1263 
1264 	/*
1265 	 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1266 	 * avoid flooding the lag tracker when we commit frequently.
1267 	 */
1268 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS	1000
1269 	if (!TimestampDifferenceExceeds(sendTime, now,
1270 									WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1271 		return;
1272 
1273 	LagTrackerWrite(lsn, now);
1274 	sendTime = now;
1275 }
1276 
1277 /*
1278  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1279  *
1280  * Returns end LSN of flushed WAL.  Normally this will be >= loc, but
1281  * if we detect a shutdown request (either from postmaster or client)
1282  * we will return early, so caller must always check.
1283  */
1284 static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)1285 WalSndWaitForWal(XLogRecPtr loc)
1286 {
1287 	int			wakeEvents;
1288 	static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1289 
1290 	/*
1291 	 * Fast path to avoid acquiring the spinlock in case we already know we
1292 	 * have enough WAL available. This is particularly interesting if we're
1293 	 * far behind.
1294 	 */
1295 	if (RecentFlushPtr != InvalidXLogRecPtr &&
1296 		loc <= RecentFlushPtr)
1297 		return RecentFlushPtr;
1298 
1299 	/* Get a more recent flush pointer. */
1300 	if (!RecoveryInProgress())
1301 		RecentFlushPtr = GetFlushRecPtr();
1302 	else
1303 		RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1304 
1305 	for (;;)
1306 	{
1307 		long		sleeptime;
1308 
1309 		/*
1310 		 * Emergency bailout if postmaster has died.  This is to avoid the
1311 		 * necessity for manual cleanup of all postmaster children.
1312 		 */
1313 		if (!PostmasterIsAlive())
1314 			exit(1);
1315 
1316 		/* Clear any already-pending wakeups */
1317 		ResetLatch(MyLatch);
1318 
1319 		CHECK_FOR_INTERRUPTS();
1320 
1321 		/* Process any requests or signals received recently */
1322 		if (ConfigReloadPending)
1323 		{
1324 			ConfigReloadPending = false;
1325 			ProcessConfigFile(PGC_SIGHUP);
1326 			SyncRepInitConfig();
1327 		}
1328 
1329 		/* Check for input from the client */
1330 		ProcessRepliesIfAny();
1331 
1332 		/*
1333 		 * If we're shutting down, trigger pending WAL to be written out,
1334 		 * otherwise we'd possibly end up waiting for WAL that never gets
1335 		 * written, because walwriter has shut down already.
1336 		 */
1337 		if (got_STOPPING)
1338 			XLogBackgroundFlush();
1339 
1340 		/* Update our idea of the currently flushed position. */
1341 		if (!RecoveryInProgress())
1342 			RecentFlushPtr = GetFlushRecPtr();
1343 		else
1344 			RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1345 
1346 		/*
1347 		 * If postmaster asked us to stop, don't wait anymore.
1348 		 *
1349 		 * It's important to do this check after the recomputation of
1350 		 * RecentFlushPtr, so we can send all remaining data before shutting
1351 		 * down.
1352 		 */
1353 		if (got_STOPPING)
1354 			break;
1355 
1356 		/*
1357 		 * We only send regular messages to the client for full decoded
1358 		 * transactions, but a synchronous replication and walsender shutdown
1359 		 * possibly are waiting for a later location. So, before sleeping, we
1360 		 * send a ping containing the flush location. If the receiver is
1361 		 * otherwise idle, this keepalive will trigger a reply. Processing the
1362 		 * reply will update these MyWalSnd locations.
1363 		 */
1364 		if (MyWalSnd->flush < sentPtr &&
1365 			MyWalSnd->write < sentPtr &&
1366 			!waiting_for_ping_response)
1367 			WalSndKeepalive(false);
1368 
1369 		/* check whether we're done */
1370 		if (loc <= RecentFlushPtr)
1371 			break;
1372 
1373 		/* Waiting for new WAL. Since we need to wait, we're now caught up. */
1374 		WalSndCaughtUp = true;
1375 
1376 		/*
1377 		 * Try to flush any pending output to the client.
1378 		 */
1379 		if (pq_flush_if_writable() != 0)
1380 			WalSndShutdown();
1381 
1382 		/*
1383 		 * If we have received CopyDone from the client, sent CopyDone
1384 		 * ourselves, and the output buffer is empty, it's time to exit
1385 		 * streaming, so fail the current WAL fetch request.
1386 		 */
1387 		if (streamingDoneReceiving && streamingDoneSending &&
1388 			!pq_is_send_pending())
1389 			break;
1390 
1391 		/* die if timeout was reached */
1392 		WalSndCheckTimeOut();
1393 
1394 		/* Send keepalive if the time has come */
1395 		WalSndKeepaliveIfNecessary();
1396 
1397 		/*
1398 		 * Sleep until something happens or we time out.  Also wait for the
1399 		 * socket becoming writable, if there's still pending output.
1400 		 * Otherwise we might sit on sendable output data while waiting for
1401 		 * new WAL to be generated.  (But if we have nothing to send, we don't
1402 		 * want to wake on socket-writable.)
1403 		 */
1404 		sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1405 
1406 		wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1407 			WL_SOCKET_READABLE | WL_TIMEOUT;
1408 
1409 		if (pq_is_send_pending())
1410 			wakeEvents |= WL_SOCKET_WRITEABLE;
1411 
1412 		WaitLatchOrSocket(MyLatch, wakeEvents,
1413 						  MyProcPort->sock, sleeptime,
1414 						  WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1415 	}
1416 
1417 	/* reactivate latch so WalSndLoop knows to continue */
1418 	SetLatch(MyLatch);
1419 	return RecentFlushPtr;
1420 }
1421 
1422 /*
1423  * Execute an incoming replication command.
1424  *
1425  * Returns true if the cmd_string was recognized as WalSender command, false
1426  * if not.
1427  */
1428 bool
exec_replication_command(const char * cmd_string)1429 exec_replication_command(const char *cmd_string)
1430 {
1431 	int			parse_rc;
1432 	Node	   *cmd_node;
1433 	MemoryContext cmd_context;
1434 	MemoryContext old_context;
1435 
1436 	/*
1437 	 * If WAL sender has been told that shutdown is getting close, switch its
1438 	 * status accordingly to handle the next replication commands correctly.
1439 	 */
1440 	if (got_STOPPING)
1441 		WalSndSetState(WALSNDSTATE_STOPPING);
1442 
1443 	/*
1444 	 * Throw error if in stopping mode.  We need prevent commands that could
1445 	 * generate WAL while the shutdown checkpoint is being written.  To be
1446 	 * safe, we just prohibit all new commands.
1447 	 */
1448 	if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1449 		ereport(ERROR,
1450 				(errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1451 
1452 	/*
1453 	 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1454 	 * command arrives. Clean up the old stuff if there's anything.
1455 	 */
1456 	SnapBuildClearExportedSnapshot();
1457 
1458 	CHECK_FOR_INTERRUPTS();
1459 
1460 	/*
1461 	 * Parse the command.
1462 	 */
1463 	cmd_context = AllocSetContextCreate(CurrentMemoryContext,
1464 										"Replication command context",
1465 										ALLOCSET_DEFAULT_SIZES);
1466 	old_context = MemoryContextSwitchTo(cmd_context);
1467 
1468 	replication_scanner_init(cmd_string);
1469 	parse_rc = replication_yyparse();
1470 	if (parse_rc != 0)
1471 		ereport(ERROR,
1472 				(errcode(ERRCODE_SYNTAX_ERROR),
1473 				 errmsg_internal("replication command parser returned %d",
1474 								 parse_rc)));
1475 	replication_scanner_finish();
1476 
1477 	cmd_node = replication_parse_result;
1478 
1479 	/*
1480 	 * If it's a SQL command, just clean up our mess and return false; the
1481 	 * caller will take care of executing it.
1482 	 */
1483 	if (IsA(cmd_node, SQLCmd))
1484 	{
1485 		if (MyDatabaseId == InvalidOid)
1486 			ereport(ERROR,
1487 					(errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1488 
1489 		MemoryContextSwitchTo(old_context);
1490 		MemoryContextDelete(cmd_context);
1491 
1492 		/* Tell the caller that this wasn't a WalSender command. */
1493 		return false;
1494 	}
1495 
1496 	/*
1497 	 * Report query to various monitoring facilities.  For this purpose, we
1498 	 * report replication commands just like SQL commands.
1499 	 */
1500 	debug_query_string = cmd_string;
1501 
1502 	pgstat_report_activity(STATE_RUNNING, cmd_string);
1503 
1504 	/*
1505 	 * Log replication command if log_replication_commands is enabled. Even
1506 	 * when it's disabled, log the command with DEBUG1 level for backward
1507 	 * compatibility.
1508 	 */
1509 	ereport(log_replication_commands ? LOG : DEBUG1,
1510 			(errmsg("received replication command: %s", cmd_string)));
1511 
1512 	/*
1513 	 * Disallow replication commands in aborted transaction blocks.
1514 	 */
1515 	if (IsAbortedTransactionBlockState())
1516 		ereport(ERROR,
1517 				(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1518 				 errmsg("current transaction is aborted, "
1519 						"commands ignored until end of transaction block")));
1520 
1521 	CHECK_FOR_INTERRUPTS();
1522 
1523 	/*
1524 	 * Allocate buffers that will be used for each outgoing and incoming
1525 	 * message.  We do this just once per command to reduce palloc overhead.
1526 	 */
1527 	initStringInfo(&output_message);
1528 	initStringInfo(&reply_message);
1529 	initStringInfo(&tmpbuf);
1530 
1531 	switch (cmd_node->type)
1532 	{
1533 		case T_IdentifySystemCmd:
1534 			IdentifySystem();
1535 			break;
1536 
1537 		case T_BaseBackupCmd:
1538 			PreventInTransactionBlock(true, "BASE_BACKUP");
1539 			SendBaseBackup((BaseBackupCmd *) cmd_node);
1540 			break;
1541 
1542 		case T_CreateReplicationSlotCmd:
1543 			CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
1544 			break;
1545 
1546 		case T_DropReplicationSlotCmd:
1547 			DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
1548 			break;
1549 
1550 		case T_StartReplicationCmd:
1551 			{
1552 				StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1553 
1554 				PreventInTransactionBlock(true, "START_REPLICATION");
1555 
1556 				if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1557 					StartReplication(cmd);
1558 				else
1559 					StartLogicalReplication(cmd);
1560 				break;
1561 			}
1562 
1563 		case T_TimeLineHistoryCmd:
1564 			PreventInTransactionBlock(true, "TIMELINE_HISTORY");
1565 			SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
1566 			break;
1567 
1568 		case T_VariableShowStmt:
1569 			{
1570 				DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
1571 				VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1572 
1573 				/* syscache access needs a transaction environment */
1574 				StartTransactionCommand();
1575 				GetPGVariable(n->name, dest);
1576 				CommitTransactionCommand();
1577 			}
1578 			break;
1579 
1580 		default:
1581 			elog(ERROR, "unrecognized replication command node tag: %u",
1582 				 cmd_node->type);
1583 	}
1584 
1585 	/* done */
1586 	MemoryContextSwitchTo(old_context);
1587 	MemoryContextDelete(cmd_context);
1588 
1589 	/* Send CommandComplete message */
1590 	EndCommand("SELECT", DestRemote);
1591 
1592 	/* Report to pgstat that this process is now idle */
1593 	pgstat_report_activity(STATE_IDLE, NULL);
1594 	debug_query_string = NULL;
1595 
1596 	return true;
1597 }
1598 
1599 /*
1600  * Process any incoming messages while streaming. Also checks if the remote
1601  * end has closed the connection.
1602  */
1603 static void
ProcessRepliesIfAny(void)1604 ProcessRepliesIfAny(void)
1605 {
1606 	unsigned char firstchar;
1607 	int			r;
1608 	bool		received = false;
1609 
1610 	last_processing = GetCurrentTimestamp();
1611 
1612 	/*
1613 	 * If we already received a CopyDone from the frontend, any subsequent
1614 	 * message is the beginning of a new command, and should be processed in
1615 	 * the main processing loop.
1616 	 */
1617 	while (!streamingDoneReceiving)
1618 	{
1619 		pq_startmsgread();
1620 		r = pq_getbyte_if_available(&firstchar);
1621 		if (r < 0)
1622 		{
1623 			/* unexpected error or EOF */
1624 			ereport(COMMERROR,
1625 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
1626 					 errmsg("unexpected EOF on standby connection")));
1627 			proc_exit(0);
1628 		}
1629 		if (r == 0)
1630 		{
1631 			/* no data available without blocking */
1632 			pq_endmsgread();
1633 			break;
1634 		}
1635 
1636 		/* Read the message contents */
1637 		resetStringInfo(&reply_message);
1638 		if (pq_getmessage(&reply_message, 0))
1639 		{
1640 			ereport(COMMERROR,
1641 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
1642 					 errmsg("unexpected EOF on standby connection")));
1643 			proc_exit(0);
1644 		}
1645 
1646 		/* Handle the very limited subset of commands expected in this phase */
1647 		switch (firstchar)
1648 		{
1649 				/*
1650 				 * 'd' means a standby reply wrapped in a CopyData packet.
1651 				 */
1652 			case 'd':
1653 				ProcessStandbyMessage();
1654 				received = true;
1655 				break;
1656 
1657 				/*
1658 				 * CopyDone means the standby requested to finish streaming.
1659 				 * Reply with CopyDone, if we had not sent that already.
1660 				 */
1661 			case 'c':
1662 				if (!streamingDoneSending)
1663 				{
1664 					pq_putmessage_noblock('c', NULL, 0);
1665 					streamingDoneSending = true;
1666 				}
1667 
1668 				streamingDoneReceiving = true;
1669 				received = true;
1670 				break;
1671 
1672 				/*
1673 				 * 'X' means that the standby is closing down the socket.
1674 				 */
1675 			case 'X':
1676 				proc_exit(0);
1677 
1678 			default:
1679 				ereport(FATAL,
1680 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
1681 						 errmsg("invalid standby message type \"%c\"",
1682 								firstchar)));
1683 		}
1684 	}
1685 
1686 	/*
1687 	 * Save the last reply timestamp if we've received at least one reply.
1688 	 */
1689 	if (received)
1690 	{
1691 		last_reply_timestamp = last_processing;
1692 		waiting_for_ping_response = false;
1693 	}
1694 }
1695 
1696 /*
1697  * Process a status update message received from standby.
1698  */
1699 static void
ProcessStandbyMessage(void)1700 ProcessStandbyMessage(void)
1701 {
1702 	char		msgtype;
1703 
1704 	/*
1705 	 * Check message type from the first byte.
1706 	 */
1707 	msgtype = pq_getmsgbyte(&reply_message);
1708 
1709 	switch (msgtype)
1710 	{
1711 		case 'r':
1712 			ProcessStandbyReplyMessage();
1713 			break;
1714 
1715 		case 'h':
1716 			ProcessStandbyHSFeedbackMessage();
1717 			break;
1718 
1719 		default:
1720 			ereport(COMMERROR,
1721 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
1722 					 errmsg("unexpected message type \"%c\"", msgtype)));
1723 			proc_exit(0);
1724 	}
1725 }
1726 
1727 /*
1728  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1729  */
1730 static void
PhysicalConfirmReceivedLocation(XLogRecPtr lsn)1731 PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
1732 {
1733 	bool		changed = false;
1734 	ReplicationSlot *slot = MyReplicationSlot;
1735 
1736 	Assert(lsn != InvalidXLogRecPtr);
1737 	SpinLockAcquire(&slot->mutex);
1738 	if (slot->data.restart_lsn != lsn)
1739 	{
1740 		changed = true;
1741 		slot->data.restart_lsn = lsn;
1742 	}
1743 	SpinLockRelease(&slot->mutex);
1744 
1745 	if (changed)
1746 	{
1747 		ReplicationSlotMarkDirty();
1748 		ReplicationSlotsComputeRequiredLSN();
1749 	}
1750 
1751 	/*
1752 	 * One could argue that the slot should be saved to disk now, but that'd
1753 	 * be energy wasted - the worst lost information can do here is give us
1754 	 * wrong information in a statistics view - we'll just potentially be more
1755 	 * conservative in removing files.
1756 	 */
1757 }
1758 
1759 /*
1760  * Regular reply from standby advising of WAL locations on standby server.
1761  */
1762 static void
ProcessStandbyReplyMessage(void)1763 ProcessStandbyReplyMessage(void)
1764 {
1765 	XLogRecPtr	writePtr,
1766 				flushPtr,
1767 				applyPtr;
1768 	bool		replyRequested;
1769 	TimeOffset	writeLag,
1770 				flushLag,
1771 				applyLag;
1772 	bool		clearLagTimes;
1773 	TimestampTz now;
1774 
1775 	static bool fullyAppliedLastTime = false;
1776 
1777 	/* the caller already consumed the msgtype byte */
1778 	writePtr = pq_getmsgint64(&reply_message);
1779 	flushPtr = pq_getmsgint64(&reply_message);
1780 	applyPtr = pq_getmsgint64(&reply_message);
1781 	(void) pq_getmsgint64(&reply_message);	/* sendTime; not used ATM */
1782 	replyRequested = pq_getmsgbyte(&reply_message);
1783 
1784 	elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1785 		 (uint32) (writePtr >> 32), (uint32) writePtr,
1786 		 (uint32) (flushPtr >> 32), (uint32) flushPtr,
1787 		 (uint32) (applyPtr >> 32), (uint32) applyPtr,
1788 		 replyRequested ? " (reply requested)" : "");
1789 
1790 	/* See if we can compute the round-trip lag for these positions. */
1791 	now = GetCurrentTimestamp();
1792 	writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1793 	flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1794 	applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1795 
1796 	/*
1797 	 * If the standby reports that it has fully replayed the WAL in two
1798 	 * consecutive reply messages, then the second such message must result
1799 	 * from wal_receiver_status_interval expiring on the standby.  This is a
1800 	 * convenient time to forget the lag times measured when it last
1801 	 * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1802 	 * until more WAL traffic arrives.
1803 	 */
1804 	clearLagTimes = false;
1805 	if (applyPtr == sentPtr)
1806 	{
1807 		if (fullyAppliedLastTime)
1808 			clearLagTimes = true;
1809 		fullyAppliedLastTime = true;
1810 	}
1811 	else
1812 		fullyAppliedLastTime = false;
1813 
1814 	/* Send a reply if the standby requested one. */
1815 	if (replyRequested)
1816 		WalSndKeepalive(false);
1817 
1818 	/*
1819 	 * Update shared state for this WalSender process based on reply data from
1820 	 * standby.
1821 	 */
1822 	{
1823 		WalSnd	   *walsnd = MyWalSnd;
1824 
1825 		SpinLockAcquire(&walsnd->mutex);
1826 		walsnd->write = writePtr;
1827 		walsnd->flush = flushPtr;
1828 		walsnd->apply = applyPtr;
1829 		if (writeLag != -1 || clearLagTimes)
1830 			walsnd->writeLag = writeLag;
1831 		if (flushLag != -1 || clearLagTimes)
1832 			walsnd->flushLag = flushLag;
1833 		if (applyLag != -1 || clearLagTimes)
1834 			walsnd->applyLag = applyLag;
1835 		SpinLockRelease(&walsnd->mutex);
1836 	}
1837 
1838 	if (!am_cascading_walsender)
1839 		SyncRepReleaseWaiters();
1840 
1841 	/*
1842 	 * Advance our local xmin horizon when the client confirmed a flush.
1843 	 */
1844 	if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1845 	{
1846 		if (SlotIsLogical(MyReplicationSlot))
1847 			LogicalConfirmReceivedLocation(flushPtr);
1848 		else
1849 			PhysicalConfirmReceivedLocation(flushPtr);
1850 	}
1851 }
1852 
1853 /* compute new replication slot xmin horizon if needed */
1854 static void
PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin,TransactionId feedbackCatalogXmin)1855 PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
1856 {
1857 	bool		changed = false;
1858 	ReplicationSlot *slot = MyReplicationSlot;
1859 
1860 	SpinLockAcquire(&slot->mutex);
1861 	MyPgXact->xmin = InvalidTransactionId;
1862 
1863 	/*
1864 	 * For physical replication we don't need the interlock provided by xmin
1865 	 * and effective_xmin since the consequences of a missed increase are
1866 	 * limited to query cancellations, so set both at once.
1867 	 */
1868 	if (!TransactionIdIsNormal(slot->data.xmin) ||
1869 		!TransactionIdIsNormal(feedbackXmin) ||
1870 		TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1871 	{
1872 		changed = true;
1873 		slot->data.xmin = feedbackXmin;
1874 		slot->effective_xmin = feedbackXmin;
1875 	}
1876 	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1877 		!TransactionIdIsNormal(feedbackCatalogXmin) ||
1878 		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1879 	{
1880 		changed = true;
1881 		slot->data.catalog_xmin = feedbackCatalogXmin;
1882 		slot->effective_catalog_xmin = feedbackCatalogXmin;
1883 	}
1884 	SpinLockRelease(&slot->mutex);
1885 
1886 	if (changed)
1887 	{
1888 		ReplicationSlotMarkDirty();
1889 		ReplicationSlotsComputeRequiredXmin(false);
1890 	}
1891 }
1892 
1893 /*
1894  * Check that the provided xmin/epoch are sane, that is, not in the future
1895  * and not so far back as to be already wrapped around.
1896  *
1897  * Epoch of nextXid should be same as standby, or if the counter has
1898  * wrapped, then one greater than standby.
1899  *
1900  * This check doesn't care about whether clog exists for these xids
1901  * at all.
1902  */
1903 static bool
TransactionIdInRecentPast(TransactionId xid,uint32 epoch)1904 TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
1905 {
1906 	TransactionId nextXid;
1907 	uint32		nextEpoch;
1908 
1909 	GetNextXidAndEpoch(&nextXid, &nextEpoch);
1910 
1911 	if (xid <= nextXid)
1912 	{
1913 		if (epoch != nextEpoch)
1914 			return false;
1915 	}
1916 	else
1917 	{
1918 		if (epoch + 1 != nextEpoch)
1919 			return false;
1920 	}
1921 
1922 	if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1923 		return false;			/* epoch OK, but it's wrapped around */
1924 
1925 	return true;
1926 }
1927 
1928 /*
1929  * Hot Standby feedback
1930  */
1931 static void
ProcessStandbyHSFeedbackMessage(void)1932 ProcessStandbyHSFeedbackMessage(void)
1933 {
1934 	TransactionId feedbackXmin;
1935 	uint32		feedbackEpoch;
1936 	TransactionId feedbackCatalogXmin;
1937 	uint32		feedbackCatalogEpoch;
1938 
1939 	/*
1940 	 * Decipher the reply message. The caller already consumed the msgtype
1941 	 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1942 	 * of this message.
1943 	 */
1944 	(void) pq_getmsgint64(&reply_message);	/* sendTime; not used ATM */
1945 	feedbackXmin = pq_getmsgint(&reply_message, 4);
1946 	feedbackEpoch = pq_getmsgint(&reply_message, 4);
1947 	feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1948 	feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1949 
1950 	elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
1951 		 feedbackXmin,
1952 		 feedbackEpoch,
1953 		 feedbackCatalogXmin,
1954 		 feedbackCatalogEpoch);
1955 
1956 	/*
1957 	 * Unset WalSender's xmins if the feedback message values are invalid.
1958 	 * This happens when the downstream turned hot_standby_feedback off.
1959 	 */
1960 	if (!TransactionIdIsNormal(feedbackXmin)
1961 		&& !TransactionIdIsNormal(feedbackCatalogXmin))
1962 	{
1963 		MyPgXact->xmin = InvalidTransactionId;
1964 		if (MyReplicationSlot != NULL)
1965 			PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1966 		return;
1967 	}
1968 
1969 	/*
1970 	 * Check that the provided xmin/epoch are sane, that is, not in the future
1971 	 * and not so far back as to be already wrapped around.  Ignore if not.
1972 	 */
1973 	if (TransactionIdIsNormal(feedbackXmin) &&
1974 		!TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
1975 		return;
1976 
1977 	if (TransactionIdIsNormal(feedbackCatalogXmin) &&
1978 		!TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
1979 		return;
1980 
1981 	/*
1982 	 * Set the WalSender's xmin equal to the standby's requested xmin, so that
1983 	 * the xmin will be taken into account by GetOldestXmin.  This will hold
1984 	 * back the removal of dead rows and thereby prevent the generation of
1985 	 * cleanup conflicts on the standby server.
1986 	 *
1987 	 * There is a small window for a race condition here: although we just
1988 	 * checked that feedbackXmin precedes nextXid, the nextXid could have
1989 	 * gotten advanced between our fetching it and applying the xmin below,
1990 	 * perhaps far enough to make feedbackXmin wrap around.  In that case the
1991 	 * xmin we set here would be "in the future" and have no effect.  No point
1992 	 * in worrying about this since it's too late to save the desired data
1993 	 * anyway.  Assuming that the standby sends us an increasing sequence of
1994 	 * xmins, this could only happen during the first reply cycle, else our
1995 	 * own xmin would prevent nextXid from advancing so far.
1996 	 *
1997 	 * We don't bother taking the ProcArrayLock here.  Setting the xmin field
1998 	 * is assumed atomic, and there's no real need to prevent a concurrent
1999 	 * GetOldestXmin.  (If we're moving our xmin forward, this is obviously
2000 	 * safe, and if we're moving it backwards, well, the data is at risk
2001 	 * already since a VACUUM could have just finished calling GetOldestXmin.)
2002 	 *
2003 	 * If we're using a replication slot we reserve the xmin via that,
2004 	 * otherwise via the walsender's PGXACT entry. We can only track the
2005 	 * catalog xmin separately when using a slot, so we store the least of the
2006 	 * two provided when not using a slot.
2007 	 *
2008 	 * XXX: It might make sense to generalize the ephemeral slot concept and
2009 	 * always use the slot mechanism to handle the feedback xmin.
2010 	 */
2011 	if (MyReplicationSlot != NULL)	/* XXX: persistency configurable? */
2012 		PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2013 	else
2014 	{
2015 		if (TransactionIdIsNormal(feedbackCatalogXmin)
2016 			&& TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2017 			MyPgXact->xmin = feedbackCatalogXmin;
2018 		else
2019 			MyPgXact->xmin = feedbackXmin;
2020 	}
2021 }
2022 
2023 /*
2024  * Compute how long send/receive loops should sleep.
2025  *
2026  * If wal_sender_timeout is enabled we want to wake up in time to send
2027  * keepalives and to abort the connection if wal_sender_timeout has been
2028  * reached.
2029  */
2030 static long
WalSndComputeSleeptime(TimestampTz now)2031 WalSndComputeSleeptime(TimestampTz now)
2032 {
2033 	long		sleeptime = 10000;	/* 10 s */
2034 
2035 	if (wal_sender_timeout > 0 && last_reply_timestamp > 0)
2036 	{
2037 		TimestampTz wakeup_time;
2038 
2039 		/*
2040 		 * At the latest stop sleeping once wal_sender_timeout has been
2041 		 * reached.
2042 		 */
2043 		wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2044 												  wal_sender_timeout);
2045 
2046 		/*
2047 		 * If no ping has been sent yet, wakeup when it's time to do so.
2048 		 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2049 		 * the timeout passed without a response.
2050 		 */
2051 		if (!waiting_for_ping_response)
2052 			wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2053 													  wal_sender_timeout / 2);
2054 
2055 		/* Compute relative time until wakeup. */
2056 		sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2057 	}
2058 
2059 	return sleeptime;
2060 }
2061 
2062 /*
2063  * Check whether there have been responses by the client within
2064  * wal_sender_timeout and shutdown if not.  Using last_processing as the
2065  * reference point avoids counting server-side stalls against the client.
2066  * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2067  * postdate last_processing by more than wal_sender_timeout.  If that happens,
2068  * the client must reply almost immediately to avoid a timeout.  This rarely
2069  * affects the default configuration, under which clients spontaneously send a
2070  * message every standby_message_timeout = wal_sender_timeout/6 = 10s.  We
2071  * could eliminate that problem by recognizing timeout expiration at
2072  * wal_sender_timeout/2 after the keepalive.
2073  */
2074 static void
WalSndCheckTimeOut(void)2075 WalSndCheckTimeOut(void)
2076 {
2077 	TimestampTz timeout;
2078 
2079 	/* don't bail out if we're doing something that doesn't require timeouts */
2080 	if (last_reply_timestamp <= 0)
2081 		return;
2082 
2083 	timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
2084 										  wal_sender_timeout);
2085 
2086 	if (wal_sender_timeout > 0 && last_processing >= timeout)
2087 	{
2088 		/*
2089 		 * Since typically expiration of replication timeout means
2090 		 * communication problem, we don't send the error message to the
2091 		 * standby.
2092 		 */
2093 		ereport(COMMERROR,
2094 				(errmsg("terminating walsender process due to replication timeout")));
2095 
2096 		WalSndShutdown();
2097 	}
2098 }
2099 
2100 /* Main loop of walsender process that streams the WAL over Copy messages. */
2101 static void
WalSndLoop(WalSndSendDataCallback send_data)2102 WalSndLoop(WalSndSendDataCallback send_data)
2103 {
2104 	/*
2105 	 * Initialize the last reply timestamp. That enables timeout processing
2106 	 * from hereon.
2107 	 */
2108 	last_reply_timestamp = GetCurrentTimestamp();
2109 	waiting_for_ping_response = false;
2110 
2111 	/*
2112 	 * Loop until we reach the end of this timeline or the client requests to
2113 	 * stop streaming.
2114 	 */
2115 	for (;;)
2116 	{
2117 		/*
2118 		 * Emergency bailout if postmaster has died.  This is to avoid the
2119 		 * necessity for manual cleanup of all postmaster children.
2120 		 */
2121 		if (!PostmasterIsAlive())
2122 			exit(1);
2123 
2124 		/* Clear any already-pending wakeups */
2125 		ResetLatch(MyLatch);
2126 
2127 		CHECK_FOR_INTERRUPTS();
2128 
2129 		/* Process any requests or signals received recently */
2130 		if (ConfigReloadPending)
2131 		{
2132 			ConfigReloadPending = false;
2133 			ProcessConfigFile(PGC_SIGHUP);
2134 			SyncRepInitConfig();
2135 		}
2136 
2137 		/* Check for input from the client */
2138 		ProcessRepliesIfAny();
2139 
2140 		/*
2141 		 * If we have received CopyDone from the client, sent CopyDone
2142 		 * ourselves, and the output buffer is empty, it's time to exit
2143 		 * streaming.
2144 		 */
2145 		if (streamingDoneReceiving && streamingDoneSending &&
2146 			!pq_is_send_pending())
2147 			break;
2148 
2149 		/*
2150 		 * If we don't have any pending data in the output buffer, try to send
2151 		 * some more.  If there is some, we don't bother to call send_data
2152 		 * again until we've flushed it ... but we'd better assume we are not
2153 		 * caught up.
2154 		 */
2155 		if (!pq_is_send_pending())
2156 			send_data();
2157 		else
2158 			WalSndCaughtUp = false;
2159 
2160 		/* Try to flush pending output to the client */
2161 		if (pq_flush_if_writable() != 0)
2162 			WalSndShutdown();
2163 
2164 		/* If nothing remains to be sent right now ... */
2165 		if (WalSndCaughtUp && !pq_is_send_pending())
2166 		{
2167 			/*
2168 			 * If we're in catchup state, move to streaming.  This is an
2169 			 * important state change for users to know about, since before
2170 			 * this point data loss might occur if the primary dies and we
2171 			 * need to failover to the standby. The state change is also
2172 			 * important for synchronous replication, since commits that
2173 			 * started to wait at that point might wait for some time.
2174 			 */
2175 			if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2176 			{
2177 				ereport(DEBUG1,
2178 						(errmsg("\"%s\" has now caught up with upstream server",
2179 								application_name)));
2180 				WalSndSetState(WALSNDSTATE_STREAMING);
2181 			}
2182 
2183 			/*
2184 			 * When SIGUSR2 arrives, we send any outstanding logs up to the
2185 			 * shutdown checkpoint record (i.e., the latest record), wait for
2186 			 * them to be replicated to the standby, and exit. This may be a
2187 			 * normal termination at shutdown, or a promotion, the walsender
2188 			 * is not sure which.
2189 			 */
2190 			if (got_SIGUSR2)
2191 				WalSndDone(send_data);
2192 		}
2193 
2194 		/* Check for replication timeout. */
2195 		WalSndCheckTimeOut();
2196 
2197 		/* Send keepalive if the time has come */
2198 		WalSndKeepaliveIfNecessary();
2199 
2200 		/*
2201 		 * We don't block if not caught up, unless there is unsent data
2202 		 * pending in which case we'd better block until the socket is
2203 		 * write-ready.  This test is only needed for the case where the
2204 		 * send_data callback handled a subset of the available data but then
2205 		 * pq_flush_if_writable flushed it all --- we should immediately try
2206 		 * to send more.
2207 		 */
2208 		if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
2209 		{
2210 			long		sleeptime;
2211 			int			wakeEvents;
2212 
2213 			wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT;
2214 
2215 			if (!streamingDoneReceiving)
2216 				wakeEvents |= WL_SOCKET_READABLE;
2217 
2218 			/*
2219 			 * Use fresh timestamp, not last_processed, to reduce the chance
2220 			 * of reaching wal_sender_timeout before sending a keepalive.
2221 			 */
2222 			sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
2223 
2224 			if (pq_is_send_pending())
2225 				wakeEvents |= WL_SOCKET_WRITEABLE;
2226 
2227 			/* Sleep until something happens or we time out */
2228 			WaitLatchOrSocket(MyLatch, wakeEvents,
2229 							  MyProcPort->sock, sleeptime,
2230 							  WAIT_EVENT_WAL_SENDER_MAIN);
2231 		}
2232 	}
2233 	return;
2234 }
2235 
2236 /* Initialize a per-walsender data structure for this walsender process */
2237 static void
InitWalSenderSlot(void)2238 InitWalSenderSlot(void)
2239 {
2240 	int			i;
2241 
2242 	/*
2243 	 * WalSndCtl should be set up already (we inherit this by fork() or
2244 	 * EXEC_BACKEND mechanism from the postmaster).
2245 	 */
2246 	Assert(WalSndCtl != NULL);
2247 	Assert(MyWalSnd == NULL);
2248 
2249 	/*
2250 	 * Find a free walsender slot and reserve it. If this fails, we must be
2251 	 * out of WalSnd structures.
2252 	 */
2253 	for (i = 0; i < max_wal_senders; i++)
2254 	{
2255 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
2256 
2257 		SpinLockAcquire(&walsnd->mutex);
2258 
2259 		if (walsnd->pid != 0)
2260 		{
2261 			SpinLockRelease(&walsnd->mutex);
2262 			continue;
2263 		}
2264 		else
2265 		{
2266 			/*
2267 			 * Found a free slot. Reserve it for us.
2268 			 */
2269 			walsnd->pid = MyProcPid;
2270 			walsnd->state = WALSNDSTATE_STARTUP;
2271 			walsnd->sentPtr = InvalidXLogRecPtr;
2272 			walsnd->needreload = false;
2273 			walsnd->write = InvalidXLogRecPtr;
2274 			walsnd->flush = InvalidXLogRecPtr;
2275 			walsnd->apply = InvalidXLogRecPtr;
2276 			walsnd->writeLag = -1;
2277 			walsnd->flushLag = -1;
2278 			walsnd->applyLag = -1;
2279 			walsnd->sync_standby_priority = 0;
2280 			walsnd->latch = &MyProc->procLatch;
2281 			SpinLockRelease(&walsnd->mutex);
2282 			/* don't need the lock anymore */
2283 			MyWalSnd = (WalSnd *) walsnd;
2284 
2285 			break;
2286 		}
2287 	}
2288 	if (MyWalSnd == NULL)
2289 		ereport(FATAL,
2290 				(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2291 				 errmsg("number of requested standby connections "
2292 						"exceeds max_wal_senders (currently %d)",
2293 						max_wal_senders)));
2294 
2295 	/* Arrange to clean up at walsender exit */
2296 	on_shmem_exit(WalSndKill, 0);
2297 }
2298 
2299 /* Destroy the per-walsender data structure for this walsender process */
2300 static void
WalSndKill(int code,Datum arg)2301 WalSndKill(int code, Datum arg)
2302 {
2303 	WalSnd	   *walsnd = MyWalSnd;
2304 
2305 	Assert(walsnd != NULL);
2306 
2307 	MyWalSnd = NULL;
2308 
2309 	SpinLockAcquire(&walsnd->mutex);
2310 	/* clear latch while holding the spinlock, so it can safely be read */
2311 	walsnd->latch = NULL;
2312 	/* Mark WalSnd struct as no longer being in use. */
2313 	walsnd->pid = 0;
2314 	SpinLockRelease(&walsnd->mutex);
2315 }
2316 
2317 /*
2318  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
2319  *
2320  * XXX probably this should be improved to suck data directly from the
2321  * WAL buffers when possible.
2322  *
2323  * Will open, and keep open, one WAL segment stored in the global file
2324  * descriptor sendFile. This means if XLogRead is used once, there will
2325  * always be one descriptor left open until the process ends, but never
2326  * more than one.
2327  */
2328 static void
XLogRead(char * buf,XLogRecPtr startptr,Size count)2329 XLogRead(char *buf, XLogRecPtr startptr, Size count)
2330 {
2331 	char	   *p;
2332 	XLogRecPtr	recptr;
2333 	Size		nbytes;
2334 	XLogSegNo	segno;
2335 
2336 retry:
2337 	p = buf;
2338 	recptr = startptr;
2339 	nbytes = count;
2340 
2341 	while (nbytes > 0)
2342 	{
2343 		uint32		startoff;
2344 		int			segbytes;
2345 		int			readbytes;
2346 
2347 		startoff = XLogSegmentOffset(recptr, wal_segment_size);
2348 
2349 		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size))
2350 		{
2351 			char		path[MAXPGPATH];
2352 
2353 			/* Switch to another logfile segment */
2354 			if (sendFile >= 0)
2355 				close(sendFile);
2356 
2357 			XLByteToSeg(recptr, sendSegNo, wal_segment_size);
2358 
2359 			/*-------
2360 			 * When reading from a historic timeline, and there is a timeline
2361 			 * switch within this segment, read from the WAL segment belonging
2362 			 * to the new timeline.
2363 			 *
2364 			 * For example, imagine that this server is currently on timeline
2365 			 * 5, and we're streaming timeline 4. The switch from timeline 4
2366 			 * to 5 happened at 0/13002088. In pg_wal, we have these files:
2367 			 *
2368 			 * ...
2369 			 * 000000040000000000000012
2370 			 * 000000040000000000000013
2371 			 * 000000050000000000000013
2372 			 * 000000050000000000000014
2373 			 * ...
2374 			 *
2375 			 * In this situation, when requested to send the WAL from
2376 			 * segment 0x13, on timeline 4, we read the WAL from file
2377 			 * 000000050000000000000013. Archive recovery prefers files from
2378 			 * newer timelines, so if the segment was restored from the
2379 			 * archive on this server, the file belonging to the old timeline,
2380 			 * 000000040000000000000013, might not exist. Their contents are
2381 			 * equal up to the switchpoint, because at a timeline switch, the
2382 			 * used portion of the old segment is copied to the new file.
2383 			 *-------
2384 			 */
2385 			curFileTimeLine = sendTimeLine;
2386 			if (sendTimeLineIsHistoric)
2387 			{
2388 				XLogSegNo	endSegNo;
2389 
2390 				XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size);
2391 				if (sendSegNo == endSegNo)
2392 					curFileTimeLine = sendTimeLineNextTLI;
2393 			}
2394 
2395 			XLogFilePath(path, curFileTimeLine, sendSegNo, wal_segment_size);
2396 
2397 			sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2398 			if (sendFile < 0)
2399 			{
2400 				/*
2401 				 * If the file is not found, assume it's because the standby
2402 				 * asked for a too old WAL segment that has already been
2403 				 * removed or recycled.
2404 				 */
2405 				if (errno == ENOENT)
2406 					ereport(ERROR,
2407 							(errcode_for_file_access(),
2408 							 errmsg("requested WAL segment %s has already been removed",
2409 									XLogFileNameP(curFileTimeLine, sendSegNo))));
2410 				else
2411 					ereport(ERROR,
2412 							(errcode_for_file_access(),
2413 							 errmsg("could not open file \"%s\": %m",
2414 									path)));
2415 			}
2416 			sendOff = 0;
2417 		}
2418 
2419 		/* Need to seek in the file? */
2420 		if (sendOff != startoff)
2421 		{
2422 			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2423 				ereport(ERROR,
2424 						(errcode_for_file_access(),
2425 						 errmsg("could not seek in log segment %s to offset %u: %m",
2426 								XLogFileNameP(curFileTimeLine, sendSegNo),
2427 								startoff)));
2428 			sendOff = startoff;
2429 		}
2430 
2431 		/* How many bytes are within this segment? */
2432 		if (nbytes > (wal_segment_size - startoff))
2433 			segbytes = wal_segment_size - startoff;
2434 		else
2435 			segbytes = nbytes;
2436 
2437 		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
2438 		readbytes = read(sendFile, p, segbytes);
2439 		pgstat_report_wait_end();
2440 		if (readbytes <= 0)
2441 		{
2442 			ereport(ERROR,
2443 					(errcode_for_file_access(),
2444 					 errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2445 							XLogFileNameP(curFileTimeLine, sendSegNo),
2446 							sendOff, (unsigned long) segbytes)));
2447 		}
2448 
2449 		/* Update state for read */
2450 		recptr += readbytes;
2451 
2452 		sendOff += readbytes;
2453 		nbytes -= readbytes;
2454 		p += readbytes;
2455 	}
2456 
2457 	/*
2458 	 * After reading into the buffer, check that what we read was valid. We do
2459 	 * this after reading, because even though the segment was present when we
2460 	 * opened it, it might get recycled or removed while we read it. The
2461 	 * read() succeeds in that case, but the data we tried to read might
2462 	 * already have been overwritten with new WAL records.
2463 	 */
2464 	XLByteToSeg(startptr, segno, wal_segment_size);
2465 	CheckXLogRemoved(segno, ThisTimeLineID);
2466 
2467 	/*
2468 	 * During recovery, the currently-open WAL file might be replaced with the
2469 	 * file of the same name retrieved from archive. So we always need to
2470 	 * check what we read was valid after reading into the buffer. If it's
2471 	 * invalid, we try to open and read the file again.
2472 	 */
2473 	if (am_cascading_walsender)
2474 	{
2475 		WalSnd	   *walsnd = MyWalSnd;
2476 		bool		reload;
2477 
2478 		SpinLockAcquire(&walsnd->mutex);
2479 		reload = walsnd->needreload;
2480 		walsnd->needreload = false;
2481 		SpinLockRelease(&walsnd->mutex);
2482 
2483 		if (reload && sendFile >= 0)
2484 		{
2485 			close(sendFile);
2486 			sendFile = -1;
2487 
2488 			goto retry;
2489 		}
2490 	}
2491 }
2492 
2493 /*
2494  * Send out the WAL in its normal physical/stored form.
2495  *
2496  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2497  * but not yet sent to the client, and buffer it in the libpq output
2498  * buffer.
2499  *
2500  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2501  * otherwise WalSndCaughtUp is set to false.
2502  */
2503 static void
XLogSendPhysical(void)2504 XLogSendPhysical(void)
2505 {
2506 	XLogRecPtr	SendRqstPtr;
2507 	XLogRecPtr	startptr;
2508 	XLogRecPtr	endptr;
2509 	Size		nbytes;
2510 
2511 	/* If requested switch the WAL sender to the stopping state. */
2512 	if (got_STOPPING)
2513 		WalSndSetState(WALSNDSTATE_STOPPING);
2514 
2515 	if (streamingDoneSending)
2516 	{
2517 		WalSndCaughtUp = true;
2518 		return;
2519 	}
2520 
2521 	/* Figure out how far we can safely send the WAL. */
2522 	if (sendTimeLineIsHistoric)
2523 	{
2524 		/*
2525 		 * Streaming an old timeline that's in this server's history, but is
2526 		 * not the one we're currently inserting or replaying. It can be
2527 		 * streamed up to the point where we switched off that timeline.
2528 		 */
2529 		SendRqstPtr = sendTimeLineValidUpto;
2530 	}
2531 	else if (am_cascading_walsender)
2532 	{
2533 		/*
2534 		 * Streaming the latest timeline on a standby.
2535 		 *
2536 		 * Attempt to send all WAL that has already been replayed, so that we
2537 		 * know it's valid. If we're receiving WAL through streaming
2538 		 * replication, it's also OK to send any WAL that has been received
2539 		 * but not replayed.
2540 		 *
2541 		 * The timeline we're recovering from can change, or we can be
2542 		 * promoted. In either case, the current timeline becomes historic. We
2543 		 * need to detect that so that we don't try to stream past the point
2544 		 * where we switched to another timeline. We check for promotion or
2545 		 * timeline switch after calculating FlushPtr, to avoid a race
2546 		 * condition: if the timeline becomes historic just after we checked
2547 		 * that it was still current, it's still be OK to stream it up to the
2548 		 * FlushPtr that was calculated before it became historic.
2549 		 */
2550 		bool		becameHistoric = false;
2551 
2552 		SendRqstPtr = GetStandbyFlushRecPtr();
2553 
2554 		if (!RecoveryInProgress())
2555 		{
2556 			/*
2557 			 * We have been promoted. RecoveryInProgress() updated
2558 			 * ThisTimeLineID to the new current timeline.
2559 			 */
2560 			am_cascading_walsender = false;
2561 			becameHistoric = true;
2562 		}
2563 		else
2564 		{
2565 			/*
2566 			 * Still a cascading standby. But is the timeline we're sending
2567 			 * still the one recovery is recovering from? ThisTimeLineID was
2568 			 * updated by the GetStandbyFlushRecPtr() call above.
2569 			 */
2570 			if (sendTimeLine != ThisTimeLineID)
2571 				becameHistoric = true;
2572 		}
2573 
2574 		if (becameHistoric)
2575 		{
2576 			/*
2577 			 * The timeline we were sending has become historic. Read the
2578 			 * timeline history file of the new timeline to see where exactly
2579 			 * we forked off from the timeline we were sending.
2580 			 */
2581 			List	   *history;
2582 
2583 			history = readTimeLineHistory(ThisTimeLineID);
2584 			sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
2585 
2586 			Assert(sendTimeLine < sendTimeLineNextTLI);
2587 			list_free_deep(history);
2588 
2589 			sendTimeLineIsHistoric = true;
2590 
2591 			SendRqstPtr = sendTimeLineValidUpto;
2592 		}
2593 	}
2594 	else
2595 	{
2596 		/*
2597 		 * Streaming the current timeline on a master.
2598 		 *
2599 		 * Attempt to send all data that's already been written out and
2600 		 * fsync'd to disk.  We cannot go further than what's been written out
2601 		 * given the current implementation of XLogRead().  And in any case
2602 		 * it's unsafe to send WAL that is not securely down to disk on the
2603 		 * master: if the master subsequently crashes and restarts, standbys
2604 		 * must not have applied any WAL that got lost on the master.
2605 		 */
2606 		SendRqstPtr = GetFlushRecPtr();
2607 	}
2608 
2609 	/*
2610 	 * Record the current system time as an approximation of the time at which
2611 	 * this WAL location was written for the purposes of lag tracking.
2612 	 *
2613 	 * In theory we could make XLogFlush() record a time in shmem whenever WAL
2614 	 * is flushed and we could get that time as well as the LSN when we call
2615 	 * GetFlushRecPtr() above (and likewise for the cascading standby
2616 	 * equivalent), but rather than putting any new code into the hot WAL path
2617 	 * it seems good enough to capture the time here.  We should reach this
2618 	 * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2619 	 * may take some time, we read the WAL flush pointer and take the time
2620 	 * very close to together here so that we'll get a later position if it is
2621 	 * still moving.
2622 	 *
2623 	 * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
2624 	 * this gives us a cheap approximation for the WAL flush time for this
2625 	 * LSN.
2626 	 *
2627 	 * Note that the LSN is not necessarily the LSN for the data contained in
2628 	 * the present message; it's the end of the WAL, which might be further
2629 	 * ahead.  All the lag tracking machinery cares about is finding out when
2630 	 * that arbitrary LSN is eventually reported as written, flushed and
2631 	 * applied, so that it can measure the elapsed time.
2632 	 */
2633 	LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2634 
2635 	/*
2636 	 * If this is a historic timeline and we've reached the point where we
2637 	 * forked to the next timeline, stop streaming.
2638 	 *
2639 	 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2640 	 * startup process will normally replay all WAL that has been received
2641 	 * from the master, before promoting, but if the WAL streaming is
2642 	 * terminated at a WAL page boundary, the valid portion of the timeline
2643 	 * might end in the middle of a WAL record. We might've already sent the
2644 	 * first half of that partial WAL record to the cascading standby, so that
2645 	 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2646 	 * replay the partial WAL record either, so it can still follow our
2647 	 * timeline switch.
2648 	 */
2649 	if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
2650 	{
2651 		/* close the current file. */
2652 		if (sendFile >= 0)
2653 			close(sendFile);
2654 		sendFile = -1;
2655 
2656 		/* Send CopyDone */
2657 		pq_putmessage_noblock('c', NULL, 0);
2658 		streamingDoneSending = true;
2659 
2660 		WalSndCaughtUp = true;
2661 
2662 		elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2663 			 (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
2664 			 (uint32) (sentPtr >> 32), (uint32) sentPtr);
2665 		return;
2666 	}
2667 
2668 	/* Do we have any work to do? */
2669 	Assert(sentPtr <= SendRqstPtr);
2670 	if (SendRqstPtr <= sentPtr)
2671 	{
2672 		WalSndCaughtUp = true;
2673 		return;
2674 	}
2675 
2676 	/*
2677 	 * Figure out how much to send in one message. If there's no more than
2678 	 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2679 	 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2680 	 *
2681 	 * The rounding is not only for performance reasons. Walreceiver relies on
2682 	 * the fact that we never split a WAL record across two messages. Since a
2683 	 * long WAL record is split at page boundary into continuation records,
2684 	 * page boundary is always a safe cut-off point. We also assume that
2685 	 * SendRqstPtr never points to the middle of a WAL record.
2686 	 */
2687 	startptr = sentPtr;
2688 	endptr = startptr;
2689 	endptr += MAX_SEND_SIZE;
2690 
2691 	/* if we went beyond SendRqstPtr, back off */
2692 	if (SendRqstPtr <= endptr)
2693 	{
2694 		endptr = SendRqstPtr;
2695 		if (sendTimeLineIsHistoric)
2696 			WalSndCaughtUp = false;
2697 		else
2698 			WalSndCaughtUp = true;
2699 	}
2700 	else
2701 	{
2702 		/* round down to page boundary. */
2703 		endptr -= (endptr % XLOG_BLCKSZ);
2704 		WalSndCaughtUp = false;
2705 	}
2706 
2707 	nbytes = endptr - startptr;
2708 	Assert(nbytes <= MAX_SEND_SIZE);
2709 
2710 	/*
2711 	 * OK to read and send the slice.
2712 	 */
2713 	resetStringInfo(&output_message);
2714 	pq_sendbyte(&output_message, 'w');
2715 
2716 	pq_sendint64(&output_message, startptr);	/* dataStart */
2717 	pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2718 	pq_sendint64(&output_message, 0);	/* sendtime, filled in last */
2719 
2720 	/*
2721 	 * Read the log directly into the output buffer to avoid extra memcpy
2722 	 * calls.
2723 	 */
2724 	enlargeStringInfo(&output_message, nbytes);
2725 	XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2726 	output_message.len += nbytes;
2727 	output_message.data[output_message.len] = '\0';
2728 
2729 	/*
2730 	 * Fill the send timestamp last, so that it is taken as late as possible.
2731 	 */
2732 	resetStringInfo(&tmpbuf);
2733 	pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2734 	memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2735 		   tmpbuf.data, sizeof(int64));
2736 
2737 	pq_putmessage_noblock('d', output_message.data, output_message.len);
2738 
2739 	sentPtr = endptr;
2740 
2741 	/* Update shared memory status */
2742 	{
2743 		WalSnd	   *walsnd = MyWalSnd;
2744 
2745 		SpinLockAcquire(&walsnd->mutex);
2746 		walsnd->sentPtr = sentPtr;
2747 		SpinLockRelease(&walsnd->mutex);
2748 	}
2749 
2750 	/* Report progress of XLOG streaming in PS display */
2751 	if (update_process_title)
2752 	{
2753 		char		activitymsg[50];
2754 
2755 		snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2756 				 (uint32) (sentPtr >> 32), (uint32) sentPtr);
2757 		set_ps_display(activitymsg, false);
2758 	}
2759 
2760 	return;
2761 }
2762 
2763 /*
2764  * Stream out logically decoded data.
2765  */
2766 static void
XLogSendLogical(void)2767 XLogSendLogical(void)
2768 {
2769 	XLogRecord *record;
2770 	char	   *errm;
2771 	XLogRecPtr	flushPtr;
2772 
2773 	/*
2774 	 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2775 	 * true in WalSndWaitForWal, if we're actually waiting. We also set to
2776 	 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2777 	 * didn't wait - i.e. when we're shutting down.
2778 	 */
2779 	WalSndCaughtUp = false;
2780 
2781 	record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
2782 	logical_startptr = InvalidXLogRecPtr;
2783 
2784 	/* xlog record was invalid */
2785 	if (errm != NULL)
2786 		elog(ERROR, "%s", errm);
2787 
2788 	/*
2789 	 * We'll use the current flush point to determine whether we've caught up.
2790 	 */
2791 	flushPtr = GetFlushRecPtr();
2792 
2793 	if (record != NULL)
2794 	{
2795 		/*
2796 		 * Note the lack of any call to LagTrackerWrite() which is handled by
2797 		 * WalSndUpdateProgress which is called by output plugin through
2798 		 * logical decoding write api.
2799 		 */
2800 		LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2801 
2802 		sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2803 	}
2804 
2805 	/* Set flag if we're caught up. */
2806 	if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2807 		WalSndCaughtUp = true;
2808 
2809 	/*
2810 	 * If we're caught up and have been requested to stop, have WalSndLoop()
2811 	 * terminate the connection in an orderly manner, after writing out all
2812 	 * the pending data.
2813 	 */
2814 	if (WalSndCaughtUp && got_STOPPING)
2815 		got_SIGUSR2 = true;
2816 
2817 	/* Update shared memory status */
2818 	{
2819 		WalSnd	   *walsnd = MyWalSnd;
2820 
2821 		SpinLockAcquire(&walsnd->mutex);
2822 		walsnd->sentPtr = sentPtr;
2823 		SpinLockRelease(&walsnd->mutex);
2824 	}
2825 }
2826 
2827 /*
2828  * Shutdown if the sender is caught up.
2829  *
2830  * NB: This should only be called when the shutdown signal has been received
2831  * from postmaster.
2832  *
2833  * Note that if we determine that there's still more data to send, this
2834  * function will return control to the caller.
2835  */
2836 static void
WalSndDone(WalSndSendDataCallback send_data)2837 WalSndDone(WalSndSendDataCallback send_data)
2838 {
2839 	XLogRecPtr	replicatedPtr;
2840 
2841 	/* ... let's just be real sure we're caught up ... */
2842 	send_data();
2843 
2844 	/*
2845 	 * To figure out whether all WAL has successfully been replicated, check
2846 	 * flush location if valid, write otherwise. Tools like pg_receivewal will
2847 	 * usually (unless in synchronous mode) return an invalid flush location.
2848 	 */
2849 	replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2850 		MyWalSnd->write : MyWalSnd->flush;
2851 
2852 	if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2853 		!pq_is_send_pending())
2854 	{
2855 		/* Inform the standby that XLOG streaming is done */
2856 		EndCommand("COPY 0", DestRemote);
2857 		pq_flush();
2858 
2859 		proc_exit(0);
2860 	}
2861 	if (!waiting_for_ping_response)
2862 		WalSndKeepalive(true);
2863 }
2864 
2865 /*
2866  * Returns the latest point in WAL that has been safely flushed to disk, and
2867  * can be sent to the standby. This should only be called when in recovery,
2868  * ie. we're streaming to a cascaded standby.
2869  *
2870  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2871  * replayed WAL record.
2872  */
2873 static XLogRecPtr
GetStandbyFlushRecPtr(void)2874 GetStandbyFlushRecPtr(void)
2875 {
2876 	XLogRecPtr	replayPtr;
2877 	TimeLineID	replayTLI;
2878 	XLogRecPtr	receivePtr;
2879 	TimeLineID	receiveTLI;
2880 	XLogRecPtr	result;
2881 
2882 	/*
2883 	 * We can safely send what's already been replayed. Also, if walreceiver
2884 	 * is streaming WAL from the same timeline, we can send anything that it
2885 	 * has streamed, but hasn't been replayed yet.
2886 	 */
2887 
2888 	receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2889 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
2890 
2891 	ThisTimeLineID = replayTLI;
2892 
2893 	result = replayPtr;
2894 	if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2895 		result = receivePtr;
2896 
2897 	return result;
2898 }
2899 
2900 /*
2901  * Request walsenders to reload the currently-open WAL file
2902  */
2903 void
WalSndRqstFileReload(void)2904 WalSndRqstFileReload(void)
2905 {
2906 	int			i;
2907 
2908 	for (i = 0; i < max_wal_senders; i++)
2909 	{
2910 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
2911 
2912 		SpinLockAcquire(&walsnd->mutex);
2913 		if (walsnd->pid == 0)
2914 		{
2915 			SpinLockRelease(&walsnd->mutex);
2916 			continue;
2917 		}
2918 		walsnd->needreload = true;
2919 		SpinLockRelease(&walsnd->mutex);
2920 	}
2921 }
2922 
2923 /*
2924  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
2925  */
2926 void
HandleWalSndInitStopping(void)2927 HandleWalSndInitStopping(void)
2928 {
2929 	Assert(am_walsender);
2930 
2931 	/*
2932 	 * If replication has not yet started, die like with SIGTERM. If
2933 	 * replication is active, only set a flag and wake up the main loop. It
2934 	 * will send any outstanding WAL, wait for it to be replicated to the
2935 	 * standby, and then exit gracefully.
2936 	 */
2937 	if (!replication_active)
2938 		kill(MyProcPid, SIGTERM);
2939 	else
2940 		got_STOPPING = true;
2941 }
2942 
2943 /*
2944  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
2945  * sender should already have been switched to WALSNDSTATE_STOPPING at
2946  * this point.
2947  */
2948 static void
WalSndLastCycleHandler(SIGNAL_ARGS)2949 WalSndLastCycleHandler(SIGNAL_ARGS)
2950 {
2951 	int			save_errno = errno;
2952 
2953 	got_SIGUSR2 = true;
2954 	SetLatch(MyLatch);
2955 
2956 	errno = save_errno;
2957 }
2958 
2959 /* Set up signal handlers */
2960 void
WalSndSignals(void)2961 WalSndSignals(void)
2962 {
2963 	/* Set up signal handlers */
2964 	pqsignal(SIGHUP, PostgresSigHupHandler);	/* set flag to read config
2965 												 * file */
2966 	pqsignal(SIGINT, StatementCancelHandler);	/* query cancel */
2967 	pqsignal(SIGTERM, die);		/* request shutdown */
2968 	pqsignal(SIGQUIT, quickdie);	/* hard crash time */
2969 	InitializeTimeouts();		/* establishes SIGALRM handler */
2970 	pqsignal(SIGPIPE, SIG_IGN);
2971 	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
2972 	pqsignal(SIGUSR2, WalSndLastCycleHandler);	/* request a last cycle and
2973 												 * shutdown */
2974 
2975 	/* Reset some signals that are accepted by postmaster but not here */
2976 	pqsignal(SIGCHLD, SIG_DFL);
2977 	pqsignal(SIGTTIN, SIG_DFL);
2978 	pqsignal(SIGTTOU, SIG_DFL);
2979 	pqsignal(SIGCONT, SIG_DFL);
2980 	pqsignal(SIGWINCH, SIG_DFL);
2981 }
2982 
2983 /* Report shared-memory space needed by WalSndShmemInit */
2984 Size
WalSndShmemSize(void)2985 WalSndShmemSize(void)
2986 {
2987 	Size		size = 0;
2988 
2989 	size = offsetof(WalSndCtlData, walsnds);
2990 	size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2991 
2992 	return size;
2993 }
2994 
2995 /* Allocate and initialize walsender-related shared memory */
2996 void
WalSndShmemInit(void)2997 WalSndShmemInit(void)
2998 {
2999 	bool		found;
3000 	int			i;
3001 
3002 	WalSndCtl = (WalSndCtlData *)
3003 		ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3004 
3005 	if (!found)
3006 	{
3007 		/* First time through, so initialize */
3008 		MemSet(WalSndCtl, 0, WalSndShmemSize());
3009 
3010 		for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3011 			SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3012 
3013 		for (i = 0; i < max_wal_senders; i++)
3014 		{
3015 			WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
3016 
3017 			SpinLockInit(&walsnd->mutex);
3018 		}
3019 	}
3020 }
3021 
3022 /*
3023  * Wake up all walsenders
3024  *
3025  * This will be called inside critical sections, so throwing an error is not
3026  * advisable.
3027  */
3028 void
WalSndWakeup(void)3029 WalSndWakeup(void)
3030 {
3031 	int			i;
3032 
3033 	for (i = 0; i < max_wal_senders; i++)
3034 	{
3035 		Latch	   *latch;
3036 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
3037 
3038 		/*
3039 		 * Get latch pointer with spinlock held, for the unlikely case that
3040 		 * pointer reads aren't atomic (as they're 8 bytes).
3041 		 */
3042 		SpinLockAcquire(&walsnd->mutex);
3043 		latch = walsnd->latch;
3044 		SpinLockRelease(&walsnd->mutex);
3045 
3046 		if (latch != NULL)
3047 			SetLatch(latch);
3048 	}
3049 }
3050 
3051 /*
3052  * Signal all walsenders to move to stopping state.
3053  *
3054  * This will trigger walsenders to move to a state where no further WAL can be
3055  * generated. See this file's header for details.
3056  */
3057 void
WalSndInitStopping(void)3058 WalSndInitStopping(void)
3059 {
3060 	int			i;
3061 
3062 	for (i = 0; i < max_wal_senders; i++)
3063 	{
3064 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
3065 		pid_t		pid;
3066 
3067 		SpinLockAcquire(&walsnd->mutex);
3068 		pid = walsnd->pid;
3069 		SpinLockRelease(&walsnd->mutex);
3070 
3071 		if (pid == 0)
3072 			continue;
3073 
3074 		SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId);
3075 	}
3076 }
3077 
3078 /*
3079  * Wait that all the WAL senders have quit or reached the stopping state. This
3080  * is used by the checkpointer to control when the shutdown checkpoint can
3081  * safely be performed.
3082  */
3083 void
WalSndWaitStopping(void)3084 WalSndWaitStopping(void)
3085 {
3086 	for (;;)
3087 	{
3088 		int			i;
3089 		bool		all_stopped = true;
3090 
3091 		for (i = 0; i < max_wal_senders; i++)
3092 		{
3093 			WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
3094 
3095 			SpinLockAcquire(&walsnd->mutex);
3096 
3097 			if (walsnd->pid == 0)
3098 			{
3099 				SpinLockRelease(&walsnd->mutex);
3100 				continue;
3101 			}
3102 
3103 			if (walsnd->state != WALSNDSTATE_STOPPING)
3104 			{
3105 				all_stopped = false;
3106 				SpinLockRelease(&walsnd->mutex);
3107 				break;
3108 			}
3109 			SpinLockRelease(&walsnd->mutex);
3110 		}
3111 
3112 		/* safe to leave if confirmation is done for all WAL senders */
3113 		if (all_stopped)
3114 			return;
3115 
3116 		pg_usleep(10000L);		/* wait for 10 msec */
3117 	}
3118 }
3119 
3120 /* Set state for current walsender (only called in walsender) */
3121 void
WalSndSetState(WalSndState state)3122 WalSndSetState(WalSndState state)
3123 {
3124 	WalSnd	   *walsnd = MyWalSnd;
3125 
3126 	Assert(am_walsender);
3127 
3128 	if (walsnd->state == state)
3129 		return;
3130 
3131 	SpinLockAcquire(&walsnd->mutex);
3132 	walsnd->state = state;
3133 	SpinLockRelease(&walsnd->mutex);
3134 }
3135 
3136 /*
3137  * Return a string constant representing the state. This is used
3138  * in system views, and should *not* be translated.
3139  */
3140 static const char *
WalSndGetStateString(WalSndState state)3141 WalSndGetStateString(WalSndState state)
3142 {
3143 	switch (state)
3144 	{
3145 		case WALSNDSTATE_STARTUP:
3146 			return "startup";
3147 		case WALSNDSTATE_BACKUP:
3148 			return "backup";
3149 		case WALSNDSTATE_CATCHUP:
3150 			return "catchup";
3151 		case WALSNDSTATE_STREAMING:
3152 			return "streaming";
3153 		case WALSNDSTATE_STOPPING:
3154 			return "stopping";
3155 	}
3156 	return "UNKNOWN";
3157 }
3158 
3159 static Interval *
offset_to_interval(TimeOffset offset)3160 offset_to_interval(TimeOffset offset)
3161 {
3162 	Interval   *result = palloc(sizeof(Interval));
3163 
3164 	result->month = 0;
3165 	result->day = 0;
3166 	result->time = offset;
3167 
3168 	return result;
3169 }
3170 
3171 /*
3172  * Returns activity of walsenders, including pids and xlog locations sent to
3173  * standby servers.
3174  */
3175 Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)3176 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
3177 {
3178 #define PG_STAT_GET_WAL_SENDERS_COLS	11
3179 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3180 	TupleDesc	tupdesc;
3181 	Tuplestorestate *tupstore;
3182 	MemoryContext per_query_ctx;
3183 	MemoryContext oldcontext;
3184 	SyncRepStandbyData *sync_standbys;
3185 	int			num_standbys;
3186 	int			i;
3187 
3188 	/* check to see if caller supports us returning a tuplestore */
3189 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3190 		ereport(ERROR,
3191 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3192 				 errmsg("set-valued function called in context that cannot accept a set")));
3193 	if (!(rsinfo->allowedModes & SFRM_Materialize))
3194 		ereport(ERROR,
3195 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3196 				 errmsg("materialize mode required, but it is not " \
3197 						"allowed in this context")));
3198 
3199 	/* Build a tuple descriptor for our result type */
3200 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3201 		elog(ERROR, "return type must be a row type");
3202 
3203 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3204 	oldcontext = MemoryContextSwitchTo(per_query_ctx);
3205 
3206 	tupstore = tuplestore_begin_heap(true, false, work_mem);
3207 	rsinfo->returnMode = SFRM_Materialize;
3208 	rsinfo->setResult = tupstore;
3209 	rsinfo->setDesc = tupdesc;
3210 
3211 	MemoryContextSwitchTo(oldcontext);
3212 
3213 	/*
3214 	 * Get the currently active synchronous standbys.  This could be out of
3215 	 * date before we're done, but we'll use the data anyway.
3216 	 */
3217 	num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3218 
3219 	for (i = 0; i < max_wal_senders; i++)
3220 	{
3221 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
3222 		XLogRecPtr	sentPtr;
3223 		XLogRecPtr	write;
3224 		XLogRecPtr	flush;
3225 		XLogRecPtr	apply;
3226 		TimeOffset	writeLag;
3227 		TimeOffset	flushLag;
3228 		TimeOffset	applyLag;
3229 		int			priority;
3230 		int			pid;
3231 		WalSndState state;
3232 		bool		is_sync_standby;
3233 		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
3234 		bool		nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3235 		int			j;
3236 
3237 		/* Collect data from shared memory */
3238 		SpinLockAcquire(&walsnd->mutex);
3239 		if (walsnd->pid == 0)
3240 		{
3241 			SpinLockRelease(&walsnd->mutex);
3242 			continue;
3243 		}
3244 		pid = walsnd->pid;
3245 		sentPtr = walsnd->sentPtr;
3246 		state = walsnd->state;
3247 		write = walsnd->write;
3248 		flush = walsnd->flush;
3249 		apply = walsnd->apply;
3250 		writeLag = walsnd->writeLag;
3251 		flushLag = walsnd->flushLag;
3252 		applyLag = walsnd->applyLag;
3253 		priority = walsnd->sync_standby_priority;
3254 		SpinLockRelease(&walsnd->mutex);
3255 
3256 		/*
3257 		 * Detect whether walsender is/was considered synchronous.  We can
3258 		 * provide some protection against stale data by checking the PID
3259 		 * along with walsnd_index.
3260 		 */
3261 		is_sync_standby = false;
3262 		for (j = 0; j < num_standbys; j++)
3263 		{
3264 			if (sync_standbys[j].walsnd_index == i &&
3265 				sync_standbys[j].pid == pid)
3266 			{
3267 				is_sync_standby = true;
3268 				break;
3269 			}
3270 		}
3271 
3272 		memset(nulls, 0, sizeof(nulls));
3273 		values[0] = Int32GetDatum(pid);
3274 
3275 		if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3276 		{
3277 			/*
3278 			 * Only superusers and members of pg_read_all_stats can see
3279 			 * details. Other users only get the pid value to know it's a
3280 			 * walsender, but no details.
3281 			 */
3282 			MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3283 		}
3284 		else
3285 		{
3286 			values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3287 
3288 			if (XLogRecPtrIsInvalid(sentPtr))
3289 				nulls[2] = true;
3290 			values[2] = LSNGetDatum(sentPtr);
3291 
3292 			if (XLogRecPtrIsInvalid(write))
3293 				nulls[3] = true;
3294 			values[3] = LSNGetDatum(write);
3295 
3296 			if (XLogRecPtrIsInvalid(flush))
3297 				nulls[4] = true;
3298 			values[4] = LSNGetDatum(flush);
3299 
3300 			if (XLogRecPtrIsInvalid(apply))
3301 				nulls[5] = true;
3302 			values[5] = LSNGetDatum(apply);
3303 
3304 			/*
3305 			 * Treat a standby such as a pg_basebackup background process
3306 			 * which always returns an invalid flush location, as an
3307 			 * asynchronous standby.
3308 			 */
3309 			priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3310 
3311 			if (writeLag < 0)
3312 				nulls[6] = true;
3313 			else
3314 				values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3315 
3316 			if (flushLag < 0)
3317 				nulls[7] = true;
3318 			else
3319 				values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3320 
3321 			if (applyLag < 0)
3322 				nulls[8] = true;
3323 			else
3324 				values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3325 
3326 			values[9] = Int32GetDatum(priority);
3327 
3328 			/*
3329 			 * More easily understood version of standby state. This is purely
3330 			 * informational.
3331 			 *
3332 			 * In quorum-based sync replication, the role of each standby
3333 			 * listed in synchronous_standby_names can be changing very
3334 			 * frequently. Any standbys considered as "sync" at one moment can
3335 			 * be switched to "potential" ones at the next moment. So, it's
3336 			 * basically useless to report "sync" or "potential" as their sync
3337 			 * states. We report just "quorum" for them.
3338 			 */
3339 			if (priority == 0)
3340 				values[10] = CStringGetTextDatum("async");
3341 			else if (is_sync_standby)
3342 				values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
3343 					CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3344 			else
3345 				values[10] = CStringGetTextDatum("potential");
3346 		}
3347 
3348 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3349 	}
3350 
3351 	/* clean up and return the tuplestore */
3352 	tuplestore_donestoring(tupstore);
3353 
3354 	return (Datum) 0;
3355 }
3356 
3357 /*
3358  * Send a keepalive message to standby.
3359  *
3360  * If requestReply is set, the message requests the other party to send
3361  * a message back to us, for heartbeat purposes.  We also set a flag to
3362  * let nearby code that we're waiting for that response, to avoid
3363  * repeated requests.
3364  */
3365 static void
WalSndKeepalive(bool requestReply)3366 WalSndKeepalive(bool requestReply)
3367 {
3368 	elog(DEBUG2, "sending replication keepalive");
3369 
3370 	/* construct the message... */
3371 	resetStringInfo(&output_message);
3372 	pq_sendbyte(&output_message, 'k');
3373 	pq_sendint64(&output_message, sentPtr);
3374 	pq_sendint64(&output_message, GetCurrentTimestamp());
3375 	pq_sendbyte(&output_message, requestReply ? 1 : 0);
3376 
3377 	/* ... and send it wrapped in CopyData */
3378 	pq_putmessage_noblock('d', output_message.data, output_message.len);
3379 
3380 	/* Set local flag */
3381 	if (requestReply)
3382 		waiting_for_ping_response = true;
3383 }
3384 
3385 /*
3386  * Send keepalive message if too much time has elapsed.
3387  */
3388 static void
WalSndKeepaliveIfNecessary(void)3389 WalSndKeepaliveIfNecessary(void)
3390 {
3391 	TimestampTz ping_time;
3392 
3393 	/*
3394 	 * Don't send keepalive messages if timeouts are globally disabled or
3395 	 * we're doing something not partaking in timeouts.
3396 	 */
3397 	if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3398 		return;
3399 
3400 	if (waiting_for_ping_response)
3401 		return;
3402 
3403 	/*
3404 	 * If half of wal_sender_timeout has lapsed without receiving any reply
3405 	 * from the standby, send a keep-alive message to the standby requesting
3406 	 * an immediate reply.
3407 	 */
3408 	ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
3409 											wal_sender_timeout / 2);
3410 	if (last_processing >= ping_time)
3411 	{
3412 		WalSndKeepalive(true);
3413 
3414 		/* Try to flush pending output to the client */
3415 		if (pq_flush_if_writable() != 0)
3416 			WalSndShutdown();
3417 	}
3418 }
3419 
3420 /*
3421  * Record the end of the WAL and the time it was flushed locally, so that
3422  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3423  * eventually reported to have been written, flushed and applied by the
3424  * standby in a reply message.
3425  */
3426 static void
LagTrackerWrite(XLogRecPtr lsn,TimestampTz local_flush_time)3427 LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
3428 {
3429 	bool		buffer_full;
3430 	int			new_write_head;
3431 	int			i;
3432 
3433 	if (!am_walsender)
3434 		return;
3435 
3436 	/*
3437 	 * If the lsn hasn't advanced since last time, then do nothing.  This way
3438 	 * we only record a new sample when new WAL has been written.
3439 	 */
3440 	if (LagTracker.last_lsn == lsn)
3441 		return;
3442 	LagTracker.last_lsn = lsn;
3443 
3444 	/*
3445 	 * If advancing the write head of the circular buffer would crash into any
3446 	 * of the read heads, then the buffer is full.  In other words, the
3447 	 * slowest reader (presumably apply) is the one that controls the release
3448 	 * of space.
3449 	 */
3450 	new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3451 	buffer_full = false;
3452 	for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3453 	{
3454 		if (new_write_head == LagTracker.read_heads[i])
3455 			buffer_full = true;
3456 	}
3457 
3458 	/*
3459 	 * If the buffer is full, for now we just rewind by one slot and overwrite
3460 	 * the last sample, as a simple (if somewhat uneven) way to lower the
3461 	 * sampling rate.  There may be better adaptive compaction algorithms.
3462 	 */
3463 	if (buffer_full)
3464 	{
3465 		new_write_head = LagTracker.write_head;
3466 		if (LagTracker.write_head > 0)
3467 			LagTracker.write_head--;
3468 		else
3469 			LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3470 	}
3471 
3472 	/* Store a sample at the current write head position. */
3473 	LagTracker.buffer[LagTracker.write_head].lsn = lsn;
3474 	LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
3475 	LagTracker.write_head = new_write_head;
3476 }
3477 
3478 /*
3479  * Find out how much time has elapsed between the moment WAL location 'lsn'
3480  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3481  * We have a separate read head for each of the reported LSN locations we
3482  * receive in replies from standby; 'head' controls which read head is
3483  * used.  Whenever a read head crosses an LSN which was written into the
3484  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3485  * find out the time this LSN (or an earlier one) was flushed locally, and
3486  * therefore compute the lag.
3487  *
3488  * Return -1 if no new sample data is available, and otherwise the elapsed
3489  * time in microseconds.
3490  */
3491 static TimeOffset
LagTrackerRead(int head,XLogRecPtr lsn,TimestampTz now)3492 LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
3493 {
3494 	TimestampTz time = 0;
3495 
3496 	/* Read all unread samples up to this LSN or end of buffer. */
3497 	while (LagTracker.read_heads[head] != LagTracker.write_head &&
3498 		   LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
3499 	{
3500 		time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3501 		LagTracker.last_read[head] =
3502 			LagTracker.buffer[LagTracker.read_heads[head]];
3503 		LagTracker.read_heads[head] =
3504 			(LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3505 	}
3506 
3507 	/*
3508 	 * If the lag tracker is empty, that means the standby has processed
3509 	 * everything we've ever sent so we should now clear 'last_read'.  If we
3510 	 * didn't do that, we'd risk using a stale and irrelevant sample for
3511 	 * interpolation at the beginning of the next burst of WAL after a period
3512 	 * of idleness.
3513 	 */
3514 	if (LagTracker.read_heads[head] == LagTracker.write_head)
3515 		LagTracker.last_read[head].time = 0;
3516 
3517 	if (time > now)
3518 	{
3519 		/* If the clock somehow went backwards, treat as not found. */
3520 		return -1;
3521 	}
3522 	else if (time == 0)
3523 	{
3524 		/*
3525 		 * We didn't cross a time.  If there is a future sample that we
3526 		 * haven't reached yet, and we've already reached at least one sample,
3527 		 * let's interpolate the local flushed time.  This is mainly useful
3528 		 * for reporting a completely stuck apply position as having
3529 		 * increasing lag, since otherwise we'd have to wait for it to
3530 		 * eventually start moving again and cross one of our samples before
3531 		 * we can show the lag increasing.
3532 		 */
3533 		if (LagTracker.read_heads[head] == LagTracker.write_head)
3534 		{
3535 			/* There are no future samples, so we can't interpolate. */
3536 			return -1;
3537 		}
3538 		else if (LagTracker.last_read[head].time != 0)
3539 		{
3540 			/* We can interpolate between last_read and the next sample. */
3541 			double		fraction;
3542 			WalTimeSample prev = LagTracker.last_read[head];
3543 			WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
3544 
3545 			if (lsn < prev.lsn)
3546 			{
3547 				/*
3548 				 * Reported LSNs shouldn't normally go backwards, but it's
3549 				 * possible when there is a timeline change.  Treat as not
3550 				 * found.
3551 				 */
3552 				return -1;
3553 			}
3554 
3555 			Assert(prev.lsn < next.lsn);
3556 
3557 			if (prev.time > next.time)
3558 			{
3559 				/* If the clock somehow went backwards, treat as not found. */
3560 				return -1;
3561 			}
3562 
3563 			/* See how far we are between the previous and next samples. */
3564 			fraction =
3565 				(double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3566 
3567 			/* Scale the local flush time proportionally. */
3568 			time = (TimestampTz)
3569 				((double) prev.time + (next.time - prev.time) * fraction);
3570 		}
3571 		else
3572 		{
3573 			/*
3574 			 * We have only a future sample, implying that we were entirely
3575 			 * caught up but and now there is a new burst of WAL and the
3576 			 * standby hasn't processed the first sample yet.  Until the
3577 			 * standby reaches the future sample the best we can do is report
3578 			 * the hypothetical lag if that sample were to be replayed now.
3579 			 */
3580 			time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3581 		}
3582 	}
3583 
3584 	/* Return the elapsed time since local flush time in microseconds. */
3585 	Assert(time != 0);
3586 	return now - time;
3587 }
3588