1 /*-------------------------------------------------------------------------
2  *
3  * walreceiver.c
4  *
5  * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6  * is the process in the standby server that takes charge of receiving
7  * XLOG records from a primary server during streaming replication.
8  *
9  * When the startup process determines that it's time to start streaming,
10  * it instructs postmaster to start walreceiver. Walreceiver first connects
11  * to the primary server (it will be served by a walsender process
12  * in the primary server), and then keeps receiving XLOG records and
13  * writing them to the disk as long as the connection is alive. As XLOG
14  * records are received and flushed to disk, it updates the
15  * WalRcv->flushedUpto variable in shared memory, to inform the startup
16  * process of how far it can proceed with XLOG replay.
17  *
18  * A WAL receiver cannot directly load GUC parameters used when establishing
19  * its connection to the primary. Instead it relies on parameter values
20  * that are passed down by the startup process when streaming is requested.
21  * This applies, for example, to the replication slot and the connection
22  * string to be used for the connection with the primary.
23  *
24  * If the primary server ends streaming, but doesn't disconnect, walreceiver
25  * goes into "waiting" mode, and waits for the startup process to give new
26  * instructions. The startup process will treat that the same as
27  * disconnection, and will rescan the archive/pg_wal directory. But when the
28  * startup process wants to try streaming replication again, it will just
29  * nudge the existing walreceiver process that's waiting, instead of launching
30  * a new one.
31  *
32  * Normal termination is by SIGTERM, which instructs the walreceiver to
33  * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
34  * process, the walreceiver will simply abort and exit on SIGQUIT. A close
35  * of the connection and a FATAL error are treated not as a crash but as
36  * normal operation.
37  *
38  * This file contains the server-facing parts of walreceiver. The libpq-
39  * specific parts are in the libpqwalreceiver module. It's loaded
40  * dynamically to avoid linking the server with libpq.
41  *
42  * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
43  *
44  *
45  * IDENTIFICATION
46  *	  src/backend/replication/walreceiver.c
47  *
48  *-------------------------------------------------------------------------
49  */
50 #include "postgres.h"
51 
52 #include <unistd.h>
53 
54 #include "access/htup_details.h"
55 #include "access/timeline.h"
56 #include "access/transam.h"
57 #include "access/xlog_internal.h"
58 #include "access/xlogarchive.h"
59 #include "catalog/pg_authid.h"
60 #include "catalog/pg_type.h"
61 #include "common/ip.h"
62 #include "funcapi.h"
63 #include "libpq/pqformat.h"
64 #include "libpq/pqsignal.h"
65 #include "miscadmin.h"
66 #include "pgstat.h"
67 #include "postmaster/interrupt.h"
68 #include "replication/walreceiver.h"
69 #include "replication/walsender.h"
70 #include "storage/ipc.h"
71 #include "storage/pmsignal.h"
72 #include "storage/proc.h"
73 #include "storage/procarray.h"
74 #include "storage/procsignal.h"
75 #include "utils/acl.h"
76 #include "utils/builtins.h"
77 #include "utils/guc.h"
78 #include "utils/pg_lsn.h"
79 #include "utils/ps_status.h"
80 #include "utils/resowner.h"
81 #include "utils/timestamp.h"
82 
83 
84 /*
85  * GUC variables.  (Other variables that affect walreceiver are in xlog.c
86  * because they're passed down from the startup process, for better
87  * synchronization.)
88  */
89 int			wal_receiver_status_interval;
90 int			wal_receiver_timeout;
91 bool		hot_standby_feedback;
92 
93 /* libpqwalreceiver connection */
94 static WalReceiverConn *wrconn = NULL;
95 WalReceiverFunctionsType *WalReceiverFunctions = NULL;
96 
97 #define NAPTIME_PER_CYCLE 100	/* max sleep time between cycles (100ms) */
98 
99 /*
100  * These variables are used similarly to openLogFile/SegNo,
101  * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
102  * corresponding the filename of recvFile.
103  */
104 static int	recvFile = -1;
105 static TimeLineID recvFileTLI = 0;
106 static XLogSegNo recvSegNo = 0;
107 
108 /*
109  * LogstreamResult indicates the byte positions that we have already
110  * written/fsynced.
111  */
112 static struct
113 {
114 	XLogRecPtr	Write;			/* last byte + 1 written out in the standby */
115 	XLogRecPtr	Flush;			/* last byte + 1 flushed in the standby */
116 }			LogstreamResult;
117 
118 static StringInfoData reply_message;
119 static StringInfoData incoming_message;
120 
121 /* Prototypes for private functions */
122 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
123 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
124 static void WalRcvDie(int code, Datum arg);
125 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
126 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
127 static void XLogWalRcvFlush(bool dying);
128 static void XLogWalRcvClose(XLogRecPtr recptr);
129 static void XLogWalRcvSendReply(bool force, bool requestReply);
130 static void XLogWalRcvSendHSFeedback(bool immed);
131 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
132 
133 /*
134  * Process any interrupts the walreceiver process may have received.
135  * This should be called any time the process's latch has become set.
136  *
137  * Currently, only SIGTERM is of interest.  We can't just exit(1) within the
138  * SIGTERM signal handler, because the signal might arrive in the middle of
139  * some critical operation, like while we're holding a spinlock.  Instead, the
140  * signal handler sets a flag variable as well as setting the process's latch.
141  * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
142  * latch has become set.  Operations that could block for a long time, such as
143  * reading from a remote server, must pay attention to the latch too; see
144  * libpqrcv_PQgetResult for example.
145  */
146 void
ProcessWalRcvInterrupts(void)147 ProcessWalRcvInterrupts(void)
148 {
149 	/*
150 	 * Although walreceiver interrupt handling doesn't use the same scheme as
151 	 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
152 	 * any incoming signals on Win32, and also to make sure we process any
153 	 * barrier events.
154 	 */
155 	CHECK_FOR_INTERRUPTS();
156 
157 	if (ShutdownRequestPending)
158 	{
159 		ereport(FATAL,
160 				(errcode(ERRCODE_ADMIN_SHUTDOWN),
161 				 errmsg("terminating walreceiver process due to administrator command")));
162 	}
163 }
164 
165 
166 /* Main entry point for walreceiver process */
167 void
WalReceiverMain(void)168 WalReceiverMain(void)
169 {
170 	char		conninfo[MAXCONNINFO];
171 	char	   *tmp_conninfo;
172 	char		slotname[NAMEDATALEN];
173 	bool		is_temp_slot;
174 	XLogRecPtr	startpoint;
175 	TimeLineID	startpointTLI;
176 	TimeLineID	primaryTLI;
177 	bool		first_stream;
178 	WalRcvData *walrcv = WalRcv;
179 	TimestampTz last_recv_timestamp;
180 	TimestampTz now;
181 	bool		ping_sent;
182 	char	   *err;
183 	char	   *sender_host = NULL;
184 	int			sender_port = 0;
185 
186 	/*
187 	 * WalRcv should be set up already (if we are a backend, we inherit this
188 	 * by fork() or EXEC_BACKEND mechanism from the postmaster).
189 	 */
190 	Assert(walrcv != NULL);
191 
192 	now = GetCurrentTimestamp();
193 
194 	/*
195 	 * Mark walreceiver as running in shared memory.
196 	 *
197 	 * Do this as early as possible, so that if we fail later on, we'll set
198 	 * state to STOPPED. If we die before this, the startup process will keep
199 	 * waiting for us to start up, until it times out.
200 	 */
201 	SpinLockAcquire(&walrcv->mutex);
202 	Assert(walrcv->pid == 0);
203 	switch (walrcv->walRcvState)
204 	{
205 		case WALRCV_STOPPING:
206 			/* If we've already been requested to stop, don't start up. */
207 			walrcv->walRcvState = WALRCV_STOPPED;
208 			/* fall through */
209 
210 		case WALRCV_STOPPED:
211 			SpinLockRelease(&walrcv->mutex);
212 			ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
213 			proc_exit(1);
214 			break;
215 
216 		case WALRCV_STARTING:
217 			/* The usual case */
218 			break;
219 
220 		case WALRCV_WAITING:
221 		case WALRCV_STREAMING:
222 		case WALRCV_RESTARTING:
223 		default:
224 			/* Shouldn't happen */
225 			SpinLockRelease(&walrcv->mutex);
226 			elog(PANIC, "walreceiver still running according to shared memory state");
227 	}
228 	/* Advertise our PID so that the startup process can kill us */
229 	walrcv->pid = MyProcPid;
230 	walrcv->walRcvState = WALRCV_STREAMING;
231 
232 	/* Fetch information required to start streaming */
233 	walrcv->ready_to_display = false;
234 	strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
235 	strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
236 	is_temp_slot = walrcv->is_temp_slot;
237 	startpoint = walrcv->receiveStart;
238 	startpointTLI = walrcv->receiveStartTLI;
239 
240 	/*
241 	 * At most one of is_temp_slot and slotname can be set; otherwise,
242 	 * RequestXLogStreaming messed up.
243 	 */
244 	Assert(!is_temp_slot || (slotname[0] == '\0'));
245 
246 	/* Initialise to a sanish value */
247 	walrcv->lastMsgSendTime =
248 		walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
249 
250 	/* Report the latch to use to awaken this process */
251 	walrcv->latch = &MyProc->procLatch;
252 
253 	SpinLockRelease(&walrcv->mutex);
254 
255 	pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
256 
257 	/* Arrange to clean up at walreceiver exit */
258 	on_shmem_exit(WalRcvDie, 0);
259 
260 	/* Properly accept or ignore signals the postmaster might send us */
261 	pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
262 													 * file */
263 	pqsignal(SIGINT, SIG_IGN);
264 	pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
265 	/* SIGQUIT handler was already set up by InitPostmasterChild */
266 	pqsignal(SIGALRM, SIG_IGN);
267 	pqsignal(SIGPIPE, SIG_IGN);
268 	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
269 	pqsignal(SIGUSR2, SIG_IGN);
270 
271 	/* Reset some signals that are accepted by postmaster but not here */
272 	pqsignal(SIGCHLD, SIG_DFL);
273 
274 	/* Load the libpq-specific functions */
275 	load_file("libpqwalreceiver", false);
276 	if (WalReceiverFunctions == NULL)
277 		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
278 
279 	/* Unblock signals (they were blocked when the postmaster forked us) */
280 	PG_SETMASK(&UnBlockSig);
281 
282 	/* Establish the connection to the primary for XLOG streaming */
283 	wrconn = walrcv_connect(conninfo, false,
284 							cluster_name[0] ? cluster_name : "walreceiver",
285 							&err);
286 	if (!wrconn)
287 		ereport(ERROR,
288 				(errcode(ERRCODE_CONNECTION_FAILURE),
289 				 errmsg("could not connect to the primary server: %s", err)));
290 
291 	/*
292 	 * Save user-visible connection string.  This clobbers the original
293 	 * conninfo, for security. Also save host and port of the sender server
294 	 * this walreceiver is connected to.
295 	 */
296 	tmp_conninfo = walrcv_get_conninfo(wrconn);
297 	walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
298 	SpinLockAcquire(&walrcv->mutex);
299 	memset(walrcv->conninfo, 0, MAXCONNINFO);
300 	if (tmp_conninfo)
301 		strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
302 
303 	memset(walrcv->sender_host, 0, NI_MAXHOST);
304 	if (sender_host)
305 		strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
306 
307 	walrcv->sender_port = sender_port;
308 	walrcv->ready_to_display = true;
309 	SpinLockRelease(&walrcv->mutex);
310 
311 	if (tmp_conninfo)
312 		pfree(tmp_conninfo);
313 
314 	if (sender_host)
315 		pfree(sender_host);
316 
317 	first_stream = true;
318 	for (;;)
319 	{
320 		char	   *primary_sysid;
321 		char		standby_sysid[32];
322 		WalRcvStreamOptions options;
323 
324 		/*
325 		 * Check that we're connected to a valid server using the
326 		 * IDENTIFY_SYSTEM replication command.
327 		 */
328 		primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
329 
330 		snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
331 				 GetSystemIdentifier());
332 		if (strcmp(primary_sysid, standby_sysid) != 0)
333 		{
334 			ereport(ERROR,
335 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
336 					 errmsg("database system identifier differs between the primary and standby"),
337 					 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
338 							   primary_sysid, standby_sysid)));
339 		}
340 
341 		/*
342 		 * Confirm that the current timeline of the primary is the same or
343 		 * ahead of ours.
344 		 */
345 		if (primaryTLI < startpointTLI)
346 			ereport(ERROR,
347 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
348 					 errmsg("highest timeline %u of the primary is behind recovery timeline %u",
349 							primaryTLI, startpointTLI)));
350 
351 		/*
352 		 * Get any missing history files. We do this always, even when we're
353 		 * not interested in that timeline, so that if we're promoted to
354 		 * become the primary later on, we don't select the same timeline that
355 		 * was already used in the current primary. This isn't bullet-proof -
356 		 * you'll need some external software to manage your cluster if you
357 		 * need to ensure that a unique timeline id is chosen in every case,
358 		 * but let's avoid the confusion of timeline id collisions where we
359 		 * can.
360 		 */
361 		WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
362 
363 		/*
364 		 * Create temporary replication slot if requested, and update slot
365 		 * name in shared memory.  (Note the slot name cannot already be set
366 		 * in this case.)
367 		 */
368 		if (is_temp_slot)
369 		{
370 			snprintf(slotname, sizeof(slotname),
371 					 "pg_walreceiver_%lld",
372 					 (long long int) walrcv_get_backend_pid(wrconn));
373 
374 			walrcv_create_slot(wrconn, slotname, true, 0, NULL);
375 
376 			SpinLockAcquire(&walrcv->mutex);
377 			strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
378 			SpinLockRelease(&walrcv->mutex);
379 		}
380 
381 		/*
382 		 * Start streaming.
383 		 *
384 		 * We'll try to start at the requested starting point and timeline,
385 		 * even if it's different from the server's latest timeline. In case
386 		 * we've already reached the end of the old timeline, the server will
387 		 * finish the streaming immediately, and we will go back to await
388 		 * orders from the startup process. If recovery_target_timeline is
389 		 * 'latest', the startup process will scan pg_wal and find the new
390 		 * history file, bump recovery target timeline, and ask us to restart
391 		 * on the new timeline.
392 		 */
393 		options.logical = false;
394 		options.startpoint = startpoint;
395 		options.slotname = slotname[0] != '\0' ? slotname : NULL;
396 		options.proto.physical.startpointTLI = startpointTLI;
397 		ThisTimeLineID = startpointTLI;
398 		if (walrcv_startstreaming(wrconn, &options))
399 		{
400 			if (first_stream)
401 				ereport(LOG,
402 						(errmsg("started streaming WAL from primary at %X/%X on timeline %u",
403 								LSN_FORMAT_ARGS(startpoint), startpointTLI)));
404 			else
405 				ereport(LOG,
406 						(errmsg("restarted WAL streaming at %X/%X on timeline %u",
407 								LSN_FORMAT_ARGS(startpoint), startpointTLI)));
408 			first_stream = false;
409 
410 			/* Initialize LogstreamResult and buffers for processing messages */
411 			LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
412 			initStringInfo(&reply_message);
413 			initStringInfo(&incoming_message);
414 
415 			/* Initialize the last recv timestamp */
416 			last_recv_timestamp = GetCurrentTimestamp();
417 			ping_sent = false;
418 
419 			/* Loop until end-of-streaming or error */
420 			for (;;)
421 			{
422 				char	   *buf;
423 				int			len;
424 				bool		endofwal = false;
425 				pgsocket	wait_fd = PGINVALID_SOCKET;
426 				int			rc;
427 
428 				/*
429 				 * Exit walreceiver if we're not in recovery. This should not
430 				 * happen, but cross-check the status here.
431 				 */
432 				if (!RecoveryInProgress())
433 					ereport(FATAL,
434 							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
435 							 errmsg("cannot continue WAL streaming, recovery has already ended")));
436 
437 				/* Process any requests or signals received recently */
438 				ProcessWalRcvInterrupts();
439 
440 				if (ConfigReloadPending)
441 				{
442 					ConfigReloadPending = false;
443 					ProcessConfigFile(PGC_SIGHUP);
444 					XLogWalRcvSendHSFeedback(true);
445 				}
446 
447 				/* See if we can read data immediately */
448 				len = walrcv_receive(wrconn, &buf, &wait_fd);
449 				if (len != 0)
450 				{
451 					/*
452 					 * Process the received data, and any subsequent data we
453 					 * can read without blocking.
454 					 */
455 					for (;;)
456 					{
457 						if (len > 0)
458 						{
459 							/*
460 							 * Something was received from primary, so reset
461 							 * timeout
462 							 */
463 							last_recv_timestamp = GetCurrentTimestamp();
464 							ping_sent = false;
465 							XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
466 						}
467 						else if (len == 0)
468 							break;
469 						else if (len < 0)
470 						{
471 							ereport(LOG,
472 									(errmsg("replication terminated by primary server"),
473 									 errdetail("End of WAL reached on timeline %u at %X/%X.",
474 											   startpointTLI,
475 											   LSN_FORMAT_ARGS(LogstreamResult.Write))));
476 							endofwal = true;
477 							break;
478 						}
479 						len = walrcv_receive(wrconn, &buf, &wait_fd);
480 					}
481 
482 					/* Let the primary know that we received some data. */
483 					XLogWalRcvSendReply(false, false);
484 
485 					/*
486 					 * If we've written some records, flush them to disk and
487 					 * let the startup process and primary server know about
488 					 * them.
489 					 */
490 					XLogWalRcvFlush(false);
491 				}
492 
493 				/* Check if we need to exit the streaming loop. */
494 				if (endofwal)
495 					break;
496 
497 				/*
498 				 * Ideally we would reuse a WaitEventSet object repeatedly
499 				 * here to avoid the overheads of WaitLatchOrSocket on epoll
500 				 * systems, but we can't be sure that libpq (or any other
501 				 * walreceiver implementation) has the same socket (even if
502 				 * the fd is the same number, it may have been closed and
503 				 * reopened since the last time).  In future, if there is a
504 				 * function for removing sockets from WaitEventSet, then we
505 				 * could add and remove just the socket each time, potentially
506 				 * avoiding some system calls.
507 				 */
508 				Assert(wait_fd != PGINVALID_SOCKET);
509 				rc = WaitLatchOrSocket(MyLatch,
510 									   WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
511 									   WL_TIMEOUT | WL_LATCH_SET,
512 									   wait_fd,
513 									   NAPTIME_PER_CYCLE,
514 									   WAIT_EVENT_WAL_RECEIVER_MAIN);
515 				if (rc & WL_LATCH_SET)
516 				{
517 					ResetLatch(MyLatch);
518 					ProcessWalRcvInterrupts();
519 
520 					if (walrcv->force_reply)
521 					{
522 						/*
523 						 * The recovery process has asked us to send apply
524 						 * feedback now.  Make sure the flag is really set to
525 						 * false in shared memory before sending the reply, so
526 						 * we don't miss a new request for a reply.
527 						 */
528 						walrcv->force_reply = false;
529 						pg_memory_barrier();
530 						XLogWalRcvSendReply(true, false);
531 					}
532 				}
533 				if (rc & WL_TIMEOUT)
534 				{
535 					/*
536 					 * We didn't receive anything new. If we haven't heard
537 					 * anything from the server for more than
538 					 * wal_receiver_timeout / 2, ping the server. Also, if
539 					 * it's been longer than wal_receiver_status_interval
540 					 * since the last update we sent, send a status update to
541 					 * the primary anyway, to report any progress in applying
542 					 * WAL.
543 					 */
544 					bool		requestReply = false;
545 
546 					/*
547 					 * Check if time since last receive from primary has
548 					 * reached the configured limit.
549 					 */
550 					if (wal_receiver_timeout > 0)
551 					{
552 						TimestampTz now = GetCurrentTimestamp();
553 						TimestampTz timeout;
554 
555 						timeout =
556 							TimestampTzPlusMilliseconds(last_recv_timestamp,
557 														wal_receiver_timeout);
558 
559 						if (now >= timeout)
560 							ereport(ERROR,
561 									(errcode(ERRCODE_CONNECTION_FAILURE),
562 									 errmsg("terminating walreceiver due to timeout")));
563 
564 						/*
565 						 * We didn't receive anything new, for half of
566 						 * receiver replication timeout. Ping the server.
567 						 */
568 						if (!ping_sent)
569 						{
570 							timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
571 																  (wal_receiver_timeout / 2));
572 							if (now >= timeout)
573 							{
574 								requestReply = true;
575 								ping_sent = true;
576 							}
577 						}
578 					}
579 
580 					XLogWalRcvSendReply(requestReply, requestReply);
581 					XLogWalRcvSendHSFeedback(false);
582 				}
583 			}
584 
585 			/*
586 			 * The backend finished streaming. Exit streaming COPY-mode from
587 			 * our side, too.
588 			 */
589 			walrcv_endstreaming(wrconn, &primaryTLI);
590 
591 			/*
592 			 * If the server had switched to a new timeline that we didn't
593 			 * know about when we began streaming, fetch its timeline history
594 			 * file now.
595 			 */
596 			WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
597 		}
598 		else
599 			ereport(LOG,
600 					(errmsg("primary server contains no more WAL on requested timeline %u",
601 							startpointTLI)));
602 
603 		/*
604 		 * End of WAL reached on the requested timeline. Close the last
605 		 * segment, and await for new orders from the startup process.
606 		 */
607 		if (recvFile >= 0)
608 		{
609 			char		xlogfname[MAXFNAMELEN];
610 
611 			XLogWalRcvFlush(false);
612 			XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
613 			if (close(recvFile) != 0)
614 				ereport(PANIC,
615 						(errcode_for_file_access(),
616 						 errmsg("could not close log segment %s: %m",
617 								xlogfname)));
618 
619 			/*
620 			 * Create .done file forcibly to prevent the streamed segment from
621 			 * being archived later.
622 			 */
623 			if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
624 				XLogArchiveForceDone(xlogfname);
625 			else
626 				XLogArchiveNotify(xlogfname);
627 		}
628 		recvFile = -1;
629 
630 		elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
631 		WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
632 	}
633 	/* not reached */
634 }
635 
636 /*
637  * Wait for startup process to set receiveStart and receiveStartTLI.
638  */
639 static void
WalRcvWaitForStartPosition(XLogRecPtr * startpoint,TimeLineID * startpointTLI)640 WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
641 {
642 	WalRcvData *walrcv = WalRcv;
643 	int			state;
644 
645 	SpinLockAcquire(&walrcv->mutex);
646 	state = walrcv->walRcvState;
647 	if (state != WALRCV_STREAMING)
648 	{
649 		SpinLockRelease(&walrcv->mutex);
650 		if (state == WALRCV_STOPPING)
651 			proc_exit(0);
652 		else
653 			elog(FATAL, "unexpected walreceiver state");
654 	}
655 	walrcv->walRcvState = WALRCV_WAITING;
656 	walrcv->receiveStart = InvalidXLogRecPtr;
657 	walrcv->receiveStartTLI = 0;
658 	SpinLockRelease(&walrcv->mutex);
659 
660 	set_ps_display("idle");
661 
662 	/*
663 	 * nudge startup process to notice that we've stopped streaming and are
664 	 * now waiting for instructions.
665 	 */
666 	WakeupRecovery();
667 	for (;;)
668 	{
669 		ResetLatch(MyLatch);
670 
671 		ProcessWalRcvInterrupts();
672 
673 		SpinLockAcquire(&walrcv->mutex);
674 		Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
675 			   walrcv->walRcvState == WALRCV_WAITING ||
676 			   walrcv->walRcvState == WALRCV_STOPPING);
677 		if (walrcv->walRcvState == WALRCV_RESTARTING)
678 		{
679 			/*
680 			 * No need to handle changes in primary_conninfo or
681 			 * primary_slotname here. Startup process will signal us to
682 			 * terminate in case those change.
683 			 */
684 			*startpoint = walrcv->receiveStart;
685 			*startpointTLI = walrcv->receiveStartTLI;
686 			walrcv->walRcvState = WALRCV_STREAMING;
687 			SpinLockRelease(&walrcv->mutex);
688 			break;
689 		}
690 		if (walrcv->walRcvState == WALRCV_STOPPING)
691 		{
692 			/*
693 			 * We should've received SIGTERM if the startup process wants us
694 			 * to die, but might as well check it here too.
695 			 */
696 			SpinLockRelease(&walrcv->mutex);
697 			exit(1);
698 		}
699 		SpinLockRelease(&walrcv->mutex);
700 
701 		(void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
702 						 WAIT_EVENT_WAL_RECEIVER_WAIT_START);
703 	}
704 
705 	if (update_process_title)
706 	{
707 		char		activitymsg[50];
708 
709 		snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
710 				 LSN_FORMAT_ARGS(*startpoint));
711 		set_ps_display(activitymsg);
712 	}
713 }
714 
715 /*
716  * Fetch any missing timeline history files between 'first' and 'last'
717  * (inclusive) from the server.
718  */
719 static void
WalRcvFetchTimeLineHistoryFiles(TimeLineID first,TimeLineID last)720 WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
721 {
722 	TimeLineID	tli;
723 
724 	for (tli = first; tli <= last; tli++)
725 	{
726 		/* there's no history file for timeline 1 */
727 		if (tli != 1 && !existsTimeLineHistory(tli))
728 		{
729 			char	   *fname;
730 			char	   *content;
731 			int			len;
732 			char		expectedfname[MAXFNAMELEN];
733 
734 			ereport(LOG,
735 					(errmsg("fetching timeline history file for timeline %u from primary server",
736 							tli)));
737 
738 			walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
739 
740 			/*
741 			 * Check that the filename on the primary matches what we
742 			 * calculated ourselves. This is just a sanity check, it should
743 			 * always match.
744 			 */
745 			TLHistoryFileName(expectedfname, tli);
746 			if (strcmp(fname, expectedfname) != 0)
747 				ereport(ERROR,
748 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
749 						 errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
750 										 tli)));
751 
752 			/*
753 			 * Write the file to pg_wal.
754 			 */
755 			writeTimeLineHistoryFile(tli, content, len);
756 
757 			/*
758 			 * Mark the streamed history file as ready for archiving if
759 			 * archive_mode is always.
760 			 */
761 			if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
762 				XLogArchiveForceDone(fname);
763 			else
764 				XLogArchiveNotify(fname);
765 
766 			pfree(fname);
767 			pfree(content);
768 		}
769 	}
770 }
771 
772 /*
773  * Mark us as STOPPED in shared memory at exit.
774  */
775 static void
WalRcvDie(int code,Datum arg)776 WalRcvDie(int code, Datum arg)
777 {
778 	WalRcvData *walrcv = WalRcv;
779 
780 	/* Ensure that all WAL records received are flushed to disk */
781 	XLogWalRcvFlush(true);
782 
783 	/* Mark ourselves inactive in shared memory */
784 	SpinLockAcquire(&walrcv->mutex);
785 	Assert(walrcv->walRcvState == WALRCV_STREAMING ||
786 		   walrcv->walRcvState == WALRCV_RESTARTING ||
787 		   walrcv->walRcvState == WALRCV_STARTING ||
788 		   walrcv->walRcvState == WALRCV_WAITING ||
789 		   walrcv->walRcvState == WALRCV_STOPPING);
790 	Assert(walrcv->pid == MyProcPid);
791 	walrcv->walRcvState = WALRCV_STOPPED;
792 	walrcv->pid = 0;
793 	walrcv->ready_to_display = false;
794 	walrcv->latch = NULL;
795 	SpinLockRelease(&walrcv->mutex);
796 
797 	ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
798 
799 	/* Terminate the connection gracefully. */
800 	if (wrconn != NULL)
801 		walrcv_disconnect(wrconn);
802 
803 	/* Wake up the startup process to notice promptly that we're gone */
804 	WakeupRecovery();
805 }
806 
807 /*
808  * Accept the message from XLOG stream, and process it.
809  */
810 static void
XLogWalRcvProcessMsg(unsigned char type,char * buf,Size len)811 XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
812 {
813 	int			hdrlen;
814 	XLogRecPtr	dataStart;
815 	XLogRecPtr	walEnd;
816 	TimestampTz sendTime;
817 	bool		replyRequested;
818 
819 	resetStringInfo(&incoming_message);
820 
821 	switch (type)
822 	{
823 		case 'w':				/* WAL records */
824 			{
825 				/* copy message to StringInfo */
826 				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
827 				if (len < hdrlen)
828 					ereport(ERROR,
829 							(errcode(ERRCODE_PROTOCOL_VIOLATION),
830 							 errmsg_internal("invalid WAL message received from primary")));
831 				appendBinaryStringInfo(&incoming_message, buf, hdrlen);
832 
833 				/* read the fields */
834 				dataStart = pq_getmsgint64(&incoming_message);
835 				walEnd = pq_getmsgint64(&incoming_message);
836 				sendTime = pq_getmsgint64(&incoming_message);
837 				ProcessWalSndrMessage(walEnd, sendTime);
838 
839 				buf += hdrlen;
840 				len -= hdrlen;
841 				XLogWalRcvWrite(buf, len, dataStart);
842 				break;
843 			}
844 		case 'k':				/* Keepalive */
845 			{
846 				/* copy message to StringInfo */
847 				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
848 				if (len != hdrlen)
849 					ereport(ERROR,
850 							(errcode(ERRCODE_PROTOCOL_VIOLATION),
851 							 errmsg_internal("invalid keepalive message received from primary")));
852 				appendBinaryStringInfo(&incoming_message, buf, hdrlen);
853 
854 				/* read the fields */
855 				walEnd = pq_getmsgint64(&incoming_message);
856 				sendTime = pq_getmsgint64(&incoming_message);
857 				replyRequested = pq_getmsgbyte(&incoming_message);
858 
859 				ProcessWalSndrMessage(walEnd, sendTime);
860 
861 				/* If the primary requested a reply, send one immediately */
862 				if (replyRequested)
863 					XLogWalRcvSendReply(true, false);
864 				break;
865 			}
866 		default:
867 			ereport(ERROR,
868 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
869 					 errmsg_internal("invalid replication message type %d",
870 									 type)));
871 	}
872 }
873 
874 /*
875  * Write XLOG data to disk.
876  */
877 static void
XLogWalRcvWrite(char * buf,Size nbytes,XLogRecPtr recptr)878 XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
879 {
880 	int			startoff;
881 	int			byteswritten;
882 
883 	while (nbytes > 0)
884 	{
885 		int			segbytes;
886 
887 		/* Close the current segment if it's completed */
888 		if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
889 			XLogWalRcvClose(recptr);
890 
891 		if (recvFile < 0)
892 		{
893 			bool		use_existent = true;
894 
895 			/* Create/use new log file */
896 			XLByteToSeg(recptr, recvSegNo, wal_segment_size);
897 			recvFile = XLogFileInit(recvSegNo, &use_existent, true);
898 			recvFileTLI = ThisTimeLineID;
899 		}
900 
901 		/* Calculate the start offset of the received logs */
902 		startoff = XLogSegmentOffset(recptr, wal_segment_size);
903 
904 		if (startoff + nbytes > wal_segment_size)
905 			segbytes = wal_segment_size - startoff;
906 		else
907 			segbytes = nbytes;
908 
909 		/* OK to write the logs */
910 		errno = 0;
911 
912 		byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
913 		if (byteswritten <= 0)
914 		{
915 			char		xlogfname[MAXFNAMELEN];
916 			int			save_errno;
917 
918 			/* if write didn't set errno, assume no disk space */
919 			if (errno == 0)
920 				errno = ENOSPC;
921 
922 			save_errno = errno;
923 			XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
924 			errno = save_errno;
925 			ereport(PANIC,
926 					(errcode_for_file_access(),
927 					 errmsg("could not write to log segment %s "
928 							"at offset %u, length %lu: %m",
929 							xlogfname, startoff, (unsigned long) segbytes)));
930 		}
931 
932 		/* Update state for write */
933 		recptr += byteswritten;
934 
935 		nbytes -= byteswritten;
936 		buf += byteswritten;
937 
938 		LogstreamResult.Write = recptr;
939 	}
940 
941 	/* Update shared-memory status */
942 	pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
943 
944 	/*
945 	 * Close the current segment if it's fully written up in the last cycle of
946 	 * the loop, to create its archive notification file soon. Otherwise WAL
947 	 * archiving of the segment will be delayed until any data in the next
948 	 * segment is received and written.
949 	 */
950 	if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
951 		XLogWalRcvClose(recptr);
952 }
953 
954 /*
955  * Flush the log to disk.
956  *
957  * If we're in the midst of dying, it's unwise to do anything that might throw
958  * an error, so we skip sending a reply in that case.
959  */
960 static void
XLogWalRcvFlush(bool dying)961 XLogWalRcvFlush(bool dying)
962 {
963 	if (LogstreamResult.Flush < LogstreamResult.Write)
964 	{
965 		WalRcvData *walrcv = WalRcv;
966 
967 		issue_xlog_fsync(recvFile, recvSegNo);
968 
969 		LogstreamResult.Flush = LogstreamResult.Write;
970 
971 		/* Update shared-memory status */
972 		SpinLockAcquire(&walrcv->mutex);
973 		if (walrcv->flushedUpto < LogstreamResult.Flush)
974 		{
975 			walrcv->latestChunkStart = walrcv->flushedUpto;
976 			walrcv->flushedUpto = LogstreamResult.Flush;
977 			walrcv->receivedTLI = ThisTimeLineID;
978 		}
979 		SpinLockRelease(&walrcv->mutex);
980 
981 		/* Signal the startup process and walsender that new WAL has arrived */
982 		WakeupRecovery();
983 		if (AllowCascadeReplication())
984 			WalSndWakeup();
985 
986 		/* Report XLOG streaming progress in PS display */
987 		if (update_process_title)
988 		{
989 			char		activitymsg[50];
990 
991 			snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
992 					 LSN_FORMAT_ARGS(LogstreamResult.Write));
993 			set_ps_display(activitymsg);
994 		}
995 
996 		/* Also let the primary know that we made some progress */
997 		if (!dying)
998 		{
999 			XLogWalRcvSendReply(false, false);
1000 			XLogWalRcvSendHSFeedback(false);
1001 		}
1002 	}
1003 }
1004 
1005 /*
1006  * Close the current segment.
1007  *
1008  * Flush the segment to disk before closing it. Otherwise we have to
1009  * reopen and fsync it later.
1010  *
1011  * Create an archive notification file since the segment is known completed.
1012  */
1013 static void
XLogWalRcvClose(XLogRecPtr recptr)1014 XLogWalRcvClose(XLogRecPtr recptr)
1015 {
1016 	char		xlogfname[MAXFNAMELEN];
1017 
1018 	Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
1019 
1020 	/*
1021 	 * fsync() and close current file before we switch to next one. We would
1022 	 * otherwise have to reopen this file to fsync it later
1023 	 */
1024 	XLogWalRcvFlush(false);
1025 
1026 	XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
1027 
1028 	/*
1029 	 * XLOG segment files will be re-read by recovery in startup process soon,
1030 	 * so we don't advise the OS to release cache pages associated with the
1031 	 * file like XLogFileClose() does.
1032 	 */
1033 	if (close(recvFile) != 0)
1034 		ereport(PANIC,
1035 				(errcode_for_file_access(),
1036 				 errmsg("could not close log segment %s: %m",
1037 						xlogfname)));
1038 
1039 	/*
1040 	 * Create .done file forcibly to prevent the streamed segment from being
1041 	 * archived later.
1042 	 */
1043 	if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
1044 		XLogArchiveForceDone(xlogfname);
1045 	else
1046 		XLogArchiveNotify(xlogfname);
1047 
1048 	recvFile = -1;
1049 }
1050 
1051 /*
1052  * Send reply message to primary, indicating our current WAL locations, oldest
1053  * xmin and the current time.
1054  *
1055  * If 'force' is not set, the message is only sent if enough time has
1056  * passed since last status update to reach wal_receiver_status_interval.
1057  * If wal_receiver_status_interval is disabled altogether and 'force' is
1058  * false, this is a no-op.
1059  *
1060  * If 'requestReply' is true, requests the server to reply immediately upon
1061  * receiving this message. This is used for heartbeats, when approaching
1062  * wal_receiver_timeout.
1063  */
1064 static void
XLogWalRcvSendReply(bool force,bool requestReply)1065 XLogWalRcvSendReply(bool force, bool requestReply)
1066 {
1067 	static XLogRecPtr writePtr = 0;
1068 	static XLogRecPtr flushPtr = 0;
1069 	XLogRecPtr	applyPtr;
1070 	static TimestampTz sendTime = 0;
1071 	TimestampTz now;
1072 
1073 	/*
1074 	 * If the user doesn't want status to be reported to the primary, be sure
1075 	 * to exit before doing anything at all.
1076 	 */
1077 	if (!force && wal_receiver_status_interval <= 0)
1078 		return;
1079 
1080 	/* Get current timestamp. */
1081 	now = GetCurrentTimestamp();
1082 
1083 	/*
1084 	 * We can compare the write and flush positions to the last message we
1085 	 * sent without taking any lock, but the apply position requires a spin
1086 	 * lock, so we don't check that unless something else has changed or 10
1087 	 * seconds have passed.  This means that the apply WAL location will
1088 	 * appear, from the primary's point of view, to lag slightly, but since
1089 	 * this is only for reporting purposes and only on idle systems, that's
1090 	 * probably OK.
1091 	 */
1092 	if (!force
1093 		&& writePtr == LogstreamResult.Write
1094 		&& flushPtr == LogstreamResult.Flush
1095 		&& !TimestampDifferenceExceeds(sendTime, now,
1096 									   wal_receiver_status_interval * 1000))
1097 		return;
1098 	sendTime = now;
1099 
1100 	/* Construct a new message */
1101 	writePtr = LogstreamResult.Write;
1102 	flushPtr = LogstreamResult.Flush;
1103 	applyPtr = GetXLogReplayRecPtr(NULL);
1104 
1105 	resetStringInfo(&reply_message);
1106 	pq_sendbyte(&reply_message, 'r');
1107 	pq_sendint64(&reply_message, writePtr);
1108 	pq_sendint64(&reply_message, flushPtr);
1109 	pq_sendint64(&reply_message, applyPtr);
1110 	pq_sendint64(&reply_message, GetCurrentTimestamp());
1111 	pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1112 
1113 	/* Send it */
1114 	elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1115 		 LSN_FORMAT_ARGS(writePtr),
1116 		 LSN_FORMAT_ARGS(flushPtr),
1117 		 LSN_FORMAT_ARGS(applyPtr),
1118 		 requestReply ? " (reply requested)" : "");
1119 
1120 	walrcv_send(wrconn, reply_message.data, reply_message.len);
1121 }
1122 
1123 /*
1124  * Send hot standby feedback message to primary, plus the current time,
1125  * in case they don't have a watch.
1126  *
1127  * If the user disables feedback, send one final message to tell sender
1128  * to forget about the xmin on this standby. We also send this message
1129  * on first connect because a previous connection might have set xmin
1130  * on a replication slot. (If we're not using a slot it's harmless to
1131  * send a feedback message explicitly setting InvalidTransactionId).
1132  */
1133 static void
XLogWalRcvSendHSFeedback(bool immed)1134 XLogWalRcvSendHSFeedback(bool immed)
1135 {
1136 	TimestampTz now;
1137 	FullTransactionId nextFullXid;
1138 	TransactionId nextXid;
1139 	uint32		xmin_epoch,
1140 				catalog_xmin_epoch;
1141 	TransactionId xmin,
1142 				catalog_xmin;
1143 	static TimestampTz sendTime = 0;
1144 
1145 	/* initially true so we always send at least one feedback message */
1146 	static bool primary_has_standby_xmin = true;
1147 
1148 	/*
1149 	 * If the user doesn't want status to be reported to the primary, be sure
1150 	 * to exit before doing anything at all.
1151 	 */
1152 	if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
1153 		!primary_has_standby_xmin)
1154 		return;
1155 
1156 	/* Get current timestamp. */
1157 	now = GetCurrentTimestamp();
1158 
1159 	if (!immed)
1160 	{
1161 		/*
1162 		 * Send feedback at most once per wal_receiver_status_interval.
1163 		 */
1164 		if (!TimestampDifferenceExceeds(sendTime, now,
1165 										wal_receiver_status_interval * 1000))
1166 			return;
1167 		sendTime = now;
1168 	}
1169 
1170 	/*
1171 	 * If Hot Standby is not yet accepting connections there is nothing to
1172 	 * send. Check this after the interval has expired to reduce number of
1173 	 * calls.
1174 	 *
1175 	 * Bailing out here also ensures that we don't send feedback until we've
1176 	 * read our own replication slot state, so we don't tell the primary to
1177 	 * discard needed xmin or catalog_xmin from any slots that may exist on
1178 	 * this replica.
1179 	 */
1180 	if (!HotStandbyActive())
1181 		return;
1182 
1183 	/*
1184 	 * Make the expensive call to get the oldest xmin once we are certain
1185 	 * everything else has been checked.
1186 	 */
1187 	if (hot_standby_feedback)
1188 	{
1189 		GetReplicationHorizons(&xmin, &catalog_xmin);
1190 	}
1191 	else
1192 	{
1193 		xmin = InvalidTransactionId;
1194 		catalog_xmin = InvalidTransactionId;
1195 	}
1196 
1197 	/*
1198 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
1199 	 * the epoch boundary.
1200 	 */
1201 	nextFullXid = ReadNextFullTransactionId();
1202 	nextXid = XidFromFullTransactionId(nextFullXid);
1203 	xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1204 	catalog_xmin_epoch = xmin_epoch;
1205 	if (nextXid < xmin)
1206 		xmin_epoch--;
1207 	if (nextXid < catalog_xmin)
1208 		catalog_xmin_epoch--;
1209 
1210 	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1211 		 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1212 
1213 	/* Construct the message and send it. */
1214 	resetStringInfo(&reply_message);
1215 	pq_sendbyte(&reply_message, 'h');
1216 	pq_sendint64(&reply_message, GetCurrentTimestamp());
1217 	pq_sendint32(&reply_message, xmin);
1218 	pq_sendint32(&reply_message, xmin_epoch);
1219 	pq_sendint32(&reply_message, catalog_xmin);
1220 	pq_sendint32(&reply_message, catalog_xmin_epoch);
1221 	walrcv_send(wrconn, reply_message.data, reply_message.len);
1222 	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1223 		primary_has_standby_xmin = true;
1224 	else
1225 		primary_has_standby_xmin = false;
1226 }
1227 
1228 /*
1229  * Update shared memory status upon receiving a message from primary.
1230  *
1231  * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
1232  * message, reported by primary.
1233  */
1234 static void
ProcessWalSndrMessage(XLogRecPtr walEnd,TimestampTz sendTime)1235 ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
1236 {
1237 	WalRcvData *walrcv = WalRcv;
1238 
1239 	TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1240 
1241 	/* Update shared-memory status */
1242 	SpinLockAcquire(&walrcv->mutex);
1243 	if (walrcv->latestWalEnd < walEnd)
1244 		walrcv->latestWalEndTime = sendTime;
1245 	walrcv->latestWalEnd = walEnd;
1246 	walrcv->lastMsgSendTime = sendTime;
1247 	walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1248 	SpinLockRelease(&walrcv->mutex);
1249 
1250 	if (message_level_is_interesting(DEBUG2))
1251 	{
1252 		char	   *sendtime;
1253 		char	   *receipttime;
1254 		int			applyDelay;
1255 
1256 		/* Copy because timestamptz_to_str returns a static buffer */
1257 		sendtime = pstrdup(timestamptz_to_str(sendTime));
1258 		receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1259 		applyDelay = GetReplicationApplyDelay();
1260 
1261 		/* apply delay is not available */
1262 		if (applyDelay == -1)
1263 			elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1264 				 sendtime,
1265 				 receipttime,
1266 				 GetReplicationTransferLatency());
1267 		else
1268 			elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1269 				 sendtime,
1270 				 receipttime,
1271 				 applyDelay,
1272 				 GetReplicationTransferLatency());
1273 
1274 		pfree(sendtime);
1275 		pfree(receipttime);
1276 	}
1277 }
1278 
1279 /*
1280  * Wake up the walreceiver main loop.
1281  *
1282  * This is called by the startup process whenever interesting xlog records
1283  * are applied, so that walreceiver can check if it needs to send an apply
1284  * notification back to the primary which may be waiting in a COMMIT with
1285  * synchronous_commit = remote_apply.
1286  */
1287 void
WalRcvForceReply(void)1288 WalRcvForceReply(void)
1289 {
1290 	Latch	   *latch;
1291 
1292 	WalRcv->force_reply = true;
1293 	/* fetching the latch pointer might not be atomic, so use spinlock */
1294 	SpinLockAcquire(&WalRcv->mutex);
1295 	latch = WalRcv->latch;
1296 	SpinLockRelease(&WalRcv->mutex);
1297 	if (latch)
1298 		SetLatch(latch);
1299 }
1300 
1301 /*
1302  * Return a string constant representing the state. This is used
1303  * in system functions and views, and should *not* be translated.
1304  */
1305 static const char *
WalRcvGetStateString(WalRcvState state)1306 WalRcvGetStateString(WalRcvState state)
1307 {
1308 	switch (state)
1309 	{
1310 		case WALRCV_STOPPED:
1311 			return "stopped";
1312 		case WALRCV_STARTING:
1313 			return "starting";
1314 		case WALRCV_STREAMING:
1315 			return "streaming";
1316 		case WALRCV_WAITING:
1317 			return "waiting";
1318 		case WALRCV_RESTARTING:
1319 			return "restarting";
1320 		case WALRCV_STOPPING:
1321 			return "stopping";
1322 	}
1323 	return "UNKNOWN";
1324 }
1325 
1326 /*
1327  * Returns activity of WAL receiver, including pid, state and xlog locations
1328  * received from the WAL sender of another server.
1329  */
1330 Datum
pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)1331 pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
1332 {
1333 	TupleDesc	tupdesc;
1334 	Datum	   *values;
1335 	bool	   *nulls;
1336 	int			pid;
1337 	bool		ready_to_display;
1338 	WalRcvState state;
1339 	XLogRecPtr	receive_start_lsn;
1340 	TimeLineID	receive_start_tli;
1341 	XLogRecPtr	written_lsn;
1342 	XLogRecPtr	flushed_lsn;
1343 	TimeLineID	received_tli;
1344 	TimestampTz last_send_time;
1345 	TimestampTz last_receipt_time;
1346 	XLogRecPtr	latest_end_lsn;
1347 	TimestampTz latest_end_time;
1348 	char		sender_host[NI_MAXHOST];
1349 	int			sender_port = 0;
1350 	char		slotname[NAMEDATALEN];
1351 	char		conninfo[MAXCONNINFO];
1352 
1353 	/* Take a lock to ensure value consistency */
1354 	SpinLockAcquire(&WalRcv->mutex);
1355 	pid = (int) WalRcv->pid;
1356 	ready_to_display = WalRcv->ready_to_display;
1357 	state = WalRcv->walRcvState;
1358 	receive_start_lsn = WalRcv->receiveStart;
1359 	receive_start_tli = WalRcv->receiveStartTLI;
1360 	flushed_lsn = WalRcv->flushedUpto;
1361 	received_tli = WalRcv->receivedTLI;
1362 	last_send_time = WalRcv->lastMsgSendTime;
1363 	last_receipt_time = WalRcv->lastMsgReceiptTime;
1364 	latest_end_lsn = WalRcv->latestWalEnd;
1365 	latest_end_time = WalRcv->latestWalEndTime;
1366 	strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1367 	strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1368 	sender_port = WalRcv->sender_port;
1369 	strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1370 	SpinLockRelease(&WalRcv->mutex);
1371 
1372 	/*
1373 	 * No WAL receiver (or not ready yet), just return a tuple with NULL
1374 	 * values
1375 	 */
1376 	if (pid == 0 || !ready_to_display)
1377 		PG_RETURN_NULL();
1378 
1379 	/*
1380 	 * Read "writtenUpto" without holding a spinlock.  Note that it may not be
1381 	 * consistent with the other shared variables of the WAL receiver
1382 	 * protected by a spinlock, but this should not be used for data integrity
1383 	 * checks.
1384 	 */
1385 	written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1386 
1387 	/* determine result type */
1388 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1389 		elog(ERROR, "return type must be a row type");
1390 
1391 	values = palloc0(sizeof(Datum) * tupdesc->natts);
1392 	nulls = palloc0(sizeof(bool) * tupdesc->natts);
1393 
1394 	/* Fetch values */
1395 	values[0] = Int32GetDatum(pid);
1396 
1397 	if (!is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1398 	{
1399 		/*
1400 		 * Only superusers and members of pg_read_all_stats can see details.
1401 		 * Other users only get the pid value to know whether it is a WAL
1402 		 * receiver, but no details.
1403 		 */
1404 		MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1405 	}
1406 	else
1407 	{
1408 		values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1409 
1410 		if (XLogRecPtrIsInvalid(receive_start_lsn))
1411 			nulls[2] = true;
1412 		else
1413 			values[2] = LSNGetDatum(receive_start_lsn);
1414 		values[3] = Int32GetDatum(receive_start_tli);
1415 		if (XLogRecPtrIsInvalid(written_lsn))
1416 			nulls[4] = true;
1417 		else
1418 			values[4] = LSNGetDatum(written_lsn);
1419 		if (XLogRecPtrIsInvalid(flushed_lsn))
1420 			nulls[5] = true;
1421 		else
1422 			values[5] = LSNGetDatum(flushed_lsn);
1423 		values[6] = Int32GetDatum(received_tli);
1424 		if (last_send_time == 0)
1425 			nulls[7] = true;
1426 		else
1427 			values[7] = TimestampTzGetDatum(last_send_time);
1428 		if (last_receipt_time == 0)
1429 			nulls[8] = true;
1430 		else
1431 			values[8] = TimestampTzGetDatum(last_receipt_time);
1432 		if (XLogRecPtrIsInvalid(latest_end_lsn))
1433 			nulls[9] = true;
1434 		else
1435 			values[9] = LSNGetDatum(latest_end_lsn);
1436 		if (latest_end_time == 0)
1437 			nulls[10] = true;
1438 		else
1439 			values[10] = TimestampTzGetDatum(latest_end_time);
1440 		if (*slotname == '\0')
1441 			nulls[11] = true;
1442 		else
1443 			values[11] = CStringGetTextDatum(slotname);
1444 		if (*sender_host == '\0')
1445 			nulls[12] = true;
1446 		else
1447 			values[12] = CStringGetTextDatum(sender_host);
1448 		if (sender_port == 0)
1449 			nulls[13] = true;
1450 		else
1451 			values[13] = Int32GetDatum(sender_port);
1452 		if (*conninfo == '\0')
1453 			nulls[14] = true;
1454 		else
1455 			values[14] = CStringGetTextDatum(conninfo);
1456 	}
1457 
1458 	/* Returns the record as Datum */
1459 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1460 }
1461