1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at the next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, checkpointer sends us
28  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited.  If
29  * the backend is idle or runs an SQL query this causes the backend to
30  * shutdown, if logical replication is in progress all existing WAL records
31  * are processed followed by a shutdown.  Otherwise this causes the walsender
32  * to switch to the "stopping" state. In this state, the walsender will reject
33  * any further replication commands. The checkpointer begins the shutdown
34  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36  * walsender to send any outstanding WAL, including the shutdown checkpoint
37  * record, wait for it to be replicated to the standby, and then exit.
38  *
39  *
40  * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
41  *
42  * IDENTIFICATION
43  *	  src/backend/replication/walsender.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <signal.h>
50 #include <unistd.h>
51 
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogreader.h"
58 #include "access/xlogutils.h"
59 #include "catalog/pg_authid.h"
60 #include "catalog/pg_type.h"
61 #include "commands/dbcommands.h"
62 #include "commands/defrem.h"
63 #include "funcapi.h"
64 #include "libpq/libpq.h"
65 #include "libpq/pqformat.h"
66 #include "miscadmin.h"
67 #include "nodes/replnodes.h"
68 #include "pgstat.h"
69 #include "postmaster/interrupt.h"
70 #include "replication/basebackup.h"
71 #include "replication/decode.h"
72 #include "replication/logical.h"
73 #include "replication/slot.h"
74 #include "replication/snapbuild.h"
75 #include "replication/syncrep.h"
76 #include "replication/walreceiver.h"
77 #include "replication/walsender.h"
78 #include "replication/walsender_private.h"
79 #include "storage/condition_variable.h"
80 #include "storage/fd.h"
81 #include "storage/ipc.h"
82 #include "storage/pmsignal.h"
83 #include "storage/proc.h"
84 #include "storage/procarray.h"
85 #include "tcop/dest.h"
86 #include "tcop/tcopprot.h"
87 #include "utils/acl.h"
88 #include "utils/builtins.h"
89 #include "utils/guc.h"
90 #include "utils/memutils.h"
91 #include "utils/pg_lsn.h"
92 #include "utils/portal.h"
93 #include "utils/ps_status.h"
94 #include "utils/timeout.h"
95 #include "utils/timestamp.h"
96 
97 /*
98  * Maximum data payload in a WAL data message.  Must be >= XLOG_BLCKSZ.
99  *
100  * We don't have a good idea of what a good value would be; there's some
101  * overhead per message in both walsender and walreceiver, but on the other
102  * hand sending large batches makes walsender less responsive to signals
103  * because signals are checked only between messages.  128kB (with
104  * default 8k blocks) seems like a reasonable guess for now.
105  */
106 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
107 
108 /* Array of WalSnds in shared memory */
109 WalSndCtlData *WalSndCtl = NULL;
110 
111 /* My slot in the shared memory array */
112 WalSnd	   *MyWalSnd = NULL;
113 
114 /* Global state */
115 bool		am_walsender = false;	/* Am I a walsender process? */
116 bool		am_cascading_walsender = false; /* Am I cascading WAL to another
117 											 * standby? */
118 bool		am_db_walsender = false;	/* Connected to a database? */
119 
120 /* User-settable parameters for walsender */
121 int			max_wal_senders = 0;	/* the maximum number of concurrent
122 									 * walsenders */
123 int			wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
124 											 * data message */
125 bool		log_replication_commands = false;
126 
127 /*
128  * State for WalSndWakeupRequest
129  */
130 bool		wake_wal_senders = false;
131 
132 /*
133  * xlogreader used for replication.  Note that a WAL sender doing physical
134  * replication does not need xlogreader to read WAL, but it needs one to
135  * keep a state of its work.
136  */
137 static XLogReaderState *xlogreader = NULL;
138 
139 /*
140  * These variables keep track of the state of the timeline we're currently
141  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
142  * the timeline is not the latest timeline on this server, and the server's
143  * history forked off from that timeline at sendTimeLineValidUpto.
144  */
145 static TimeLineID sendTimeLine = 0;
146 static TimeLineID sendTimeLineNextTLI = 0;
147 static bool sendTimeLineIsHistoric = false;
148 static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
149 
150 /*
151  * How far have we sent WAL already? This is also advertised in
152  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
153  */
154 static XLogRecPtr sentPtr = InvalidXLogRecPtr;
155 
156 /* Buffers for constructing outgoing messages and processing reply messages. */
157 static StringInfoData output_message;
158 static StringInfoData reply_message;
159 static StringInfoData tmpbuf;
160 
161 /* Timestamp of last ProcessRepliesIfAny(). */
162 static TimestampTz last_processing = 0;
163 
164 /*
165  * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
166  * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
167  */
168 static TimestampTz last_reply_timestamp = 0;
169 
170 /* Have we sent a heartbeat message asking for reply, since last reply? */
171 static bool waiting_for_ping_response = false;
172 
173 /*
174  * While streaming WAL in Copy mode, streamingDoneSending is set to true
175  * after we have sent CopyDone. We should not send any more CopyData messages
176  * after that. streamingDoneReceiving is set to true when we receive CopyDone
177  * from the other end. When both become true, it's time to exit Copy mode.
178  */
179 static bool streamingDoneSending;
180 static bool streamingDoneReceiving;
181 
182 /* Are we there yet? */
183 static bool WalSndCaughtUp = false;
184 
185 /* Flags set by signal handlers for later service in main loop */
186 static volatile sig_atomic_t got_SIGUSR2 = false;
187 static volatile sig_atomic_t got_STOPPING = false;
188 
189 /*
190  * This is set while we are streaming. When not set
191  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
192  * the main loop is responsible for checking got_STOPPING and terminating when
193  * it's set (after streaming any remaining WAL).
194  */
195 static volatile sig_atomic_t replication_active = false;
196 
197 static LogicalDecodingContext *logical_decoding_ctx = NULL;
198 
199 /* A sample associating a WAL location with the time it was written. */
200 typedef struct
201 {
202 	XLogRecPtr	lsn;
203 	TimestampTz time;
204 } WalTimeSample;
205 
206 /* The size of our buffer of time samples. */
207 #define LAG_TRACKER_BUFFER_SIZE 8192
208 
209 /* A mechanism for tracking replication lag. */
210 typedef struct
211 {
212 	XLogRecPtr	last_lsn;
213 	WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE];
214 	int			write_head;
215 	int			read_heads[NUM_SYNC_REP_WAIT_MODE];
216 	WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
217 } LagTracker;
218 
219 static LagTracker *lag_tracker;
220 
221 /* Signal handlers */
222 static void WalSndLastCycleHandler(SIGNAL_ARGS);
223 
224 /* Prototypes for private functions */
225 typedef void (*WalSndSendDataCallback) (void);
226 static void WalSndLoop(WalSndSendDataCallback send_data);
227 static void InitWalSenderSlot(void);
228 static void WalSndKill(int code, Datum arg);
229 static void WalSndShutdown(void) pg_attribute_noreturn();
230 static void XLogSendPhysical(void);
231 static void XLogSendLogical(void);
232 static void WalSndDone(WalSndSendDataCallback send_data);
233 static XLogRecPtr GetStandbyFlushRecPtr(void);
234 static void IdentifySystem(void);
235 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
236 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
237 static void StartReplication(StartReplicationCmd *cmd);
238 static void StartLogicalReplication(StartReplicationCmd *cmd);
239 static void ProcessStandbyMessage(void);
240 static void ProcessStandbyReplyMessage(void);
241 static void ProcessStandbyHSFeedbackMessage(void);
242 static void ProcessRepliesIfAny(void);
243 static void WalSndKeepalive(bool requestReply);
244 static void WalSndKeepaliveIfNecessary(void);
245 static void WalSndCheckTimeOut(void);
246 static long WalSndComputeSleeptime(TimestampTz now);
247 static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
248 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
249 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
250 static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
251 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
252 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
253 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
254 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
255 
256 static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
257 							  TimeLineID *tli_p);
258 
259 
260 /* Initialize walsender process before entering the main command loop */
261 void
InitWalSender(void)262 InitWalSender(void)
263 {
264 	am_cascading_walsender = RecoveryInProgress();
265 
266 	/* Create a per-walsender data structure in shared memory */
267 	InitWalSenderSlot();
268 
269 	/*
270 	 * We don't currently need any ResourceOwner in a walsender process, but
271 	 * if we did, we could call CreateAuxProcessResourceOwner here.
272 	 */
273 
274 	/*
275 	 * Let postmaster know that we're a WAL sender. Once we've declared us as
276 	 * a WAL sender process, postmaster will let us outlive the bgwriter and
277 	 * kill us last in the shutdown sequence, so we get a chance to stream all
278 	 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
279 	 * there's no going back, and we mustn't write any WAL records after this.
280 	 */
281 	MarkPostmasterChildWalSender();
282 	SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
283 
284 	/* Initialize empty timestamp buffer for lag tracking. */
285 	lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
286 }
287 
288 /*
289  * Clean up after an error.
290  *
291  * WAL sender processes don't use transactions like regular backends do.
292  * This function does any cleanup required after an error in a WAL sender
293  * process, similar to what transaction abort does in a regular backend.
294  */
295 void
WalSndErrorCleanup(void)296 WalSndErrorCleanup(void)
297 {
298 	LWLockReleaseAll();
299 	ConditionVariableCancelSleep();
300 	pgstat_report_wait_end();
301 
302 	if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
303 		wal_segment_close(xlogreader);
304 
305 	if (MyReplicationSlot != NULL)
306 		ReplicationSlotRelease();
307 
308 	ReplicationSlotCleanup();
309 
310 	replication_active = false;
311 
312 	/*
313 	 * If there is a transaction in progress, it will clean up our
314 	 * ResourceOwner, but if a replication command set up a resource owner
315 	 * without a transaction, we've got to clean that up now.
316 	 */
317 	if (!IsTransactionOrTransactionBlock())
318 		WalSndResourceCleanup(false);
319 
320 	if (got_STOPPING || got_SIGUSR2)
321 		proc_exit(0);
322 
323 	/* Revert back to startup state */
324 	WalSndSetState(WALSNDSTATE_STARTUP);
325 }
326 
327 /*
328  * Clean up any ResourceOwner we created.
329  */
330 void
WalSndResourceCleanup(bool isCommit)331 WalSndResourceCleanup(bool isCommit)
332 {
333 	ResourceOwner resowner;
334 
335 	if (CurrentResourceOwner == NULL)
336 		return;
337 
338 	/*
339 	 * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
340 	 * in a local variable and clear it first.
341 	 */
342 	resowner = CurrentResourceOwner;
343 	CurrentResourceOwner = NULL;
344 
345 	/* Now we can release resources and delete it. */
346 	ResourceOwnerRelease(resowner,
347 						 RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
348 	ResourceOwnerRelease(resowner,
349 						 RESOURCE_RELEASE_LOCKS, isCommit, true);
350 	ResourceOwnerRelease(resowner,
351 						 RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
352 	ResourceOwnerDelete(resowner);
353 }
354 
355 /*
356  * Handle a client's connection abort in an orderly manner.
357  */
358 static void
WalSndShutdown(void)359 WalSndShutdown(void)
360 {
361 	/*
362 	 * Reset whereToSendOutput to prevent ereport from attempting to send any
363 	 * more messages to the standby.
364 	 */
365 	if (whereToSendOutput == DestRemote)
366 		whereToSendOutput = DestNone;
367 
368 	proc_exit(0);
369 	abort();					/* keep the compiler quiet */
370 }
371 
372 /*
373  * Handle the IDENTIFY_SYSTEM command.
374  */
375 static void
IdentifySystem(void)376 IdentifySystem(void)
377 {
378 	char		sysid[32];
379 	char		xloc[MAXFNAMELEN];
380 	XLogRecPtr	logptr;
381 	char	   *dbname = NULL;
382 	DestReceiver *dest;
383 	TupOutputState *tstate;
384 	TupleDesc	tupdesc;
385 	Datum		values[4];
386 	bool		nulls[4];
387 
388 	/*
389 	 * Reply with a result set with one row, four columns. First col is system
390 	 * ID, second is timeline ID, third is current xlog location and the
391 	 * fourth contains the database name if we are connected to one.
392 	 */
393 
394 	snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
395 			 GetSystemIdentifier());
396 
397 	am_cascading_walsender = RecoveryInProgress();
398 	if (am_cascading_walsender)
399 	{
400 		/* this also updates ThisTimeLineID */
401 		logptr = GetStandbyFlushRecPtr();
402 	}
403 	else
404 		logptr = GetFlushRecPtr();
405 
406 	snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
407 
408 	if (MyDatabaseId != InvalidOid)
409 	{
410 		MemoryContext cur = CurrentMemoryContext;
411 
412 		/* syscache access needs a transaction env. */
413 		StartTransactionCommand();
414 		/* make dbname live outside TX context */
415 		MemoryContextSwitchTo(cur);
416 		dbname = get_database_name(MyDatabaseId);
417 		CommitTransactionCommand();
418 		/* CommitTransactionCommand switches to TopMemoryContext */
419 		MemoryContextSwitchTo(cur);
420 	}
421 
422 	dest = CreateDestReceiver(DestRemoteSimple);
423 	MemSet(nulls, false, sizeof(nulls));
424 
425 	/* need a tuple descriptor representing four columns */
426 	tupdesc = CreateTemplateTupleDesc(4);
427 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
428 							  TEXTOID, -1, 0);
429 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
430 							  INT4OID, -1, 0);
431 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
432 							  TEXTOID, -1, 0);
433 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
434 							  TEXTOID, -1, 0);
435 
436 	/* prepare for projection of tuples */
437 	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
438 
439 	/* column 1: system identifier */
440 	values[0] = CStringGetTextDatum(sysid);
441 
442 	/* column 2: timeline */
443 	values[1] = Int32GetDatum(ThisTimeLineID);
444 
445 	/* column 3: wal location */
446 	values[2] = CStringGetTextDatum(xloc);
447 
448 	/* column 4: database name, or NULL if none */
449 	if (dbname)
450 		values[3] = CStringGetTextDatum(dbname);
451 	else
452 		nulls[3] = true;
453 
454 	/* send it to dest */
455 	do_tup_output(tstate, values, nulls);
456 
457 	end_tup_output(tstate);
458 }
459 
460 
461 /*
462  * Handle TIMELINE_HISTORY command.
463  */
464 static void
SendTimeLineHistory(TimeLineHistoryCmd * cmd)465 SendTimeLineHistory(TimeLineHistoryCmd *cmd)
466 {
467 	StringInfoData buf;
468 	char		histfname[MAXFNAMELEN];
469 	char		path[MAXPGPATH];
470 	int			fd;
471 	off_t		histfilelen;
472 	off_t		bytesleft;
473 	Size		len;
474 
475 	/*
476 	 * Reply with a result set with one row, and two columns. The first col is
477 	 * the name of the history file, 2nd is the contents.
478 	 */
479 
480 	TLHistoryFileName(histfname, cmd->timeline);
481 	TLHistoryFilePath(path, cmd->timeline);
482 
483 	/* Send a RowDescription message */
484 	pq_beginmessage(&buf, 'T');
485 	pq_sendint16(&buf, 2);		/* 2 fields */
486 
487 	/* first field */
488 	pq_sendstring(&buf, "filename");	/* col name */
489 	pq_sendint32(&buf, 0);		/* table oid */
490 	pq_sendint16(&buf, 0);		/* attnum */
491 	pq_sendint32(&buf, TEXTOID);	/* type oid */
492 	pq_sendint16(&buf, -1);		/* typlen */
493 	pq_sendint32(&buf, 0);		/* typmod */
494 	pq_sendint16(&buf, 0);		/* format code */
495 
496 	/* second field */
497 	pq_sendstring(&buf, "content"); /* col name */
498 	pq_sendint32(&buf, 0);		/* table oid */
499 	pq_sendint16(&buf, 0);		/* attnum */
500 	pq_sendint32(&buf, TEXTOID);	/* type oid */
501 	pq_sendint16(&buf, -1);		/* typlen */
502 	pq_sendint32(&buf, 0);		/* typmod */
503 	pq_sendint16(&buf, 0);		/* format code */
504 	pq_endmessage(&buf);
505 
506 	/* Send a DataRow message */
507 	pq_beginmessage(&buf, 'D');
508 	pq_sendint16(&buf, 2);		/* # of columns */
509 	len = strlen(histfname);
510 	pq_sendint32(&buf, len);	/* col1 len */
511 	pq_sendbytes(&buf, histfname, len);
512 
513 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
514 	if (fd < 0)
515 		ereport(ERROR,
516 				(errcode_for_file_access(),
517 				 errmsg("could not open file \"%s\": %m", path)));
518 
519 	/* Determine file length and send it to client */
520 	histfilelen = lseek(fd, 0, SEEK_END);
521 	if (histfilelen < 0)
522 		ereport(ERROR,
523 				(errcode_for_file_access(),
524 				 errmsg("could not seek to end of file \"%s\": %m", path)));
525 	if (lseek(fd, 0, SEEK_SET) != 0)
526 		ereport(ERROR,
527 				(errcode_for_file_access(),
528 				 errmsg("could not seek to beginning of file \"%s\": %m", path)));
529 
530 	pq_sendint32(&buf, histfilelen);	/* col2 len */
531 
532 	bytesleft = histfilelen;
533 	while (bytesleft > 0)
534 	{
535 		PGAlignedBlock rbuf;
536 		int			nread;
537 
538 		pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
539 		nread = read(fd, rbuf.data, sizeof(rbuf));
540 		pgstat_report_wait_end();
541 		if (nread < 0)
542 			ereport(ERROR,
543 					(errcode_for_file_access(),
544 					 errmsg("could not read file \"%s\": %m",
545 							path)));
546 		else if (nread == 0)
547 			ereport(ERROR,
548 					(errcode(ERRCODE_DATA_CORRUPTED),
549 					 errmsg("could not read file \"%s\": read %d of %zu",
550 							path, nread, (Size) bytesleft)));
551 
552 		pq_sendbytes(&buf, rbuf.data, nread);
553 		bytesleft -= nread;
554 	}
555 
556 	if (CloseTransientFile(fd) != 0)
557 		ereport(ERROR,
558 				(errcode_for_file_access(),
559 				 errmsg("could not close file \"%s\": %m", path)));
560 
561 	pq_endmessage(&buf);
562 }
563 
564 /*
565  * Handle START_REPLICATION command.
566  *
567  * At the moment, this never returns, but an ereport(ERROR) will take us back
568  * to the main loop.
569  */
570 static void
StartReplication(StartReplicationCmd * cmd)571 StartReplication(StartReplicationCmd *cmd)
572 {
573 	StringInfoData buf;
574 	XLogRecPtr	FlushPtr;
575 
576 	if (ThisTimeLineID == 0)
577 		ereport(ERROR,
578 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
579 				 errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
580 
581 	/* create xlogreader for physical replication */
582 	xlogreader =
583 		XLogReaderAllocate(wal_segment_size, NULL,
584 						   XL_ROUTINE(.segment_open = WalSndSegmentOpen,
585 									  .segment_close = wal_segment_close),
586 						   NULL);
587 
588 	if (!xlogreader)
589 		ereport(ERROR,
590 				(errcode(ERRCODE_OUT_OF_MEMORY),
591 				 errmsg("out of memory")));
592 
593 	/*
594 	 * We assume here that we're logging enough information in the WAL for
595 	 * log-shipping, since this is checked in PostmasterMain().
596 	 *
597 	 * NOTE: wal_level can only change at shutdown, so in most cases it is
598 	 * difficult for there to be WAL data that we can still see that was
599 	 * written at wal_level='minimal'.
600 	 */
601 
602 	if (cmd->slotname)
603 	{
604 		ReplicationSlotAcquire(cmd->slotname, true);
605 		if (SlotIsLogical(MyReplicationSlot))
606 			ereport(ERROR,
607 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
608 					 errmsg("cannot use a logical replication slot for physical replication")));
609 
610 		/*
611 		 * We don't need to verify the slot's restart_lsn here; instead we
612 		 * rely on the caller requesting the starting point to use.  If the
613 		 * WAL segment doesn't exist, we'll fail later.
614 		 */
615 	}
616 
617 	/*
618 	 * Select the timeline. If it was given explicitly by the client, use
619 	 * that. Otherwise use the timeline of the last replayed record, which is
620 	 * kept in ThisTimeLineID.
621 	 */
622 	if (am_cascading_walsender)
623 	{
624 		/* this also updates ThisTimeLineID */
625 		FlushPtr = GetStandbyFlushRecPtr();
626 	}
627 	else
628 		FlushPtr = GetFlushRecPtr();
629 
630 	if (cmd->timeline != 0)
631 	{
632 		XLogRecPtr	switchpoint;
633 
634 		sendTimeLine = cmd->timeline;
635 		if (sendTimeLine == ThisTimeLineID)
636 		{
637 			sendTimeLineIsHistoric = false;
638 			sendTimeLineValidUpto = InvalidXLogRecPtr;
639 		}
640 		else
641 		{
642 			List	   *timeLineHistory;
643 
644 			sendTimeLineIsHistoric = true;
645 
646 			/*
647 			 * Check that the timeline the client requested exists, and the
648 			 * requested start location is on that timeline.
649 			 */
650 			timeLineHistory = readTimeLineHistory(ThisTimeLineID);
651 			switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
652 										 &sendTimeLineNextTLI);
653 			list_free_deep(timeLineHistory);
654 
655 			/*
656 			 * Found the requested timeline in the history. Check that
657 			 * requested startpoint is on that timeline in our history.
658 			 *
659 			 * This is quite loose on purpose. We only check that we didn't
660 			 * fork off the requested timeline before the switchpoint. We
661 			 * don't check that we switched *to* it before the requested
662 			 * starting point. This is because the client can legitimately
663 			 * request to start replication from the beginning of the WAL
664 			 * segment that contains switchpoint, but on the new timeline, so
665 			 * that it doesn't end up with a partial segment. If you ask for
666 			 * too old a starting point, you'll get an error later when we
667 			 * fail to find the requested WAL segment in pg_wal.
668 			 *
669 			 * XXX: we could be more strict here and only allow a startpoint
670 			 * that's older than the switchpoint, if it's still in the same
671 			 * WAL segment.
672 			 */
673 			if (!XLogRecPtrIsInvalid(switchpoint) &&
674 				switchpoint < cmd->startpoint)
675 			{
676 				ereport(ERROR,
677 						(errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
678 								LSN_FORMAT_ARGS(cmd->startpoint),
679 								cmd->timeline),
680 						 errdetail("This server's history forked from timeline %u at %X/%X.",
681 								   cmd->timeline,
682 								   LSN_FORMAT_ARGS(switchpoint))));
683 			}
684 			sendTimeLineValidUpto = switchpoint;
685 		}
686 	}
687 	else
688 	{
689 		sendTimeLine = ThisTimeLineID;
690 		sendTimeLineValidUpto = InvalidXLogRecPtr;
691 		sendTimeLineIsHistoric = false;
692 	}
693 
694 	streamingDoneSending = streamingDoneReceiving = false;
695 
696 	/* If there is nothing to stream, don't even enter COPY mode */
697 	if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
698 	{
699 		/*
700 		 * When we first start replication the standby will be behind the
701 		 * primary. For some applications, for example synchronous
702 		 * replication, it is important to have a clear state for this initial
703 		 * catchup mode, so we can trigger actions when we change streaming
704 		 * state later. We may stay in this state for a long time, which is
705 		 * exactly why we want to be able to monitor whether or not we are
706 		 * still here.
707 		 */
708 		WalSndSetState(WALSNDSTATE_CATCHUP);
709 
710 		/* Send a CopyBothResponse message, and start streaming */
711 		pq_beginmessage(&buf, 'W');
712 		pq_sendbyte(&buf, 0);
713 		pq_sendint16(&buf, 0);
714 		pq_endmessage(&buf);
715 		pq_flush();
716 
717 		/*
718 		 * Don't allow a request to stream from a future point in WAL that
719 		 * hasn't been flushed to disk in this server yet.
720 		 */
721 		if (FlushPtr < cmd->startpoint)
722 		{
723 			ereport(ERROR,
724 					(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
725 							LSN_FORMAT_ARGS(cmd->startpoint),
726 							LSN_FORMAT_ARGS(FlushPtr))));
727 		}
728 
729 		/* Start streaming from the requested point */
730 		sentPtr = cmd->startpoint;
731 
732 		/* Initialize shared memory status, too */
733 		SpinLockAcquire(&MyWalSnd->mutex);
734 		MyWalSnd->sentPtr = sentPtr;
735 		SpinLockRelease(&MyWalSnd->mutex);
736 
737 		SyncRepInitConfig();
738 
739 		/* Main loop of walsender */
740 		replication_active = true;
741 
742 		WalSndLoop(XLogSendPhysical);
743 
744 		replication_active = false;
745 		if (got_STOPPING)
746 			proc_exit(0);
747 		WalSndSetState(WALSNDSTATE_STARTUP);
748 
749 		Assert(streamingDoneSending && streamingDoneReceiving);
750 	}
751 
752 	if (cmd->slotname)
753 		ReplicationSlotRelease();
754 
755 	/*
756 	 * Copy is finished now. Send a single-row result set indicating the next
757 	 * timeline.
758 	 */
759 	if (sendTimeLineIsHistoric)
760 	{
761 		char		startpos_str[8 + 1 + 8 + 1];
762 		DestReceiver *dest;
763 		TupOutputState *tstate;
764 		TupleDesc	tupdesc;
765 		Datum		values[2];
766 		bool		nulls[2];
767 
768 		snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
769 				 LSN_FORMAT_ARGS(sendTimeLineValidUpto));
770 
771 		dest = CreateDestReceiver(DestRemoteSimple);
772 		MemSet(nulls, false, sizeof(nulls));
773 
774 		/*
775 		 * Need a tuple descriptor representing two columns. int8 may seem
776 		 * like a surprising data type for this, but in theory int4 would not
777 		 * be wide enough for this, as TimeLineID is unsigned.
778 		 */
779 		tupdesc = CreateTemplateTupleDesc(2);
780 		TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
781 								  INT8OID, -1, 0);
782 		TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
783 								  TEXTOID, -1, 0);
784 
785 		/* prepare for projection of tuple */
786 		tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
787 
788 		values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
789 		values[1] = CStringGetTextDatum(startpos_str);
790 
791 		/* send it to dest */
792 		do_tup_output(tstate, values, nulls);
793 
794 		end_tup_output(tstate);
795 	}
796 
797 	/* Send CommandComplete message */
798 	EndReplicationCommand("START_STREAMING");
799 }
800 
801 /*
802  * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
803  * walsender process.
804  *
805  * Inside the walsender we can do better than read_local_xlog_page,
806  * which has to do a plain sleep/busy loop, because the walsender's latch gets
807  * set every time WAL is flushed.
808  */
809 static int
logical_read_xlog_page(XLogReaderState * state,XLogRecPtr targetPagePtr,int reqLen,XLogRecPtr targetRecPtr,char * cur_page)810 logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
811 					   XLogRecPtr targetRecPtr, char *cur_page)
812 {
813 	XLogRecPtr	flushptr;
814 	int			count;
815 	WALReadError errinfo;
816 	XLogSegNo	segno;
817 
818 	XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
819 	sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
820 	sendTimeLine = state->currTLI;
821 	sendTimeLineValidUpto = state->currTLIValidUntil;
822 	sendTimeLineNextTLI = state->nextTLI;
823 
824 	/* make sure we have enough WAL available */
825 	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
826 
827 	/* fail if not (implies we are going to shut down) */
828 	if (flushptr < targetPagePtr + reqLen)
829 		return -1;
830 
831 	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
832 		count = XLOG_BLCKSZ;	/* more than one block available */
833 	else
834 		count = flushptr - targetPagePtr;	/* part of the page available */
835 
836 	/* now actually read the data, we know it's there */
837 	if (!WALRead(state,
838 				 cur_page,
839 				 targetPagePtr,
840 				 XLOG_BLCKSZ,
841 				 state->seg.ws_tli, /* Pass the current TLI because only
842 									 * WalSndSegmentOpen controls whether new
843 									 * TLI is needed. */
844 				 &errinfo))
845 		WALReadRaiseError(&errinfo);
846 
847 	/*
848 	 * After reading into the buffer, check that what we read was valid. We do
849 	 * this after reading, because even though the segment was present when we
850 	 * opened it, it might get recycled or removed while we read it. The
851 	 * read() succeeds in that case, but the data we tried to read might
852 	 * already have been overwritten with new WAL records.
853 	 */
854 	XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
855 	CheckXLogRemoved(segno, state->seg.ws_tli);
856 
857 	return count;
858 }
859 
860 /*
861  * Process extra options given to CREATE_REPLICATION_SLOT.
862  */
863 static void
parseCreateReplSlotOptions(CreateReplicationSlotCmd * cmd,bool * reserve_wal,CRSSnapshotAction * snapshot_action)864 parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
865 						   bool *reserve_wal,
866 						   CRSSnapshotAction *snapshot_action)
867 {
868 	ListCell   *lc;
869 	bool		snapshot_action_given = false;
870 	bool		reserve_wal_given = false;
871 
872 	/* Parse options */
873 	foreach(lc, cmd->options)
874 	{
875 		DefElem    *defel = (DefElem *) lfirst(lc);
876 
877 		if (strcmp(defel->defname, "export_snapshot") == 0)
878 		{
879 			if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
880 				ereport(ERROR,
881 						(errcode(ERRCODE_SYNTAX_ERROR),
882 						 errmsg("conflicting or redundant options")));
883 
884 			snapshot_action_given = true;
885 			*snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
886 				CRS_NOEXPORT_SNAPSHOT;
887 		}
888 		else if (strcmp(defel->defname, "use_snapshot") == 0)
889 		{
890 			if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
891 				ereport(ERROR,
892 						(errcode(ERRCODE_SYNTAX_ERROR),
893 						 errmsg("conflicting or redundant options")));
894 
895 			snapshot_action_given = true;
896 			*snapshot_action = CRS_USE_SNAPSHOT;
897 		}
898 		else if (strcmp(defel->defname, "reserve_wal") == 0)
899 		{
900 			if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
901 				ereport(ERROR,
902 						(errcode(ERRCODE_SYNTAX_ERROR),
903 						 errmsg("conflicting or redundant options")));
904 
905 			reserve_wal_given = true;
906 			*reserve_wal = true;
907 		}
908 		else
909 			elog(ERROR, "unrecognized option: %s", defel->defname);
910 	}
911 }
912 
913 /*
914  * Create a new replication slot.
915  */
916 static void
CreateReplicationSlot(CreateReplicationSlotCmd * cmd)917 CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
918 {
919 	const char *snapshot_name = NULL;
920 	char		xloc[MAXFNAMELEN];
921 	char	   *slot_name;
922 	bool		reserve_wal = false;
923 	CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
924 	DestReceiver *dest;
925 	TupOutputState *tstate;
926 	TupleDesc	tupdesc;
927 	Datum		values[4];
928 	bool		nulls[4];
929 
930 	Assert(!MyReplicationSlot);
931 
932 	parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
933 
934 	/* setup state for WalSndSegmentOpen */
935 	sendTimeLineIsHistoric = false;
936 	sendTimeLine = ThisTimeLineID;
937 
938 	if (cmd->kind == REPLICATION_KIND_PHYSICAL)
939 	{
940 		ReplicationSlotCreate(cmd->slotname, false,
941 							  cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
942 							  false);
943 	}
944 	else
945 	{
946 		CheckLogicalDecodingRequirements();
947 
948 		/*
949 		 * Initially create persistent slot as ephemeral - that allows us to
950 		 * nicely handle errors during initialization because it'll get
951 		 * dropped if this transaction fails. We'll make it persistent at the
952 		 * end. Temporary slots can be created as temporary from beginning as
953 		 * they get dropped on error as well.
954 		 */
955 		ReplicationSlotCreate(cmd->slotname, true,
956 							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
957 							  false);
958 	}
959 
960 	if (cmd->kind == REPLICATION_KIND_LOGICAL)
961 	{
962 		LogicalDecodingContext *ctx;
963 		bool		need_full_snapshot = false;
964 
965 		/*
966 		 * Do options check early so that we can bail before calling the
967 		 * DecodingContextFindStartpoint which can take long time.
968 		 */
969 		if (snapshot_action == CRS_EXPORT_SNAPSHOT)
970 		{
971 			if (IsTransactionBlock())
972 				ereport(ERROR,
973 				/*- translator: %s is a CREATE_REPLICATION_SLOT statement */
974 						(errmsg("%s must not be called inside a transaction",
975 								"CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
976 
977 			need_full_snapshot = true;
978 		}
979 		else if (snapshot_action == CRS_USE_SNAPSHOT)
980 		{
981 			if (!IsTransactionBlock())
982 				ereport(ERROR,
983 				/*- translator: %s is a CREATE_REPLICATION_SLOT statement */
984 						(errmsg("%s must be called inside a transaction",
985 								"CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
986 
987 			if (XactIsoLevel != XACT_REPEATABLE_READ)
988 				ereport(ERROR,
989 				/*- translator: %s is a CREATE_REPLICATION_SLOT statement */
990 						(errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
991 								"CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
992 
993 			if (FirstSnapshotSet)
994 				ereport(ERROR,
995 				/*- translator: %s is a CREATE_REPLICATION_SLOT statement */
996 						(errmsg("%s must be called before any query",
997 								"CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
998 
999 			if (IsSubTransaction())
1000 				ereport(ERROR,
1001 				/*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1002 						(errmsg("%s must not be called in a subtransaction",
1003 								"CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1004 
1005 			need_full_snapshot = true;
1006 		}
1007 
1008 		ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1009 										InvalidXLogRecPtr,
1010 										XL_ROUTINE(.page_read = logical_read_xlog_page,
1011 												   .segment_open = WalSndSegmentOpen,
1012 												   .segment_close = wal_segment_close),
1013 										WalSndPrepareWrite, WalSndWriteData,
1014 										WalSndUpdateProgress);
1015 
1016 		/*
1017 		 * Signal that we don't need the timeout mechanism. We're just
1018 		 * creating the replication slot and don't yet accept feedback
1019 		 * messages or send keepalives. As we possibly need to wait for
1020 		 * further WAL the walsender would otherwise possibly be killed too
1021 		 * soon.
1022 		 */
1023 		last_reply_timestamp = 0;
1024 
1025 		/* build initial snapshot, might take a while */
1026 		DecodingContextFindStartpoint(ctx);
1027 
1028 		/*
1029 		 * Export or use the snapshot if we've been asked to do so.
1030 		 *
1031 		 * NB. We will convert the snapbuild.c kind of snapshot to normal
1032 		 * snapshot when doing this.
1033 		 */
1034 		if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1035 		{
1036 			snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1037 		}
1038 		else if (snapshot_action == CRS_USE_SNAPSHOT)
1039 		{
1040 			Snapshot	snap;
1041 
1042 			snap = SnapBuildInitialSnapshot(ctx->snapshot_builder);
1043 			RestoreTransactionSnapshot(snap, MyProc);
1044 		}
1045 
1046 		/* don't need the decoding context anymore */
1047 		FreeDecodingContext(ctx);
1048 
1049 		if (!cmd->temporary)
1050 			ReplicationSlotPersist();
1051 	}
1052 	else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1053 	{
1054 		ReplicationSlotReserveWal();
1055 
1056 		ReplicationSlotMarkDirty();
1057 
1058 		/* Write this slot to disk if it's a permanent one. */
1059 		if (!cmd->temporary)
1060 			ReplicationSlotSave();
1061 	}
1062 
1063 	snprintf(xloc, sizeof(xloc), "%X/%X",
1064 			 LSN_FORMAT_ARGS(MyReplicationSlot->data.confirmed_flush));
1065 
1066 	dest = CreateDestReceiver(DestRemoteSimple);
1067 	MemSet(nulls, false, sizeof(nulls));
1068 
1069 	/*----------
1070 	 * Need a tuple descriptor representing four columns:
1071 	 * - first field: the slot name
1072 	 * - second field: LSN at which we became consistent
1073 	 * - third field: exported snapshot's name
1074 	 * - fourth field: output plugin
1075 	 *----------
1076 	 */
1077 	tupdesc = CreateTemplateTupleDesc(4);
1078 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1079 							  TEXTOID, -1, 0);
1080 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1081 							  TEXTOID, -1, 0);
1082 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1083 							  TEXTOID, -1, 0);
1084 	TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1085 							  TEXTOID, -1, 0);
1086 
1087 	/* prepare for projection of tuples */
1088 	tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1089 
1090 	/* slot_name */
1091 	slot_name = NameStr(MyReplicationSlot->data.name);
1092 	values[0] = CStringGetTextDatum(slot_name);
1093 
1094 	/* consistent wal location */
1095 	values[1] = CStringGetTextDatum(xloc);
1096 
1097 	/* snapshot name, or NULL if none */
1098 	if (snapshot_name != NULL)
1099 		values[2] = CStringGetTextDatum(snapshot_name);
1100 	else
1101 		nulls[2] = true;
1102 
1103 	/* plugin, or NULL if none */
1104 	if (cmd->plugin != NULL)
1105 		values[3] = CStringGetTextDatum(cmd->plugin);
1106 	else
1107 		nulls[3] = true;
1108 
1109 	/* send it to dest */
1110 	do_tup_output(tstate, values, nulls);
1111 	end_tup_output(tstate);
1112 
1113 	ReplicationSlotRelease();
1114 }
1115 
1116 /*
1117  * Get rid of a replication slot that is no longer wanted.
1118  */
1119 static void
DropReplicationSlot(DropReplicationSlotCmd * cmd)1120 DropReplicationSlot(DropReplicationSlotCmd *cmd)
1121 {
1122 	ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1123 }
1124 
1125 /*
1126  * Load previously initiated logical slot and prepare for sending data (via
1127  * WalSndLoop).
1128  */
1129 static void
StartLogicalReplication(StartReplicationCmd * cmd)1130 StartLogicalReplication(StartReplicationCmd *cmd)
1131 {
1132 	StringInfoData buf;
1133 	QueryCompletion qc;
1134 
1135 	/* make sure that our requirements are still fulfilled */
1136 	CheckLogicalDecodingRequirements();
1137 
1138 	Assert(!MyReplicationSlot);
1139 
1140 	ReplicationSlotAcquire(cmd->slotname, true);
1141 
1142 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
1143 		ereport(ERROR,
1144 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1145 				 errmsg("cannot read from logical replication slot \"%s\"",
1146 						cmd->slotname),
1147 				 errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
1148 
1149 	/*
1150 	 * Force a disconnect, so that the decoding code doesn't need to care
1151 	 * about an eventual switch from running in recovery, to running in a
1152 	 * normal environment. Client code is expected to handle reconnects.
1153 	 */
1154 	if (am_cascading_walsender && !RecoveryInProgress())
1155 	{
1156 		ereport(LOG,
1157 				(errmsg("terminating walsender process after promotion")));
1158 		got_STOPPING = true;
1159 	}
1160 
1161 	/*
1162 	 * Create our decoding context, making it start at the previously ack'ed
1163 	 * position.
1164 	 *
1165 	 * Do this before sending a CopyBothResponse message, so that any errors
1166 	 * are reported early.
1167 	 */
1168 	logical_decoding_ctx =
1169 		CreateDecodingContext(cmd->startpoint, cmd->options, false,
1170 							  XL_ROUTINE(.page_read = logical_read_xlog_page,
1171 										 .segment_open = WalSndSegmentOpen,
1172 										 .segment_close = wal_segment_close),
1173 							  WalSndPrepareWrite, WalSndWriteData,
1174 							  WalSndUpdateProgress);
1175 	xlogreader = logical_decoding_ctx->reader;
1176 
1177 	WalSndSetState(WALSNDSTATE_CATCHUP);
1178 
1179 	/* Send a CopyBothResponse message, and start streaming */
1180 	pq_beginmessage(&buf, 'W');
1181 	pq_sendbyte(&buf, 0);
1182 	pq_sendint16(&buf, 0);
1183 	pq_endmessage(&buf);
1184 	pq_flush();
1185 
1186 	/* Start reading WAL from the oldest required WAL. */
1187 	XLogBeginRead(logical_decoding_ctx->reader,
1188 				  MyReplicationSlot->data.restart_lsn);
1189 
1190 	/*
1191 	 * Report the location after which we'll send out further commits as the
1192 	 * current sentPtr.
1193 	 */
1194 	sentPtr = MyReplicationSlot->data.confirmed_flush;
1195 
1196 	/* Also update the sent position status in shared memory */
1197 	SpinLockAcquire(&MyWalSnd->mutex);
1198 	MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn;
1199 	SpinLockRelease(&MyWalSnd->mutex);
1200 
1201 	replication_active = true;
1202 
1203 	SyncRepInitConfig();
1204 
1205 	/* Main loop of walsender */
1206 	WalSndLoop(XLogSendLogical);
1207 
1208 	FreeDecodingContext(logical_decoding_ctx);
1209 	ReplicationSlotRelease();
1210 
1211 	replication_active = false;
1212 	if (got_STOPPING)
1213 		proc_exit(0);
1214 	WalSndSetState(WALSNDSTATE_STARTUP);
1215 
1216 	/* Get out of COPY mode (CommandComplete). */
1217 	SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1218 	EndCommand(&qc, DestRemote, false);
1219 }
1220 
1221 /*
1222  * LogicalDecodingContext 'prepare_write' callback.
1223  *
1224  * Prepare a write into a StringInfo.
1225  *
1226  * Don't do anything lasting in here, it's quite possible that nothing will be done
1227  * with the data.
1228  */
1229 static void
WalSndPrepareWrite(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid,bool last_write)1230 WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
1231 {
1232 	/* can't have sync rep confused by sending the same LSN several times */
1233 	if (!last_write)
1234 		lsn = InvalidXLogRecPtr;
1235 
1236 	resetStringInfo(ctx->out);
1237 
1238 	pq_sendbyte(ctx->out, 'w');
1239 	pq_sendint64(ctx->out, lsn);	/* dataStart */
1240 	pq_sendint64(ctx->out, lsn);	/* walEnd */
1241 
1242 	/*
1243 	 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1244 	 * reserve space here.
1245 	 */
1246 	pq_sendint64(ctx->out, 0);	/* sendtime */
1247 }
1248 
1249 /*
1250  * LogicalDecodingContext 'write' callback.
1251  *
1252  * Actually write out data previously prepared by WalSndPrepareWrite out to
1253  * the network. Take as long as needed, but process replies from the other
1254  * side and check timeouts during that.
1255  */
1256 static void
WalSndWriteData(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid,bool last_write)1257 WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
1258 				bool last_write)
1259 {
1260 	TimestampTz now;
1261 
1262 	/*
1263 	 * Fill the send timestamp last, so that it is taken as late as possible.
1264 	 * This is somewhat ugly, but the protocol is set as it's already used for
1265 	 * several releases by streaming physical replication.
1266 	 */
1267 	resetStringInfo(&tmpbuf);
1268 	now = GetCurrentTimestamp();
1269 	pq_sendint64(&tmpbuf, now);
1270 	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1271 		   tmpbuf.data, sizeof(int64));
1272 
1273 	/* output previously gathered data in a CopyData packet */
1274 	pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1275 
1276 	CHECK_FOR_INTERRUPTS();
1277 
1278 	/* Try to flush pending output to the client */
1279 	if (pq_flush_if_writable() != 0)
1280 		WalSndShutdown();
1281 
1282 	/* Try taking fast path unless we get too close to walsender timeout. */
1283 	if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
1284 										  wal_sender_timeout / 2) &&
1285 		!pq_is_send_pending())
1286 	{
1287 		return;
1288 	}
1289 
1290 	/* If we have pending write here, go to slow path */
1291 	for (;;)
1292 	{
1293 		long		sleeptime;
1294 
1295 		/* Check for input from the client */
1296 		ProcessRepliesIfAny();
1297 
1298 		/* die if timeout was reached */
1299 		WalSndCheckTimeOut();
1300 
1301 		/* Send keepalive if the time has come */
1302 		WalSndKeepaliveIfNecessary();
1303 
1304 		if (!pq_is_send_pending())
1305 			break;
1306 
1307 		sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1308 
1309 		/* Sleep until something happens or we time out */
1310 		WalSndWait(WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE, sleeptime,
1311 				   WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1312 
1313 		/* Clear any already-pending wakeups */
1314 		ResetLatch(MyLatch);
1315 
1316 		CHECK_FOR_INTERRUPTS();
1317 
1318 		/* Process any requests or signals received recently */
1319 		if (ConfigReloadPending)
1320 		{
1321 			ConfigReloadPending = false;
1322 			ProcessConfigFile(PGC_SIGHUP);
1323 			SyncRepInitConfig();
1324 		}
1325 
1326 		/* Try to flush pending output to the client */
1327 		if (pq_flush_if_writable() != 0)
1328 			WalSndShutdown();
1329 	}
1330 
1331 	/* reactivate latch so WalSndLoop knows to continue */
1332 	SetLatch(MyLatch);
1333 }
1334 
1335 /*
1336  * LogicalDecodingContext 'update_progress' callback.
1337  *
1338  * Write the current position to the lag tracker (see XLogSendPhysical).
1339  */
1340 static void
WalSndUpdateProgress(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid)1341 WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
1342 {
1343 	static TimestampTz sendTime = 0;
1344 	TimestampTz now = GetCurrentTimestamp();
1345 
1346 	/*
1347 	 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1348 	 * avoid flooding the lag tracker when we commit frequently.
1349 	 */
1350 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS	1000
1351 	if (!TimestampDifferenceExceeds(sendTime, now,
1352 									WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1353 		return;
1354 
1355 	LagTrackerWrite(lsn, now);
1356 	sendTime = now;
1357 }
1358 
1359 /*
1360  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1361  *
1362  * Returns end LSN of flushed WAL.  Normally this will be >= loc, but
1363  * if we detect a shutdown request (either from postmaster or client)
1364  * we will return early, so caller must always check.
1365  */
1366 static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)1367 WalSndWaitForWal(XLogRecPtr loc)
1368 {
1369 	int			wakeEvents;
1370 	static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1371 
1372 	/*
1373 	 * Fast path to avoid acquiring the spinlock in case we already know we
1374 	 * have enough WAL available. This is particularly interesting if we're
1375 	 * far behind.
1376 	 */
1377 	if (RecentFlushPtr != InvalidXLogRecPtr &&
1378 		loc <= RecentFlushPtr)
1379 		return RecentFlushPtr;
1380 
1381 	/* Get a more recent flush pointer. */
1382 	if (!RecoveryInProgress())
1383 		RecentFlushPtr = GetFlushRecPtr();
1384 	else
1385 		RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1386 
1387 	for (;;)
1388 	{
1389 		long		sleeptime;
1390 
1391 		/* Clear any already-pending wakeups */
1392 		ResetLatch(MyLatch);
1393 
1394 		CHECK_FOR_INTERRUPTS();
1395 
1396 		/* Process any requests or signals received recently */
1397 		if (ConfigReloadPending)
1398 		{
1399 			ConfigReloadPending = false;
1400 			ProcessConfigFile(PGC_SIGHUP);
1401 			SyncRepInitConfig();
1402 		}
1403 
1404 		/* Check for input from the client */
1405 		ProcessRepliesIfAny();
1406 
1407 		/*
1408 		 * If we're shutting down, trigger pending WAL to be written out,
1409 		 * otherwise we'd possibly end up waiting for WAL that never gets
1410 		 * written, because walwriter has shut down already.
1411 		 */
1412 		if (got_STOPPING)
1413 			XLogBackgroundFlush();
1414 
1415 		/* Update our idea of the currently flushed position. */
1416 		if (!RecoveryInProgress())
1417 			RecentFlushPtr = GetFlushRecPtr();
1418 		else
1419 			RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1420 
1421 		/*
1422 		 * If postmaster asked us to stop, don't wait anymore.
1423 		 *
1424 		 * It's important to do this check after the recomputation of
1425 		 * RecentFlushPtr, so we can send all remaining data before shutting
1426 		 * down.
1427 		 */
1428 		if (got_STOPPING)
1429 			break;
1430 
1431 		/*
1432 		 * We only send regular messages to the client for full decoded
1433 		 * transactions, but a synchronous replication and walsender shutdown
1434 		 * possibly are waiting for a later location. So, before sleeping, we
1435 		 * send a ping containing the flush location. If the receiver is
1436 		 * otherwise idle, this keepalive will trigger a reply. Processing the
1437 		 * reply will update these MyWalSnd locations.
1438 		 */
1439 		if (MyWalSnd->flush < sentPtr &&
1440 			MyWalSnd->write < sentPtr &&
1441 			!waiting_for_ping_response)
1442 			WalSndKeepalive(false);
1443 
1444 		/* check whether we're done */
1445 		if (loc <= RecentFlushPtr)
1446 			break;
1447 
1448 		/* Waiting for new WAL. Since we need to wait, we're now caught up. */
1449 		WalSndCaughtUp = true;
1450 
1451 		/*
1452 		 * Try to flush any pending output to the client.
1453 		 */
1454 		if (pq_flush_if_writable() != 0)
1455 			WalSndShutdown();
1456 
1457 		/*
1458 		 * If we have received CopyDone from the client, sent CopyDone
1459 		 * ourselves, and the output buffer is empty, it's time to exit
1460 		 * streaming, so fail the current WAL fetch request.
1461 		 */
1462 		if (streamingDoneReceiving && streamingDoneSending &&
1463 			!pq_is_send_pending())
1464 			break;
1465 
1466 		/* die if timeout was reached */
1467 		WalSndCheckTimeOut();
1468 
1469 		/* Send keepalive if the time has come */
1470 		WalSndKeepaliveIfNecessary();
1471 
1472 		/*
1473 		 * Sleep until something happens or we time out.  Also wait for the
1474 		 * socket becoming writable, if there's still pending output.
1475 		 * Otherwise we might sit on sendable output data while waiting for
1476 		 * new WAL to be generated.  (But if we have nothing to send, we don't
1477 		 * want to wake on socket-writable.)
1478 		 */
1479 		sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1480 
1481 		wakeEvents = WL_SOCKET_READABLE;
1482 
1483 		if (pq_is_send_pending())
1484 			wakeEvents |= WL_SOCKET_WRITEABLE;
1485 
1486 		WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1487 	}
1488 
1489 	/* reactivate latch so WalSndLoop knows to continue */
1490 	SetLatch(MyLatch);
1491 	return RecentFlushPtr;
1492 }
1493 
1494 /*
1495  * Execute an incoming replication command.
1496  *
1497  * Returns true if the cmd_string was recognized as WalSender command, false
1498  * if not.
1499  */
1500 bool
exec_replication_command(const char * cmd_string)1501 exec_replication_command(const char *cmd_string)
1502 {
1503 	int			parse_rc;
1504 	Node	   *cmd_node;
1505 	const char *cmdtag;
1506 	MemoryContext cmd_context;
1507 	MemoryContext old_context;
1508 
1509 	/*
1510 	 * If WAL sender has been told that shutdown is getting close, switch its
1511 	 * status accordingly to handle the next replication commands correctly.
1512 	 */
1513 	if (got_STOPPING)
1514 		WalSndSetState(WALSNDSTATE_STOPPING);
1515 
1516 	/*
1517 	 * Throw error if in stopping mode.  We need prevent commands that could
1518 	 * generate WAL while the shutdown checkpoint is being written.  To be
1519 	 * safe, we just prohibit all new commands.
1520 	 */
1521 	if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1522 		ereport(ERROR,
1523 				(errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1524 
1525 	/*
1526 	 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1527 	 * command arrives. Clean up the old stuff if there's anything.
1528 	 */
1529 	SnapBuildClearExportedSnapshot();
1530 
1531 	CHECK_FOR_INTERRUPTS();
1532 
1533 	/*
1534 	 * Parse the command.
1535 	 */
1536 	cmd_context = AllocSetContextCreate(CurrentMemoryContext,
1537 										"Replication command context",
1538 										ALLOCSET_DEFAULT_SIZES);
1539 	old_context = MemoryContextSwitchTo(cmd_context);
1540 
1541 	replication_scanner_init(cmd_string);
1542 	parse_rc = replication_yyparse();
1543 	if (parse_rc != 0)
1544 		ereport(ERROR,
1545 				(errcode(ERRCODE_SYNTAX_ERROR),
1546 				 errmsg_internal("replication command parser returned %d",
1547 								 parse_rc)));
1548 	replication_scanner_finish();
1549 
1550 	cmd_node = replication_parse_result;
1551 
1552 	/*
1553 	 * If it's a SQL command, just clean up our mess and return false; the
1554 	 * caller will take care of executing it.
1555 	 */
1556 	if (IsA(cmd_node, SQLCmd))
1557 	{
1558 		if (MyDatabaseId == InvalidOid)
1559 			ereport(ERROR,
1560 					(errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1561 
1562 		MemoryContextSwitchTo(old_context);
1563 		MemoryContextDelete(cmd_context);
1564 
1565 		/* Tell the caller that this wasn't a WalSender command. */
1566 		return false;
1567 	}
1568 
1569 	/*
1570 	 * Report query to various monitoring facilities.  For this purpose, we
1571 	 * report replication commands just like SQL commands.
1572 	 */
1573 	debug_query_string = cmd_string;
1574 
1575 	pgstat_report_activity(STATE_RUNNING, cmd_string);
1576 
1577 	/*
1578 	 * Log replication command if log_replication_commands is enabled. Even
1579 	 * when it's disabled, log the command with DEBUG1 level for backward
1580 	 * compatibility.
1581 	 */
1582 	ereport(log_replication_commands ? LOG : DEBUG1,
1583 			(errmsg("received replication command: %s", cmd_string)));
1584 
1585 	/*
1586 	 * Disallow replication commands in aborted transaction blocks.
1587 	 */
1588 	if (IsAbortedTransactionBlockState())
1589 		ereport(ERROR,
1590 				(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1591 				 errmsg("current transaction is aborted, "
1592 						"commands ignored until end of transaction block")));
1593 
1594 	CHECK_FOR_INTERRUPTS();
1595 
1596 	/*
1597 	 * Allocate buffers that will be used for each outgoing and incoming
1598 	 * message.  We do this just once per command to reduce palloc overhead.
1599 	 */
1600 	initStringInfo(&output_message);
1601 	initStringInfo(&reply_message);
1602 	initStringInfo(&tmpbuf);
1603 
1604 	switch (cmd_node->type)
1605 	{
1606 		case T_IdentifySystemCmd:
1607 			cmdtag = "IDENTIFY_SYSTEM";
1608 			set_ps_display(cmdtag);
1609 			IdentifySystem();
1610 			EndReplicationCommand(cmdtag);
1611 			break;
1612 
1613 		case T_BaseBackupCmd:
1614 			cmdtag = "BASE_BACKUP";
1615 			set_ps_display(cmdtag);
1616 			PreventInTransactionBlock(true, cmdtag);
1617 			SendBaseBackup((BaseBackupCmd *) cmd_node);
1618 			EndReplicationCommand(cmdtag);
1619 			break;
1620 
1621 		case T_CreateReplicationSlotCmd:
1622 			cmdtag = "CREATE_REPLICATION_SLOT";
1623 			set_ps_display(cmdtag);
1624 			CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
1625 			EndReplicationCommand(cmdtag);
1626 			break;
1627 
1628 		case T_DropReplicationSlotCmd:
1629 			cmdtag = "DROP_REPLICATION_SLOT";
1630 			set_ps_display(cmdtag);
1631 			DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
1632 			EndReplicationCommand(cmdtag);
1633 			break;
1634 
1635 		case T_StartReplicationCmd:
1636 			{
1637 				StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1638 
1639 				cmdtag = "START_REPLICATION";
1640 				set_ps_display(cmdtag);
1641 				PreventInTransactionBlock(true, cmdtag);
1642 
1643 				if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1644 					StartReplication(cmd);
1645 				else
1646 					StartLogicalReplication(cmd);
1647 
1648 				/* dupe, but necessary per libpqrcv_endstreaming */
1649 				EndReplicationCommand(cmdtag);
1650 
1651 				Assert(xlogreader != NULL);
1652 				break;
1653 			}
1654 
1655 		case T_TimeLineHistoryCmd:
1656 			cmdtag = "TIMELINE_HISTORY";
1657 			set_ps_display(cmdtag);
1658 			PreventInTransactionBlock(true, cmdtag);
1659 			SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
1660 			EndReplicationCommand(cmdtag);
1661 			break;
1662 
1663 		case T_VariableShowStmt:
1664 			{
1665 				DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
1666 				VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1667 
1668 				cmdtag = "SHOW";
1669 				set_ps_display(cmdtag);
1670 
1671 				/* syscache access needs a transaction environment */
1672 				StartTransactionCommand();
1673 				GetPGVariable(n->name, dest);
1674 				CommitTransactionCommand();
1675 				EndReplicationCommand(cmdtag);
1676 			}
1677 			break;
1678 
1679 		default:
1680 			elog(ERROR, "unrecognized replication command node tag: %u",
1681 				 cmd_node->type);
1682 	}
1683 
1684 	/* done */
1685 	MemoryContextSwitchTo(old_context);
1686 	MemoryContextDelete(cmd_context);
1687 
1688 	/*
1689 	 * We need not update ps display or pg_stat_activity, because PostgresMain
1690 	 * will reset those to "idle".  But we must reset debug_query_string to
1691 	 * ensure it doesn't become a dangling pointer.
1692 	 */
1693 	debug_query_string = NULL;
1694 
1695 	return true;
1696 }
1697 
1698 /*
1699  * Process any incoming messages while streaming. Also checks if the remote
1700  * end has closed the connection.
1701  */
1702 static void
ProcessRepliesIfAny(void)1703 ProcessRepliesIfAny(void)
1704 {
1705 	unsigned char firstchar;
1706 	int			maxmsglen;
1707 	int			r;
1708 	bool		received = false;
1709 
1710 	last_processing = GetCurrentTimestamp();
1711 
1712 	/*
1713 	 * If we already received a CopyDone from the frontend, any subsequent
1714 	 * message is the beginning of a new command, and should be processed in
1715 	 * the main processing loop.
1716 	 */
1717 	while (!streamingDoneReceiving)
1718 	{
1719 		pq_startmsgread();
1720 		r = pq_getbyte_if_available(&firstchar);
1721 		if (r < 0)
1722 		{
1723 			/* unexpected error or EOF */
1724 			ereport(COMMERROR,
1725 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
1726 					 errmsg("unexpected EOF on standby connection")));
1727 			proc_exit(0);
1728 		}
1729 		if (r == 0)
1730 		{
1731 			/* no data available without blocking */
1732 			pq_endmsgread();
1733 			break;
1734 		}
1735 
1736 		/* Validate message type and set packet size limit */
1737 		switch (firstchar)
1738 		{
1739 			case 'd':
1740 				maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
1741 				break;
1742 			case 'c':
1743 			case 'X':
1744 				maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
1745 				break;
1746 			default:
1747 				ereport(FATAL,
1748 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
1749 						 errmsg("invalid standby message type \"%c\"",
1750 								firstchar)));
1751 				maxmsglen = 0;	/* keep compiler quiet */
1752 				break;
1753 		}
1754 
1755 		/* Read the message contents */
1756 		resetStringInfo(&reply_message);
1757 		if (pq_getmessage(&reply_message, maxmsglen))
1758 		{
1759 			ereport(COMMERROR,
1760 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
1761 					 errmsg("unexpected EOF on standby connection")));
1762 			proc_exit(0);
1763 		}
1764 
1765 		/* ... and process it */
1766 		switch (firstchar)
1767 		{
1768 				/*
1769 				 * 'd' means a standby reply wrapped in a CopyData packet.
1770 				 */
1771 			case 'd':
1772 				ProcessStandbyMessage();
1773 				received = true;
1774 				break;
1775 
1776 				/*
1777 				 * CopyDone means the standby requested to finish streaming.
1778 				 * Reply with CopyDone, if we had not sent that already.
1779 				 */
1780 			case 'c':
1781 				if (!streamingDoneSending)
1782 				{
1783 					pq_putmessage_noblock('c', NULL, 0);
1784 					streamingDoneSending = true;
1785 				}
1786 
1787 				streamingDoneReceiving = true;
1788 				received = true;
1789 				break;
1790 
1791 				/*
1792 				 * 'X' means that the standby is closing down the socket.
1793 				 */
1794 			case 'X':
1795 				proc_exit(0);
1796 
1797 			default:
1798 				Assert(false);	/* NOT REACHED */
1799 		}
1800 	}
1801 
1802 	/*
1803 	 * Save the last reply timestamp if we've received at least one reply.
1804 	 */
1805 	if (received)
1806 	{
1807 		last_reply_timestamp = last_processing;
1808 		waiting_for_ping_response = false;
1809 	}
1810 }
1811 
1812 /*
1813  * Process a status update message received from standby.
1814  */
1815 static void
ProcessStandbyMessage(void)1816 ProcessStandbyMessage(void)
1817 {
1818 	char		msgtype;
1819 
1820 	/*
1821 	 * Check message type from the first byte.
1822 	 */
1823 	msgtype = pq_getmsgbyte(&reply_message);
1824 
1825 	switch (msgtype)
1826 	{
1827 		case 'r':
1828 			ProcessStandbyReplyMessage();
1829 			break;
1830 
1831 		case 'h':
1832 			ProcessStandbyHSFeedbackMessage();
1833 			break;
1834 
1835 		default:
1836 			ereport(COMMERROR,
1837 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
1838 					 errmsg("unexpected message type \"%c\"", msgtype)));
1839 			proc_exit(0);
1840 	}
1841 }
1842 
1843 /*
1844  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1845  */
1846 static void
PhysicalConfirmReceivedLocation(XLogRecPtr lsn)1847 PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
1848 {
1849 	bool		changed = false;
1850 	ReplicationSlot *slot = MyReplicationSlot;
1851 
1852 	Assert(lsn != InvalidXLogRecPtr);
1853 	SpinLockAcquire(&slot->mutex);
1854 	if (slot->data.restart_lsn != lsn)
1855 	{
1856 		changed = true;
1857 		slot->data.restart_lsn = lsn;
1858 	}
1859 	SpinLockRelease(&slot->mutex);
1860 
1861 	if (changed)
1862 	{
1863 		ReplicationSlotMarkDirty();
1864 		ReplicationSlotsComputeRequiredLSN();
1865 	}
1866 
1867 	/*
1868 	 * One could argue that the slot should be saved to disk now, but that'd
1869 	 * be energy wasted - the worst lost information can do here is give us
1870 	 * wrong information in a statistics view - we'll just potentially be more
1871 	 * conservative in removing files.
1872 	 */
1873 }
1874 
1875 /*
1876  * Regular reply from standby advising of WAL locations on standby server.
1877  */
1878 static void
ProcessStandbyReplyMessage(void)1879 ProcessStandbyReplyMessage(void)
1880 {
1881 	XLogRecPtr	writePtr,
1882 				flushPtr,
1883 				applyPtr;
1884 	bool		replyRequested;
1885 	TimeOffset	writeLag,
1886 				flushLag,
1887 				applyLag;
1888 	bool		clearLagTimes;
1889 	TimestampTz now;
1890 	TimestampTz replyTime;
1891 
1892 	static bool fullyAppliedLastTime = false;
1893 
1894 	/* the caller already consumed the msgtype byte */
1895 	writePtr = pq_getmsgint64(&reply_message);
1896 	flushPtr = pq_getmsgint64(&reply_message);
1897 	applyPtr = pq_getmsgint64(&reply_message);
1898 	replyTime = pq_getmsgint64(&reply_message);
1899 	replyRequested = pq_getmsgbyte(&reply_message);
1900 
1901 	if (message_level_is_interesting(DEBUG2))
1902 	{
1903 		char	   *replyTimeStr;
1904 
1905 		/* Copy because timestamptz_to_str returns a static buffer */
1906 		replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1907 
1908 		elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1909 			 LSN_FORMAT_ARGS(writePtr),
1910 			 LSN_FORMAT_ARGS(flushPtr),
1911 			 LSN_FORMAT_ARGS(applyPtr),
1912 			 replyRequested ? " (reply requested)" : "",
1913 			 replyTimeStr);
1914 
1915 		pfree(replyTimeStr);
1916 	}
1917 
1918 	/* See if we can compute the round-trip lag for these positions. */
1919 	now = GetCurrentTimestamp();
1920 	writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1921 	flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1922 	applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1923 
1924 	/*
1925 	 * If the standby reports that it has fully replayed the WAL in two
1926 	 * consecutive reply messages, then the second such message must result
1927 	 * from wal_receiver_status_interval expiring on the standby.  This is a
1928 	 * convenient time to forget the lag times measured when it last
1929 	 * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1930 	 * until more WAL traffic arrives.
1931 	 */
1932 	clearLagTimes = false;
1933 	if (applyPtr == sentPtr)
1934 	{
1935 		if (fullyAppliedLastTime)
1936 			clearLagTimes = true;
1937 		fullyAppliedLastTime = true;
1938 	}
1939 	else
1940 		fullyAppliedLastTime = false;
1941 
1942 	/* Send a reply if the standby requested one. */
1943 	if (replyRequested)
1944 		WalSndKeepalive(false);
1945 
1946 	/*
1947 	 * Update shared state for this WalSender process based on reply data from
1948 	 * standby.
1949 	 */
1950 	{
1951 		WalSnd	   *walsnd = MyWalSnd;
1952 
1953 		SpinLockAcquire(&walsnd->mutex);
1954 		walsnd->write = writePtr;
1955 		walsnd->flush = flushPtr;
1956 		walsnd->apply = applyPtr;
1957 		if (writeLag != -1 || clearLagTimes)
1958 			walsnd->writeLag = writeLag;
1959 		if (flushLag != -1 || clearLagTimes)
1960 			walsnd->flushLag = flushLag;
1961 		if (applyLag != -1 || clearLagTimes)
1962 			walsnd->applyLag = applyLag;
1963 		walsnd->replyTime = replyTime;
1964 		SpinLockRelease(&walsnd->mutex);
1965 	}
1966 
1967 	if (!am_cascading_walsender)
1968 		SyncRepReleaseWaiters();
1969 
1970 	/*
1971 	 * Advance our local xmin horizon when the client confirmed a flush.
1972 	 */
1973 	if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1974 	{
1975 		if (SlotIsLogical(MyReplicationSlot))
1976 			LogicalConfirmReceivedLocation(flushPtr);
1977 		else
1978 			PhysicalConfirmReceivedLocation(flushPtr);
1979 	}
1980 }
1981 
1982 /* compute new replication slot xmin horizon if needed */
1983 static void
PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin,TransactionId feedbackCatalogXmin)1984 PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
1985 {
1986 	bool		changed = false;
1987 	ReplicationSlot *slot = MyReplicationSlot;
1988 
1989 	SpinLockAcquire(&slot->mutex);
1990 	MyProc->xmin = InvalidTransactionId;
1991 
1992 	/*
1993 	 * For physical replication we don't need the interlock provided by xmin
1994 	 * and effective_xmin since the consequences of a missed increase are
1995 	 * limited to query cancellations, so set both at once.
1996 	 */
1997 	if (!TransactionIdIsNormal(slot->data.xmin) ||
1998 		!TransactionIdIsNormal(feedbackXmin) ||
1999 		TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2000 	{
2001 		changed = true;
2002 		slot->data.xmin = feedbackXmin;
2003 		slot->effective_xmin = feedbackXmin;
2004 	}
2005 	if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2006 		!TransactionIdIsNormal(feedbackCatalogXmin) ||
2007 		TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2008 	{
2009 		changed = true;
2010 		slot->data.catalog_xmin = feedbackCatalogXmin;
2011 		slot->effective_catalog_xmin = feedbackCatalogXmin;
2012 	}
2013 	SpinLockRelease(&slot->mutex);
2014 
2015 	if (changed)
2016 	{
2017 		ReplicationSlotMarkDirty();
2018 		ReplicationSlotsComputeRequiredXmin(false);
2019 	}
2020 }
2021 
2022 /*
2023  * Check that the provided xmin/epoch are sane, that is, not in the future
2024  * and not so far back as to be already wrapped around.
2025  *
2026  * Epoch of nextXid should be same as standby, or if the counter has
2027  * wrapped, then one greater than standby.
2028  *
2029  * This check doesn't care about whether clog exists for these xids
2030  * at all.
2031  */
2032 static bool
TransactionIdInRecentPast(TransactionId xid,uint32 epoch)2033 TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
2034 {
2035 	FullTransactionId nextFullXid;
2036 	TransactionId nextXid;
2037 	uint32		nextEpoch;
2038 
2039 	nextFullXid = ReadNextFullTransactionId();
2040 	nextXid = XidFromFullTransactionId(nextFullXid);
2041 	nextEpoch = EpochFromFullTransactionId(nextFullXid);
2042 
2043 	if (xid <= nextXid)
2044 	{
2045 		if (epoch != nextEpoch)
2046 			return false;
2047 	}
2048 	else
2049 	{
2050 		if (epoch + 1 != nextEpoch)
2051 			return false;
2052 	}
2053 
2054 	if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2055 		return false;			/* epoch OK, but it's wrapped around */
2056 
2057 	return true;
2058 }
2059 
2060 /*
2061  * Hot Standby feedback
2062  */
2063 static void
ProcessStandbyHSFeedbackMessage(void)2064 ProcessStandbyHSFeedbackMessage(void)
2065 {
2066 	TransactionId feedbackXmin;
2067 	uint32		feedbackEpoch;
2068 	TransactionId feedbackCatalogXmin;
2069 	uint32		feedbackCatalogEpoch;
2070 	TimestampTz replyTime;
2071 
2072 	/*
2073 	 * Decipher the reply message. The caller already consumed the msgtype
2074 	 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2075 	 * of this message.
2076 	 */
2077 	replyTime = pq_getmsgint64(&reply_message);
2078 	feedbackXmin = pq_getmsgint(&reply_message, 4);
2079 	feedbackEpoch = pq_getmsgint(&reply_message, 4);
2080 	feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2081 	feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2082 
2083 	if (message_level_is_interesting(DEBUG2))
2084 	{
2085 		char	   *replyTimeStr;
2086 
2087 		/* Copy because timestamptz_to_str returns a static buffer */
2088 		replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2089 
2090 		elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2091 			 feedbackXmin,
2092 			 feedbackEpoch,
2093 			 feedbackCatalogXmin,
2094 			 feedbackCatalogEpoch,
2095 			 replyTimeStr);
2096 
2097 		pfree(replyTimeStr);
2098 	}
2099 
2100 	/*
2101 	 * Update shared state for this WalSender process based on reply data from
2102 	 * standby.
2103 	 */
2104 	{
2105 		WalSnd	   *walsnd = MyWalSnd;
2106 
2107 		SpinLockAcquire(&walsnd->mutex);
2108 		walsnd->replyTime = replyTime;
2109 		SpinLockRelease(&walsnd->mutex);
2110 	}
2111 
2112 	/*
2113 	 * Unset WalSender's xmins if the feedback message values are invalid.
2114 	 * This happens when the downstream turned hot_standby_feedback off.
2115 	 */
2116 	if (!TransactionIdIsNormal(feedbackXmin)
2117 		&& !TransactionIdIsNormal(feedbackCatalogXmin))
2118 	{
2119 		MyProc->xmin = InvalidTransactionId;
2120 		if (MyReplicationSlot != NULL)
2121 			PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2122 		return;
2123 	}
2124 
2125 	/*
2126 	 * Check that the provided xmin/epoch are sane, that is, not in the future
2127 	 * and not so far back as to be already wrapped around.  Ignore if not.
2128 	 */
2129 	if (TransactionIdIsNormal(feedbackXmin) &&
2130 		!TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2131 		return;
2132 
2133 	if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2134 		!TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2135 		return;
2136 
2137 	/*
2138 	 * Set the WalSender's xmin equal to the standby's requested xmin, so that
2139 	 * the xmin will be taken into account by GetSnapshotData() /
2140 	 * ComputeXidHorizons().  This will hold back the removal of dead rows and
2141 	 * thereby prevent the generation of cleanup conflicts on the standby
2142 	 * server.
2143 	 *
2144 	 * There is a small window for a race condition here: although we just
2145 	 * checked that feedbackXmin precedes nextXid, the nextXid could have
2146 	 * gotten advanced between our fetching it and applying the xmin below,
2147 	 * perhaps far enough to make feedbackXmin wrap around.  In that case the
2148 	 * xmin we set here would be "in the future" and have no effect.  No point
2149 	 * in worrying about this since it's too late to save the desired data
2150 	 * anyway.  Assuming that the standby sends us an increasing sequence of
2151 	 * xmins, this could only happen during the first reply cycle, else our
2152 	 * own xmin would prevent nextXid from advancing so far.
2153 	 *
2154 	 * We don't bother taking the ProcArrayLock here.  Setting the xmin field
2155 	 * is assumed atomic, and there's no real need to prevent concurrent
2156 	 * horizon determinations.  (If we're moving our xmin forward, this is
2157 	 * obviously safe, and if we're moving it backwards, well, the data is at
2158 	 * risk already since a VACUUM could already have determined the horizon.)
2159 	 *
2160 	 * If we're using a replication slot we reserve the xmin via that,
2161 	 * otherwise via the walsender's PGPROC entry. We can only track the
2162 	 * catalog xmin separately when using a slot, so we store the least of the
2163 	 * two provided when not using a slot.
2164 	 *
2165 	 * XXX: It might make sense to generalize the ephemeral slot concept and
2166 	 * always use the slot mechanism to handle the feedback xmin.
2167 	 */
2168 	if (MyReplicationSlot != NULL)	/* XXX: persistency configurable? */
2169 		PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2170 	else
2171 	{
2172 		if (TransactionIdIsNormal(feedbackCatalogXmin)
2173 			&& TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2174 			MyProc->xmin = feedbackCatalogXmin;
2175 		else
2176 			MyProc->xmin = feedbackXmin;
2177 	}
2178 }
2179 
2180 /*
2181  * Compute how long send/receive loops should sleep.
2182  *
2183  * If wal_sender_timeout is enabled we want to wake up in time to send
2184  * keepalives and to abort the connection if wal_sender_timeout has been
2185  * reached.
2186  */
2187 static long
WalSndComputeSleeptime(TimestampTz now)2188 WalSndComputeSleeptime(TimestampTz now)
2189 {
2190 	long		sleeptime = 10000;	/* 10 s */
2191 
2192 	if (wal_sender_timeout > 0 && last_reply_timestamp > 0)
2193 	{
2194 		TimestampTz wakeup_time;
2195 
2196 		/*
2197 		 * At the latest stop sleeping once wal_sender_timeout has been
2198 		 * reached.
2199 		 */
2200 		wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2201 												  wal_sender_timeout);
2202 
2203 		/*
2204 		 * If no ping has been sent yet, wakeup when it's time to do so.
2205 		 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2206 		 * the timeout passed without a response.
2207 		 */
2208 		if (!waiting_for_ping_response)
2209 			wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2210 													  wal_sender_timeout / 2);
2211 
2212 		/* Compute relative time until wakeup. */
2213 		sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2214 	}
2215 
2216 	return sleeptime;
2217 }
2218 
2219 /*
2220  * Check whether there have been responses by the client within
2221  * wal_sender_timeout and shutdown if not.  Using last_processing as the
2222  * reference point avoids counting server-side stalls against the client.
2223  * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2224  * postdate last_processing by more than wal_sender_timeout.  If that happens,
2225  * the client must reply almost immediately to avoid a timeout.  This rarely
2226  * affects the default configuration, under which clients spontaneously send a
2227  * message every standby_message_timeout = wal_sender_timeout/6 = 10s.  We
2228  * could eliminate that problem by recognizing timeout expiration at
2229  * wal_sender_timeout/2 after the keepalive.
2230  */
2231 static void
WalSndCheckTimeOut(void)2232 WalSndCheckTimeOut(void)
2233 {
2234 	TimestampTz timeout;
2235 
2236 	/* don't bail out if we're doing something that doesn't require timeouts */
2237 	if (last_reply_timestamp <= 0)
2238 		return;
2239 
2240 	timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
2241 										  wal_sender_timeout);
2242 
2243 	if (wal_sender_timeout > 0 && last_processing >= timeout)
2244 	{
2245 		/*
2246 		 * Since typically expiration of replication timeout means
2247 		 * communication problem, we don't send the error message to the
2248 		 * standby.
2249 		 */
2250 		ereport(COMMERROR,
2251 				(errmsg("terminating walsender process due to replication timeout")));
2252 
2253 		WalSndShutdown();
2254 	}
2255 }
2256 
2257 /* Main loop of walsender process that streams the WAL over Copy messages. */
2258 static void
WalSndLoop(WalSndSendDataCallback send_data)2259 WalSndLoop(WalSndSendDataCallback send_data)
2260 {
2261 	/*
2262 	 * Initialize the last reply timestamp. That enables timeout processing
2263 	 * from hereon.
2264 	 */
2265 	last_reply_timestamp = GetCurrentTimestamp();
2266 	waiting_for_ping_response = false;
2267 
2268 	/*
2269 	 * Loop until we reach the end of this timeline or the client requests to
2270 	 * stop streaming.
2271 	 */
2272 	for (;;)
2273 	{
2274 		/* Clear any already-pending wakeups */
2275 		ResetLatch(MyLatch);
2276 
2277 		CHECK_FOR_INTERRUPTS();
2278 
2279 		/* Process any requests or signals received recently */
2280 		if (ConfigReloadPending)
2281 		{
2282 			ConfigReloadPending = false;
2283 			ProcessConfigFile(PGC_SIGHUP);
2284 			SyncRepInitConfig();
2285 		}
2286 
2287 		/* Check for input from the client */
2288 		ProcessRepliesIfAny();
2289 
2290 		/*
2291 		 * If we have received CopyDone from the client, sent CopyDone
2292 		 * ourselves, and the output buffer is empty, it's time to exit
2293 		 * streaming.
2294 		 */
2295 		if (streamingDoneReceiving && streamingDoneSending &&
2296 			!pq_is_send_pending())
2297 			break;
2298 
2299 		/*
2300 		 * If we don't have any pending data in the output buffer, try to send
2301 		 * some more.  If there is some, we don't bother to call send_data
2302 		 * again until we've flushed it ... but we'd better assume we are not
2303 		 * caught up.
2304 		 */
2305 		if (!pq_is_send_pending())
2306 			send_data();
2307 		else
2308 			WalSndCaughtUp = false;
2309 
2310 		/* Try to flush pending output to the client */
2311 		if (pq_flush_if_writable() != 0)
2312 			WalSndShutdown();
2313 
2314 		/* If nothing remains to be sent right now ... */
2315 		if (WalSndCaughtUp && !pq_is_send_pending())
2316 		{
2317 			/*
2318 			 * If we're in catchup state, move to streaming.  This is an
2319 			 * important state change for users to know about, since before
2320 			 * this point data loss might occur if the primary dies and we
2321 			 * need to failover to the standby. The state change is also
2322 			 * important for synchronous replication, since commits that
2323 			 * started to wait at that point might wait for some time.
2324 			 */
2325 			if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2326 			{
2327 				ereport(DEBUG1,
2328 						(errmsg_internal("\"%s\" has now caught up with upstream server",
2329 										 application_name)));
2330 				WalSndSetState(WALSNDSTATE_STREAMING);
2331 			}
2332 
2333 			/*
2334 			 * When SIGUSR2 arrives, we send any outstanding logs up to the
2335 			 * shutdown checkpoint record (i.e., the latest record), wait for
2336 			 * them to be replicated to the standby, and exit. This may be a
2337 			 * normal termination at shutdown, or a promotion, the walsender
2338 			 * is not sure which.
2339 			 */
2340 			if (got_SIGUSR2)
2341 				WalSndDone(send_data);
2342 		}
2343 
2344 		/* Check for replication timeout. */
2345 		WalSndCheckTimeOut();
2346 
2347 		/* Send keepalive if the time has come */
2348 		WalSndKeepaliveIfNecessary();
2349 
2350 		/*
2351 		 * Block if we have unsent data.  XXX For logical replication, let
2352 		 * WalSndWaitForWal() handle any other blocking; idle receivers need
2353 		 * its additional actions.  For physical replication, also block if
2354 		 * caught up; its send_data does not block.
2355 		 */
2356 		if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2357 			 !streamingDoneSending) ||
2358 			pq_is_send_pending())
2359 		{
2360 			long		sleeptime;
2361 			int			wakeEvents;
2362 
2363 			if (!streamingDoneReceiving)
2364 				wakeEvents = WL_SOCKET_READABLE;
2365 			else
2366 				wakeEvents = 0;
2367 
2368 			/*
2369 			 * Use fresh timestamp, not last_processing, to reduce the chance
2370 			 * of reaching wal_sender_timeout before sending a keepalive.
2371 			 */
2372 			sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
2373 
2374 			if (pq_is_send_pending())
2375 				wakeEvents |= WL_SOCKET_WRITEABLE;
2376 
2377 			/* Sleep until something happens or we time out */
2378 			WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2379 		}
2380 	}
2381 }
2382 
2383 /* Initialize a per-walsender data structure for this walsender process */
2384 static void
InitWalSenderSlot(void)2385 InitWalSenderSlot(void)
2386 {
2387 	int			i;
2388 
2389 	/*
2390 	 * WalSndCtl should be set up already (we inherit this by fork() or
2391 	 * EXEC_BACKEND mechanism from the postmaster).
2392 	 */
2393 	Assert(WalSndCtl != NULL);
2394 	Assert(MyWalSnd == NULL);
2395 
2396 	/*
2397 	 * Find a free walsender slot and reserve it. This must not fail due to
2398 	 * the prior check for free WAL senders in InitProcess().
2399 	 */
2400 	for (i = 0; i < max_wal_senders; i++)
2401 	{
2402 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
2403 
2404 		SpinLockAcquire(&walsnd->mutex);
2405 
2406 		if (walsnd->pid != 0)
2407 		{
2408 			SpinLockRelease(&walsnd->mutex);
2409 			continue;
2410 		}
2411 		else
2412 		{
2413 			/*
2414 			 * Found a free slot. Reserve it for us.
2415 			 */
2416 			walsnd->pid = MyProcPid;
2417 			walsnd->state = WALSNDSTATE_STARTUP;
2418 			walsnd->sentPtr = InvalidXLogRecPtr;
2419 			walsnd->needreload = false;
2420 			walsnd->write = InvalidXLogRecPtr;
2421 			walsnd->flush = InvalidXLogRecPtr;
2422 			walsnd->apply = InvalidXLogRecPtr;
2423 			walsnd->writeLag = -1;
2424 			walsnd->flushLag = -1;
2425 			walsnd->applyLag = -1;
2426 			walsnd->sync_standby_priority = 0;
2427 			walsnd->latch = &MyProc->procLatch;
2428 			walsnd->replyTime = 0;
2429 			SpinLockRelease(&walsnd->mutex);
2430 			/* don't need the lock anymore */
2431 			MyWalSnd = (WalSnd *) walsnd;
2432 
2433 			break;
2434 		}
2435 	}
2436 
2437 	Assert(MyWalSnd != NULL);
2438 
2439 	/* Arrange to clean up at walsender exit */
2440 	on_shmem_exit(WalSndKill, 0);
2441 }
2442 
2443 /* Destroy the per-walsender data structure for this walsender process */
2444 static void
WalSndKill(int code,Datum arg)2445 WalSndKill(int code, Datum arg)
2446 {
2447 	WalSnd	   *walsnd = MyWalSnd;
2448 
2449 	Assert(walsnd != NULL);
2450 
2451 	MyWalSnd = NULL;
2452 
2453 	SpinLockAcquire(&walsnd->mutex);
2454 	/* clear latch while holding the spinlock, so it can safely be read */
2455 	walsnd->latch = NULL;
2456 	/* Mark WalSnd struct as no longer being in use. */
2457 	walsnd->pid = 0;
2458 	SpinLockRelease(&walsnd->mutex);
2459 }
2460 
2461 /* XLogReaderRoutine->segment_open callback */
2462 static void
WalSndSegmentOpen(XLogReaderState * state,XLogSegNo nextSegNo,TimeLineID * tli_p)2463 WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
2464 				  TimeLineID *tli_p)
2465 {
2466 	char		path[MAXPGPATH];
2467 
2468 	/*-------
2469 	 * When reading from a historic timeline, and there is a timeline switch
2470 	 * within this segment, read from the WAL segment belonging to the new
2471 	 * timeline.
2472 	 *
2473 	 * For example, imagine that this server is currently on timeline 5, and
2474 	 * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2475 	 * 0/13002088. In pg_wal, we have these files:
2476 	 *
2477 	 * ...
2478 	 * 000000040000000000000012
2479 	 * 000000040000000000000013
2480 	 * 000000050000000000000013
2481 	 * 000000050000000000000014
2482 	 * ...
2483 	 *
2484 	 * In this situation, when requested to send the WAL from segment 0x13, on
2485 	 * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2486 	 * recovery prefers files from newer timelines, so if the segment was
2487 	 * restored from the archive on this server, the file belonging to the old
2488 	 * timeline, 000000040000000000000013, might not exist. Their contents are
2489 	 * equal up to the switchpoint, because at a timeline switch, the used
2490 	 * portion of the old segment is copied to the new file.  -------
2491 	 */
2492 	*tli_p = sendTimeLine;
2493 	if (sendTimeLineIsHistoric)
2494 	{
2495 		XLogSegNo	endSegNo;
2496 
2497 		XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
2498 		if (nextSegNo == endSegNo)
2499 			*tli_p = sendTimeLineNextTLI;
2500 	}
2501 
2502 	XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2503 	state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2504 	if (state->seg.ws_file >= 0)
2505 		return;
2506 
2507 	/*
2508 	 * If the file is not found, assume it's because the standby asked for a
2509 	 * too old WAL segment that has already been removed or recycled.
2510 	 */
2511 	if (errno == ENOENT)
2512 	{
2513 		char		xlogfname[MAXFNAMELEN];
2514 		int			save_errno = errno;
2515 
2516 		XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2517 		errno = save_errno;
2518 		ereport(ERROR,
2519 				(errcode_for_file_access(),
2520 				 errmsg("requested WAL segment %s has already been removed",
2521 						xlogfname)));
2522 	}
2523 	else
2524 		ereport(ERROR,
2525 				(errcode_for_file_access(),
2526 				 errmsg("could not open file \"%s\": %m",
2527 						path)));
2528 }
2529 
2530 /*
2531  * Send out the WAL in its normal physical/stored form.
2532  *
2533  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2534  * but not yet sent to the client, and buffer it in the libpq output
2535  * buffer.
2536  *
2537  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2538  * otherwise WalSndCaughtUp is set to false.
2539  */
2540 static void
XLogSendPhysical(void)2541 XLogSendPhysical(void)
2542 {
2543 	XLogRecPtr	SendRqstPtr;
2544 	XLogRecPtr	startptr;
2545 	XLogRecPtr	endptr;
2546 	Size		nbytes;
2547 	XLogSegNo	segno;
2548 	WALReadError errinfo;
2549 
2550 	/* If requested switch the WAL sender to the stopping state. */
2551 	if (got_STOPPING)
2552 		WalSndSetState(WALSNDSTATE_STOPPING);
2553 
2554 	if (streamingDoneSending)
2555 	{
2556 		WalSndCaughtUp = true;
2557 		return;
2558 	}
2559 
2560 	/* Figure out how far we can safely send the WAL. */
2561 	if (sendTimeLineIsHistoric)
2562 	{
2563 		/*
2564 		 * Streaming an old timeline that's in this server's history, but is
2565 		 * not the one we're currently inserting or replaying. It can be
2566 		 * streamed up to the point where we switched off that timeline.
2567 		 */
2568 		SendRqstPtr = sendTimeLineValidUpto;
2569 	}
2570 	else if (am_cascading_walsender)
2571 	{
2572 		/*
2573 		 * Streaming the latest timeline on a standby.
2574 		 *
2575 		 * Attempt to send all WAL that has already been replayed, so that we
2576 		 * know it's valid. If we're receiving WAL through streaming
2577 		 * replication, it's also OK to send any WAL that has been received
2578 		 * but not replayed.
2579 		 *
2580 		 * The timeline we're recovering from can change, or we can be
2581 		 * promoted. In either case, the current timeline becomes historic. We
2582 		 * need to detect that so that we don't try to stream past the point
2583 		 * where we switched to another timeline. We check for promotion or
2584 		 * timeline switch after calculating FlushPtr, to avoid a race
2585 		 * condition: if the timeline becomes historic just after we checked
2586 		 * that it was still current, it's still be OK to stream it up to the
2587 		 * FlushPtr that was calculated before it became historic.
2588 		 */
2589 		bool		becameHistoric = false;
2590 
2591 		SendRqstPtr = GetStandbyFlushRecPtr();
2592 
2593 		if (!RecoveryInProgress())
2594 		{
2595 			/*
2596 			 * We have been promoted. RecoveryInProgress() updated
2597 			 * ThisTimeLineID to the new current timeline.
2598 			 */
2599 			am_cascading_walsender = false;
2600 			becameHistoric = true;
2601 		}
2602 		else
2603 		{
2604 			/*
2605 			 * Still a cascading standby. But is the timeline we're sending
2606 			 * still the one recovery is recovering from? ThisTimeLineID was
2607 			 * updated by the GetStandbyFlushRecPtr() call above.
2608 			 */
2609 			if (sendTimeLine != ThisTimeLineID)
2610 				becameHistoric = true;
2611 		}
2612 
2613 		if (becameHistoric)
2614 		{
2615 			/*
2616 			 * The timeline we were sending has become historic. Read the
2617 			 * timeline history file of the new timeline to see where exactly
2618 			 * we forked off from the timeline we were sending.
2619 			 */
2620 			List	   *history;
2621 
2622 			history = readTimeLineHistory(ThisTimeLineID);
2623 			sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
2624 
2625 			Assert(sendTimeLine < sendTimeLineNextTLI);
2626 			list_free_deep(history);
2627 
2628 			sendTimeLineIsHistoric = true;
2629 
2630 			SendRqstPtr = sendTimeLineValidUpto;
2631 		}
2632 	}
2633 	else
2634 	{
2635 		/*
2636 		 * Streaming the current timeline on a primary.
2637 		 *
2638 		 * Attempt to send all data that's already been written out and
2639 		 * fsync'd to disk.  We cannot go further than what's been written out
2640 		 * given the current implementation of WALRead().  And in any case
2641 		 * it's unsafe to send WAL that is not securely down to disk on the
2642 		 * primary: if the primary subsequently crashes and restarts, standbys
2643 		 * must not have applied any WAL that got lost on the primary.
2644 		 */
2645 		SendRqstPtr = GetFlushRecPtr();
2646 	}
2647 
2648 	/*
2649 	 * Record the current system time as an approximation of the time at which
2650 	 * this WAL location was written for the purposes of lag tracking.
2651 	 *
2652 	 * In theory we could make XLogFlush() record a time in shmem whenever WAL
2653 	 * is flushed and we could get that time as well as the LSN when we call
2654 	 * GetFlushRecPtr() above (and likewise for the cascading standby
2655 	 * equivalent), but rather than putting any new code into the hot WAL path
2656 	 * it seems good enough to capture the time here.  We should reach this
2657 	 * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2658 	 * may take some time, we read the WAL flush pointer and take the time
2659 	 * very close to together here so that we'll get a later position if it is
2660 	 * still moving.
2661 	 *
2662 	 * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
2663 	 * this gives us a cheap approximation for the WAL flush time for this
2664 	 * LSN.
2665 	 *
2666 	 * Note that the LSN is not necessarily the LSN for the data contained in
2667 	 * the present message; it's the end of the WAL, which might be further
2668 	 * ahead.  All the lag tracking machinery cares about is finding out when
2669 	 * that arbitrary LSN is eventually reported as written, flushed and
2670 	 * applied, so that it can measure the elapsed time.
2671 	 */
2672 	LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2673 
2674 	/*
2675 	 * If this is a historic timeline and we've reached the point where we
2676 	 * forked to the next timeline, stop streaming.
2677 	 *
2678 	 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2679 	 * startup process will normally replay all WAL that has been received
2680 	 * from the primary, before promoting, but if the WAL streaming is
2681 	 * terminated at a WAL page boundary, the valid portion of the timeline
2682 	 * might end in the middle of a WAL record. We might've already sent the
2683 	 * first half of that partial WAL record to the cascading standby, so that
2684 	 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2685 	 * replay the partial WAL record either, so it can still follow our
2686 	 * timeline switch.
2687 	 */
2688 	if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
2689 	{
2690 		/* close the current file. */
2691 		if (xlogreader->seg.ws_file >= 0)
2692 			wal_segment_close(xlogreader);
2693 
2694 		/* Send CopyDone */
2695 		pq_putmessage_noblock('c', NULL, 0);
2696 		streamingDoneSending = true;
2697 
2698 		WalSndCaughtUp = true;
2699 
2700 		elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2701 			 LSN_FORMAT_ARGS(sendTimeLineValidUpto),
2702 			 LSN_FORMAT_ARGS(sentPtr));
2703 		return;
2704 	}
2705 
2706 	/* Do we have any work to do? */
2707 	Assert(sentPtr <= SendRqstPtr);
2708 	if (SendRqstPtr <= sentPtr)
2709 	{
2710 		WalSndCaughtUp = true;
2711 		return;
2712 	}
2713 
2714 	/*
2715 	 * Figure out how much to send in one message. If there's no more than
2716 	 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2717 	 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2718 	 *
2719 	 * The rounding is not only for performance reasons. Walreceiver relies on
2720 	 * the fact that we never split a WAL record across two messages. Since a
2721 	 * long WAL record is split at page boundary into continuation records,
2722 	 * page boundary is always a safe cut-off point. We also assume that
2723 	 * SendRqstPtr never points to the middle of a WAL record.
2724 	 */
2725 	startptr = sentPtr;
2726 	endptr = startptr;
2727 	endptr += MAX_SEND_SIZE;
2728 
2729 	/* if we went beyond SendRqstPtr, back off */
2730 	if (SendRqstPtr <= endptr)
2731 	{
2732 		endptr = SendRqstPtr;
2733 		if (sendTimeLineIsHistoric)
2734 			WalSndCaughtUp = false;
2735 		else
2736 			WalSndCaughtUp = true;
2737 	}
2738 	else
2739 	{
2740 		/* round down to page boundary. */
2741 		endptr -= (endptr % XLOG_BLCKSZ);
2742 		WalSndCaughtUp = false;
2743 	}
2744 
2745 	nbytes = endptr - startptr;
2746 	Assert(nbytes <= MAX_SEND_SIZE);
2747 
2748 	/*
2749 	 * OK to read and send the slice.
2750 	 */
2751 	resetStringInfo(&output_message);
2752 	pq_sendbyte(&output_message, 'w');
2753 
2754 	pq_sendint64(&output_message, startptr);	/* dataStart */
2755 	pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2756 	pq_sendint64(&output_message, 0);	/* sendtime, filled in last */
2757 
2758 	/*
2759 	 * Read the log directly into the output buffer to avoid extra memcpy
2760 	 * calls.
2761 	 */
2762 	enlargeStringInfo(&output_message, nbytes);
2763 
2764 retry:
2765 	if (!WALRead(xlogreader,
2766 				 &output_message.data[output_message.len],
2767 				 startptr,
2768 				 nbytes,
2769 				 xlogreader->seg.ws_tli,	/* Pass the current TLI because
2770 											 * only WalSndSegmentOpen controls
2771 											 * whether new TLI is needed. */
2772 				 &errinfo))
2773 		WALReadRaiseError(&errinfo);
2774 
2775 	/* See logical_read_xlog_page(). */
2776 	XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
2777 	CheckXLogRemoved(segno, xlogreader->seg.ws_tli);
2778 
2779 	/*
2780 	 * During recovery, the currently-open WAL file might be replaced with the
2781 	 * file of the same name retrieved from archive. So we always need to
2782 	 * check what we read was valid after reading into the buffer. If it's
2783 	 * invalid, we try to open and read the file again.
2784 	 */
2785 	if (am_cascading_walsender)
2786 	{
2787 		WalSnd	   *walsnd = MyWalSnd;
2788 		bool		reload;
2789 
2790 		SpinLockAcquire(&walsnd->mutex);
2791 		reload = walsnd->needreload;
2792 		walsnd->needreload = false;
2793 		SpinLockRelease(&walsnd->mutex);
2794 
2795 		if (reload && xlogreader->seg.ws_file >= 0)
2796 		{
2797 			wal_segment_close(xlogreader);
2798 
2799 			goto retry;
2800 		}
2801 	}
2802 
2803 	output_message.len += nbytes;
2804 	output_message.data[output_message.len] = '\0';
2805 
2806 	/*
2807 	 * Fill the send timestamp last, so that it is taken as late as possible.
2808 	 */
2809 	resetStringInfo(&tmpbuf);
2810 	pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2811 	memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2812 		   tmpbuf.data, sizeof(int64));
2813 
2814 	pq_putmessage_noblock('d', output_message.data, output_message.len);
2815 
2816 	sentPtr = endptr;
2817 
2818 	/* Update shared memory status */
2819 	{
2820 		WalSnd	   *walsnd = MyWalSnd;
2821 
2822 		SpinLockAcquire(&walsnd->mutex);
2823 		walsnd->sentPtr = sentPtr;
2824 		SpinLockRelease(&walsnd->mutex);
2825 	}
2826 
2827 	/* Report progress of XLOG streaming in PS display */
2828 	if (update_process_title)
2829 	{
2830 		char		activitymsg[50];
2831 
2832 		snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2833 				 LSN_FORMAT_ARGS(sentPtr));
2834 		set_ps_display(activitymsg);
2835 	}
2836 }
2837 
2838 /*
2839  * Stream out logically decoded data.
2840  */
2841 static void
XLogSendLogical(void)2842 XLogSendLogical(void)
2843 {
2844 	XLogRecord *record;
2845 	char	   *errm;
2846 
2847 	/*
2848 	 * We'll use the current flush point to determine whether we've caught up.
2849 	 * This variable is static in order to cache it across calls.  Caching is
2850 	 * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
2851 	 * spinlock.
2852 	 */
2853 	static XLogRecPtr flushPtr = InvalidXLogRecPtr;
2854 
2855 	/*
2856 	 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2857 	 * true in WalSndWaitForWal, if we're actually waiting. We also set to
2858 	 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2859 	 * didn't wait - i.e. when we're shutting down.
2860 	 */
2861 	WalSndCaughtUp = false;
2862 
2863 	record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
2864 
2865 	/* xlog record was invalid */
2866 	if (errm != NULL)
2867 		elog(ERROR, "%s", errm);
2868 
2869 	if (record != NULL)
2870 	{
2871 		/*
2872 		 * Note the lack of any call to LagTrackerWrite() which is handled by
2873 		 * WalSndUpdateProgress which is called by output plugin through
2874 		 * logical decoding write api.
2875 		 */
2876 		LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2877 
2878 		sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2879 	}
2880 
2881 	/*
2882 	 * If first time through in this session, initialize flushPtr.  Otherwise,
2883 	 * we only need to update flushPtr if EndRecPtr is past it.
2884 	 */
2885 	if (flushPtr == InvalidXLogRecPtr)
2886 		flushPtr = GetFlushRecPtr();
2887 	else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2888 		flushPtr = GetFlushRecPtr();
2889 
2890 	/* If EndRecPtr is still past our flushPtr, it means we caught up. */
2891 	if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2892 		WalSndCaughtUp = true;
2893 
2894 	/*
2895 	 * If we're caught up and have been requested to stop, have WalSndLoop()
2896 	 * terminate the connection in an orderly manner, after writing out all
2897 	 * the pending data.
2898 	 */
2899 	if (WalSndCaughtUp && got_STOPPING)
2900 		got_SIGUSR2 = true;
2901 
2902 	/* Update shared memory status */
2903 	{
2904 		WalSnd	   *walsnd = MyWalSnd;
2905 
2906 		SpinLockAcquire(&walsnd->mutex);
2907 		walsnd->sentPtr = sentPtr;
2908 		SpinLockRelease(&walsnd->mutex);
2909 	}
2910 }
2911 
2912 /*
2913  * Shutdown if the sender is caught up.
2914  *
2915  * NB: This should only be called when the shutdown signal has been received
2916  * from postmaster.
2917  *
2918  * Note that if we determine that there's still more data to send, this
2919  * function will return control to the caller.
2920  */
2921 static void
WalSndDone(WalSndSendDataCallback send_data)2922 WalSndDone(WalSndSendDataCallback send_data)
2923 {
2924 	XLogRecPtr	replicatedPtr;
2925 
2926 	/* ... let's just be real sure we're caught up ... */
2927 	send_data();
2928 
2929 	/*
2930 	 * To figure out whether all WAL has successfully been replicated, check
2931 	 * flush location if valid, write otherwise. Tools like pg_receivewal will
2932 	 * usually (unless in synchronous mode) return an invalid flush location.
2933 	 */
2934 	replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2935 		MyWalSnd->write : MyWalSnd->flush;
2936 
2937 	if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2938 		!pq_is_send_pending())
2939 	{
2940 		QueryCompletion qc;
2941 
2942 		/* Inform the standby that XLOG streaming is done */
2943 		SetQueryCompletion(&qc, CMDTAG_COPY, 0);
2944 		EndCommand(&qc, DestRemote, false);
2945 		pq_flush();
2946 
2947 		proc_exit(0);
2948 	}
2949 	if (!waiting_for_ping_response)
2950 		WalSndKeepalive(true);
2951 }
2952 
2953 /*
2954  * Returns the latest point in WAL that has been safely flushed to disk, and
2955  * can be sent to the standby. This should only be called when in recovery,
2956  * ie. we're streaming to a cascaded standby.
2957  *
2958  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2959  * replayed WAL record.
2960  */
2961 static XLogRecPtr
GetStandbyFlushRecPtr(void)2962 GetStandbyFlushRecPtr(void)
2963 {
2964 	XLogRecPtr	replayPtr;
2965 	TimeLineID	replayTLI;
2966 	XLogRecPtr	receivePtr;
2967 	TimeLineID	receiveTLI;
2968 	XLogRecPtr	result;
2969 
2970 	/*
2971 	 * We can safely send what's already been replayed. Also, if walreceiver
2972 	 * is streaming WAL from the same timeline, we can send anything that it
2973 	 * has streamed, but hasn't been replayed yet.
2974 	 */
2975 
2976 	receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
2977 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
2978 
2979 	ThisTimeLineID = replayTLI;
2980 
2981 	result = replayPtr;
2982 	if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2983 		result = receivePtr;
2984 
2985 	return result;
2986 }
2987 
2988 /*
2989  * Request walsenders to reload the currently-open WAL file
2990  */
2991 void
WalSndRqstFileReload(void)2992 WalSndRqstFileReload(void)
2993 {
2994 	int			i;
2995 
2996 	for (i = 0; i < max_wal_senders; i++)
2997 	{
2998 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
2999 
3000 		SpinLockAcquire(&walsnd->mutex);
3001 		if (walsnd->pid == 0)
3002 		{
3003 			SpinLockRelease(&walsnd->mutex);
3004 			continue;
3005 		}
3006 		walsnd->needreload = true;
3007 		SpinLockRelease(&walsnd->mutex);
3008 	}
3009 }
3010 
3011 /*
3012  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
3013  */
3014 void
HandleWalSndInitStopping(void)3015 HandleWalSndInitStopping(void)
3016 {
3017 	Assert(am_walsender);
3018 
3019 	/*
3020 	 * If replication has not yet started, die like with SIGTERM. If
3021 	 * replication is active, only set a flag and wake up the main loop. It
3022 	 * will send any outstanding WAL, wait for it to be replicated to the
3023 	 * standby, and then exit gracefully.
3024 	 */
3025 	if (!replication_active)
3026 		kill(MyProcPid, SIGTERM);
3027 	else
3028 		got_STOPPING = true;
3029 }
3030 
3031 /*
3032  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
3033  * sender should already have been switched to WALSNDSTATE_STOPPING at
3034  * this point.
3035  */
3036 static void
WalSndLastCycleHandler(SIGNAL_ARGS)3037 WalSndLastCycleHandler(SIGNAL_ARGS)
3038 {
3039 	int			save_errno = errno;
3040 
3041 	got_SIGUSR2 = true;
3042 	SetLatch(MyLatch);
3043 
3044 	errno = save_errno;
3045 }
3046 
3047 /* Set up signal handlers */
3048 void
WalSndSignals(void)3049 WalSndSignals(void)
3050 {
3051 	/* Set up signal handlers */
3052 	pqsignal(SIGHUP, SignalHandlerForConfigReload);
3053 	pqsignal(SIGINT, StatementCancelHandler);	/* query cancel */
3054 	pqsignal(SIGTERM, die);		/* request shutdown */
3055 	/* SIGQUIT handler was already set up by InitPostmasterChild */
3056 	InitializeTimeouts();		/* establishes SIGALRM handler */
3057 	pqsignal(SIGPIPE, SIG_IGN);
3058 	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
3059 	pqsignal(SIGUSR2, WalSndLastCycleHandler);	/* request a last cycle and
3060 												 * shutdown */
3061 
3062 	/* Reset some signals that are accepted by postmaster but not here */
3063 	pqsignal(SIGCHLD, SIG_DFL);
3064 }
3065 
3066 /* Report shared-memory space needed by WalSndShmemInit */
3067 Size
WalSndShmemSize(void)3068 WalSndShmemSize(void)
3069 {
3070 	Size		size = 0;
3071 
3072 	size = offsetof(WalSndCtlData, walsnds);
3073 	size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3074 
3075 	return size;
3076 }
3077 
3078 /* Allocate and initialize walsender-related shared memory */
3079 void
WalSndShmemInit(void)3080 WalSndShmemInit(void)
3081 {
3082 	bool		found;
3083 	int			i;
3084 
3085 	WalSndCtl = (WalSndCtlData *)
3086 		ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3087 
3088 	if (!found)
3089 	{
3090 		/* First time through, so initialize */
3091 		MemSet(WalSndCtl, 0, WalSndShmemSize());
3092 
3093 		for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3094 			SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3095 
3096 		for (i = 0; i < max_wal_senders; i++)
3097 		{
3098 			WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
3099 
3100 			SpinLockInit(&walsnd->mutex);
3101 		}
3102 	}
3103 }
3104 
3105 /*
3106  * Wake up all walsenders
3107  *
3108  * This will be called inside critical sections, so throwing an error is not
3109  * advisable.
3110  */
3111 void
WalSndWakeup(void)3112 WalSndWakeup(void)
3113 {
3114 	int			i;
3115 
3116 	for (i = 0; i < max_wal_senders; i++)
3117 	{
3118 		Latch	   *latch;
3119 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
3120 
3121 		/*
3122 		 * Get latch pointer with spinlock held, for the unlikely case that
3123 		 * pointer reads aren't atomic (as they're 8 bytes).
3124 		 */
3125 		SpinLockAcquire(&walsnd->mutex);
3126 		latch = walsnd->latch;
3127 		SpinLockRelease(&walsnd->mutex);
3128 
3129 		if (latch != NULL)
3130 			SetLatch(latch);
3131 	}
3132 }
3133 
3134 /*
3135  * Wait for readiness on the FeBe socket, or a timeout.  The mask should be
3136  * composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags.  Exit
3137  * on postmaster death.
3138  */
3139 static void
WalSndWait(uint32 socket_events,long timeout,uint32 wait_event)3140 WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
3141 {
3142 	WaitEvent	event;
3143 
3144 	ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3145 	if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3146 		(event.events & WL_POSTMASTER_DEATH))
3147 		proc_exit(1);
3148 }
3149 
3150 /*
3151  * Signal all walsenders to move to stopping state.
3152  *
3153  * This will trigger walsenders to move to a state where no further WAL can be
3154  * generated. See this file's header for details.
3155  */
3156 void
WalSndInitStopping(void)3157 WalSndInitStopping(void)
3158 {
3159 	int			i;
3160 
3161 	for (i = 0; i < max_wal_senders; i++)
3162 	{
3163 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
3164 		pid_t		pid;
3165 
3166 		SpinLockAcquire(&walsnd->mutex);
3167 		pid = walsnd->pid;
3168 		SpinLockRelease(&walsnd->mutex);
3169 
3170 		if (pid == 0)
3171 			continue;
3172 
3173 		SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId);
3174 	}
3175 }
3176 
3177 /*
3178  * Wait that all the WAL senders have quit or reached the stopping state. This
3179  * is used by the checkpointer to control when the shutdown checkpoint can
3180  * safely be performed.
3181  */
3182 void
WalSndWaitStopping(void)3183 WalSndWaitStopping(void)
3184 {
3185 	for (;;)
3186 	{
3187 		int			i;
3188 		bool		all_stopped = true;
3189 
3190 		for (i = 0; i < max_wal_senders; i++)
3191 		{
3192 			WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
3193 
3194 			SpinLockAcquire(&walsnd->mutex);
3195 
3196 			if (walsnd->pid == 0)
3197 			{
3198 				SpinLockRelease(&walsnd->mutex);
3199 				continue;
3200 			}
3201 
3202 			if (walsnd->state != WALSNDSTATE_STOPPING)
3203 			{
3204 				all_stopped = false;
3205 				SpinLockRelease(&walsnd->mutex);
3206 				break;
3207 			}
3208 			SpinLockRelease(&walsnd->mutex);
3209 		}
3210 
3211 		/* safe to leave if confirmation is done for all WAL senders */
3212 		if (all_stopped)
3213 			return;
3214 
3215 		pg_usleep(10000L);		/* wait for 10 msec */
3216 	}
3217 }
3218 
3219 /* Set state for current walsender (only called in walsender) */
3220 void
WalSndSetState(WalSndState state)3221 WalSndSetState(WalSndState state)
3222 {
3223 	WalSnd	   *walsnd = MyWalSnd;
3224 
3225 	Assert(am_walsender);
3226 
3227 	if (walsnd->state == state)
3228 		return;
3229 
3230 	SpinLockAcquire(&walsnd->mutex);
3231 	walsnd->state = state;
3232 	SpinLockRelease(&walsnd->mutex);
3233 }
3234 
3235 /*
3236  * Return a string constant representing the state. This is used
3237  * in system views, and should *not* be translated.
3238  */
3239 static const char *
WalSndGetStateString(WalSndState state)3240 WalSndGetStateString(WalSndState state)
3241 {
3242 	switch (state)
3243 	{
3244 		case WALSNDSTATE_STARTUP:
3245 			return "startup";
3246 		case WALSNDSTATE_BACKUP:
3247 			return "backup";
3248 		case WALSNDSTATE_CATCHUP:
3249 			return "catchup";
3250 		case WALSNDSTATE_STREAMING:
3251 			return "streaming";
3252 		case WALSNDSTATE_STOPPING:
3253 			return "stopping";
3254 	}
3255 	return "UNKNOWN";
3256 }
3257 
3258 static Interval *
offset_to_interval(TimeOffset offset)3259 offset_to_interval(TimeOffset offset)
3260 {
3261 	Interval   *result = palloc(sizeof(Interval));
3262 
3263 	result->month = 0;
3264 	result->day = 0;
3265 	result->time = offset;
3266 
3267 	return result;
3268 }
3269 
3270 /*
3271  * Returns activity of walsenders, including pids and xlog locations sent to
3272  * standby servers.
3273  */
3274 Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)3275 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
3276 {
3277 #define PG_STAT_GET_WAL_SENDERS_COLS	12
3278 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3279 	TupleDesc	tupdesc;
3280 	Tuplestorestate *tupstore;
3281 	MemoryContext per_query_ctx;
3282 	MemoryContext oldcontext;
3283 	SyncRepStandbyData *sync_standbys;
3284 	int			num_standbys;
3285 	int			i;
3286 
3287 	/* check to see if caller supports us returning a tuplestore */
3288 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3289 		ereport(ERROR,
3290 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3291 				 errmsg("set-valued function called in context that cannot accept a set")));
3292 	if (!(rsinfo->allowedModes & SFRM_Materialize))
3293 		ereport(ERROR,
3294 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3295 				 errmsg("materialize mode required, but it is not allowed in this context")));
3296 
3297 	/* Build a tuple descriptor for our result type */
3298 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3299 		elog(ERROR, "return type must be a row type");
3300 
3301 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3302 	oldcontext = MemoryContextSwitchTo(per_query_ctx);
3303 
3304 	tupstore = tuplestore_begin_heap(true, false, work_mem);
3305 	rsinfo->returnMode = SFRM_Materialize;
3306 	rsinfo->setResult = tupstore;
3307 	rsinfo->setDesc = tupdesc;
3308 
3309 	MemoryContextSwitchTo(oldcontext);
3310 
3311 	/*
3312 	 * Get the currently active synchronous standbys.  This could be out of
3313 	 * date before we're done, but we'll use the data anyway.
3314 	 */
3315 	num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3316 
3317 	for (i = 0; i < max_wal_senders; i++)
3318 	{
3319 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
3320 		XLogRecPtr	sentPtr;
3321 		XLogRecPtr	write;
3322 		XLogRecPtr	flush;
3323 		XLogRecPtr	apply;
3324 		TimeOffset	writeLag;
3325 		TimeOffset	flushLag;
3326 		TimeOffset	applyLag;
3327 		int			priority;
3328 		int			pid;
3329 		WalSndState state;
3330 		TimestampTz replyTime;
3331 		bool		is_sync_standby;
3332 		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
3333 		bool		nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3334 		int			j;
3335 
3336 		/* Collect data from shared memory */
3337 		SpinLockAcquire(&walsnd->mutex);
3338 		if (walsnd->pid == 0)
3339 		{
3340 			SpinLockRelease(&walsnd->mutex);
3341 			continue;
3342 		}
3343 		pid = walsnd->pid;
3344 		sentPtr = walsnd->sentPtr;
3345 		state = walsnd->state;
3346 		write = walsnd->write;
3347 		flush = walsnd->flush;
3348 		apply = walsnd->apply;
3349 		writeLag = walsnd->writeLag;
3350 		flushLag = walsnd->flushLag;
3351 		applyLag = walsnd->applyLag;
3352 		priority = walsnd->sync_standby_priority;
3353 		replyTime = walsnd->replyTime;
3354 		SpinLockRelease(&walsnd->mutex);
3355 
3356 		/*
3357 		 * Detect whether walsender is/was considered synchronous.  We can
3358 		 * provide some protection against stale data by checking the PID
3359 		 * along with walsnd_index.
3360 		 */
3361 		is_sync_standby = false;
3362 		for (j = 0; j < num_standbys; j++)
3363 		{
3364 			if (sync_standbys[j].walsnd_index == i &&
3365 				sync_standbys[j].pid == pid)
3366 			{
3367 				is_sync_standby = true;
3368 				break;
3369 			}
3370 		}
3371 
3372 		memset(nulls, 0, sizeof(nulls));
3373 		values[0] = Int32GetDatum(pid);
3374 
3375 		if (!is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3376 		{
3377 			/*
3378 			 * Only superusers and members of pg_read_all_stats can see
3379 			 * details. Other users only get the pid value to know it's a
3380 			 * walsender, but no details.
3381 			 */
3382 			MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3383 		}
3384 		else
3385 		{
3386 			values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3387 
3388 			if (XLogRecPtrIsInvalid(sentPtr))
3389 				nulls[2] = true;
3390 			values[2] = LSNGetDatum(sentPtr);
3391 
3392 			if (XLogRecPtrIsInvalid(write))
3393 				nulls[3] = true;
3394 			values[3] = LSNGetDatum(write);
3395 
3396 			if (XLogRecPtrIsInvalid(flush))
3397 				nulls[4] = true;
3398 			values[4] = LSNGetDatum(flush);
3399 
3400 			if (XLogRecPtrIsInvalid(apply))
3401 				nulls[5] = true;
3402 			values[5] = LSNGetDatum(apply);
3403 
3404 			/*
3405 			 * Treat a standby such as a pg_basebackup background process
3406 			 * which always returns an invalid flush location, as an
3407 			 * asynchronous standby.
3408 			 */
3409 			priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3410 
3411 			if (writeLag < 0)
3412 				nulls[6] = true;
3413 			else
3414 				values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3415 
3416 			if (flushLag < 0)
3417 				nulls[7] = true;
3418 			else
3419 				values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3420 
3421 			if (applyLag < 0)
3422 				nulls[8] = true;
3423 			else
3424 				values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3425 
3426 			values[9] = Int32GetDatum(priority);
3427 
3428 			/*
3429 			 * More easily understood version of standby state. This is purely
3430 			 * informational.
3431 			 *
3432 			 * In quorum-based sync replication, the role of each standby
3433 			 * listed in synchronous_standby_names can be changing very
3434 			 * frequently. Any standbys considered as "sync" at one moment can
3435 			 * be switched to "potential" ones at the next moment. So, it's
3436 			 * basically useless to report "sync" or "potential" as their sync
3437 			 * states. We report just "quorum" for them.
3438 			 */
3439 			if (priority == 0)
3440 				values[10] = CStringGetTextDatum("async");
3441 			else if (is_sync_standby)
3442 				values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
3443 					CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3444 			else
3445 				values[10] = CStringGetTextDatum("potential");
3446 
3447 			if (replyTime == 0)
3448 				nulls[11] = true;
3449 			else
3450 				values[11] = TimestampTzGetDatum(replyTime);
3451 		}
3452 
3453 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3454 	}
3455 
3456 	/* clean up and return the tuplestore */
3457 	tuplestore_donestoring(tupstore);
3458 
3459 	return (Datum) 0;
3460 }
3461 
3462 /*
3463  * Send a keepalive message to standby.
3464  *
3465  * If requestReply is set, the message requests the other party to send
3466  * a message back to us, for heartbeat purposes.  We also set a flag to
3467  * let nearby code that we're waiting for that response, to avoid
3468  * repeated requests.
3469  */
3470 static void
WalSndKeepalive(bool requestReply)3471 WalSndKeepalive(bool requestReply)
3472 {
3473 	elog(DEBUG2, "sending replication keepalive");
3474 
3475 	/* construct the message... */
3476 	resetStringInfo(&output_message);
3477 	pq_sendbyte(&output_message, 'k');
3478 	pq_sendint64(&output_message, sentPtr);
3479 	pq_sendint64(&output_message, GetCurrentTimestamp());
3480 	pq_sendbyte(&output_message, requestReply ? 1 : 0);
3481 
3482 	/* ... and send it wrapped in CopyData */
3483 	pq_putmessage_noblock('d', output_message.data, output_message.len);
3484 
3485 	/* Set local flag */
3486 	if (requestReply)
3487 		waiting_for_ping_response = true;
3488 }
3489 
3490 /*
3491  * Send keepalive message if too much time has elapsed.
3492  */
3493 static void
WalSndKeepaliveIfNecessary(void)3494 WalSndKeepaliveIfNecessary(void)
3495 {
3496 	TimestampTz ping_time;
3497 
3498 	/*
3499 	 * Don't send keepalive messages if timeouts are globally disabled or
3500 	 * we're doing something not partaking in timeouts.
3501 	 */
3502 	if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3503 		return;
3504 
3505 	if (waiting_for_ping_response)
3506 		return;
3507 
3508 	/*
3509 	 * If half of wal_sender_timeout has lapsed without receiving any reply
3510 	 * from the standby, send a keep-alive message to the standby requesting
3511 	 * an immediate reply.
3512 	 */
3513 	ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
3514 											wal_sender_timeout / 2);
3515 	if (last_processing >= ping_time)
3516 	{
3517 		WalSndKeepalive(true);
3518 
3519 		/* Try to flush pending output to the client */
3520 		if (pq_flush_if_writable() != 0)
3521 			WalSndShutdown();
3522 	}
3523 }
3524 
3525 /*
3526  * Record the end of the WAL and the time it was flushed locally, so that
3527  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3528  * eventually reported to have been written, flushed and applied by the
3529  * standby in a reply message.
3530  */
3531 static void
LagTrackerWrite(XLogRecPtr lsn,TimestampTz local_flush_time)3532 LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
3533 {
3534 	bool		buffer_full;
3535 	int			new_write_head;
3536 	int			i;
3537 
3538 	if (!am_walsender)
3539 		return;
3540 
3541 	/*
3542 	 * If the lsn hasn't advanced since last time, then do nothing.  This way
3543 	 * we only record a new sample when new WAL has been written.
3544 	 */
3545 	if (lag_tracker->last_lsn == lsn)
3546 		return;
3547 	lag_tracker->last_lsn = lsn;
3548 
3549 	/*
3550 	 * If advancing the write head of the circular buffer would crash into any
3551 	 * of the read heads, then the buffer is full.  In other words, the
3552 	 * slowest reader (presumably apply) is the one that controls the release
3553 	 * of space.
3554 	 */
3555 	new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3556 	buffer_full = false;
3557 	for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3558 	{
3559 		if (new_write_head == lag_tracker->read_heads[i])
3560 			buffer_full = true;
3561 	}
3562 
3563 	/*
3564 	 * If the buffer is full, for now we just rewind by one slot and overwrite
3565 	 * the last sample, as a simple (if somewhat uneven) way to lower the
3566 	 * sampling rate.  There may be better adaptive compaction algorithms.
3567 	 */
3568 	if (buffer_full)
3569 	{
3570 		new_write_head = lag_tracker->write_head;
3571 		if (lag_tracker->write_head > 0)
3572 			lag_tracker->write_head--;
3573 		else
3574 			lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3575 	}
3576 
3577 	/* Store a sample at the current write head position. */
3578 	lag_tracker->buffer[lag_tracker->write_head].lsn = lsn;
3579 	lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3580 	lag_tracker->write_head = new_write_head;
3581 }
3582 
3583 /*
3584  * Find out how much time has elapsed between the moment WAL location 'lsn'
3585  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3586  * We have a separate read head for each of the reported LSN locations we
3587  * receive in replies from standby; 'head' controls which read head is
3588  * used.  Whenever a read head crosses an LSN which was written into the
3589  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3590  * find out the time this LSN (or an earlier one) was flushed locally, and
3591  * therefore compute the lag.
3592  *
3593  * Return -1 if no new sample data is available, and otherwise the elapsed
3594  * time in microseconds.
3595  */
3596 static TimeOffset
LagTrackerRead(int head,XLogRecPtr lsn,TimestampTz now)3597 LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
3598 {
3599 	TimestampTz time = 0;
3600 
3601 	/* Read all unread samples up to this LSN or end of buffer. */
3602 	while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3603 		   lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3604 	{
3605 		time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3606 		lag_tracker->last_read[head] =
3607 			lag_tracker->buffer[lag_tracker->read_heads[head]];
3608 		lag_tracker->read_heads[head] =
3609 			(lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3610 	}
3611 
3612 	/*
3613 	 * If the lag tracker is empty, that means the standby has processed
3614 	 * everything we've ever sent so we should now clear 'last_read'.  If we
3615 	 * didn't do that, we'd risk using a stale and irrelevant sample for
3616 	 * interpolation at the beginning of the next burst of WAL after a period
3617 	 * of idleness.
3618 	 */
3619 	if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3620 		lag_tracker->last_read[head].time = 0;
3621 
3622 	if (time > now)
3623 	{
3624 		/* If the clock somehow went backwards, treat as not found. */
3625 		return -1;
3626 	}
3627 	else if (time == 0)
3628 	{
3629 		/*
3630 		 * We didn't cross a time.  If there is a future sample that we
3631 		 * haven't reached yet, and we've already reached at least one sample,
3632 		 * let's interpolate the local flushed time.  This is mainly useful
3633 		 * for reporting a completely stuck apply position as having
3634 		 * increasing lag, since otherwise we'd have to wait for it to
3635 		 * eventually start moving again and cross one of our samples before
3636 		 * we can show the lag increasing.
3637 		 */
3638 		if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3639 		{
3640 			/* There are no future samples, so we can't interpolate. */
3641 			return -1;
3642 		}
3643 		else if (lag_tracker->last_read[head].time != 0)
3644 		{
3645 			/* We can interpolate between last_read and the next sample. */
3646 			double		fraction;
3647 			WalTimeSample prev = lag_tracker->last_read[head];
3648 			WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]];
3649 
3650 			if (lsn < prev.lsn)
3651 			{
3652 				/*
3653 				 * Reported LSNs shouldn't normally go backwards, but it's
3654 				 * possible when there is a timeline change.  Treat as not
3655 				 * found.
3656 				 */
3657 				return -1;
3658 			}
3659 
3660 			Assert(prev.lsn < next.lsn);
3661 
3662 			if (prev.time > next.time)
3663 			{
3664 				/* If the clock somehow went backwards, treat as not found. */
3665 				return -1;
3666 			}
3667 
3668 			/* See how far we are between the previous and next samples. */
3669 			fraction =
3670 				(double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3671 
3672 			/* Scale the local flush time proportionally. */
3673 			time = (TimestampTz)
3674 				((double) prev.time + (next.time - prev.time) * fraction);
3675 		}
3676 		else
3677 		{
3678 			/*
3679 			 * We have only a future sample, implying that we were entirely
3680 			 * caught up but and now there is a new burst of WAL and the
3681 			 * standby hasn't processed the first sample yet.  Until the
3682 			 * standby reaches the future sample the best we can do is report
3683 			 * the hypothetical lag if that sample were to be replayed now.
3684 			 */
3685 			time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3686 		}
3687 	}
3688 
3689 	/* Return the elapsed time since local flush time in microseconds. */
3690 	Assert(time != 0);
3691 	return now - time;
3692 }
3693