1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, checkpointer sends us
28  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited.  If
29  * the backend is idle or runs an SQL query this causes the backend to
30  * shutdown, if logical replication is in progress all existing WAL records
31  * are processed followed by a shutdown.  Otherwise this causes the walsender
32  * to switch to the "stopping" state. In this state, the walsender will reject
33  * any further replication commands. The checkpointer begins the shutdown
34  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36  * walsender to send any outstanding WAL, including the shutdown checkpoint
37  * record, wait for it to be replicated to the standby, and then exit.
38  *
39  *
40  * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
41  *
42  * IDENTIFICATION
43  *	  src/backend/replication/walsender.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <signal.h>
50 #include <unistd.h>
51 
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogutils.h"
58 
59 #include "catalog/pg_authid.h"
60 #include "catalog/pg_type.h"
61 #include "commands/dbcommands.h"
62 #include "commands/defrem.h"
63 #include "funcapi.h"
64 #include "libpq/libpq.h"
65 #include "libpq/pqformat.h"
66 #include "miscadmin.h"
67 #include "nodes/replnodes.h"
68 #include "pgstat.h"
69 #include "replication/basebackup.h"
70 #include "replication/decode.h"
71 #include "replication/logical.h"
72 #include "replication/logicalfuncs.h"
73 #include "replication/slot.h"
74 #include "replication/snapbuild.h"
75 #include "replication/syncrep.h"
76 #include "replication/walreceiver.h"
77 #include "replication/walsender.h"
78 #include "replication/walsender_private.h"
79 #include "storage/condition_variable.h"
80 #include "storage/fd.h"
81 #include "storage/ipc.h"
82 #include "storage/pmsignal.h"
83 #include "storage/proc.h"
84 #include "storage/procarray.h"
85 #include "tcop/dest.h"
86 #include "tcop/tcopprot.h"
87 #include "utils/builtins.h"
88 #include "utils/guc.h"
89 #include "utils/memutils.h"
90 #include "utils/pg_lsn.h"
91 #include "utils/portal.h"
92 #include "utils/ps_status.h"
93 #include "utils/resowner.h"
94 #include "utils/timeout.h"
95 #include "utils/timestamp.h"
96 
97 /*
98  * Maximum data payload in a WAL data message.  Must be >= XLOG_BLCKSZ.
99  *
100  * We don't have a good idea of what a good value would be; there's some
101  * overhead per message in both walsender and walreceiver, but on the other
102  * hand sending large batches makes walsender less responsive to signals
103  * because signals are checked only between messages.  128kB (with
104  * default 8k blocks) seems like a reasonable guess for now.
105  */
106 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
107 
108 /* Array of WalSnds in shared memory */
109 WalSndCtlData *WalSndCtl = NULL;
110 
111 /* My slot in the shared memory array */
112 WalSnd	   *MyWalSnd = NULL;
113 
114 /* Global state */
115 bool		am_walsender = false;	/* Am I a walsender process? */
116 bool		am_cascading_walsender = false; /* Am I cascading WAL to another
117 											 * standby? */
118 bool		am_db_walsender = false;	/* Connected to a database? */
119 
120 /* User-settable parameters for walsender */
121 int			max_wal_senders = 0;	/* the maximum number of concurrent
122 									 * walsenders */
123 int			wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
124 											 * data message */
125 bool		log_replication_commands = false;
126 
127 /*
128  * State for WalSndWakeupRequest
129  */
130 bool		wake_wal_senders = false;
131 
132 /*
133  * These variables are used similarly to openLogFile/SegNo/Off,
134  * but for walsender to read the XLOG.
135  */
136 static int	sendFile = -1;
137 static XLogSegNo sendSegNo = 0;
138 static uint32 sendOff = 0;
139 
140 /* Timeline ID of the currently open file */
141 static TimeLineID curFileTimeLine = 0;
142 
143 /*
144  * These variables keep track of the state of the timeline we're currently
145  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
146  * the timeline is not the latest timeline on this server, and the server's
147  * history forked off from that timeline at sendTimeLineValidUpto.
148  */
149 static TimeLineID sendTimeLine = 0;
150 static TimeLineID sendTimeLineNextTLI = 0;
151 static bool sendTimeLineIsHistoric = false;
152 static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
153 
154 /*
155  * How far have we sent WAL already? This is also advertised in
156  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
157  */
158 static XLogRecPtr sentPtr = InvalidXLogRecPtr;
159 
160 /* Buffers for constructing outgoing messages and processing reply messages. */
161 static StringInfoData output_message;
162 static StringInfoData reply_message;
163 static StringInfoData tmpbuf;
164 
165 /* Timestamp of last ProcessRepliesIfAny(). */
166 static TimestampTz last_processing = 0;
167 
168 /*
169  * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
170  * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
171  */
172 static TimestampTz last_reply_timestamp = 0;
173 
174 /* Have we sent a heartbeat message asking for reply, since last reply? */
175 static bool waiting_for_ping_response = false;
176 
177 /*
178  * While streaming WAL in Copy mode, streamingDoneSending is set to true
179  * after we have sent CopyDone. We should not send any more CopyData messages
180  * after that. streamingDoneReceiving is set to true when we receive CopyDone
181  * from the other end. When both become true, it's time to exit Copy mode.
182  */
183 static bool streamingDoneSending;
184 static bool streamingDoneReceiving;
185 
186 /* Are we there yet? */
187 static bool WalSndCaughtUp = false;
188 
189 /* Flags set by signal handlers for later service in main loop */
190 static volatile sig_atomic_t got_SIGUSR2 = false;
191 static volatile sig_atomic_t got_STOPPING = false;
192 
193 /*
194  * This is set while we are streaming. When not set
195  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
196  * the main loop is responsible for checking got_STOPPING and terminating when
197  * it's set (after streaming any remaining WAL).
198  */
199 static volatile sig_atomic_t replication_active = false;
200 
201 static LogicalDecodingContext *logical_decoding_ctx = NULL;
202 static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
203 
204 /* A sample associating a WAL location with the time it was written. */
205 typedef struct
206 {
207 	XLogRecPtr	lsn;
208 	TimestampTz time;
209 } WalTimeSample;
210 
211 /* The size of our buffer of time samples. */
212 #define LAG_TRACKER_BUFFER_SIZE 8192
213 
214 /* A mechanism for tracking replication lag. */
215 static struct
216 {
217 	XLogRecPtr	last_lsn;
218 	WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE];
219 	int			write_head;
220 	int			read_heads[NUM_SYNC_REP_WAIT_MODE];
221 	WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
222 }			LagTracker;
223 
224 /* Signal handlers */
225 static void WalSndLastCycleHandler(SIGNAL_ARGS);
226 
227 /* Prototypes for private functions */
228 typedef void (*WalSndSendDataCallback) (void);
229 static void WalSndLoop(WalSndSendDataCallback send_data);
230 static void InitWalSenderSlot(void);
231 static void WalSndKill(int code, Datum arg);
232 static void WalSndShutdown(void) pg_attribute_noreturn();
233 static void XLogSendPhysical(void);
234 static void XLogSendLogical(void);
235 static void WalSndDone(WalSndSendDataCallback send_data);
236 static XLogRecPtr GetStandbyFlushRecPtr(void);
237 static void IdentifySystem(void);
238 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
239 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
240 static void StartReplication(StartReplicationCmd *cmd);
241 static void StartLogicalReplication(StartReplicationCmd *cmd);
242 static void ProcessStandbyMessage(void);
243 static void ProcessStandbyReplyMessage(void);
244 static void ProcessStandbyHSFeedbackMessage(void);
245 static void ProcessRepliesIfAny(void);
246 static void WalSndKeepalive(bool requestReply);
247 static void WalSndKeepaliveIfNecessary(void);
248 static void WalSndCheckTimeOut(void);
249 static long WalSndComputeSleeptime(TimestampTz now);
250 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
251 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
252 static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
253 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
254 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
255 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
256 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
257 
258 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
259 
260 
261 /* Initialize walsender process before entering the main command loop */
262 void
InitWalSender(void)263 InitWalSender(void)
264 {
265 	am_cascading_walsender = RecoveryInProgress();
266 
267 	/* Create a per-walsender data structure in shared memory */
268 	InitWalSenderSlot();
269 
270 	/* Set up resource owner */
271 	CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
272 
273 	/*
274 	 * Let postmaster know that we're a WAL sender. Once we've declared us as
275 	 * a WAL sender process, postmaster will let us outlive the bgwriter and
276 	 * kill us last in the shutdown sequence, so we get a chance to stream all
277 	 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
278 	 * there's no going back, and we mustn't write any WAL records after this.
279 	 */
280 	MarkPostmasterChildWalSender();
281 	SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
282 
283 	/* Initialize empty timestamp buffer for lag tracking. */
284 	memset(&LagTracker, 0, sizeof(LagTracker));
285 }
286 
287 /*
288  * Clean up after an error.
289  *
290  * WAL sender processes don't use transactions like regular backends do.
291  * This function does any cleanup required after an error in a WAL sender
292  * process, similar to what transaction abort does in a regular backend.
293  */
294 void
WalSndErrorCleanup(void)295 WalSndErrorCleanup(void)
296 {
297 	LWLockReleaseAll();
298 	ConditionVariableCancelSleep();
299 	pgstat_report_wait_end();
300 
301 	if (sendFile >= 0)
302 	{
303 		close(sendFile);
304 		sendFile = -1;
305 	}
306 
307 	if (MyReplicationSlot != NULL)
308 		ReplicationSlotRelease();
309 
310 	ReplicationSlotCleanup();
311 
312 	replication_active = false;
313 
314 	if (got_STOPPING || got_SIGUSR2)
315 		proc_exit(0);
316 
317 	/* Revert back to startup state */
318 	WalSndSetState(WALSNDSTATE_STARTUP);
319 }
320 
321 /*
322  * Handle a client's connection abort in an orderly manner.
323  */
324 static void
WalSndShutdown(void)325 WalSndShutdown(void)
326 {
327 	/*
328 	 * Reset whereToSendOutput to prevent ereport from attempting to send any
329 	 * more messages to the standby.
330 	 */
331 	if (whereToSendOutput == DestRemote)
332 		whereToSendOutput = DestNone;
333 
334 	proc_exit(0);
335 	abort();					/* keep the compiler quiet */
336 }
337 
338 /*
339  * Handle the IDENTIFY_SYSTEM command.
340  */
341 static void
IdentifySystem(void)342 IdentifySystem(void)
343 {
344 	char		sysid[32];
345 	char		xloc[MAXFNAMELEN];
346 	XLogRecPtr	logptr;
347 	char	   *dbname = NULL;
348 	DestReceiver *dest;
349 	TupOutputState *tstate;
350 	TupleDesc	tupdesc;
351 	Datum		values[4];
352 	bool		nulls[4];
353 
354 	/*
355 	 * Reply with a result set with one row, four columns. First col is system
356 	 * ID, second is timeline ID, third is current xlog location and the
357 	 * fourth contains the database name if we are connected to one.
358 	 */
359 
360 	snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
361 			 GetSystemIdentifier());
362 
363 	am_cascading_walsender = RecoveryInProgress();
364 	if (am_cascading_walsender)
365 	{
366 		/* this also updates ThisTimeLineID */
367 		logptr = GetStandbyFlushRecPtr();
368 	}
369 	else
370 		logptr = GetFlushRecPtr();
371 
372 	snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
373 
374 	if (MyDatabaseId != InvalidOid)
375 	{
376 		MemoryContext cur = CurrentMemoryContext;
377 
378 		/* syscache access needs a transaction env. */
379 		StartTransactionCommand();
380 		/* make dbname live outside TX context */
381 		MemoryContextSwitchTo(cur);
382 		dbname = get_database_name(MyDatabaseId);
383 		CommitTransactionCommand();
384 		/* CommitTransactionCommand switches to TopMemoryContext */
385 		MemoryContextSwitchTo(cur);
386 	}
387 
388 	dest = CreateDestReceiver(DestRemoteSimple);
389 	MemSet(nulls, false, sizeof(nulls));
390 
391 	/* need a tuple descriptor representing four columns */
392 	tupdesc = CreateTemplateTupleDesc(4, false);
393 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
394 							  TEXTOID, -1, 0);
395 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
396 							  INT4OID, -1, 0);
397 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
398 							  TEXTOID, -1, 0);
399 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
400 							  TEXTOID, -1, 0);
401 
402 	/* prepare for projection of tuples */
403 	tstate = begin_tup_output_tupdesc(dest, tupdesc);
404 
405 	/* column 1: system identifier */
406 	values[0] = CStringGetTextDatum(sysid);
407 
408 	/* column 2: timeline */
409 	values[1] = Int32GetDatum(ThisTimeLineID);
410 
411 	/* column 3: wal location */
412 	values[2] = CStringGetTextDatum(xloc);
413 
414 	/* column 4: database name, or NULL if none */
415 	if (dbname)
416 		values[3] = CStringGetTextDatum(dbname);
417 	else
418 		nulls[3] = true;
419 
420 	/* send it to dest */
421 	do_tup_output(tstate, values, nulls);
422 
423 	end_tup_output(tstate);
424 }
425 
426 
427 /*
428  * Handle TIMELINE_HISTORY command.
429  */
430 static void
SendTimeLineHistory(TimeLineHistoryCmd * cmd)431 SendTimeLineHistory(TimeLineHistoryCmd *cmd)
432 {
433 	StringInfoData buf;
434 	char		histfname[MAXFNAMELEN];
435 	char		path[MAXPGPATH];
436 	int			fd;
437 	off_t		histfilelen;
438 	off_t		bytesleft;
439 	Size		len;
440 
441 	/*
442 	 * Reply with a result set with one row, and two columns. The first col is
443 	 * the name of the history file, 2nd is the contents.
444 	 */
445 
446 	TLHistoryFileName(histfname, cmd->timeline);
447 	TLHistoryFilePath(path, cmd->timeline);
448 
449 	/* Send a RowDescription message */
450 	pq_beginmessage(&buf, 'T');
451 	pq_sendint(&buf, 2, 2);		/* 2 fields */
452 
453 	/* first field */
454 	pq_sendstring(&buf, "filename");	/* col name */
455 	pq_sendint(&buf, 0, 4);		/* table oid */
456 	pq_sendint(&buf, 0, 2);		/* attnum */
457 	pq_sendint(&buf, TEXTOID, 4);	/* type oid */
458 	pq_sendint(&buf, -1, 2);	/* typlen */
459 	pq_sendint(&buf, 0, 4);		/* typmod */
460 	pq_sendint(&buf, 0, 2);		/* format code */
461 
462 	/* second field */
463 	pq_sendstring(&buf, "content"); /* col name */
464 	pq_sendint(&buf, 0, 4);		/* table oid */
465 	pq_sendint(&buf, 0, 2);		/* attnum */
466 	/*
467 	 * While this is labeled as BYTEAOID, it is the same output format
468 	 * as TEXTOID above.
469 	 */
470 	pq_sendint(&buf, BYTEAOID, 4);	/* type oid */
471 	pq_sendint(&buf, -1, 2);	/* typlen */
472 	pq_sendint(&buf, 0, 4);		/* typmod */
473 	pq_sendint(&buf, 0, 2);		/* format code */
474 	pq_endmessage(&buf);
475 
476 	/* Send a DataRow message */
477 	pq_beginmessage(&buf, 'D');
478 	pq_sendint(&buf, 2, 2);		/* # of columns */
479 	len = strlen(histfname);
480 	pq_sendint(&buf, len, 4);	/* col1 len */
481 	pq_sendbytes(&buf, histfname, len);
482 
483 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
484 	if (fd < 0)
485 		ereport(ERROR,
486 				(errcode_for_file_access(),
487 				 errmsg("could not open file \"%s\": %m", path)));
488 
489 	/* Determine file length and send it to client */
490 	histfilelen = lseek(fd, 0, SEEK_END);
491 	if (histfilelen < 0)
492 		ereport(ERROR,
493 				(errcode_for_file_access(),
494 				 errmsg("could not seek to end of file \"%s\": %m", path)));
495 	if (lseek(fd, 0, SEEK_SET) != 0)
496 		ereport(ERROR,
497 				(errcode_for_file_access(),
498 				 errmsg("could not seek to beginning of file \"%s\": %m", path)));
499 
500 	pq_sendint(&buf, histfilelen, 4);	/* col2 len */
501 
502 	bytesleft = histfilelen;
503 	while (bytesleft > 0)
504 	{
505 		PGAlignedBlock rbuf;
506 		int			nread;
507 
508 		pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
509 		nread = read(fd, rbuf.data, sizeof(rbuf));
510 		pgstat_report_wait_end();
511 		if (nread <= 0)
512 			ereport(ERROR,
513 					(errcode_for_file_access(),
514 					 errmsg("could not read file \"%s\": %m",
515 							path)));
516 		pq_sendbytes(&buf, rbuf.data, nread);
517 		bytesleft -= nread;
518 	}
519 	CloseTransientFile(fd);
520 
521 	pq_endmessage(&buf);
522 }
523 
524 /*
525  * Handle START_REPLICATION command.
526  *
527  * At the moment, this never returns, but an ereport(ERROR) will take us back
528  * to the main loop.
529  */
530 static void
StartReplication(StartReplicationCmd * cmd)531 StartReplication(StartReplicationCmd *cmd)
532 {
533 	StringInfoData buf;
534 	XLogRecPtr	FlushPtr;
535 
536 	if (ThisTimeLineID == 0)
537 		ereport(ERROR,
538 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
539 				 errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
540 
541 	/*
542 	 * We assume here that we're logging enough information in the WAL for
543 	 * log-shipping, since this is checked in PostmasterMain().
544 	 *
545 	 * NOTE: wal_level can only change at shutdown, so in most cases it is
546 	 * difficult for there to be WAL data that we can still see that was
547 	 * written at wal_level='minimal'.
548 	 */
549 
550 	if (cmd->slotname)
551 	{
552 		ReplicationSlotAcquire(cmd->slotname, true);
553 		if (SlotIsLogical(MyReplicationSlot))
554 			ereport(ERROR,
555 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
556 					 (errmsg("cannot use a logical replication slot for physical replication"))));
557 	}
558 
559 	/*
560 	 * Select the timeline. If it was given explicitly by the client, use
561 	 * that. Otherwise use the timeline of the last replayed record, which is
562 	 * kept in ThisTimeLineID.
563 	 */
564 	if (am_cascading_walsender)
565 	{
566 		/* this also updates ThisTimeLineID */
567 		FlushPtr = GetStandbyFlushRecPtr();
568 	}
569 	else
570 		FlushPtr = GetFlushRecPtr();
571 
572 	if (cmd->timeline != 0)
573 	{
574 		XLogRecPtr	switchpoint;
575 
576 		sendTimeLine = cmd->timeline;
577 		if (sendTimeLine == ThisTimeLineID)
578 		{
579 			sendTimeLineIsHistoric = false;
580 			sendTimeLineValidUpto = InvalidXLogRecPtr;
581 		}
582 		else
583 		{
584 			List	   *timeLineHistory;
585 
586 			sendTimeLineIsHistoric = true;
587 
588 			/*
589 			 * Check that the timeline the client requested exists, and the
590 			 * requested start location is on that timeline.
591 			 */
592 			timeLineHistory = readTimeLineHistory(ThisTimeLineID);
593 			switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
594 										 &sendTimeLineNextTLI);
595 			list_free_deep(timeLineHistory);
596 
597 			/*
598 			 * Found the requested timeline in the history. Check that
599 			 * requested startpoint is on that timeline in our history.
600 			 *
601 			 * This is quite loose on purpose. We only check that we didn't
602 			 * fork off the requested timeline before the switchpoint. We
603 			 * don't check that we switched *to* it before the requested
604 			 * starting point. This is because the client can legitimately
605 			 * request to start replication from the beginning of the WAL
606 			 * segment that contains switchpoint, but on the new timeline, so
607 			 * that it doesn't end up with a partial segment. If you ask for
608 			 * too old a starting point, you'll get an error later when we
609 			 * fail to find the requested WAL segment in pg_wal.
610 			 *
611 			 * XXX: we could be more strict here and only allow a startpoint
612 			 * that's older than the switchpoint, if it's still in the same
613 			 * WAL segment.
614 			 */
615 			if (!XLogRecPtrIsInvalid(switchpoint) &&
616 				switchpoint < cmd->startpoint)
617 			{
618 				ereport(ERROR,
619 						(errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
620 								(uint32) (cmd->startpoint >> 32),
621 								(uint32) (cmd->startpoint),
622 								cmd->timeline),
623 						 errdetail("This server's history forked from timeline %u at %X/%X.",
624 								   cmd->timeline,
625 								   (uint32) (switchpoint >> 32),
626 								   (uint32) (switchpoint))));
627 			}
628 			sendTimeLineValidUpto = switchpoint;
629 		}
630 	}
631 	else
632 	{
633 		sendTimeLine = ThisTimeLineID;
634 		sendTimeLineValidUpto = InvalidXLogRecPtr;
635 		sendTimeLineIsHistoric = false;
636 	}
637 
638 	streamingDoneSending = streamingDoneReceiving = false;
639 
640 	/* If there is nothing to stream, don't even enter COPY mode */
641 	if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
642 	{
643 		/*
644 		 * When we first start replication the standby will be behind the
645 		 * primary. For some applications, for example synchronous
646 		 * replication, it is important to have a clear state for this initial
647 		 * catchup mode, so we can trigger actions when we change streaming
648 		 * state later. We may stay in this state for a long time, which is
649 		 * exactly why we want to be able to monitor whether or not we are
650 		 * still here.
651 		 */
652 		WalSndSetState(WALSNDSTATE_CATCHUP);
653 
654 		/* Send a CopyBothResponse message, and start streaming */
655 		pq_beginmessage(&buf, 'W');
656 		pq_sendbyte(&buf, 0);
657 		pq_sendint(&buf, 0, 2);
658 		pq_endmessage(&buf);
659 		pq_flush();
660 
661 		/*
662 		 * Don't allow a request to stream from a future point in WAL that
663 		 * hasn't been flushed to disk in this server yet.
664 		 */
665 		if (FlushPtr < cmd->startpoint)
666 		{
667 			ereport(ERROR,
668 					(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
669 							(uint32) (cmd->startpoint >> 32),
670 							(uint32) (cmd->startpoint),
671 							(uint32) (FlushPtr >> 32),
672 							(uint32) (FlushPtr))));
673 		}
674 
675 		/* Start streaming from the requested point */
676 		sentPtr = cmd->startpoint;
677 
678 		/* Initialize shared memory status, too */
679 		SpinLockAcquire(&MyWalSnd->mutex);
680 		MyWalSnd->sentPtr = sentPtr;
681 		SpinLockRelease(&MyWalSnd->mutex);
682 
683 		SyncRepInitConfig();
684 
685 		/* Main loop of walsender */
686 		replication_active = true;
687 
688 		WalSndLoop(XLogSendPhysical);
689 
690 		replication_active = false;
691 		if (got_STOPPING)
692 			proc_exit(0);
693 		WalSndSetState(WALSNDSTATE_STARTUP);
694 
695 		Assert(streamingDoneSending && streamingDoneReceiving);
696 	}
697 
698 	if (cmd->slotname)
699 		ReplicationSlotRelease();
700 
701 	/*
702 	 * Copy is finished now. Send a single-row result set indicating the next
703 	 * timeline.
704 	 */
705 	if (sendTimeLineIsHistoric)
706 	{
707 		char		startpos_str[8 + 1 + 8 + 1];
708 		DestReceiver *dest;
709 		TupOutputState *tstate;
710 		TupleDesc	tupdesc;
711 		Datum		values[2];
712 		bool		nulls[2];
713 
714 		snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
715 				 (uint32) (sendTimeLineValidUpto >> 32),
716 				 (uint32) sendTimeLineValidUpto);
717 
718 		dest = CreateDestReceiver(DestRemoteSimple);
719 		MemSet(nulls, false, sizeof(nulls));
720 
721 		/*
722 		 * Need a tuple descriptor representing two columns. int8 may seem
723 		 * like a surprising data type for this, but in theory int4 would not
724 		 * be wide enough for this, as TimeLineID is unsigned.
725 		 */
726 		tupdesc = CreateTemplateTupleDesc(2, false);
727 		TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
728 								  INT8OID, -1, 0);
729 		TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
730 								  TEXTOID, -1, 0);
731 
732 		/* prepare for projection of tuple */
733 		tstate = begin_tup_output_tupdesc(dest, tupdesc);
734 
735 		values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
736 		values[1] = CStringGetTextDatum(startpos_str);
737 
738 		/* send it to dest */
739 		do_tup_output(tstate, values, nulls);
740 
741 		end_tup_output(tstate);
742 	}
743 
744 	/* Send CommandComplete message */
745 	pq_puttextmessage('C', "START_STREAMING");
746 }
747 
748 /*
749  * read_page callback for logical decoding contexts, as a walsender process.
750  *
751  * Inside the walsender we can do better than logical_read_local_xlog_page,
752  * which has to do a plain sleep/busy loop, because the walsender's latch gets
753  * set every time WAL is flushed.
754  */
755 static int
logical_read_xlog_page(XLogReaderState * state,XLogRecPtr targetPagePtr,int reqLen,XLogRecPtr targetRecPtr,char * cur_page,TimeLineID * pageTLI)756 logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
757 					   XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
758 {
759 	XLogRecPtr	flushptr;
760 	int			count;
761 
762 	XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
763 	sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
764 	sendTimeLine = state->currTLI;
765 	sendTimeLineValidUpto = state->currTLIValidUntil;
766 	sendTimeLineNextTLI = state->nextTLI;
767 
768 	/* make sure we have enough WAL available */
769 	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
770 
771 	/* fail if not (implies we are going to shut down) */
772 	if (flushptr < targetPagePtr + reqLen)
773 		return -1;
774 
775 	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
776 		count = XLOG_BLCKSZ;	/* more than one block available */
777 	else
778 		count = flushptr - targetPagePtr;	/* part of the page available */
779 
780 	/* now actually read the data, we know it's there */
781 	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
782 
783 	return count;
784 }
785 
786 /*
787  * Process extra options given to CREATE_REPLICATION_SLOT.
788  */
789 static void
parseCreateReplSlotOptions(CreateReplicationSlotCmd * cmd,bool * reserve_wal,CRSSnapshotAction * snapshot_action)790 parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
791 						   bool *reserve_wal,
792 						   CRSSnapshotAction *snapshot_action)
793 {
794 	ListCell   *lc;
795 	bool		snapshot_action_given = false;
796 	bool		reserve_wal_given = false;
797 
798 	/* Parse options */
799 	foreach(lc, cmd->options)
800 	{
801 		DefElem    *defel = (DefElem *) lfirst(lc);
802 
803 		if (strcmp(defel->defname, "export_snapshot") == 0)
804 		{
805 			if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
806 				ereport(ERROR,
807 						(errcode(ERRCODE_SYNTAX_ERROR),
808 						 errmsg("conflicting or redundant options")));
809 
810 			snapshot_action_given = true;
811 			*snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
812 				CRS_NOEXPORT_SNAPSHOT;
813 		}
814 		else if (strcmp(defel->defname, "use_snapshot") == 0)
815 		{
816 			if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
817 				ereport(ERROR,
818 						(errcode(ERRCODE_SYNTAX_ERROR),
819 						 errmsg("conflicting or redundant options")));
820 
821 			snapshot_action_given = true;
822 			*snapshot_action = CRS_USE_SNAPSHOT;
823 		}
824 		else if (strcmp(defel->defname, "reserve_wal") == 0)
825 		{
826 			if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
827 				ereport(ERROR,
828 						(errcode(ERRCODE_SYNTAX_ERROR),
829 						 errmsg("conflicting or redundant options")));
830 
831 			reserve_wal_given = true;
832 			*reserve_wal = true;
833 		}
834 		else
835 			elog(ERROR, "unrecognized option: %s", defel->defname);
836 	}
837 }
838 
839 /*
840  * Create a new replication slot.
841  */
842 static void
CreateReplicationSlot(CreateReplicationSlotCmd * cmd)843 CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
844 {
845 	const char *snapshot_name = NULL;
846 	char		xloc[MAXFNAMELEN];
847 	char	   *slot_name;
848 	bool		reserve_wal = false;
849 	CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
850 	DestReceiver *dest;
851 	TupOutputState *tstate;
852 	TupleDesc	tupdesc;
853 	Datum		values[4];
854 	bool		nulls[4];
855 
856 	Assert(!MyReplicationSlot);
857 
858 	parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
859 
860 	/* setup state for XLogReadPage */
861 	sendTimeLineIsHistoric = false;
862 	sendTimeLine = ThisTimeLineID;
863 
864 	if (cmd->kind == REPLICATION_KIND_PHYSICAL)
865 	{
866 		ReplicationSlotCreate(cmd->slotname, false,
867 							  cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT);
868 	}
869 	else
870 	{
871 		CheckLogicalDecodingRequirements();
872 
873 		/*
874 		 * Initially create persistent slot as ephemeral - that allows us to
875 		 * nicely handle errors during initialization because it'll get
876 		 * dropped if this transaction fails. We'll make it persistent at the
877 		 * end. Temporary slots can be created as temporary from beginning as
878 		 * they get dropped on error as well.
879 		 */
880 		ReplicationSlotCreate(cmd->slotname, true,
881 							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
882 	}
883 
884 	if (cmd->kind == REPLICATION_KIND_LOGICAL)
885 	{
886 		LogicalDecodingContext *ctx;
887 		bool		need_full_snapshot = false;
888 
889 		/*
890 		 * Do options check early so that we can bail before calling the
891 		 * DecodingContextFindStartpoint which can take long time.
892 		 */
893 		if (snapshot_action == CRS_EXPORT_SNAPSHOT)
894 		{
895 			if (IsTransactionBlock())
896 				ereport(ERROR,
897 						(errmsg("CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT "
898 								"must not be called inside a transaction")));
899 
900 			need_full_snapshot = true;
901 		}
902 		else if (snapshot_action == CRS_USE_SNAPSHOT)
903 		{
904 			if (!IsTransactionBlock())
905 				ereport(ERROR,
906 						(errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
907 								"must be called inside a transaction")));
908 
909 			if (XactIsoLevel != XACT_REPEATABLE_READ)
910 				ereport(ERROR,
911 						(errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
912 								"must be called in REPEATABLE READ isolation mode transaction")));
913 
914 			if (FirstSnapshotSet)
915 				ereport(ERROR,
916 						(errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
917 								"must be called before any query")));
918 
919 			if (IsSubTransaction())
920 				ereport(ERROR,
921 						(errmsg("CREATE_REPLICATION_SLOT ... USE_SNAPSHOT "
922 								"must not be called in a subtransaction")));
923 
924 			need_full_snapshot = true;
925 		}
926 
927 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
928 										logical_read_xlog_page,
929 										WalSndPrepareWrite, WalSndWriteData,
930 										WalSndUpdateProgress);
931 
932 		/*
933 		 * Signal that we don't need the timeout mechanism. We're just
934 		 * creating the replication slot and don't yet accept feedback
935 		 * messages or send keepalives. As we possibly need to wait for
936 		 * further WAL the walsender would otherwise possibly be killed too
937 		 * soon.
938 		 */
939 		last_reply_timestamp = 0;
940 
941 		/* build initial snapshot, might take a while */
942 		DecodingContextFindStartpoint(ctx);
943 
944 		/*
945 		 * Export or use the snapshot if we've been asked to do so.
946 		 *
947 		 * NB. We will convert the snapbuild.c kind of snapshot to normal
948 		 * snapshot when doing this.
949 		 */
950 		if (snapshot_action == CRS_EXPORT_SNAPSHOT)
951 		{
952 			snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
953 		}
954 		else if (snapshot_action == CRS_USE_SNAPSHOT)
955 		{
956 			Snapshot	snap;
957 
958 			snap = SnapBuildInitialSnapshot(ctx->snapshot_builder);
959 			RestoreTransactionSnapshot(snap, MyProc);
960 		}
961 
962 		/* don't need the decoding context anymore */
963 		FreeDecodingContext(ctx);
964 
965 		if (!cmd->temporary)
966 			ReplicationSlotPersist();
967 	}
968 	else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
969 	{
970 		ReplicationSlotReserveWal();
971 
972 		ReplicationSlotMarkDirty();
973 
974 		/* Write this slot to disk if it's a permanent one. */
975 		if (!cmd->temporary)
976 			ReplicationSlotSave();
977 	}
978 
979 	snprintf(xloc, sizeof(xloc), "%X/%X",
980 			 (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
981 			 (uint32) MyReplicationSlot->data.confirmed_flush);
982 
983 	dest = CreateDestReceiver(DestRemoteSimple);
984 	MemSet(nulls, false, sizeof(nulls));
985 
986 	/*----------
987 	 * Need a tuple descriptor representing four columns:
988 	 * - first field: the slot name
989 	 * - second field: LSN at which we became consistent
990 	 * - third field: exported snapshot's name
991 	 * - fourth field: output plugin
992 	 *----------
993 	 */
994 	tupdesc = CreateTemplateTupleDesc(4, false);
995 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
996 							  TEXTOID, -1, 0);
997 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
998 							  TEXTOID, -1, 0);
999 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1000 							  TEXTOID, -1, 0);
1001 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1002 							  TEXTOID, -1, 0);
1003 
1004 	/* prepare for projection of tuples */
1005 	tstate = begin_tup_output_tupdesc(dest, tupdesc);
1006 
1007 	/* slot_name */
1008 	slot_name = NameStr(MyReplicationSlot->data.name);
1009 	values[0] = CStringGetTextDatum(slot_name);
1010 
1011 	/* consistent wal location */
1012 	values[1] = CStringGetTextDatum(xloc);
1013 
1014 	/* snapshot name, or NULL if none */
1015 	if (snapshot_name != NULL)
1016 		values[2] = CStringGetTextDatum(snapshot_name);
1017 	else
1018 		nulls[2] = true;
1019 
1020 	/* plugin, or NULL if none */
1021 	if (cmd->plugin != NULL)
1022 		values[3] = CStringGetTextDatum(cmd->plugin);
1023 	else
1024 		nulls[3] = true;
1025 
1026 	/* send it to dest */
1027 	do_tup_output(tstate, values, nulls);
1028 	end_tup_output(tstate);
1029 
1030 	ReplicationSlotRelease();
1031 }
1032 
1033 /*
1034  * Get rid of a replication slot that is no longer wanted.
1035  */
1036 static void
DropReplicationSlot(DropReplicationSlotCmd * cmd)1037 DropReplicationSlot(DropReplicationSlotCmd *cmd)
1038 {
1039 	ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1040 	EndCommand("DROP_REPLICATION_SLOT", DestRemote);
1041 }
1042 
1043 /*
1044  * Load previously initiated logical slot and prepare for sending data (via
1045  * WalSndLoop).
1046  */
1047 static void
StartLogicalReplication(StartReplicationCmd * cmd)1048 StartLogicalReplication(StartReplicationCmd *cmd)
1049 {
1050 	StringInfoData buf;
1051 
1052 	/* make sure that our requirements are still fulfilled */
1053 	CheckLogicalDecodingRequirements();
1054 
1055 	Assert(!MyReplicationSlot);
1056 
1057 	ReplicationSlotAcquire(cmd->slotname, true);
1058 
1059 	/*
1060 	 * Force a disconnect, so that the decoding code doesn't need to care
1061 	 * about an eventual switch from running in recovery, to running in a
1062 	 * normal environment. Client code is expected to handle reconnects.
1063 	 */
1064 	if (am_cascading_walsender && !RecoveryInProgress())
1065 	{
1066 		ereport(LOG,
1067 				(errmsg("terminating walsender process after promotion")));
1068 		got_STOPPING = true;
1069 	}
1070 
1071 	WalSndSetState(WALSNDSTATE_CATCHUP);
1072 
1073 	/* Send a CopyBothResponse message, and start streaming */
1074 	pq_beginmessage(&buf, 'W');
1075 	pq_sendbyte(&buf, 0);
1076 	pq_sendint(&buf, 0, 2);
1077 	pq_endmessage(&buf);
1078 	pq_flush();
1079 
1080 	/*
1081 	 * Initialize position to the last ack'ed one, then the xlog records begin
1082 	 * to be shipped from that position.
1083 	 */
1084 	logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
1085 												 logical_read_xlog_page,
1086 												 WalSndPrepareWrite,
1087 												 WalSndWriteData,
1088 												 WalSndUpdateProgress);
1089 
1090 	/* Start reading WAL from the oldest required WAL. */
1091 	logical_startptr = MyReplicationSlot->data.restart_lsn;
1092 
1093 	/*
1094 	 * Report the location after which we'll send out further commits as the
1095 	 * current sentPtr.
1096 	 */
1097 	sentPtr = MyReplicationSlot->data.confirmed_flush;
1098 
1099 	/* Also update the sent position status in shared memory */
1100 	SpinLockAcquire(&MyWalSnd->mutex);
1101 	MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn;
1102 	SpinLockRelease(&MyWalSnd->mutex);
1103 
1104 	replication_active = true;
1105 
1106 	SyncRepInitConfig();
1107 
1108 	/* Main loop of walsender */
1109 	WalSndLoop(XLogSendLogical);
1110 
1111 	FreeDecodingContext(logical_decoding_ctx);
1112 	ReplicationSlotRelease();
1113 
1114 	replication_active = false;
1115 	if (got_STOPPING)
1116 		proc_exit(0);
1117 	WalSndSetState(WALSNDSTATE_STARTUP);
1118 
1119 	/* Get out of COPY mode (CommandComplete). */
1120 	EndCommand("COPY 0", DestRemote);
1121 }
1122 
1123 /*
1124  * LogicalDecodingContext 'prepare_write' callback.
1125  *
1126  * Prepare a write into a StringInfo.
1127  *
1128  * Don't do anything lasting in here, it's quite possible that nothing will be done
1129  * with the data.
1130  */
1131 static void
WalSndPrepareWrite(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid,bool last_write)1132 WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
1133 {
1134 	/* can't have sync rep confused by sending the same LSN several times */
1135 	if (!last_write)
1136 		lsn = InvalidXLogRecPtr;
1137 
1138 	resetStringInfo(ctx->out);
1139 
1140 	pq_sendbyte(ctx->out, 'w');
1141 	pq_sendint64(ctx->out, lsn);	/* dataStart */
1142 	pq_sendint64(ctx->out, lsn);	/* walEnd */
1143 
1144 	/*
1145 	 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1146 	 * reserve space here.
1147 	 */
1148 	pq_sendint64(ctx->out, 0);	/* sendtime */
1149 }
1150 
1151 /*
1152  * LogicalDecodingContext 'write' callback.
1153  *
1154  * Actually write out data previously prepared by WalSndPrepareWrite out to
1155  * the network. Take as long as needed, but process replies from the other
1156  * side and check timeouts during that.
1157  */
1158 static void
WalSndWriteData(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid,bool last_write)1159 WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
1160 				bool last_write)
1161 {
1162 	TimestampTz	now;
1163 
1164 	/*
1165 	 * Fill the send timestamp last, so that it is taken as late as possible.
1166 	 * This is somewhat ugly, but the protocol is set as it's already used for
1167 	 * several releases by streaming physical replication.
1168 	 */
1169 	resetStringInfo(&tmpbuf);
1170 	now = GetCurrentTimestamp();
1171 	pq_sendint64(&tmpbuf, now);
1172 	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1173 		   tmpbuf.data, sizeof(int64));
1174 
1175 	/* output previously gathered data in a CopyData packet */
1176 	pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1177 
1178 	CHECK_FOR_INTERRUPTS();
1179 
1180 	/* Try to flush pending output to the client */
1181 	if (pq_flush_if_writable() != 0)
1182 		WalSndShutdown();
1183 
1184 	/* Try taking fast path unless we get too close to walsender timeout. */
1185 	if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
1186 										  wal_sender_timeout / 2) &&
1187 		!pq_is_send_pending())
1188 	{
1189 		return;
1190 	}
1191 
1192 	/* If we have pending write here, go to slow path */
1193 	for (;;)
1194 	{
1195 		int			wakeEvents;
1196 		long		sleeptime;
1197 
1198 		/* Check for input from the client */
1199 		ProcessRepliesIfAny();
1200 
1201 		/* die if timeout was reached */
1202 		WalSndCheckTimeOut();
1203 
1204 		/* Send keepalive if the time has come */
1205 		WalSndKeepaliveIfNecessary();
1206 
1207 		if (!pq_is_send_pending())
1208 			break;
1209 
1210 		sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1211 
1212 		wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1213 			WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
1214 
1215 		/* Sleep until something happens or we time out */
1216 		WaitLatchOrSocket(MyLatch, wakeEvents,
1217 						  MyProcPort->sock, sleeptime,
1218 						  WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1219 
1220 		/*
1221 		 * Emergency bailout if postmaster has died.  This is to avoid the
1222 		 * necessity for manual cleanup of all postmaster children.
1223 		 */
1224 		if (!PostmasterIsAlive())
1225 			exit(1);
1226 
1227 		/* Clear any already-pending wakeups */
1228 		ResetLatch(MyLatch);
1229 
1230 		CHECK_FOR_INTERRUPTS();
1231 
1232 		/* Process any requests or signals received recently */
1233 		if (ConfigReloadPending)
1234 		{
1235 			ConfigReloadPending = false;
1236 			ProcessConfigFile(PGC_SIGHUP);
1237 			SyncRepInitConfig();
1238 		}
1239 
1240 		/* Try to flush pending output to the client */
1241 		if (pq_flush_if_writable() != 0)
1242 			WalSndShutdown();
1243 	}
1244 
1245 	/* reactivate latch so WalSndLoop knows to continue */
1246 	SetLatch(MyLatch);
1247 }
1248 
1249 /*
1250  * LogicalDecodingContext 'progress_update' callback.
1251  *
1252  * Write the current position to the log tracker (see XLogSendPhysical).
1253  */
1254 static void
WalSndUpdateProgress(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid)1255 WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
1256 {
1257 	static TimestampTz sendTime = 0;
1258 	TimestampTz now = GetCurrentTimestamp();
1259 
1260 	/*
1261 	 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1262 	 * avoid flooding the lag tracker when we commit frequently.
1263 	 */
1264 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS	1000
1265 	if (!TimestampDifferenceExceeds(sendTime, now,
1266 									WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1267 		return;
1268 
1269 	LagTrackerWrite(lsn, now);
1270 	sendTime = now;
1271 }
1272 
1273 /*
1274  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1275  *
1276  * Returns end LSN of flushed WAL.  Normally this will be >= loc, but
1277  * if we detect a shutdown request (either from postmaster or client)
1278  * we will return early, so caller must always check.
1279  */
1280 static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)1281 WalSndWaitForWal(XLogRecPtr loc)
1282 {
1283 	int			wakeEvents;
1284 	static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1285 
1286 	/*
1287 	 * Fast path to avoid acquiring the spinlock in case we already know we
1288 	 * have enough WAL available. This is particularly interesting if we're
1289 	 * far behind.
1290 	 */
1291 	if (RecentFlushPtr != InvalidXLogRecPtr &&
1292 		loc <= RecentFlushPtr)
1293 		return RecentFlushPtr;
1294 
1295 	/* Get a more recent flush pointer. */
1296 	if (!RecoveryInProgress())
1297 		RecentFlushPtr = GetFlushRecPtr();
1298 	else
1299 		RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1300 
1301 	for (;;)
1302 	{
1303 		long		sleeptime;
1304 
1305 		/*
1306 		 * Emergency bailout if postmaster has died.  This is to avoid the
1307 		 * necessity for manual cleanup of all postmaster children.
1308 		 */
1309 		if (!PostmasterIsAlive())
1310 			exit(1);
1311 
1312 		/* Clear any already-pending wakeups */
1313 		ResetLatch(MyLatch);
1314 
1315 		CHECK_FOR_INTERRUPTS();
1316 
1317 		/* Process any requests or signals received recently */
1318 		if (ConfigReloadPending)
1319 		{
1320 			ConfigReloadPending = false;
1321 			ProcessConfigFile(PGC_SIGHUP);
1322 			SyncRepInitConfig();
1323 		}
1324 
1325 		/* Check for input from the client */
1326 		ProcessRepliesIfAny();
1327 
1328 		/*
1329 		 * If we're shutting down, trigger pending WAL to be written out,
1330 		 * otherwise we'd possibly end up waiting for WAL that never gets
1331 		 * written, because walwriter has shut down already.
1332 		 */
1333 		if (got_STOPPING)
1334 			XLogBackgroundFlush();
1335 
1336 		/* Update our idea of the currently flushed position. */
1337 		if (!RecoveryInProgress())
1338 			RecentFlushPtr = GetFlushRecPtr();
1339 		else
1340 			RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1341 
1342 		/*
1343 		 * If postmaster asked us to stop, don't wait anymore.
1344 		 *
1345 		 * It's important to do this check after the recomputation of
1346 		 * RecentFlushPtr, so we can send all remaining data before shutting
1347 		 * down.
1348 		 */
1349 		if (got_STOPPING)
1350 			break;
1351 
1352 		/*
1353 		 * We only send regular messages to the client for full decoded
1354 		 * transactions, but a synchronous replication and walsender shutdown
1355 		 * possibly are waiting for a later location. So, before sleeping, we
1356 		 * send a ping containing the flush location. If the receiver is
1357 		 * otherwise idle, this keepalive will trigger a reply. Processing the
1358 		 * reply will update these MyWalSnd locations.
1359 		 */
1360 		if (MyWalSnd->flush < sentPtr &&
1361 			MyWalSnd->write < sentPtr &&
1362 			!waiting_for_ping_response)
1363 			WalSndKeepalive(false);
1364 
1365 		/* check whether we're done */
1366 		if (loc <= RecentFlushPtr)
1367 			break;
1368 
1369 		/* Waiting for new WAL. Since we need to wait, we're now caught up. */
1370 		WalSndCaughtUp = true;
1371 
1372 		/*
1373 		 * Try to flush any pending output to the client.
1374 		 */
1375 		if (pq_flush_if_writable() != 0)
1376 			WalSndShutdown();
1377 
1378 		/*
1379 		 * If we have received CopyDone from the client, sent CopyDone
1380 		 * ourselves, and the output buffer is empty, it's time to exit
1381 		 * streaming, so fail the current WAL fetch request.
1382 		 */
1383 		if (streamingDoneReceiving && streamingDoneSending &&
1384 			!pq_is_send_pending())
1385 			break;
1386 
1387 		/* die if timeout was reached */
1388 		WalSndCheckTimeOut();
1389 
1390 		/* Send keepalive if the time has come */
1391 		WalSndKeepaliveIfNecessary();
1392 
1393 		/*
1394 		 * Sleep until something happens or we time out.  Also wait for the
1395 		 * socket becoming writable, if there's still pending output.
1396 		 * Otherwise we might sit on sendable output data while waiting for
1397 		 * new WAL to be generated.  (But if we have nothing to send, we don't
1398 		 * want to wake on socket-writable.)
1399 		 */
1400 		sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1401 
1402 		wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1403 			WL_SOCKET_READABLE | WL_TIMEOUT;
1404 
1405 		if (pq_is_send_pending())
1406 			wakeEvents |= WL_SOCKET_WRITEABLE;
1407 
1408 		WaitLatchOrSocket(MyLatch, wakeEvents,
1409 						  MyProcPort->sock, sleeptime,
1410 						  WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1411 	}
1412 
1413 	/* reactivate latch so WalSndLoop knows to continue */
1414 	SetLatch(MyLatch);
1415 	return RecentFlushPtr;
1416 }
1417 
1418 /*
1419  * Execute an incoming replication command.
1420  *
1421  * Returns true if the cmd_string was recognized as WalSender command, false
1422  * if not.
1423  */
1424 bool
exec_replication_command(const char * cmd_string)1425 exec_replication_command(const char *cmd_string)
1426 {
1427 	int			parse_rc;
1428 	Node	   *cmd_node;
1429 	MemoryContext cmd_context;
1430 	MemoryContext old_context;
1431 
1432 	/*
1433 	 * If WAL sender has been told that shutdown is getting close, switch its
1434 	 * status accordingly to handle the next replication commands correctly.
1435 	 */
1436 	if (got_STOPPING)
1437 		WalSndSetState(WALSNDSTATE_STOPPING);
1438 
1439 	/*
1440 	 * Throw error if in stopping mode.  We need prevent commands that could
1441 	 * generate WAL while the shutdown checkpoint is being written.  To be
1442 	 * safe, we just prohibit all new commands.
1443 	 */
1444 	if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1445 		ereport(ERROR,
1446 				(errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1447 
1448 	/*
1449 	 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1450 	 * command arrives. Clean up the old stuff if there's anything.
1451 	 */
1452 	SnapBuildClearExportedSnapshot();
1453 
1454 	CHECK_FOR_INTERRUPTS();
1455 
1456 	/*
1457 	 * Parse the command.
1458 	 */
1459 	cmd_context = AllocSetContextCreate(CurrentMemoryContext,
1460 										"Replication command context",
1461 										ALLOCSET_DEFAULT_SIZES);
1462 	old_context = MemoryContextSwitchTo(cmd_context);
1463 
1464 	replication_scanner_init(cmd_string);
1465 	parse_rc = replication_yyparse();
1466 	if (parse_rc != 0)
1467 		ereport(ERROR,
1468 				(errcode(ERRCODE_SYNTAX_ERROR),
1469 				 errmsg_internal("replication command parser returned %d",
1470 								 parse_rc)));
1471 	replication_scanner_finish();
1472 
1473 	cmd_node = replication_parse_result;
1474 
1475 	/*
1476 	 * If it's a SQL command, just clean up our mess and return false; the
1477 	 * caller will take care of executing it.
1478 	 */
1479 	if (IsA(cmd_node, SQLCmd))
1480 	{
1481 		if (MyDatabaseId == InvalidOid)
1482 			ereport(ERROR,
1483 					(errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1484 
1485 		MemoryContextSwitchTo(old_context);
1486 		MemoryContextDelete(cmd_context);
1487 
1488 		/* Tell the caller that this wasn't a WalSender command. */
1489 		return false;
1490 	}
1491 
1492 	/*
1493 	 * Report query to various monitoring facilities.  For this purpose, we
1494 	 * report replication commands just like SQL commands.
1495 	 */
1496 	debug_query_string = cmd_string;
1497 
1498 	pgstat_report_activity(STATE_RUNNING, cmd_string);
1499 
1500 	/*
1501 	 * Log replication command if log_replication_commands is enabled. Even
1502 	 * when it's disabled, log the command with DEBUG1 level for backward
1503 	 * compatibility.
1504 	 */
1505 	ereport(log_replication_commands ? LOG : DEBUG1,
1506 			(errmsg("received replication command: %s", cmd_string)));
1507 
1508 	/*
1509 	 * Disallow replication commands in aborted transaction blocks.
1510 	 */
1511 	if (IsAbortedTransactionBlockState())
1512 		ereport(ERROR,
1513 				(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1514 				 errmsg("current transaction is aborted, "
1515 						"commands ignored until end of transaction block")));
1516 
1517 	CHECK_FOR_INTERRUPTS();
1518 
1519 	/*
1520 	 * Allocate buffers that will be used for each outgoing and incoming
1521 	 * message.  We do this just once per command to reduce palloc overhead.
1522 	 */
1523 	initStringInfo(&output_message);
1524 	initStringInfo(&reply_message);
1525 	initStringInfo(&tmpbuf);
1526 
1527 	switch (cmd_node->type)
1528 	{
1529 		case T_IdentifySystemCmd:
1530 			IdentifySystem();
1531 			break;
1532 
1533 		case T_BaseBackupCmd:
1534 			PreventTransactionChain(true, "BASE_BACKUP");
1535 			SendBaseBackup((BaseBackupCmd *) cmd_node);
1536 			break;
1537 
1538 		case T_CreateReplicationSlotCmd:
1539 			CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
1540 			break;
1541 
1542 		case T_DropReplicationSlotCmd:
1543 			DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
1544 			break;
1545 
1546 		case T_StartReplicationCmd:
1547 			{
1548 				StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1549 
1550 				PreventTransactionChain(true, "START_REPLICATION");
1551 
1552 				if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1553 					StartReplication(cmd);
1554 				else
1555 					StartLogicalReplication(cmd);
1556 				break;
1557 			}
1558 
1559 		case T_TimeLineHistoryCmd:
1560 			PreventTransactionChain(true, "TIMELINE_HISTORY");
1561 			SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
1562 			break;
1563 
1564 		case T_VariableShowStmt:
1565 			{
1566 				DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
1567 				VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1568 
1569 				/* syscache access needs a transaction environment */
1570 				StartTransactionCommand();
1571 				GetPGVariable(n->name, dest);
1572 				CommitTransactionCommand();
1573 			}
1574 			break;
1575 
1576 		default:
1577 			elog(ERROR, "unrecognized replication command node tag: %u",
1578 				 cmd_node->type);
1579 	}
1580 
1581 	/* done */
1582 	MemoryContextSwitchTo(old_context);
1583 	MemoryContextDelete(cmd_context);
1584 
1585 	/* Send CommandComplete message */
1586 	EndCommand("SELECT", DestRemote);
1587 
1588 	/* Report to pgstat that this process is now idle */
1589 	pgstat_report_activity(STATE_IDLE, NULL);
1590 	debug_query_string = NULL;
1591 
1592 	return true;
1593 }
1594 
1595 /*
1596  * Process any incoming messages while streaming. Also checks if the remote
1597  * end has closed the connection.
1598  */
1599 static void
ProcessRepliesIfAny(void)1600 ProcessRepliesIfAny(void)
1601 {
1602 	unsigned char firstchar;
1603 	int			r;
1604 	bool		received = false;
1605 
1606 	last_processing = GetCurrentTimestamp();
1607 
1608 	/*
1609 	 * If we already received a CopyDone from the frontend, any subsequent
1610 	 * message is the beginning of a new command, and should be processed in
1611 	 * the main processing loop.
1612 	 */
1613 	while (!streamingDoneReceiving)
1614 	{
1615 		pq_startmsgread();
1616 		r = pq_getbyte_if_available(&firstchar);
1617 		if (r < 0)
1618 		{
1619 			/* unexpected error or EOF */
1620 			ereport(COMMERROR,
1621 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
1622 					 errmsg("unexpected EOF on standby connection")));
1623 			proc_exit(0);
1624 		}
1625 		if (r == 0)
1626 		{
1627 			/* no data available without blocking */
1628 			pq_endmsgread();
1629 			break;
1630 		}
1631 
1632 		/* Read the message contents */
1633 		resetStringInfo(&reply_message);
1634 		if (pq_getmessage(&reply_message, 0))
1635 		{
1636 			ereport(COMMERROR,
1637 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
1638 					 errmsg("unexpected EOF on standby connection")));
1639 			proc_exit(0);
1640 		}
1641 
1642 		/* Handle the very limited subset of commands expected in this phase */
1643 		switch (firstchar)
1644 		{
1645 				/*
1646 				 * 'd' means a standby reply wrapped in a CopyData packet.
1647 				 */
1648 			case 'd':
1649 				ProcessStandbyMessage();
1650 				received = true;
1651 				break;
1652 
1653 				/*
1654 				 * CopyDone means the standby requested to finish streaming.
1655 				 * Reply with CopyDone, if we had not sent that already.
1656 				 */
1657 			case 'c':
1658 				if (!streamingDoneSending)
1659 				{
1660 					pq_putmessage_noblock('c', NULL, 0);
1661 					streamingDoneSending = true;
1662 				}
1663 
1664 				streamingDoneReceiving = true;
1665 				received = true;
1666 				break;
1667 
1668 				/*
1669 				 * 'X' means that the standby is closing down the socket.
1670 				 */
1671 			case 'X':
1672 				proc_exit(0);
1673 
1674 			default:
1675 				ereport(FATAL,
1676 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
1677 						 errmsg("invalid standby message type \"%c\"",
1678 								firstchar)));
1679 		}
1680 	}
1681 
1682 	/*
1683 	 * Save the last reply timestamp if we've received at least one reply.
1684 	 */
1685 	if (received)
1686 	{
1687 		last_reply_timestamp = last_processing;
1688 		waiting_for_ping_response = false;
1689 	}
1690 }
1691 
1692 /*
1693  * Process a status update message received from standby.
1694  */
1695 static void
ProcessStandbyMessage(void)1696 ProcessStandbyMessage(void)
1697 {
1698 	char		msgtype;
1699 
1700 	/*
1701 	 * Check message type from the first byte.
1702 	 */
1703 	msgtype = pq_getmsgbyte(&reply_message);
1704 
1705 	switch (msgtype)
1706 	{
1707 		case 'r':
1708 			ProcessStandbyReplyMessage();
1709 			break;
1710 
1711 		case 'h':
1712 			ProcessStandbyHSFeedbackMessage();
1713 			break;
1714 
1715 		default:
1716 			ereport(COMMERROR,
1717 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
1718 					 errmsg("unexpected message type \"%c\"", msgtype)));
1719 			proc_exit(0);
1720 	}
1721 }
1722 
1723 /*
1724  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1725  */
1726 static void
PhysicalConfirmReceivedLocation(XLogRecPtr lsn)1727 PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
1728 {
1729 	bool		changed = false;
1730 	ReplicationSlot *slot = MyReplicationSlot;
1731 
1732 	Assert(lsn != InvalidXLogRecPtr);
1733 	SpinLockAcquire(&slot->mutex);
1734 	if (slot->data.restart_lsn != lsn)
1735 	{
1736 		changed = true;
1737 		slot->data.restart_lsn = lsn;
1738 	}
1739 	SpinLockRelease(&slot->mutex);
1740 
1741 	if (changed)
1742 	{
1743 		ReplicationSlotMarkDirty();
1744 		ReplicationSlotsComputeRequiredLSN();
1745 	}
1746 
1747 	/*
1748 	 * One could argue that the slot should be saved to disk now, but that'd
1749 	 * be energy wasted - the worst lost information can do here is give us
1750 	 * wrong information in a statistics view - we'll just potentially be more
1751 	 * conservative in removing files.
1752 	 */
1753 }
1754 
1755 /*
1756  * Regular reply from standby advising of WAL locations on standby server.
1757  */
1758 static void
ProcessStandbyReplyMessage(void)1759 ProcessStandbyReplyMessage(void)
1760 {
1761 	XLogRecPtr	writePtr,
1762 				flushPtr,
1763 				applyPtr;
1764 	bool		replyRequested;
1765 	TimeOffset	writeLag,
1766 				flushLag,
1767 				applyLag;
1768 	bool		clearLagTimes;
1769 	TimestampTz now;
1770 
1771 	static bool fullyAppliedLastTime = false;
1772 
1773 	/* the caller already consumed the msgtype byte */
1774 	writePtr = pq_getmsgint64(&reply_message);
1775 	flushPtr = pq_getmsgint64(&reply_message);
1776 	applyPtr = pq_getmsgint64(&reply_message);
1777 	(void) pq_getmsgint64(&reply_message);	/* sendTime; not used ATM */
1778 	replyRequested = pq_getmsgbyte(&reply_message);
1779 
1780 	elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1781 		 (uint32) (writePtr >> 32), (uint32) writePtr,
1782 		 (uint32) (flushPtr >> 32), (uint32) flushPtr,
1783 		 (uint32) (applyPtr >> 32), (uint32) applyPtr,
1784 		 replyRequested ? " (reply requested)" : "");
1785 
1786 	/* See if we can compute the round-trip lag for these positions. */
1787 	now = GetCurrentTimestamp();
1788 	writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1789 	flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1790 	applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1791 
1792 	/*
1793 	 * If the standby reports that it has fully replayed the WAL in two
1794 	 * consecutive reply messages, then the second such message must result
1795 	 * from wal_receiver_status_interval expiring on the standby.  This is a
1796 	 * convenient time to forget the lag times measured when it last
1797 	 * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1798 	 * until more WAL traffic arrives.
1799 	 */
1800 	clearLagTimes = false;
1801 	if (applyPtr == sentPtr)
1802 	{
1803 		if (fullyAppliedLastTime)
1804 			clearLagTimes = true;
1805 		fullyAppliedLastTime = true;
1806 	}
1807 	else
1808 		fullyAppliedLastTime = false;
1809 
1810 	/* Send a reply if the standby requested one. */
1811 	if (replyRequested)
1812 		WalSndKeepalive(false);
1813 
1814 	/*
1815 	 * Update shared state for this WalSender process based on reply data from
1816 	 * standby.
1817 	 */
1818 	{
1819 		WalSnd	   *walsnd = MyWalSnd;
1820 
1821 		SpinLockAcquire(&walsnd->mutex);
1822 		walsnd->write = writePtr;
1823 		walsnd->flush = flushPtr;
1824 		walsnd->apply = applyPtr;
1825 		if (writeLag != -1 || clearLagTimes)
1826 			walsnd->writeLag = writeLag;
1827 		if (flushLag != -1 || clearLagTimes)
1828 			walsnd->flushLag = flushLag;
1829 		if (applyLag != -1 || clearLagTimes)
1830 			walsnd->applyLag = applyLag;
1831 		SpinLockRelease(&walsnd->mutex);
1832 	}
1833 
1834 	if (!am_cascading_walsender)
1835 		SyncRepReleaseWaiters();
1836 
1837 	/*
1838 	 * Advance our local xmin horizon when the client confirmed a flush.
1839 	 */
1840 	if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1841 	{
1842 		if (SlotIsLogical(MyReplicationSlot))
1843 			LogicalConfirmReceivedLocation(flushPtr);
1844 		else
1845 			PhysicalConfirmReceivedLocation(flushPtr);
1846 	}
1847 }
1848 
1849 /* compute new replication slot xmin horizon if needed */
1850 static void
PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin,TransactionId feedbackCatalogXmin)1851 PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
1852 {
1853 	bool		changed = false;
1854 	ReplicationSlot *slot = MyReplicationSlot;
1855 
1856 	SpinLockAcquire(&slot->mutex);
1857 	MyPgXact->xmin = InvalidTransactionId;
1858 
1859 	/*
1860 	 * For physical replication we don't need the interlock provided by xmin
1861 	 * and effective_xmin since the consequences of a missed increase are
1862 	 * limited to query cancellations, so set both at once.
1863 	 */
1864 	if (!TransactionIdIsNormal(slot->data.xmin) ||
1865 		!TransactionIdIsNormal(feedbackXmin) ||
1866 		TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1867 	{
1868 		changed = true;
1869 		slot->data.xmin = feedbackXmin;
1870 		slot->effective_xmin = feedbackXmin;
1871 	}
1872 	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
1873 		!TransactionIdIsNormal(feedbackCatalogXmin) ||
1874 		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
1875 	{
1876 		changed = true;
1877 		slot->data.catalog_xmin = feedbackCatalogXmin;
1878 		slot->effective_catalog_xmin = feedbackCatalogXmin;
1879 	}
1880 	SpinLockRelease(&slot->mutex);
1881 
1882 	if (changed)
1883 	{
1884 		ReplicationSlotMarkDirty();
1885 		ReplicationSlotsComputeRequiredXmin(false);
1886 	}
1887 }
1888 
1889 /*
1890  * Check that the provided xmin/epoch are sane, that is, not in the future
1891  * and not so far back as to be already wrapped around.
1892  *
1893  * Epoch of nextXid should be same as standby, or if the counter has
1894  * wrapped, then one greater than standby.
1895  *
1896  * This check doesn't care about whether clog exists for these xids
1897  * at all.
1898  */
1899 static bool
TransactionIdInRecentPast(TransactionId xid,uint32 epoch)1900 TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
1901 {
1902 	TransactionId nextXid;
1903 	uint32		nextEpoch;
1904 
1905 	GetNextXidAndEpoch(&nextXid, &nextEpoch);
1906 
1907 	if (xid <= nextXid)
1908 	{
1909 		if (epoch != nextEpoch)
1910 			return false;
1911 	}
1912 	else
1913 	{
1914 		if (epoch + 1 != nextEpoch)
1915 			return false;
1916 	}
1917 
1918 	if (!TransactionIdPrecedesOrEquals(xid, nextXid))
1919 		return false;			/* epoch OK, but it's wrapped around */
1920 
1921 	return true;
1922 }
1923 
1924 /*
1925  * Hot Standby feedback
1926  */
1927 static void
ProcessStandbyHSFeedbackMessage(void)1928 ProcessStandbyHSFeedbackMessage(void)
1929 {
1930 	TransactionId feedbackXmin;
1931 	uint32		feedbackEpoch;
1932 	TransactionId feedbackCatalogXmin;
1933 	uint32		feedbackCatalogEpoch;
1934 
1935 	/*
1936 	 * Decipher the reply message. The caller already consumed the msgtype
1937 	 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
1938 	 * of this message.
1939 	 */
1940 	(void) pq_getmsgint64(&reply_message);	/* sendTime; not used ATM */
1941 	feedbackXmin = pq_getmsgint(&reply_message, 4);
1942 	feedbackEpoch = pq_getmsgint(&reply_message, 4);
1943 	feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
1944 	feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
1945 
1946 	elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
1947 		 feedbackXmin,
1948 		 feedbackEpoch,
1949 		 feedbackCatalogXmin,
1950 		 feedbackCatalogEpoch);
1951 
1952 	/*
1953 	 * Unset WalSender's xmins if the feedback message values are invalid.
1954 	 * This happens when the downstream turned hot_standby_feedback off.
1955 	 */
1956 	if (!TransactionIdIsNormal(feedbackXmin)
1957 		&& !TransactionIdIsNormal(feedbackCatalogXmin))
1958 	{
1959 		MyPgXact->xmin = InvalidTransactionId;
1960 		if (MyReplicationSlot != NULL)
1961 			PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
1962 		return;
1963 	}
1964 
1965 	/*
1966 	 * Check that the provided xmin/epoch are sane, that is, not in the future
1967 	 * and not so far back as to be already wrapped around.  Ignore if not.
1968 	 */
1969 	if (TransactionIdIsNormal(feedbackXmin) &&
1970 		!TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
1971 		return;
1972 
1973 	if (TransactionIdIsNormal(feedbackCatalogXmin) &&
1974 		!TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
1975 		return;
1976 
1977 	/*
1978 	 * Set the WalSender's xmin equal to the standby's requested xmin, so that
1979 	 * the xmin will be taken into account by GetOldestXmin.  This will hold
1980 	 * back the removal of dead rows and thereby prevent the generation of
1981 	 * cleanup conflicts on the standby server.
1982 	 *
1983 	 * There is a small window for a race condition here: although we just
1984 	 * checked that feedbackXmin precedes nextXid, the nextXid could have
1985 	 * gotten advanced between our fetching it and applying the xmin below,
1986 	 * perhaps far enough to make feedbackXmin wrap around.  In that case the
1987 	 * xmin we set here would be "in the future" and have no effect.  No point
1988 	 * in worrying about this since it's too late to save the desired data
1989 	 * anyway.  Assuming that the standby sends us an increasing sequence of
1990 	 * xmins, this could only happen during the first reply cycle, else our
1991 	 * own xmin would prevent nextXid from advancing so far.
1992 	 *
1993 	 * We don't bother taking the ProcArrayLock here.  Setting the xmin field
1994 	 * is assumed atomic, and there's no real need to prevent a concurrent
1995 	 * GetOldestXmin.  (If we're moving our xmin forward, this is obviously
1996 	 * safe, and if we're moving it backwards, well, the data is at risk
1997 	 * already since a VACUUM could have just finished calling GetOldestXmin.)
1998 	 *
1999 	 * If we're using a replication slot we reserve the xmin via that,
2000 	 * otherwise via the walsender's PGXACT entry. We can only track the
2001 	 * catalog xmin separately when using a slot, so we store the least of the
2002 	 * two provided when not using a slot.
2003 	 *
2004 	 * XXX: It might make sense to generalize the ephemeral slot concept and
2005 	 * always use the slot mechanism to handle the feedback xmin.
2006 	 */
2007 	if (MyReplicationSlot != NULL)	/* XXX: persistency configurable? */
2008 		PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2009 	else
2010 	{
2011 		if (TransactionIdIsNormal(feedbackCatalogXmin)
2012 			&& TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2013 			MyPgXact->xmin = feedbackCatalogXmin;
2014 		else
2015 			MyPgXact->xmin = feedbackXmin;
2016 	}
2017 }
2018 
2019 /*
2020  * Compute how long send/receive loops should sleep.
2021  *
2022  * If wal_sender_timeout is enabled we want to wake up in time to send
2023  * keepalives and to abort the connection if wal_sender_timeout has been
2024  * reached.
2025  */
2026 static long
WalSndComputeSleeptime(TimestampTz now)2027 WalSndComputeSleeptime(TimestampTz now)
2028 {
2029 	long		sleeptime = 10000;	/* 10 s */
2030 
2031 	if (wal_sender_timeout > 0 && last_reply_timestamp > 0)
2032 	{
2033 		TimestampTz wakeup_time;
2034 
2035 		/*
2036 		 * At the latest stop sleeping once wal_sender_timeout has been
2037 		 * reached.
2038 		 */
2039 		wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2040 												  wal_sender_timeout);
2041 
2042 		/*
2043 		 * If no ping has been sent yet, wakeup when it's time to do so.
2044 		 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2045 		 * the timeout passed without a response.
2046 		 */
2047 		if (!waiting_for_ping_response)
2048 			wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2049 													  wal_sender_timeout / 2);
2050 
2051 		/* Compute relative time until wakeup. */
2052 		sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2053 	}
2054 
2055 	return sleeptime;
2056 }
2057 
2058 /*
2059  * Check whether there have been responses by the client within
2060  * wal_sender_timeout and shutdown if not.  Using last_processing as the
2061  * reference point avoids counting server-side stalls against the client.
2062  * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2063  * postdate last_processing by more than wal_sender_timeout.  If that happens,
2064  * the client must reply almost immediately to avoid a timeout.  This rarely
2065  * affects the default configuration, under which clients spontaneously send a
2066  * message every standby_message_timeout = wal_sender_timeout/6 = 10s.  We
2067  * could eliminate that problem by recognizing timeout expiration at
2068  * wal_sender_timeout/2 after the keepalive.
2069  */
2070 static void
WalSndCheckTimeOut(void)2071 WalSndCheckTimeOut(void)
2072 {
2073 	TimestampTz timeout;
2074 
2075 	/* don't bail out if we're doing something that doesn't require timeouts */
2076 	if (last_reply_timestamp <= 0)
2077 		return;
2078 
2079 	timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
2080 										  wal_sender_timeout);
2081 
2082 	if (wal_sender_timeout > 0 && last_processing >= timeout)
2083 	{
2084 		/*
2085 		 * Since typically expiration of replication timeout means
2086 		 * communication problem, we don't send the error message to the
2087 		 * standby.
2088 		 */
2089 		ereport(COMMERROR,
2090 				(errmsg("terminating walsender process due to replication timeout")));
2091 
2092 		WalSndShutdown();
2093 	}
2094 }
2095 
2096 /* Main loop of walsender process that streams the WAL over Copy messages. */
2097 static void
WalSndLoop(WalSndSendDataCallback send_data)2098 WalSndLoop(WalSndSendDataCallback send_data)
2099 {
2100 	/*
2101 	 * Initialize the last reply timestamp. That enables timeout processing
2102 	 * from hereon.
2103 	 */
2104 	last_reply_timestamp = GetCurrentTimestamp();
2105 	waiting_for_ping_response = false;
2106 
2107 	/*
2108 	 * Loop until we reach the end of this timeline or the client requests to
2109 	 * stop streaming.
2110 	 */
2111 	for (;;)
2112 	{
2113 		/*
2114 		 * Emergency bailout if postmaster has died.  This is to avoid the
2115 		 * necessity for manual cleanup of all postmaster children.
2116 		 */
2117 		if (!PostmasterIsAlive())
2118 			exit(1);
2119 
2120 		/* Clear any already-pending wakeups */
2121 		ResetLatch(MyLatch);
2122 
2123 		CHECK_FOR_INTERRUPTS();
2124 
2125 		/* Process any requests or signals received recently */
2126 		if (ConfigReloadPending)
2127 		{
2128 			ConfigReloadPending = false;
2129 			ProcessConfigFile(PGC_SIGHUP);
2130 			SyncRepInitConfig();
2131 		}
2132 
2133 		/* Check for input from the client */
2134 		ProcessRepliesIfAny();
2135 
2136 		/*
2137 		 * If we have received CopyDone from the client, sent CopyDone
2138 		 * ourselves, and the output buffer is empty, it's time to exit
2139 		 * streaming.
2140 		 */
2141 		if (streamingDoneReceiving && streamingDoneSending &&
2142 			!pq_is_send_pending())
2143 			break;
2144 
2145 		/*
2146 		 * If we don't have any pending data in the output buffer, try to send
2147 		 * some more.  If there is some, we don't bother to call send_data
2148 		 * again until we've flushed it ... but we'd better assume we are not
2149 		 * caught up.
2150 		 */
2151 		if (!pq_is_send_pending())
2152 			send_data();
2153 		else
2154 			WalSndCaughtUp = false;
2155 
2156 		/* Try to flush pending output to the client */
2157 		if (pq_flush_if_writable() != 0)
2158 			WalSndShutdown();
2159 
2160 		/* If nothing remains to be sent right now ... */
2161 		if (WalSndCaughtUp && !pq_is_send_pending())
2162 		{
2163 			/*
2164 			 * If we're in catchup state, move to streaming.  This is an
2165 			 * important state change for users to know about, since before
2166 			 * this point data loss might occur if the primary dies and we
2167 			 * need to failover to the standby. The state change is also
2168 			 * important for synchronous replication, since commits that
2169 			 * started to wait at that point might wait for some time.
2170 			 */
2171 			if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2172 			{
2173 				ereport(DEBUG1,
2174 						(errmsg("\"%s\" has now caught up with upstream server",
2175 								application_name)));
2176 				WalSndSetState(WALSNDSTATE_STREAMING);
2177 			}
2178 
2179 			/*
2180 			 * When SIGUSR2 arrives, we send any outstanding logs up to the
2181 			 * shutdown checkpoint record (i.e., the latest record), wait for
2182 			 * them to be replicated to the standby, and exit. This may be a
2183 			 * normal termination at shutdown, or a promotion, the walsender
2184 			 * is not sure which.
2185 			 */
2186 			if (got_SIGUSR2)
2187 				WalSndDone(send_data);
2188 		}
2189 
2190 		/* Check for replication timeout. */
2191 		WalSndCheckTimeOut();
2192 
2193 		/* Send keepalive if the time has come */
2194 		WalSndKeepaliveIfNecessary();
2195 
2196 		/*
2197 		 * We don't block if not caught up, unless there is unsent data
2198 		 * pending in which case we'd better block until the socket is
2199 		 * write-ready.  This test is only needed for the case where the
2200 		 * send_data callback handled a subset of the available data but then
2201 		 * pq_flush_if_writable flushed it all --- we should immediately try
2202 		 * to send more.
2203 		 */
2204 		if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
2205 		{
2206 			long		sleeptime;
2207 			int			wakeEvents;
2208 
2209 			wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT;
2210 
2211 			if (!streamingDoneReceiving)
2212 				wakeEvents |= WL_SOCKET_READABLE;
2213 
2214 			/*
2215 			 * Use fresh timestamp, not last_processed, to reduce the chance
2216 			 * of reaching wal_sender_timeout before sending a keepalive.
2217 			 */
2218 			sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
2219 
2220 			if (pq_is_send_pending())
2221 				wakeEvents |= WL_SOCKET_WRITEABLE;
2222 
2223 			/* Sleep until something happens or we time out */
2224 			WaitLatchOrSocket(MyLatch, wakeEvents,
2225 							  MyProcPort->sock, sleeptime,
2226 							  WAIT_EVENT_WAL_SENDER_MAIN);
2227 		}
2228 	}
2229 	return;
2230 }
2231 
2232 /* Initialize a per-walsender data structure for this walsender process */
2233 static void
InitWalSenderSlot(void)2234 InitWalSenderSlot(void)
2235 {
2236 	int			i;
2237 
2238 	/*
2239 	 * WalSndCtl should be set up already (we inherit this by fork() or
2240 	 * EXEC_BACKEND mechanism from the postmaster).
2241 	 */
2242 	Assert(WalSndCtl != NULL);
2243 	Assert(MyWalSnd == NULL);
2244 
2245 	/*
2246 	 * Find a free walsender slot and reserve it. If this fails, we must be
2247 	 * out of WalSnd structures.
2248 	 */
2249 	for (i = 0; i < max_wal_senders; i++)
2250 	{
2251 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
2252 
2253 		SpinLockAcquire(&walsnd->mutex);
2254 
2255 		if (walsnd->pid != 0)
2256 		{
2257 			SpinLockRelease(&walsnd->mutex);
2258 			continue;
2259 		}
2260 		else
2261 		{
2262 			/*
2263 			 * Found a free slot. Reserve it for us.
2264 			 */
2265 			walsnd->pid = MyProcPid;
2266 			walsnd->state = WALSNDSTATE_STARTUP;
2267 			walsnd->sentPtr = InvalidXLogRecPtr;
2268 			walsnd->needreload = false;
2269 			walsnd->write = InvalidXLogRecPtr;
2270 			walsnd->flush = InvalidXLogRecPtr;
2271 			walsnd->apply = InvalidXLogRecPtr;
2272 			walsnd->writeLag = -1;
2273 			walsnd->flushLag = -1;
2274 			walsnd->applyLag = -1;
2275 			walsnd->sync_standby_priority = 0;
2276 			walsnd->latch = &MyProc->procLatch;
2277 			SpinLockRelease(&walsnd->mutex);
2278 			/* don't need the lock anymore */
2279 			MyWalSnd = (WalSnd *) walsnd;
2280 
2281 			break;
2282 		}
2283 	}
2284 	if (MyWalSnd == NULL)
2285 		ereport(FATAL,
2286 				(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2287 				 errmsg("number of requested standby connections "
2288 						"exceeds max_wal_senders (currently %d)",
2289 						max_wal_senders)));
2290 
2291 	/* Arrange to clean up at walsender exit */
2292 	on_shmem_exit(WalSndKill, 0);
2293 }
2294 
2295 /* Destroy the per-walsender data structure for this walsender process */
2296 static void
WalSndKill(int code,Datum arg)2297 WalSndKill(int code, Datum arg)
2298 {
2299 	WalSnd	   *walsnd = MyWalSnd;
2300 
2301 	Assert(walsnd != NULL);
2302 
2303 	MyWalSnd = NULL;
2304 
2305 	SpinLockAcquire(&walsnd->mutex);
2306 	/* clear latch while holding the spinlock, so it can safely be read */
2307 	walsnd->latch = NULL;
2308 	/* Mark WalSnd struct as no longer being in use. */
2309 	walsnd->pid = 0;
2310 	SpinLockRelease(&walsnd->mutex);
2311 }
2312 
2313 /*
2314  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
2315  *
2316  * XXX probably this should be improved to suck data directly from the
2317  * WAL buffers when possible.
2318  *
2319  * Will open, and keep open, one WAL segment stored in the global file
2320  * descriptor sendFile. This means if XLogRead is used once, there will
2321  * always be one descriptor left open until the process ends, but never
2322  * more than one.
2323  */
2324 static void
XLogRead(char * buf,XLogRecPtr startptr,Size count)2325 XLogRead(char *buf, XLogRecPtr startptr, Size count)
2326 {
2327 	char	   *p;
2328 	XLogRecPtr	recptr;
2329 	Size		nbytes;
2330 	XLogSegNo	segno;
2331 
2332 retry:
2333 	p = buf;
2334 	recptr = startptr;
2335 	nbytes = count;
2336 
2337 	while (nbytes > 0)
2338 	{
2339 		uint32		startoff;
2340 		int			segbytes;
2341 		int			readbytes;
2342 
2343 		startoff = recptr % XLogSegSize;
2344 
2345 		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
2346 		{
2347 			char		path[MAXPGPATH];
2348 
2349 			/* Switch to another logfile segment */
2350 			if (sendFile >= 0)
2351 				close(sendFile);
2352 
2353 			XLByteToSeg(recptr, sendSegNo);
2354 
2355 			/*-------
2356 			 * When reading from a historic timeline, and there is a timeline
2357 			 * switch within this segment, read from the WAL segment belonging
2358 			 * to the new timeline.
2359 			 *
2360 			 * For example, imagine that this server is currently on timeline
2361 			 * 5, and we're streaming timeline 4. The switch from timeline 4
2362 			 * to 5 happened at 0/13002088. In pg_wal, we have these files:
2363 			 *
2364 			 * ...
2365 			 * 000000040000000000000012
2366 			 * 000000040000000000000013
2367 			 * 000000050000000000000013
2368 			 * 000000050000000000000014
2369 			 * ...
2370 			 *
2371 			 * In this situation, when requested to send the WAL from
2372 			 * segment 0x13, on timeline 4, we read the WAL from file
2373 			 * 000000050000000000000013. Archive recovery prefers files from
2374 			 * newer timelines, so if the segment was restored from the
2375 			 * archive on this server, the file belonging to the old timeline,
2376 			 * 000000040000000000000013, might not exist. Their contents are
2377 			 * equal up to the switchpoint, because at a timeline switch, the
2378 			 * used portion of the old segment is copied to the new file.
2379 			 *-------
2380 			 */
2381 			curFileTimeLine = sendTimeLine;
2382 			if (sendTimeLineIsHistoric)
2383 			{
2384 				XLogSegNo	endSegNo;
2385 
2386 				XLByteToSeg(sendTimeLineValidUpto, endSegNo);
2387 				if (sendSegNo == endSegNo)
2388 					curFileTimeLine = sendTimeLineNextTLI;
2389 			}
2390 
2391 			XLogFilePath(path, curFileTimeLine, sendSegNo);
2392 
2393 			sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2394 			if (sendFile < 0)
2395 			{
2396 				/*
2397 				 * If the file is not found, assume it's because the standby
2398 				 * asked for a too old WAL segment that has already been
2399 				 * removed or recycled.
2400 				 */
2401 				if (errno == ENOENT)
2402 					ereport(ERROR,
2403 							(errcode_for_file_access(),
2404 							 errmsg("requested WAL segment %s has already been removed",
2405 									XLogFileNameP(curFileTimeLine, sendSegNo))));
2406 				else
2407 					ereport(ERROR,
2408 							(errcode_for_file_access(),
2409 							 errmsg("could not open file \"%s\": %m",
2410 									path)));
2411 			}
2412 			sendOff = 0;
2413 		}
2414 
2415 		/* Need to seek in the file? */
2416 		if (sendOff != startoff)
2417 		{
2418 			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2419 				ereport(ERROR,
2420 						(errcode_for_file_access(),
2421 						 errmsg("could not seek in log segment %s to offset %u: %m",
2422 								XLogFileNameP(curFileTimeLine, sendSegNo),
2423 								startoff)));
2424 			sendOff = startoff;
2425 		}
2426 
2427 		/* How many bytes are within this segment? */
2428 		if (nbytes > (XLogSegSize - startoff))
2429 			segbytes = XLogSegSize - startoff;
2430 		else
2431 			segbytes = nbytes;
2432 
2433 		pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
2434 		readbytes = read(sendFile, p, segbytes);
2435 		pgstat_report_wait_end();
2436 		if (readbytes <= 0)
2437 		{
2438 			ereport(ERROR,
2439 					(errcode_for_file_access(),
2440 					 errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2441 							XLogFileNameP(curFileTimeLine, sendSegNo),
2442 							sendOff, (unsigned long) segbytes)));
2443 		}
2444 
2445 		/* Update state for read */
2446 		recptr += readbytes;
2447 
2448 		sendOff += readbytes;
2449 		nbytes -= readbytes;
2450 		p += readbytes;
2451 	}
2452 
2453 	/*
2454 	 * After reading into the buffer, check that what we read was valid. We do
2455 	 * this after reading, because even though the segment was present when we
2456 	 * opened it, it might get recycled or removed while we read it. The
2457 	 * read() succeeds in that case, but the data we tried to read might
2458 	 * already have been overwritten with new WAL records.
2459 	 */
2460 	XLByteToSeg(startptr, segno);
2461 	CheckXLogRemoved(segno, ThisTimeLineID);
2462 
2463 	/*
2464 	 * During recovery, the currently-open WAL file might be replaced with the
2465 	 * file of the same name retrieved from archive. So we always need to
2466 	 * check what we read was valid after reading into the buffer. If it's
2467 	 * invalid, we try to open and read the file again.
2468 	 */
2469 	if (am_cascading_walsender)
2470 	{
2471 		WalSnd	   *walsnd = MyWalSnd;
2472 		bool		reload;
2473 
2474 		SpinLockAcquire(&walsnd->mutex);
2475 		reload = walsnd->needreload;
2476 		walsnd->needreload = false;
2477 		SpinLockRelease(&walsnd->mutex);
2478 
2479 		if (reload && sendFile >= 0)
2480 		{
2481 			close(sendFile);
2482 			sendFile = -1;
2483 
2484 			goto retry;
2485 		}
2486 	}
2487 }
2488 
2489 /*
2490  * Send out the WAL in its normal physical/stored form.
2491  *
2492  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2493  * but not yet sent to the client, and buffer it in the libpq output
2494  * buffer.
2495  *
2496  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2497  * otherwise WalSndCaughtUp is set to false.
2498  */
2499 static void
XLogSendPhysical(void)2500 XLogSendPhysical(void)
2501 {
2502 	XLogRecPtr	SendRqstPtr;
2503 	XLogRecPtr	startptr;
2504 	XLogRecPtr	endptr;
2505 	Size		nbytes;
2506 
2507 	/* If requested switch the WAL sender to the stopping state. */
2508 	if (got_STOPPING)
2509 		WalSndSetState(WALSNDSTATE_STOPPING);
2510 
2511 	if (streamingDoneSending)
2512 	{
2513 		WalSndCaughtUp = true;
2514 		return;
2515 	}
2516 
2517 	/* Figure out how far we can safely send the WAL. */
2518 	if (sendTimeLineIsHistoric)
2519 	{
2520 		/*
2521 		 * Streaming an old timeline that's in this server's history, but is
2522 		 * not the one we're currently inserting or replaying. It can be
2523 		 * streamed up to the point where we switched off that timeline.
2524 		 */
2525 		SendRqstPtr = sendTimeLineValidUpto;
2526 	}
2527 	else if (am_cascading_walsender)
2528 	{
2529 		/*
2530 		 * Streaming the latest timeline on a standby.
2531 		 *
2532 		 * Attempt to send all WAL that has already been replayed, so that we
2533 		 * know it's valid. If we're receiving WAL through streaming
2534 		 * replication, it's also OK to send any WAL that has been received
2535 		 * but not replayed.
2536 		 *
2537 		 * The timeline we're recovering from can change, or we can be
2538 		 * promoted. In either case, the current timeline becomes historic. We
2539 		 * need to detect that so that we don't try to stream past the point
2540 		 * where we switched to another timeline. We check for promotion or
2541 		 * timeline switch after calculating FlushPtr, to avoid a race
2542 		 * condition: if the timeline becomes historic just after we checked
2543 		 * that it was still current, it's still be OK to stream it up to the
2544 		 * FlushPtr that was calculated before it became historic.
2545 		 */
2546 		bool		becameHistoric = false;
2547 
2548 		SendRqstPtr = GetStandbyFlushRecPtr();
2549 
2550 		if (!RecoveryInProgress())
2551 		{
2552 			/*
2553 			 * We have been promoted. RecoveryInProgress() updated
2554 			 * ThisTimeLineID to the new current timeline.
2555 			 */
2556 			am_cascading_walsender = false;
2557 			becameHistoric = true;
2558 		}
2559 		else
2560 		{
2561 			/*
2562 			 * Still a cascading standby. But is the timeline we're sending
2563 			 * still the one recovery is recovering from? ThisTimeLineID was
2564 			 * updated by the GetStandbyFlushRecPtr() call above.
2565 			 */
2566 			if (sendTimeLine != ThisTimeLineID)
2567 				becameHistoric = true;
2568 		}
2569 
2570 		if (becameHistoric)
2571 		{
2572 			/*
2573 			 * The timeline we were sending has become historic. Read the
2574 			 * timeline history file of the new timeline to see where exactly
2575 			 * we forked off from the timeline we were sending.
2576 			 */
2577 			List	   *history;
2578 
2579 			history = readTimeLineHistory(ThisTimeLineID);
2580 			sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
2581 
2582 			Assert(sendTimeLine < sendTimeLineNextTLI);
2583 			list_free_deep(history);
2584 
2585 			sendTimeLineIsHistoric = true;
2586 
2587 			SendRqstPtr = sendTimeLineValidUpto;
2588 		}
2589 	}
2590 	else
2591 	{
2592 		/*
2593 		 * Streaming the current timeline on a master.
2594 		 *
2595 		 * Attempt to send all data that's already been written out and
2596 		 * fsync'd to disk.  We cannot go further than what's been written out
2597 		 * given the current implementation of XLogRead().  And in any case
2598 		 * it's unsafe to send WAL that is not securely down to disk on the
2599 		 * master: if the master subsequently crashes and restarts, standbys
2600 		 * must not have applied any WAL that got lost on the master.
2601 		 */
2602 		SendRqstPtr = GetFlushRecPtr();
2603 	}
2604 
2605 	/*
2606 	 * Record the current system time as an approximation of the time at which
2607 	 * this WAL location was written for the purposes of lag tracking.
2608 	 *
2609 	 * In theory we could make XLogFlush() record a time in shmem whenever WAL
2610 	 * is flushed and we could get that time as well as the LSN when we call
2611 	 * GetFlushRecPtr() above (and likewise for the cascading standby
2612 	 * equivalent), but rather than putting any new code into the hot WAL path
2613 	 * it seems good enough to capture the time here.  We should reach this
2614 	 * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2615 	 * may take some time, we read the WAL flush pointer and take the time
2616 	 * very close to together here so that we'll get a later position if it is
2617 	 * still moving.
2618 	 *
2619 	 * Because LagTrackerWriter ignores samples when the LSN hasn't advanced,
2620 	 * this gives us a cheap approximation for the WAL flush time for this
2621 	 * LSN.
2622 	 *
2623 	 * Note that the LSN is not necessarily the LSN for the data contained in
2624 	 * the present message; it's the end of the WAL, which might be further
2625 	 * ahead.  All the lag tracking machinery cares about is finding out when
2626 	 * that arbitrary LSN is eventually reported as written, flushed and
2627 	 * applied, so that it can measure the elapsed time.
2628 	 */
2629 	LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2630 
2631 	/*
2632 	 * If this is a historic timeline and we've reached the point where we
2633 	 * forked to the next timeline, stop streaming.
2634 	 *
2635 	 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2636 	 * startup process will normally replay all WAL that has been received
2637 	 * from the master, before promoting, but if the WAL streaming is
2638 	 * terminated at a WAL page boundary, the valid portion of the timeline
2639 	 * might end in the middle of a WAL record. We might've already sent the
2640 	 * first half of that partial WAL record to the cascading standby, so that
2641 	 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2642 	 * replay the partial WAL record either, so it can still follow our
2643 	 * timeline switch.
2644 	 */
2645 	if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
2646 	{
2647 		/* close the current file. */
2648 		if (sendFile >= 0)
2649 			close(sendFile);
2650 		sendFile = -1;
2651 
2652 		/* Send CopyDone */
2653 		pq_putmessage_noblock('c', NULL, 0);
2654 		streamingDoneSending = true;
2655 
2656 		WalSndCaughtUp = true;
2657 
2658 		elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2659 			 (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
2660 			 (uint32) (sentPtr >> 32), (uint32) sentPtr);
2661 		return;
2662 	}
2663 
2664 	/* Do we have any work to do? */
2665 	Assert(sentPtr <= SendRqstPtr);
2666 	if (SendRqstPtr <= sentPtr)
2667 	{
2668 		WalSndCaughtUp = true;
2669 		return;
2670 	}
2671 
2672 	/*
2673 	 * Figure out how much to send in one message. If there's no more than
2674 	 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2675 	 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2676 	 *
2677 	 * The rounding is not only for performance reasons. Walreceiver relies on
2678 	 * the fact that we never split a WAL record across two messages. Since a
2679 	 * long WAL record is split at page boundary into continuation records,
2680 	 * page boundary is always a safe cut-off point. We also assume that
2681 	 * SendRqstPtr never points to the middle of a WAL record.
2682 	 */
2683 	startptr = sentPtr;
2684 	endptr = startptr;
2685 	endptr += MAX_SEND_SIZE;
2686 
2687 	/* if we went beyond SendRqstPtr, back off */
2688 	if (SendRqstPtr <= endptr)
2689 	{
2690 		endptr = SendRqstPtr;
2691 		if (sendTimeLineIsHistoric)
2692 			WalSndCaughtUp = false;
2693 		else
2694 			WalSndCaughtUp = true;
2695 	}
2696 	else
2697 	{
2698 		/* round down to page boundary. */
2699 		endptr -= (endptr % XLOG_BLCKSZ);
2700 		WalSndCaughtUp = false;
2701 	}
2702 
2703 	nbytes = endptr - startptr;
2704 	Assert(nbytes <= MAX_SEND_SIZE);
2705 
2706 	/*
2707 	 * OK to read and send the slice.
2708 	 */
2709 	resetStringInfo(&output_message);
2710 	pq_sendbyte(&output_message, 'w');
2711 
2712 	pq_sendint64(&output_message, startptr);	/* dataStart */
2713 	pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2714 	pq_sendint64(&output_message, 0);	/* sendtime, filled in last */
2715 
2716 	/*
2717 	 * Read the log directly into the output buffer to avoid extra memcpy
2718 	 * calls.
2719 	 */
2720 	enlargeStringInfo(&output_message, nbytes);
2721 	XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2722 	output_message.len += nbytes;
2723 	output_message.data[output_message.len] = '\0';
2724 
2725 	/*
2726 	 * Fill the send timestamp last, so that it is taken as late as possible.
2727 	 */
2728 	resetStringInfo(&tmpbuf);
2729 	pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2730 	memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2731 		   tmpbuf.data, sizeof(int64));
2732 
2733 	pq_putmessage_noblock('d', output_message.data, output_message.len);
2734 
2735 	sentPtr = endptr;
2736 
2737 	/* Update shared memory status */
2738 	{
2739 		WalSnd	   *walsnd = MyWalSnd;
2740 
2741 		SpinLockAcquire(&walsnd->mutex);
2742 		walsnd->sentPtr = sentPtr;
2743 		SpinLockRelease(&walsnd->mutex);
2744 	}
2745 
2746 	/* Report progress of XLOG streaming in PS display */
2747 	if (update_process_title)
2748 	{
2749 		char		activitymsg[50];
2750 
2751 		snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2752 				 (uint32) (sentPtr >> 32), (uint32) sentPtr);
2753 		set_ps_display(activitymsg, false);
2754 	}
2755 
2756 	return;
2757 }
2758 
2759 /*
2760  * Stream out logically decoded data.
2761  */
2762 static void
XLogSendLogical(void)2763 XLogSendLogical(void)
2764 {
2765 	XLogRecord *record;
2766 	char	   *errm;
2767 	XLogRecPtr	flushPtr;
2768 
2769 	/*
2770 	 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2771 	 * true in WalSndWaitForWal, if we're actually waiting. We also set to
2772 	 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2773 	 * didn't wait - i.e. when we're shutting down.
2774 	 */
2775 	WalSndCaughtUp = false;
2776 
2777 	record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
2778 	logical_startptr = InvalidXLogRecPtr;
2779 
2780 	/* xlog record was invalid */
2781 	if (errm != NULL)
2782 		elog(ERROR, "%s", errm);
2783 
2784 	/*
2785 	 * We'll use the current flush point to determine whether we've caught up.
2786 	 */
2787 	flushPtr = GetFlushRecPtr();
2788 
2789 	if (record != NULL)
2790 	{
2791 		/*
2792 		 * Note the lack of any call to LagTrackerWrite() which is handled by
2793 		 * WalSndUpdateProgress which is called by output plugin through
2794 		 * logical decoding write api.
2795 		 */
2796 		LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2797 
2798 		sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2799 	}
2800 
2801 	/* Set flag if we're caught up. */
2802 	if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2803 		WalSndCaughtUp = true;
2804 
2805 	/*
2806 	 * If we're caught up and have been requested to stop, have WalSndLoop()
2807 	 * terminate the connection in an orderly manner, after writing out all
2808 	 * the pending data.
2809 	 */
2810 	if (WalSndCaughtUp && got_STOPPING)
2811 		got_SIGUSR2 = true;
2812 
2813 	/* Update shared memory status */
2814 	{
2815 		WalSnd	   *walsnd = MyWalSnd;
2816 
2817 		SpinLockAcquire(&walsnd->mutex);
2818 		walsnd->sentPtr = sentPtr;
2819 		SpinLockRelease(&walsnd->mutex);
2820 	}
2821 }
2822 
2823 /*
2824  * Shutdown if the sender is caught up.
2825  *
2826  * NB: This should only be called when the shutdown signal has been received
2827  * from postmaster.
2828  *
2829  * Note that if we determine that there's still more data to send, this
2830  * function will return control to the caller.
2831  */
2832 static void
WalSndDone(WalSndSendDataCallback send_data)2833 WalSndDone(WalSndSendDataCallback send_data)
2834 {
2835 	XLogRecPtr	replicatedPtr;
2836 
2837 	/* ... let's just be real sure we're caught up ... */
2838 	send_data();
2839 
2840 	/*
2841 	 * To figure out whether all WAL has successfully been replicated, check
2842 	 * flush location if valid, write otherwise. Tools like pg_receivewal will
2843 	 * usually (unless in synchronous mode) return an invalid flush location.
2844 	 */
2845 	replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2846 		MyWalSnd->write : MyWalSnd->flush;
2847 
2848 	if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2849 		!pq_is_send_pending())
2850 	{
2851 		/* Inform the standby that XLOG streaming is done */
2852 		EndCommand("COPY 0", DestRemote);
2853 		pq_flush();
2854 
2855 		proc_exit(0);
2856 	}
2857 	if (!waiting_for_ping_response)
2858 		WalSndKeepalive(true);
2859 }
2860 
2861 /*
2862  * Returns the latest point in WAL that has been safely flushed to disk, and
2863  * can be sent to the standby. This should only be called when in recovery,
2864  * ie. we're streaming to a cascaded standby.
2865  *
2866  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2867  * replayed WAL record.
2868  */
2869 static XLogRecPtr
GetStandbyFlushRecPtr(void)2870 GetStandbyFlushRecPtr(void)
2871 {
2872 	XLogRecPtr	replayPtr;
2873 	TimeLineID	replayTLI;
2874 	XLogRecPtr	receivePtr;
2875 	TimeLineID	receiveTLI;
2876 	XLogRecPtr	result;
2877 
2878 	/*
2879 	 * We can safely send what's already been replayed. Also, if walreceiver
2880 	 * is streaming WAL from the same timeline, we can send anything that it
2881 	 * has streamed, but hasn't been replayed yet.
2882 	 */
2883 
2884 	receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2885 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
2886 
2887 	ThisTimeLineID = replayTLI;
2888 
2889 	result = replayPtr;
2890 	if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2891 		result = receivePtr;
2892 
2893 	return result;
2894 }
2895 
2896 /*
2897  * Request walsenders to reload the currently-open WAL file
2898  */
2899 void
WalSndRqstFileReload(void)2900 WalSndRqstFileReload(void)
2901 {
2902 	int			i;
2903 
2904 	for (i = 0; i < max_wal_senders; i++)
2905 	{
2906 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
2907 
2908 		SpinLockAcquire(&walsnd->mutex);
2909 		if (walsnd->pid == 0)
2910 		{
2911 			SpinLockRelease(&walsnd->mutex);
2912 			continue;
2913 		}
2914 		walsnd->needreload = true;
2915 		SpinLockRelease(&walsnd->mutex);
2916 	}
2917 }
2918 
2919 /*
2920  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
2921  */
2922 void
HandleWalSndInitStopping(void)2923 HandleWalSndInitStopping(void)
2924 {
2925 	Assert(am_walsender);
2926 
2927 	/*
2928 	 * If replication has not yet started, die like with SIGTERM. If
2929 	 * replication is active, only set a flag and wake up the main loop. It
2930 	 * will send any outstanding WAL, wait for it to be replicated to the
2931 	 * standby, and then exit gracefully.
2932 	 */
2933 	if (!replication_active)
2934 		kill(MyProcPid, SIGTERM);
2935 	else
2936 		got_STOPPING = true;
2937 }
2938 
2939 /*
2940  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
2941  * sender should already have been switched to WALSNDSTATE_STOPPING at
2942  * this point.
2943  */
2944 static void
WalSndLastCycleHandler(SIGNAL_ARGS)2945 WalSndLastCycleHandler(SIGNAL_ARGS)
2946 {
2947 	int			save_errno = errno;
2948 
2949 	got_SIGUSR2 = true;
2950 	SetLatch(MyLatch);
2951 
2952 	errno = save_errno;
2953 }
2954 
2955 /* Set up signal handlers */
2956 void
WalSndSignals(void)2957 WalSndSignals(void)
2958 {
2959 	/* Set up signal handlers */
2960 	pqsignal(SIGHUP, PostgresSigHupHandler);	/* set flag to read config
2961 												 * file */
2962 	pqsignal(SIGINT, StatementCancelHandler);	/* query cancel */
2963 	pqsignal(SIGTERM, die);		/* request shutdown */
2964 	pqsignal(SIGQUIT, quickdie);	/* hard crash time */
2965 	InitializeTimeouts();		/* establishes SIGALRM handler */
2966 	pqsignal(SIGPIPE, SIG_IGN);
2967 	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
2968 	pqsignal(SIGUSR2, WalSndLastCycleHandler);	/* request a last cycle and
2969 												 * shutdown */
2970 
2971 	/* Reset some signals that are accepted by postmaster but not here */
2972 	pqsignal(SIGCHLD, SIG_DFL);
2973 	pqsignal(SIGTTIN, SIG_DFL);
2974 	pqsignal(SIGTTOU, SIG_DFL);
2975 	pqsignal(SIGCONT, SIG_DFL);
2976 	pqsignal(SIGWINCH, SIG_DFL);
2977 }
2978 
2979 /* Report shared-memory space needed by WalSndShmemInit */
2980 Size
WalSndShmemSize(void)2981 WalSndShmemSize(void)
2982 {
2983 	Size		size = 0;
2984 
2985 	size = offsetof(WalSndCtlData, walsnds);
2986 	size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2987 
2988 	return size;
2989 }
2990 
2991 /* Allocate and initialize walsender-related shared memory */
2992 void
WalSndShmemInit(void)2993 WalSndShmemInit(void)
2994 {
2995 	bool		found;
2996 	int			i;
2997 
2998 	WalSndCtl = (WalSndCtlData *)
2999 		ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3000 
3001 	if (!found)
3002 	{
3003 		/* First time through, so initialize */
3004 		MemSet(WalSndCtl, 0, WalSndShmemSize());
3005 
3006 		for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3007 			SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3008 
3009 		for (i = 0; i < max_wal_senders; i++)
3010 		{
3011 			WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
3012 
3013 			SpinLockInit(&walsnd->mutex);
3014 		}
3015 	}
3016 }
3017 
3018 /*
3019  * Wake up all walsenders
3020  *
3021  * This will be called inside critical sections, so throwing an error is not
3022  * advisable.
3023  */
3024 void
WalSndWakeup(void)3025 WalSndWakeup(void)
3026 {
3027 	int			i;
3028 
3029 	for (i = 0; i < max_wal_senders; i++)
3030 	{
3031 		Latch	   *latch;
3032 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
3033 
3034 		/*
3035 		 * Get latch pointer with spinlock held, for the unlikely case that
3036 		 * pointer reads aren't atomic (as they're 8 bytes).
3037 		 */
3038 		SpinLockAcquire(&walsnd->mutex);
3039 		latch = walsnd->latch;
3040 		SpinLockRelease(&walsnd->mutex);
3041 
3042 		if (latch != NULL)
3043 			SetLatch(latch);
3044 	}
3045 }
3046 
3047 /*
3048  * Signal all walsenders to move to stopping state.
3049  *
3050  * This will trigger walsenders to move to a state where no further WAL can be
3051  * generated. See this file's header for details.
3052  */
3053 void
WalSndInitStopping(void)3054 WalSndInitStopping(void)
3055 {
3056 	int			i;
3057 
3058 	for (i = 0; i < max_wal_senders; i++)
3059 	{
3060 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
3061 		pid_t		pid;
3062 
3063 		SpinLockAcquire(&walsnd->mutex);
3064 		pid = walsnd->pid;
3065 		SpinLockRelease(&walsnd->mutex);
3066 
3067 		if (pid == 0)
3068 			continue;
3069 
3070 		SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId);
3071 	}
3072 }
3073 
3074 /*
3075  * Wait that all the WAL senders have quit or reached the stopping state. This
3076  * is used by the checkpointer to control when the shutdown checkpoint can
3077  * safely be performed.
3078  */
3079 void
WalSndWaitStopping(void)3080 WalSndWaitStopping(void)
3081 {
3082 	for (;;)
3083 	{
3084 		int			i;
3085 		bool		all_stopped = true;
3086 
3087 		for (i = 0; i < max_wal_senders; i++)
3088 		{
3089 			WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
3090 
3091 			SpinLockAcquire(&walsnd->mutex);
3092 
3093 			if (walsnd->pid == 0)
3094 			{
3095 				SpinLockRelease(&walsnd->mutex);
3096 				continue;
3097 			}
3098 
3099 			if (walsnd->state != WALSNDSTATE_STOPPING)
3100 			{
3101 				all_stopped = false;
3102 				SpinLockRelease(&walsnd->mutex);
3103 				break;
3104 			}
3105 			SpinLockRelease(&walsnd->mutex);
3106 		}
3107 
3108 		/* safe to leave if confirmation is done for all WAL senders */
3109 		if (all_stopped)
3110 			return;
3111 
3112 		pg_usleep(10000L);		/* wait for 10 msec */
3113 	}
3114 }
3115 
3116 /* Set state for current walsender (only called in walsender) */
3117 void
WalSndSetState(WalSndState state)3118 WalSndSetState(WalSndState state)
3119 {
3120 	WalSnd	   *walsnd = MyWalSnd;
3121 
3122 	Assert(am_walsender);
3123 
3124 	if (walsnd->state == state)
3125 		return;
3126 
3127 	SpinLockAcquire(&walsnd->mutex);
3128 	walsnd->state = state;
3129 	SpinLockRelease(&walsnd->mutex);
3130 }
3131 
3132 /*
3133  * Return a string constant representing the state. This is used
3134  * in system views, and should *not* be translated.
3135  */
3136 static const char *
WalSndGetStateString(WalSndState state)3137 WalSndGetStateString(WalSndState state)
3138 {
3139 	switch (state)
3140 	{
3141 		case WALSNDSTATE_STARTUP:
3142 			return "startup";
3143 		case WALSNDSTATE_BACKUP:
3144 			return "backup";
3145 		case WALSNDSTATE_CATCHUP:
3146 			return "catchup";
3147 		case WALSNDSTATE_STREAMING:
3148 			return "streaming";
3149 		case WALSNDSTATE_STOPPING:
3150 			return "stopping";
3151 	}
3152 	return "UNKNOWN";
3153 }
3154 
3155 static Interval *
offset_to_interval(TimeOffset offset)3156 offset_to_interval(TimeOffset offset)
3157 {
3158 	Interval   *result = palloc(sizeof(Interval));
3159 
3160 	result->month = 0;
3161 	result->day = 0;
3162 	result->time = offset;
3163 
3164 	return result;
3165 }
3166 
3167 /*
3168  * Returns activity of walsenders, including pids and xlog locations sent to
3169  * standby servers.
3170  */
3171 Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)3172 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
3173 {
3174 #define PG_STAT_GET_WAL_SENDERS_COLS	11
3175 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3176 	TupleDesc	tupdesc;
3177 	Tuplestorestate *tupstore;
3178 	MemoryContext per_query_ctx;
3179 	MemoryContext oldcontext;
3180 	SyncRepStandbyData *sync_standbys;
3181 	int			num_standbys;
3182 	int			i;
3183 
3184 	/* check to see if caller supports us returning a tuplestore */
3185 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3186 		ereport(ERROR,
3187 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3188 				 errmsg("set-valued function called in context that cannot accept a set")));
3189 	if (!(rsinfo->allowedModes & SFRM_Materialize))
3190 		ereport(ERROR,
3191 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3192 				 errmsg("materialize mode required, but it is not " \
3193 						"allowed in this context")));
3194 
3195 	/* Build a tuple descriptor for our result type */
3196 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3197 		elog(ERROR, "return type must be a row type");
3198 
3199 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3200 	oldcontext = MemoryContextSwitchTo(per_query_ctx);
3201 
3202 	tupstore = tuplestore_begin_heap(true, false, work_mem);
3203 	rsinfo->returnMode = SFRM_Materialize;
3204 	rsinfo->setResult = tupstore;
3205 	rsinfo->setDesc = tupdesc;
3206 
3207 	MemoryContextSwitchTo(oldcontext);
3208 
3209 	/*
3210 	 * Get the currently active synchronous standbys.  This could be out of
3211 	 * date before we're done, but we'll use the data anyway.
3212 	 */
3213 	num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3214 
3215 	for (i = 0; i < max_wal_senders; i++)
3216 	{
3217 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
3218 		XLogRecPtr	sentPtr;
3219 		XLogRecPtr	write;
3220 		XLogRecPtr	flush;
3221 		XLogRecPtr	apply;
3222 		TimeOffset	writeLag;
3223 		TimeOffset	flushLag;
3224 		TimeOffset	applyLag;
3225 		int			priority;
3226 		int			pid;
3227 		WalSndState state;
3228 		bool		is_sync_standby;
3229 		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
3230 		bool		nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3231 		int			j;
3232 
3233 		/* Collect data from shared memory */
3234 		SpinLockAcquire(&walsnd->mutex);
3235 		if (walsnd->pid == 0)
3236 		{
3237 			SpinLockRelease(&walsnd->mutex);
3238 			continue;
3239 		}
3240 		pid = walsnd->pid;
3241 		sentPtr = walsnd->sentPtr;
3242 		state = walsnd->state;
3243 		write = walsnd->write;
3244 		flush = walsnd->flush;
3245 		apply = walsnd->apply;
3246 		writeLag = walsnd->writeLag;
3247 		flushLag = walsnd->flushLag;
3248 		applyLag = walsnd->applyLag;
3249 		priority = walsnd->sync_standby_priority;
3250 		SpinLockRelease(&walsnd->mutex);
3251 
3252 		/*
3253 		 * Detect whether walsender is/was considered synchronous.  We can
3254 		 * provide some protection against stale data by checking the PID
3255 		 * along with walsnd_index.
3256 		 */
3257 		is_sync_standby = false;
3258 		for (j = 0; j < num_standbys; j++)
3259 		{
3260 			if (sync_standbys[j].walsnd_index == i &&
3261 				sync_standbys[j].pid == pid)
3262 			{
3263 				is_sync_standby = true;
3264 				break;
3265 			}
3266 		}
3267 
3268 		memset(nulls, 0, sizeof(nulls));
3269 		values[0] = Int32GetDatum(pid);
3270 
3271 		if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
3272 		{
3273 			/*
3274 			 * Only superusers and members of pg_read_all_stats can see details.
3275 			 * Other users only get the pid value to know it's a walsender,
3276 			 * but no details.
3277 			 */
3278 			MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3279 		}
3280 		else
3281 		{
3282 			values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3283 
3284 			if (XLogRecPtrIsInvalid(sentPtr))
3285 				nulls[2] = true;
3286 			values[2] = LSNGetDatum(sentPtr);
3287 
3288 			if (XLogRecPtrIsInvalid(write))
3289 				nulls[3] = true;
3290 			values[3] = LSNGetDatum(write);
3291 
3292 			if (XLogRecPtrIsInvalid(flush))
3293 				nulls[4] = true;
3294 			values[4] = LSNGetDatum(flush);
3295 
3296 			if (XLogRecPtrIsInvalid(apply))
3297 				nulls[5] = true;
3298 			values[5] = LSNGetDatum(apply);
3299 
3300 			/*
3301 			 * Treat a standby such as a pg_basebackup background process
3302 			 * which always returns an invalid flush location, as an
3303 			 * asynchronous standby.
3304 			 */
3305 			priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3306 
3307 			if (writeLag < 0)
3308 				nulls[6] = true;
3309 			else
3310 				values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3311 
3312 			if (flushLag < 0)
3313 				nulls[7] = true;
3314 			else
3315 				values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3316 
3317 			if (applyLag < 0)
3318 				nulls[8] = true;
3319 			else
3320 				values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3321 
3322 			values[9] = Int32GetDatum(priority);
3323 
3324 			/*
3325 			 * More easily understood version of standby state. This is purely
3326 			 * informational.
3327 			 *
3328 			 * In quorum-based sync replication, the role of each standby
3329 			 * listed in synchronous_standby_names can be changing very
3330 			 * frequently. Any standbys considered as "sync" at one moment can
3331 			 * be switched to "potential" ones at the next moment. So, it's
3332 			 * basically useless to report "sync" or "potential" as their sync
3333 			 * states. We report just "quorum" for them.
3334 			 */
3335 			if (priority == 0)
3336 				values[10] = CStringGetTextDatum("async");
3337 			else if (is_sync_standby)
3338 				values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
3339 					CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3340 			else
3341 				values[10] = CStringGetTextDatum("potential");
3342 		}
3343 
3344 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3345 	}
3346 
3347 	/* clean up and return the tuplestore */
3348 	tuplestore_donestoring(tupstore);
3349 
3350 	return (Datum) 0;
3351 }
3352 
3353 /*
3354  * Send a keepalive message to standby.
3355  *
3356  * If requestReply is set, the message requests the other party to send
3357  * a message back to us, for heartbeat purposes.  We also set a flag to
3358  * let nearby code that we're waiting for that response, to avoid
3359  * repeated requests.
3360  */
3361 static void
WalSndKeepalive(bool requestReply)3362 WalSndKeepalive(bool requestReply)
3363 {
3364 	elog(DEBUG2, "sending replication keepalive");
3365 
3366 	/* construct the message... */
3367 	resetStringInfo(&output_message);
3368 	pq_sendbyte(&output_message, 'k');
3369 	pq_sendint64(&output_message, sentPtr);
3370 	pq_sendint64(&output_message, GetCurrentTimestamp());
3371 	pq_sendbyte(&output_message, requestReply ? 1 : 0);
3372 
3373 	/* ... and send it wrapped in CopyData */
3374 	pq_putmessage_noblock('d', output_message.data, output_message.len);
3375 
3376 	/* Set local flag */
3377 	if (requestReply)
3378 		waiting_for_ping_response = true;
3379 }
3380 
3381 /*
3382  * Send keepalive message if too much time has elapsed.
3383  */
3384 static void
WalSndKeepaliveIfNecessary(void)3385 WalSndKeepaliveIfNecessary(void)
3386 {
3387 	TimestampTz ping_time;
3388 
3389 	/*
3390 	 * Don't send keepalive messages if timeouts are globally disabled or
3391 	 * we're doing something not partaking in timeouts.
3392 	 */
3393 	if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3394 		return;
3395 
3396 	if (waiting_for_ping_response)
3397 		return;
3398 
3399 	/*
3400 	 * If half of wal_sender_timeout has lapsed without receiving any reply
3401 	 * from the standby, send a keep-alive message to the standby requesting
3402 	 * an immediate reply.
3403 	 */
3404 	ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
3405 											wal_sender_timeout / 2);
3406 	if (last_processing >= ping_time)
3407 	{
3408 		WalSndKeepalive(true);
3409 
3410 		/* Try to flush pending output to the client */
3411 		if (pq_flush_if_writable() != 0)
3412 			WalSndShutdown();
3413 	}
3414 }
3415 
3416 /*
3417  * Record the end of the WAL and the time it was flushed locally, so that
3418  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3419  * eventually reported to have been written, flushed and applied by the
3420  * standby in a reply message.
3421  */
3422 static void
LagTrackerWrite(XLogRecPtr lsn,TimestampTz local_flush_time)3423 LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
3424 {
3425 	bool		buffer_full;
3426 	int			new_write_head;
3427 	int			i;
3428 
3429 	if (!am_walsender)
3430 		return;
3431 
3432 	/*
3433 	 * If the lsn hasn't advanced since last time, then do nothing.  This way
3434 	 * we only record a new sample when new WAL has been written.
3435 	 */
3436 	if (LagTracker.last_lsn == lsn)
3437 		return;
3438 	LagTracker.last_lsn = lsn;
3439 
3440 	/*
3441 	 * If advancing the write head of the circular buffer would crash into any
3442 	 * of the read heads, then the buffer is full.  In other words, the
3443 	 * slowest reader (presumably apply) is the one that controls the release
3444 	 * of space.
3445 	 */
3446 	new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3447 	buffer_full = false;
3448 	for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3449 	{
3450 		if (new_write_head == LagTracker.read_heads[i])
3451 			buffer_full = true;
3452 	}
3453 
3454 	/*
3455 	 * If the buffer is full, for now we just rewind by one slot and overwrite
3456 	 * the last sample, as a simple (if somewhat uneven) way to lower the
3457 	 * sampling rate.  There may be better adaptive compaction algorithms.
3458 	 */
3459 	if (buffer_full)
3460 	{
3461 		new_write_head = LagTracker.write_head;
3462 		if (LagTracker.write_head > 0)
3463 			LagTracker.write_head--;
3464 		else
3465 			LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3466 	}
3467 
3468 	/* Store a sample at the current write head position. */
3469 	LagTracker.buffer[LagTracker.write_head].lsn = lsn;
3470 	LagTracker.buffer[LagTracker.write_head].time = local_flush_time;
3471 	LagTracker.write_head = new_write_head;
3472 }
3473 
3474 /*
3475  * Find out how much time has elapsed between the moment WAL location 'lsn'
3476  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3477  * We have a separate read head for each of the reported LSN locations we
3478  * receive in replies from standby; 'head' controls which read head is
3479  * used.  Whenever a read head crosses an LSN which was written into the
3480  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3481  * find out the time this LSN (or an earlier one) was flushed locally, and
3482  * therefore compute the lag.
3483  *
3484  * Return -1 if no new sample data is available, and otherwise the elapsed
3485  * time in microseconds.
3486  */
3487 static TimeOffset
LagTrackerRead(int head,XLogRecPtr lsn,TimestampTz now)3488 LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
3489 {
3490 	TimestampTz time = 0;
3491 
3492 	/* Read all unread samples up to this LSN or end of buffer. */
3493 	while (LagTracker.read_heads[head] != LagTracker.write_head &&
3494 		   LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn)
3495 	{
3496 		time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3497 		LagTracker.last_read[head] =
3498 			LagTracker.buffer[LagTracker.read_heads[head]];
3499 		LagTracker.read_heads[head] =
3500 			(LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3501 	}
3502 
3503 	/*
3504 	 * If the lag tracker is empty, that means the standby has processed
3505 	 * everything we've ever sent so we should now clear 'last_read'.  If we
3506 	 * didn't do that, we'd risk using a stale and irrelevant sample for
3507 	 * interpolation at the beginning of the next burst of WAL after a period
3508 	 * of idleness.
3509 	 */
3510 	if (LagTracker.read_heads[head] == LagTracker.write_head)
3511 		LagTracker.last_read[head].time = 0;
3512 
3513 	if (time > now)
3514 	{
3515 		/* If the clock somehow went backwards, treat as not found. */
3516 		return -1;
3517 	}
3518 	else if (time == 0)
3519 	{
3520 		/*
3521 		 * We didn't cross a time.  If there is a future sample that we
3522 		 * haven't reached yet, and we've already reached at least one sample,
3523 		 * let's interpolate the local flushed time.  This is mainly useful
3524 		 * for reporting a completely stuck apply position as having
3525 		 * increasing lag, since otherwise we'd have to wait for it to
3526 		 * eventually start moving again and cross one of our samples before
3527 		 * we can show the lag increasing.
3528 		 */
3529 		if (LagTracker.read_heads[head] == LagTracker.write_head)
3530 		{
3531 			/* There are no future samples, so we can't interpolate. */
3532 			return -1;
3533 		}
3534 		else if (LagTracker.last_read[head].time != 0)
3535 		{
3536 			/* We can interpolate between last_read and the next sample. */
3537 			double		fraction;
3538 			WalTimeSample prev = LagTracker.last_read[head];
3539 			WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]];
3540 
3541 			if (lsn < prev.lsn)
3542 			{
3543 				/*
3544 				 * Reported LSNs shouldn't normally go backwards, but it's
3545 				 * possible when there is a timeline change.  Treat as not
3546 				 * found.
3547 				 */
3548 				return -1;
3549 			}
3550 
3551 			Assert(prev.lsn < next.lsn);
3552 
3553 			if (prev.time > next.time)
3554 			{
3555 				/* If the clock somehow went backwards, treat as not found. */
3556 				return -1;
3557 			}
3558 
3559 			/* See how far we are between the previous and next samples. */
3560 			fraction =
3561 				(double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3562 
3563 			/* Scale the local flush time proportionally. */
3564 			time = (TimestampTz)
3565 				((double) prev.time + (next.time - prev.time) * fraction);
3566 		}
3567 		else
3568 		{
3569 			/*
3570 			 * We have only a future sample, implying that we were entirely
3571 			 * caught up but and now there is a new burst of WAL and the
3572 			 * standby hasn't processed the first sample yet.  Until the
3573 			 * standby reaches the future sample the best we can do is report
3574 			 * the hypothetical lag if that sample were to be replayed now.
3575 			 */
3576 			time = LagTracker.buffer[LagTracker.read_heads[head]].time;
3577 		}
3578 	}
3579 
3580 	/* Return the elapsed time since local flush time in microseconds. */
3581 	Assert(time != 0);
3582 	return now - time;
3583 }
3584