1 /*-------------------------------------------------------------------------
2  *
3  * walreceiver.c
4  *
5  * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6  * is the process in the standby server that takes charge of receiving
7  * XLOG records from a primary server during streaming replication.
8  *
9  * When the startup process determines that it's time to start streaming,
10  * it instructs postmaster to start walreceiver. Walreceiver first connects
11  * to the primary server (it will be served by a walsender process
12  * in the primary server), and then keeps receiving XLOG records and
13  * writing them to the disk as long as the connection is alive. As XLOG
14  * records are received and flushed to disk, it updates the
15  * WalRcv->receivedUpto variable in shared memory, to inform the startup
16  * process of how far it can proceed with XLOG replay.
17  *
18  * If the primary server ends streaming, but doesn't disconnect, walreceiver
19  * goes into "waiting" mode, and waits for the startup process to give new
20  * instructions. The startup process will treat that the same as
21  * disconnection, and will rescan the archive/pg_wal directory. But when the
22  * startup process wants to try streaming replication again, it will just
23  * nudge the existing walreceiver process that's waiting, instead of launching
24  * a new one.
25  *
26  * Normal termination is by SIGTERM, which instructs the walreceiver to
27  * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
28  * process, the walreceiver will simply abort and exit on SIGQUIT. A close
29  * of the connection and a FATAL error are treated not as a crash but as
30  * normal operation.
31  *
32  * This file contains the server-facing parts of walreceiver. The libpq-
33  * specific parts are in the libpqwalreceiver module. It's loaded
34  * dynamically to avoid linking the server with libpq.
35  *
36  * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
37  *
38  *
39  * IDENTIFICATION
40  *	  src/backend/replication/walreceiver.c
41  *
42  *-------------------------------------------------------------------------
43  */
44 #include "postgres.h"
45 
46 #include <signal.h>
47 #include <unistd.h>
48 
49 #include "access/htup_details.h"
50 #include "access/timeline.h"
51 #include "access/transam.h"
52 #include "access/xlog_internal.h"
53 #include "catalog/pg_authid.h"
54 #include "catalog/pg_type.h"
55 #include "funcapi.h"
56 #include "libpq/pqformat.h"
57 #include "libpq/pqsignal.h"
58 #include "miscadmin.h"
59 #include "pgstat.h"
60 #include "replication/walreceiver.h"
61 #include "replication/walsender.h"
62 #include "storage/ipc.h"
63 #include "storage/pmsignal.h"
64 #include "storage/procarray.h"
65 #include "utils/builtins.h"
66 #include "utils/guc.h"
67 #include "utils/pg_lsn.h"
68 #include "utils/ps_status.h"
69 #include "utils/resowner.h"
70 #include "utils/timestamp.h"
71 
72 
73 /* GUC variables */
74 int			wal_receiver_status_interval;
75 int			wal_receiver_timeout;
76 bool		hot_standby_feedback;
77 
78 /* libpqwalreceiver connection */
79 static WalReceiverConn *wrconn = NULL;
80 WalReceiverFunctionsType *WalReceiverFunctions = NULL;
81 
82 #define NAPTIME_PER_CYCLE 100	/* max sleep time between cycles (100ms) */
83 
84 /*
85  * These variables are used similarly to openLogFile/SegNo/Off,
86  * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
87  * corresponding the filename of recvFile.
88  */
89 static int	recvFile = -1;
90 static TimeLineID recvFileTLI = 0;
91 static XLogSegNo recvSegNo = 0;
92 static uint32 recvOff = 0;
93 
94 /*
95  * Flags set by interrupt handlers of walreceiver for later service in the
96  * main loop.
97  */
98 static volatile sig_atomic_t got_SIGHUP = false;
99 static volatile sig_atomic_t got_SIGTERM = false;
100 
101 /*
102  * LogstreamResult indicates the byte positions that we have already
103  * written/fsynced.
104  */
105 static struct
106 {
107 	XLogRecPtr	Write;			/* last byte + 1 written out in the standby */
108 	XLogRecPtr	Flush;			/* last byte + 1 flushed in the standby */
109 }			LogstreamResult;
110 
111 static StringInfoData reply_message;
112 static StringInfoData incoming_message;
113 
114 /* Prototypes for private functions */
115 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
116 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
117 static void WalRcvDie(int code, Datum arg);
118 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
119 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
120 static void XLogWalRcvFlush(bool dying);
121 static void XLogWalRcvClose(XLogRecPtr recptr);
122 static void XLogWalRcvSendReply(bool force, bool requestReply);
123 static void XLogWalRcvSendHSFeedback(bool immed);
124 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
125 
126 /* Signal handlers */
127 static void WalRcvSigHupHandler(SIGNAL_ARGS);
128 static void WalRcvSigUsr1Handler(SIGNAL_ARGS);
129 static void WalRcvShutdownHandler(SIGNAL_ARGS);
130 static void WalRcvQuickDieHandler(SIGNAL_ARGS);
131 
132 
133 /*
134  * Process any interrupts the walreceiver process may have received.
135  * This should be called any time the process's latch has become set.
136  *
137  * Currently, only SIGTERM is of interest.  We can't just exit(1) within the
138  * SIGTERM signal handler, because the signal might arrive in the middle of
139  * some critical operation, like while we're holding a spinlock.  Instead, the
140  * signal handler sets a flag variable as well as setting the process's latch.
141  * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
142  * latch has become set.  Operations that could block for a long time, such as
143  * reading from a remote server, must pay attention to the latch too; see
144  * libpqrcv_PQgetResult for example.
145  */
146 void
ProcessWalRcvInterrupts(void)147 ProcessWalRcvInterrupts(void)
148 {
149 	/*
150 	 * Although walreceiver interrupt handling doesn't use the same scheme as
151 	 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
152 	 * any incoming signals on Win32.
153 	 */
154 	CHECK_FOR_INTERRUPTS();
155 
156 	if (got_SIGTERM)
157 	{
158 		ereport(FATAL,
159 				(errcode(ERRCODE_ADMIN_SHUTDOWN),
160 				 errmsg("terminating walreceiver process due to administrator command")));
161 	}
162 }
163 
164 
165 /* Main entry point for walreceiver process */
166 void
WalReceiverMain(void)167 WalReceiverMain(void)
168 {
169 	char		conninfo[MAXCONNINFO];
170 	char	   *tmp_conninfo;
171 	char		slotname[NAMEDATALEN];
172 	XLogRecPtr	startpoint;
173 	TimeLineID	startpointTLI;
174 	TimeLineID	primaryTLI;
175 	bool		first_stream;
176 	WalRcvData *walrcv = WalRcv;
177 	TimestampTz last_recv_timestamp;
178 	TimestampTz now;
179 	bool		ping_sent;
180 	char	   *err;
181 
182 	/*
183 	 * WalRcv should be set up already (if we are a backend, we inherit this
184 	 * by fork() or EXEC_BACKEND mechanism from the postmaster).
185 	 */
186 	Assert(walrcv != NULL);
187 
188 	now = GetCurrentTimestamp();
189 
190 	/*
191 	 * Mark walreceiver as running in shared memory.
192 	 *
193 	 * Do this as early as possible, so that if we fail later on, we'll set
194 	 * state to STOPPED. If we die before this, the startup process will keep
195 	 * waiting for us to start up, until it times out.
196 	 */
197 	SpinLockAcquire(&walrcv->mutex);
198 	Assert(walrcv->pid == 0);
199 	switch (walrcv->walRcvState)
200 	{
201 		case WALRCV_STOPPING:
202 			/* If we've already been requested to stop, don't start up. */
203 			walrcv->walRcvState = WALRCV_STOPPED;
204 			/* fall through */
205 
206 		case WALRCV_STOPPED:
207 			SpinLockRelease(&walrcv->mutex);
208 			proc_exit(1);
209 			break;
210 
211 		case WALRCV_STARTING:
212 			/* The usual case */
213 			break;
214 
215 		case WALRCV_WAITING:
216 		case WALRCV_STREAMING:
217 		case WALRCV_RESTARTING:
218 		default:
219 			/* Shouldn't happen */
220 			SpinLockRelease(&walrcv->mutex);
221 			elog(PANIC, "walreceiver still running according to shared memory state");
222 	}
223 	/* Advertise our PID so that the startup process can kill us */
224 	walrcv->pid = MyProcPid;
225 	walrcv->walRcvState = WALRCV_STREAMING;
226 
227 	/* Fetch information required to start streaming */
228 	walrcv->ready_to_display = false;
229 	strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
230 	strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
231 	startpoint = walrcv->receiveStart;
232 	startpointTLI = walrcv->receiveStartTLI;
233 
234 	/* Initialise to a sanish value */
235 	walrcv->lastMsgSendTime =
236 		walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
237 
238 	/* Report the latch to use to awaken this process */
239 	walrcv->latch = &MyProc->procLatch;
240 
241 	SpinLockRelease(&walrcv->mutex);
242 
243 	/* Arrange to clean up at walreceiver exit */
244 	on_shmem_exit(WalRcvDie, 0);
245 
246 	/* Properly accept or ignore signals the postmaster might send us */
247 	pqsignal(SIGHUP, WalRcvSigHupHandler);	/* set flag to read config file */
248 	pqsignal(SIGINT, SIG_IGN);
249 	pqsignal(SIGTERM, WalRcvShutdownHandler);	/* request shutdown */
250 	pqsignal(SIGQUIT, WalRcvQuickDieHandler);	/* hard crash time */
251 	pqsignal(SIGALRM, SIG_IGN);
252 	pqsignal(SIGPIPE, SIG_IGN);
253 	pqsignal(SIGUSR1, WalRcvSigUsr1Handler);
254 	pqsignal(SIGUSR2, SIG_IGN);
255 
256 	/* Reset some signals that are accepted by postmaster but not here */
257 	pqsignal(SIGCHLD, SIG_DFL);
258 	pqsignal(SIGTTIN, SIG_DFL);
259 	pqsignal(SIGTTOU, SIG_DFL);
260 	pqsignal(SIGCONT, SIG_DFL);
261 	pqsignal(SIGWINCH, SIG_DFL);
262 
263 	/* We allow SIGQUIT (quickdie) at all times */
264 	sigdelset(&BlockSig, SIGQUIT);
265 
266 	/* Load the libpq-specific functions */
267 	load_file("libpqwalreceiver", false);
268 	if (WalReceiverFunctions == NULL)
269 		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
270 
271 	/*
272 	 * Create a resource owner to keep track of our resources (not clear that
273 	 * we need this, but may as well have one).
274 	 */
275 	CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
276 
277 	/* Unblock signals (they were blocked when the postmaster forked us) */
278 	PG_SETMASK(&UnBlockSig);
279 
280 	/* Establish the connection to the primary for XLOG streaming */
281 	wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
282 	if (!wrconn)
283 		ereport(ERROR,
284 				(errmsg("could not connect to the primary server: %s", err)));
285 
286 	/*
287 	 * Save user-visible connection string.  This clobbers the original
288 	 * conninfo, for security.
289 	 */
290 	tmp_conninfo = walrcv_get_conninfo(wrconn);
291 	SpinLockAcquire(&walrcv->mutex);
292 	memset(walrcv->conninfo, 0, MAXCONNINFO);
293 	if (tmp_conninfo)
294 		strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
295 	walrcv->ready_to_display = true;
296 	SpinLockRelease(&walrcv->mutex);
297 
298 	if (tmp_conninfo)
299 		pfree(tmp_conninfo);
300 
301 	first_stream = true;
302 	for (;;)
303 	{
304 		char	   *primary_sysid;
305 		char		standby_sysid[32];
306 		int			server_version;
307 		WalRcvStreamOptions options;
308 
309 		/*
310 		 * Check that we're connected to a valid server using the
311 		 * IDENTIFY_SYSTEM replication command.
312 		 */
313 		primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
314 											   &server_version);
315 
316 		snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
317 				 GetSystemIdentifier());
318 		if (strcmp(primary_sysid, standby_sysid) != 0)
319 		{
320 			ereport(ERROR,
321 					(errmsg("database system identifier differs between the primary and standby"),
322 					 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
323 							   primary_sysid, standby_sysid)));
324 		}
325 
326 		/*
327 		 * Confirm that the current timeline of the primary is the same or
328 		 * ahead of ours.
329 		 */
330 		if (primaryTLI < startpointTLI)
331 			ereport(ERROR,
332 					(errmsg("highest timeline %u of the primary is behind recovery timeline %u",
333 							primaryTLI, startpointTLI)));
334 
335 		/*
336 		 * Get any missing history files. We do this always, even when we're
337 		 * not interested in that timeline, so that if we're promoted to
338 		 * become the master later on, we don't select the same timeline that
339 		 * was already used in the current master. This isn't bullet-proof -
340 		 * you'll need some external software to manage your cluster if you
341 		 * need to ensure that a unique timeline id is chosen in every case,
342 		 * but let's avoid the confusion of timeline id collisions where we
343 		 * can.
344 		 */
345 		WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
346 
347 		/*
348 		 * Start streaming.
349 		 *
350 		 * We'll try to start at the requested starting point and timeline,
351 		 * even if it's different from the server's latest timeline. In case
352 		 * we've already reached the end of the old timeline, the server will
353 		 * finish the streaming immediately, and we will go back to await
354 		 * orders from the startup process. If recovery_target_timeline is
355 		 * 'latest', the startup process will scan pg_wal and find the new
356 		 * history file, bump recovery target timeline, and ask us to restart
357 		 * on the new timeline.
358 		 */
359 		options.logical = false;
360 		options.startpoint = startpoint;
361 		options.slotname = slotname[0] != '\0' ? slotname : NULL;
362 		options.proto.physical.startpointTLI = startpointTLI;
363 		ThisTimeLineID = startpointTLI;
364 		if (walrcv_startstreaming(wrconn, &options))
365 		{
366 			if (first_stream)
367 				ereport(LOG,
368 						(errmsg("started streaming WAL from primary at %X/%X on timeline %u",
369 								(uint32) (startpoint >> 32), (uint32) startpoint,
370 								startpointTLI)));
371 			else
372 				ereport(LOG,
373 						(errmsg("restarted WAL streaming at %X/%X on timeline %u",
374 								(uint32) (startpoint >> 32), (uint32) startpoint,
375 								startpointTLI)));
376 			first_stream = false;
377 
378 			/* Initialize LogstreamResult and buffers for processing messages */
379 			LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
380 			initStringInfo(&reply_message);
381 			initStringInfo(&incoming_message);
382 
383 			/* Initialize the last recv timestamp */
384 			last_recv_timestamp = GetCurrentTimestamp();
385 			ping_sent = false;
386 
387 			/* Loop until end-of-streaming or error */
388 			for (;;)
389 			{
390 				char	   *buf;
391 				int			len;
392 				bool		endofwal = false;
393 				pgsocket	wait_fd = PGINVALID_SOCKET;
394 				int			rc;
395 
396 				/*
397 				 * Exit walreceiver if we're not in recovery. This should not
398 				 * happen, but cross-check the status here.
399 				 */
400 				if (!RecoveryInProgress())
401 					ereport(FATAL,
402 							(errmsg("cannot continue WAL streaming, recovery has already ended")));
403 
404 				/* Process any requests or signals received recently */
405 				ProcessWalRcvInterrupts();
406 
407 				if (got_SIGHUP)
408 				{
409 					got_SIGHUP = false;
410 					ProcessConfigFile(PGC_SIGHUP);
411 					XLogWalRcvSendHSFeedback(true);
412 				}
413 
414 				/* See if we can read data immediately */
415 				len = walrcv_receive(wrconn, &buf, &wait_fd);
416 				if (len != 0)
417 				{
418 					/*
419 					 * Process the received data, and any subsequent data we
420 					 * can read without blocking.
421 					 */
422 					for (;;)
423 					{
424 						if (len > 0)
425 						{
426 							/*
427 							 * Something was received from master, so reset
428 							 * timeout
429 							 */
430 							last_recv_timestamp = GetCurrentTimestamp();
431 							ping_sent = false;
432 							XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
433 						}
434 						else if (len == 0)
435 							break;
436 						else if (len < 0)
437 						{
438 							ereport(LOG,
439 									(errmsg("replication terminated by primary server"),
440 									 errdetail("End of WAL reached on timeline %u at %X/%X.",
441 											   startpointTLI,
442 											   (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
443 							endofwal = true;
444 							break;
445 						}
446 						len = walrcv_receive(wrconn, &buf, &wait_fd);
447 					}
448 
449 					/* Let the master know that we received some data. */
450 					XLogWalRcvSendReply(false, false);
451 
452 					/*
453 					 * If we've written some records, flush them to disk and
454 					 * let the startup process and primary server know about
455 					 * them.
456 					 */
457 					XLogWalRcvFlush(false);
458 				}
459 
460 				/* Check if we need to exit the streaming loop. */
461 				if (endofwal)
462 					break;
463 
464 				/*
465 				 * Ideally we would reuse a WaitEventSet object repeatedly
466 				 * here to avoid the overheads of WaitLatchOrSocket on epoll
467 				 * systems, but we can't be sure that libpq (or any other
468 				 * walreceiver implementation) has the same socket (even if
469 				 * the fd is the same number, it may have been closed and
470 				 * reopened since the last time).  In future, if there is a
471 				 * function for removing sockets from WaitEventSet, then we
472 				 * could add and remove just the socket each time, potentially
473 				 * avoiding some system calls.
474 				 */
475 				Assert(wait_fd != PGINVALID_SOCKET);
476 				rc = WaitLatchOrSocket(walrcv->latch,
477 									   WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
478 									   WL_TIMEOUT | WL_LATCH_SET,
479 									   wait_fd,
480 									   NAPTIME_PER_CYCLE,
481 									   WAIT_EVENT_WAL_RECEIVER_MAIN);
482 				if (rc & WL_LATCH_SET)
483 				{
484 					ResetLatch(walrcv->latch);
485 					ProcessWalRcvInterrupts();
486 
487 					if (walrcv->force_reply)
488 					{
489 						/*
490 						 * The recovery process has asked us to send apply
491 						 * feedback now.  Make sure the flag is really set to
492 						 * false in shared memory before sending the reply, so
493 						 * we don't miss a new request for a reply.
494 						 */
495 						walrcv->force_reply = false;
496 						pg_memory_barrier();
497 						XLogWalRcvSendReply(true, false);
498 					}
499 				}
500 				if (rc & WL_POSTMASTER_DEATH)
501 				{
502 					/*
503 					 * Emergency bailout if postmaster has died.  This is to
504 					 * avoid the necessity for manual cleanup of all
505 					 * postmaster children.
506 					 */
507 					exit(1);
508 				}
509 				if (rc & WL_TIMEOUT)
510 				{
511 					/*
512 					 * We didn't receive anything new. If we haven't heard
513 					 * anything from the server for more than
514 					 * wal_receiver_timeout / 2, ping the server. Also, if
515 					 * it's been longer than wal_receiver_status_interval
516 					 * since the last update we sent, send a status update to
517 					 * the master anyway, to report any progress in applying
518 					 * WAL.
519 					 */
520 					bool		requestReply = false;
521 
522 					/*
523 					 * Check if time since last receive from standby has
524 					 * reached the configured limit.
525 					 */
526 					if (wal_receiver_timeout > 0)
527 					{
528 						TimestampTz now = GetCurrentTimestamp();
529 						TimestampTz timeout;
530 
531 						timeout =
532 							TimestampTzPlusMilliseconds(last_recv_timestamp,
533 														wal_receiver_timeout);
534 
535 						if (now >= timeout)
536 							ereport(ERROR,
537 									(errmsg("terminating walreceiver due to timeout")));
538 
539 						/*
540 						 * We didn't receive anything new, for half of
541 						 * receiver replication timeout. Ping the server.
542 						 */
543 						if (!ping_sent)
544 						{
545 							timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
546 																  (wal_receiver_timeout / 2));
547 							if (now >= timeout)
548 							{
549 								requestReply = true;
550 								ping_sent = true;
551 							}
552 						}
553 					}
554 
555 					XLogWalRcvSendReply(requestReply, requestReply);
556 					XLogWalRcvSendHSFeedback(false);
557 				}
558 			}
559 
560 			/*
561 			 * The backend finished streaming. Exit streaming COPY-mode from
562 			 * our side, too.
563 			 */
564 			walrcv_endstreaming(wrconn, &primaryTLI);
565 
566 			/*
567 			 * If the server had switched to a new timeline that we didn't
568 			 * know about when we began streaming, fetch its timeline history
569 			 * file now.
570 			 */
571 			WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
572 		}
573 		else
574 			ereport(LOG,
575 					(errmsg("primary server contains no more WAL on requested timeline %u",
576 							startpointTLI)));
577 
578 		/*
579 		 * End of WAL reached on the requested timeline. Close the last
580 		 * segment, and await for new orders from the startup process.
581 		 */
582 		if (recvFile >= 0)
583 		{
584 			char		xlogfname[MAXFNAMELEN];
585 
586 			XLogWalRcvFlush(false);
587 			if (close(recvFile) != 0)
588 				ereport(PANIC,
589 						(errcode_for_file_access(),
590 						 errmsg("could not close log segment %s: %m",
591 								XLogFileNameP(recvFileTLI, recvSegNo))));
592 
593 			/*
594 			 * Create .done file forcibly to prevent the streamed segment from
595 			 * being archived later.
596 			 */
597 			XLogFileName(xlogfname, recvFileTLI, recvSegNo);
598 			if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
599 				XLogArchiveForceDone(xlogfname);
600 			else
601 				XLogArchiveNotify(xlogfname);
602 		}
603 		recvFile = -1;
604 
605 		elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
606 		WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
607 	}
608 	/* not reached */
609 }
610 
611 /*
612  * Wait for startup process to set receiveStart and receiveStartTLI.
613  */
614 static void
WalRcvWaitForStartPosition(XLogRecPtr * startpoint,TimeLineID * startpointTLI)615 WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
616 {
617 	WalRcvData *walrcv = WalRcv;
618 	int			state;
619 
620 	SpinLockAcquire(&walrcv->mutex);
621 	state = walrcv->walRcvState;
622 	if (state != WALRCV_STREAMING)
623 	{
624 		SpinLockRelease(&walrcv->mutex);
625 		if (state == WALRCV_STOPPING)
626 			proc_exit(0);
627 		else
628 			elog(FATAL, "unexpected walreceiver state");
629 	}
630 	walrcv->walRcvState = WALRCV_WAITING;
631 	walrcv->receiveStart = InvalidXLogRecPtr;
632 	walrcv->receiveStartTLI = 0;
633 	SpinLockRelease(&walrcv->mutex);
634 
635 	if (update_process_title)
636 		set_ps_display("idle", false);
637 
638 	/*
639 	 * nudge startup process to notice that we've stopped streaming and are
640 	 * now waiting for instructions.
641 	 */
642 	WakeupRecovery();
643 	for (;;)
644 	{
645 		ResetLatch(walrcv->latch);
646 
647 		/*
648 		 * Emergency bailout if postmaster has died.  This is to avoid the
649 		 * necessity for manual cleanup of all postmaster children.
650 		 */
651 		if (!PostmasterIsAlive())
652 			exit(1);
653 
654 		ProcessWalRcvInterrupts();
655 
656 		SpinLockAcquire(&walrcv->mutex);
657 		Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
658 			   walrcv->walRcvState == WALRCV_WAITING ||
659 			   walrcv->walRcvState == WALRCV_STOPPING);
660 		if (walrcv->walRcvState == WALRCV_RESTARTING)
661 		{
662 			/* we don't expect primary_conninfo to change */
663 			*startpoint = walrcv->receiveStart;
664 			*startpointTLI = walrcv->receiveStartTLI;
665 			walrcv->walRcvState = WALRCV_STREAMING;
666 			SpinLockRelease(&walrcv->mutex);
667 			break;
668 		}
669 		if (walrcv->walRcvState == WALRCV_STOPPING)
670 		{
671 			/*
672 			 * We should've received SIGTERM if the startup process wants us
673 			 * to die, but might as well check it here too.
674 			 */
675 			SpinLockRelease(&walrcv->mutex);
676 			exit(1);
677 		}
678 		SpinLockRelease(&walrcv->mutex);
679 
680 		WaitLatch(walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
681 				  WAIT_EVENT_WAL_RECEIVER_WAIT_START);
682 	}
683 
684 	if (update_process_title)
685 	{
686 		char		activitymsg[50];
687 
688 		snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
689 				 (uint32) (*startpoint >> 32),
690 				 (uint32) *startpoint);
691 		set_ps_display(activitymsg, false);
692 	}
693 }
694 
695 /*
696  * Fetch any missing timeline history files between 'first' and 'last'
697  * (inclusive) from the server.
698  */
699 static void
WalRcvFetchTimeLineHistoryFiles(TimeLineID first,TimeLineID last)700 WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
701 {
702 	TimeLineID	tli;
703 
704 	for (tli = first; tli <= last; tli++)
705 	{
706 		/* there's no history file for timeline 1 */
707 		if (tli != 1 && !existsTimeLineHistory(tli))
708 		{
709 			char	   *fname;
710 			char	   *content;
711 			int			len;
712 			char		expectedfname[MAXFNAMELEN];
713 
714 			ereport(LOG,
715 					(errmsg("fetching timeline history file for timeline %u from primary server",
716 							tli)));
717 
718 			walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
719 
720 			/*
721 			 * Check that the filename on the master matches what we
722 			 * calculated ourselves. This is just a sanity check, it should
723 			 * always match.
724 			 */
725 			TLHistoryFileName(expectedfname, tli);
726 			if (strcmp(fname, expectedfname) != 0)
727 				ereport(ERROR,
728 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
729 						 errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
730 										 tli)));
731 
732 			/*
733 			 * Write the file to pg_wal.
734 			 */
735 			writeTimeLineHistoryFile(tli, content, len);
736 
737 			/*
738 			 * Mark the streamed history file as ready for archiving
739 			 * if archive_mode is always.
740 			 */
741 			if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
742 				XLogArchiveForceDone(fname);
743 			else
744 				XLogArchiveNotify(fname);
745 
746 			pfree(fname);
747 			pfree(content);
748 		}
749 	}
750 }
751 
752 /*
753  * Mark us as STOPPED in shared memory at exit.
754  */
755 static void
WalRcvDie(int code,Datum arg)756 WalRcvDie(int code, Datum arg)
757 {
758 	WalRcvData *walrcv = WalRcv;
759 
760 	/* Ensure that all WAL records received are flushed to disk */
761 	XLogWalRcvFlush(true);
762 
763 	/* Mark ourselves inactive in shared memory */
764 	SpinLockAcquire(&walrcv->mutex);
765 	Assert(walrcv->walRcvState == WALRCV_STREAMING ||
766 		   walrcv->walRcvState == WALRCV_RESTARTING ||
767 		   walrcv->walRcvState == WALRCV_STARTING ||
768 		   walrcv->walRcvState == WALRCV_WAITING ||
769 		   walrcv->walRcvState == WALRCV_STOPPING);
770 	Assert(walrcv->pid == MyProcPid);
771 	walrcv->walRcvState = WALRCV_STOPPED;
772 	walrcv->pid = 0;
773 	walrcv->ready_to_display = false;
774 	walrcv->latch = NULL;
775 	SpinLockRelease(&walrcv->mutex);
776 
777 	/* Terminate the connection gracefully. */
778 	if (wrconn != NULL)
779 		walrcv_disconnect(wrconn);
780 
781 	/* Wake up the startup process to notice promptly that we're gone */
782 	WakeupRecovery();
783 }
784 
785 /* SIGHUP: set flag to re-read config file at next convenient time */
786 static void
WalRcvSigHupHandler(SIGNAL_ARGS)787 WalRcvSigHupHandler(SIGNAL_ARGS)
788 {
789 	got_SIGHUP = true;
790 }
791 
792 
793 /* SIGUSR1: used by latch mechanism */
794 static void
WalRcvSigUsr1Handler(SIGNAL_ARGS)795 WalRcvSigUsr1Handler(SIGNAL_ARGS)
796 {
797 	int			save_errno = errno;
798 
799 	latch_sigusr1_handler();
800 
801 	errno = save_errno;
802 }
803 
804 /* SIGTERM: set flag for ProcessWalRcvInterrupts */
805 static void
WalRcvShutdownHandler(SIGNAL_ARGS)806 WalRcvShutdownHandler(SIGNAL_ARGS)
807 {
808 	int			save_errno = errno;
809 
810 	got_SIGTERM = true;
811 
812 	if (WalRcv->latch)
813 		SetLatch(WalRcv->latch);
814 
815 	errno = save_errno;
816 }
817 
818 /*
819  * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
820  *
821  * Some backend has bought the farm, so we need to stop what we're doing and
822  * exit.
823  */
824 static void
WalRcvQuickDieHandler(SIGNAL_ARGS)825 WalRcvQuickDieHandler(SIGNAL_ARGS)
826 {
827 	/*
828 	 * We DO NOT want to run proc_exit() or atexit() callbacks -- we're here
829 	 * because shared memory may be corrupted, so we don't want to try to
830 	 * clean up our transaction.  Just nail the windows shut and get out of
831 	 * town.  The callbacks wouldn't be safe to run from a signal handler,
832 	 * anyway.
833 	 *
834 	 * Note we use _exit(2) not _exit(0).  This is to force the postmaster
835 	 * into a system reset cycle if someone sends a manual SIGQUIT to a
836 	 * random backend.  This is necessary precisely because we don't clean up
837 	 * our shared memory state.  (The "dead man switch" mechanism in
838 	 * pmsignal.c should ensure the postmaster sees this as a crash, too, but
839 	 * no harm in being doubly sure.)
840 	 */
841 	_exit(2);
842 }
843 
844 /*
845  * Accept the message from XLOG stream, and process it.
846  */
847 static void
XLogWalRcvProcessMsg(unsigned char type,char * buf,Size len)848 XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
849 {
850 	int			hdrlen;
851 	XLogRecPtr	dataStart;
852 	XLogRecPtr	walEnd;
853 	TimestampTz sendTime;
854 	bool		replyRequested;
855 
856 	resetStringInfo(&incoming_message);
857 
858 	switch (type)
859 	{
860 		case 'w':				/* WAL records */
861 			{
862 				/* copy message to StringInfo */
863 				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
864 				if (len < hdrlen)
865 					ereport(ERROR,
866 							(errcode(ERRCODE_PROTOCOL_VIOLATION),
867 							 errmsg_internal("invalid WAL message received from primary")));
868 				appendBinaryStringInfo(&incoming_message, buf, hdrlen);
869 
870 				/* read the fields */
871 				dataStart = pq_getmsgint64(&incoming_message);
872 				walEnd = pq_getmsgint64(&incoming_message);
873 				sendTime = pq_getmsgint64(&incoming_message);
874 				ProcessWalSndrMessage(walEnd, sendTime);
875 
876 				buf += hdrlen;
877 				len -= hdrlen;
878 				XLogWalRcvWrite(buf, len, dataStart);
879 				break;
880 			}
881 		case 'k':				/* Keepalive */
882 			{
883 				/* copy message to StringInfo */
884 				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
885 				if (len != hdrlen)
886 					ereport(ERROR,
887 							(errcode(ERRCODE_PROTOCOL_VIOLATION),
888 							 errmsg_internal("invalid keepalive message received from primary")));
889 				appendBinaryStringInfo(&incoming_message, buf, hdrlen);
890 
891 				/* read the fields */
892 				walEnd = pq_getmsgint64(&incoming_message);
893 				sendTime = pq_getmsgint64(&incoming_message);
894 				replyRequested = pq_getmsgbyte(&incoming_message);
895 
896 				ProcessWalSndrMessage(walEnd, sendTime);
897 
898 				/* If the primary requested a reply, send one immediately */
899 				if (replyRequested)
900 					XLogWalRcvSendReply(true, false);
901 				break;
902 			}
903 		default:
904 			ereport(ERROR,
905 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
906 					 errmsg_internal("invalid replication message type %d",
907 									 type)));
908 	}
909 }
910 
911 /*
912  * Write XLOG data to disk.
913  */
914 static void
XLogWalRcvWrite(char * buf,Size nbytes,XLogRecPtr recptr)915 XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
916 {
917 	int			startoff;
918 	int			byteswritten;
919 
920 	while (nbytes > 0)
921 	{
922 		int			segbytes;
923 
924 		/* Close the current segment if it's completed */
925 		if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo))
926 			XLogWalRcvClose(recptr);
927 
928 		if (recvFile < 0)
929 		{
930 			bool		use_existent = true;
931 
932 			/* Create/use new log file */
933 			XLByteToSeg(recptr, recvSegNo);
934 			recvFile = XLogFileInit(recvSegNo, &use_existent, true);
935 			recvFileTLI = ThisTimeLineID;
936 			recvOff = 0;
937 		}
938 
939 		/* Calculate the start offset of the received logs */
940 		startoff = recptr % XLogSegSize;
941 
942 		if (startoff + nbytes > XLogSegSize)
943 			segbytes = XLogSegSize - startoff;
944 		else
945 			segbytes = nbytes;
946 
947 		/* Need to seek in the file? */
948 		if (recvOff != startoff)
949 		{
950 			if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
951 				ereport(PANIC,
952 						(errcode_for_file_access(),
953 						 errmsg("could not seek in log segment %s to offset %u: %m",
954 								XLogFileNameP(recvFileTLI, recvSegNo),
955 								startoff)));
956 			recvOff = startoff;
957 		}
958 
959 		/* OK to write the logs */
960 		errno = 0;
961 
962 		byteswritten = write(recvFile, buf, segbytes);
963 		if (byteswritten <= 0)
964 		{
965 			/* if write didn't set errno, assume no disk space */
966 			if (errno == 0)
967 				errno = ENOSPC;
968 			ereport(PANIC,
969 					(errcode_for_file_access(),
970 					 errmsg("could not write to log segment %s "
971 							"at offset %u, length %lu: %m",
972 							XLogFileNameP(recvFileTLI, recvSegNo),
973 							recvOff, (unsigned long) segbytes)));
974 		}
975 
976 		/* Update state for write */
977 		recptr += byteswritten;
978 
979 		recvOff += byteswritten;
980 		nbytes -= byteswritten;
981 		buf += byteswritten;
982 
983 		LogstreamResult.Write = recptr;
984 	}
985 
986 	/*
987 	 * Close the current segment if it's fully written up in the last cycle of
988 	 * the loop, to create its archive notification file soon. Otherwise WAL
989 	 * archiving of the segment will be delayed until any data in the next
990 	 * segment is received and written.
991 	 */
992 	if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo))
993 		XLogWalRcvClose(recptr);
994 }
995 
996 /*
997  * Flush the log to disk.
998  *
999  * If we're in the midst of dying, it's unwise to do anything that might throw
1000  * an error, so we skip sending a reply in that case.
1001  */
1002 static void
XLogWalRcvFlush(bool dying)1003 XLogWalRcvFlush(bool dying)
1004 {
1005 	if (LogstreamResult.Flush < LogstreamResult.Write)
1006 	{
1007 		WalRcvData *walrcv = WalRcv;
1008 
1009 		issue_xlog_fsync(recvFile, recvSegNo);
1010 
1011 		LogstreamResult.Flush = LogstreamResult.Write;
1012 
1013 		/* Update shared-memory status */
1014 		SpinLockAcquire(&walrcv->mutex);
1015 		if (walrcv->receivedUpto < LogstreamResult.Flush)
1016 		{
1017 			walrcv->latestChunkStart = walrcv->receivedUpto;
1018 			walrcv->receivedUpto = LogstreamResult.Flush;
1019 			walrcv->receivedTLI = ThisTimeLineID;
1020 		}
1021 		SpinLockRelease(&walrcv->mutex);
1022 
1023 		/* Signal the startup process and walsender that new WAL has arrived */
1024 		WakeupRecovery();
1025 		if (AllowCascadeReplication())
1026 			WalSndWakeup();
1027 
1028 		/* Report XLOG streaming progress in PS display */
1029 		if (update_process_title)
1030 		{
1031 			char		activitymsg[50];
1032 
1033 			snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1034 					 (uint32) (LogstreamResult.Write >> 32),
1035 					 (uint32) LogstreamResult.Write);
1036 			set_ps_display(activitymsg, false);
1037 		}
1038 
1039 		/* Also let the master know that we made some progress */
1040 		if (!dying)
1041 		{
1042 			XLogWalRcvSendReply(false, false);
1043 			XLogWalRcvSendHSFeedback(false);
1044 		}
1045 	}
1046 }
1047 
1048 /*
1049  * Close the current segment.
1050  *
1051  * Flush the segment to disk before closing it. Otherwise we have to
1052  * reopen and fsync it later.
1053  *
1054  * Create an archive notification file since the segment is known completed.
1055  */
1056 static void
XLogWalRcvClose(XLogRecPtr recptr)1057 XLogWalRcvClose(XLogRecPtr recptr)
1058 {
1059 	char		xlogfname[MAXFNAMELEN];
1060 
1061 	Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo));
1062 
1063 	/*
1064 	 * fsync() and close current file before we switch to next one. We would
1065 	 * otherwise have to reopen this file to fsync it later
1066 	 */
1067 	XLogWalRcvFlush(false);
1068 
1069 	XLogFileName(xlogfname, recvFileTLI, recvSegNo);
1070 
1071 	/*
1072 	 * XLOG segment files will be re-read by recovery in startup process soon,
1073 	 * so we don't advise the OS to release cache pages associated with the
1074 	 * file like XLogFileClose() does.
1075 	 */
1076 	if (close(recvFile) != 0)
1077 		ereport(PANIC,
1078 				(errcode_for_file_access(),
1079 				 errmsg("could not close log segment %s: %m",
1080 						xlogfname)));
1081 
1082 	/*
1083 	 * Create .done file forcibly to prevent the streamed segment from being
1084 	 * archived later.
1085 	 */
1086 	if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
1087 		XLogArchiveForceDone(xlogfname);
1088 	else
1089 		XLogArchiveNotify(xlogfname);
1090 
1091 	recvFile = -1;
1092 }
1093 
1094 /*
1095  * Send reply message to primary, indicating our current WAL locations, oldest
1096  * xmin and the current time.
1097  *
1098  * If 'force' is not set, the message is only sent if enough time has
1099  * passed since last status update to reach wal_receiver_status_interval.
1100  * If wal_receiver_status_interval is disabled altogether and 'force' is
1101  * false, this is a no-op.
1102  *
1103  * If 'requestReply' is true, requests the server to reply immediately upon
1104  * receiving this message. This is used for heartbearts, when approaching
1105  * wal_receiver_timeout.
1106  */
1107 static void
XLogWalRcvSendReply(bool force,bool requestReply)1108 XLogWalRcvSendReply(bool force, bool requestReply)
1109 {
1110 	static XLogRecPtr writePtr = 0;
1111 	static XLogRecPtr flushPtr = 0;
1112 	XLogRecPtr	applyPtr;
1113 	static TimestampTz sendTime = 0;
1114 	TimestampTz now;
1115 
1116 	/*
1117 	 * If the user doesn't want status to be reported to the master, be sure
1118 	 * to exit before doing anything at all.
1119 	 */
1120 	if (!force && wal_receiver_status_interval <= 0)
1121 		return;
1122 
1123 	/* Get current timestamp. */
1124 	now = GetCurrentTimestamp();
1125 
1126 	/*
1127 	 * We can compare the write and flush positions to the last message we
1128 	 * sent without taking any lock, but the apply position requires a spin
1129 	 * lock, so we don't check that unless something else has changed or 10
1130 	 * seconds have passed.  This means that the apply WAL location will
1131 	 * appear, from the master's point of view, to lag slightly, but since
1132 	 * this is only for reporting purposes and only on idle systems, that's
1133 	 * probably OK.
1134 	 */
1135 	if (!force
1136 		&& writePtr == LogstreamResult.Write
1137 		&& flushPtr == LogstreamResult.Flush
1138 		&& !TimestampDifferenceExceeds(sendTime, now,
1139 									   wal_receiver_status_interval * 1000))
1140 		return;
1141 	sendTime = now;
1142 
1143 	/* Construct a new message */
1144 	writePtr = LogstreamResult.Write;
1145 	flushPtr = LogstreamResult.Flush;
1146 	applyPtr = GetXLogReplayRecPtr(NULL);
1147 
1148 	resetStringInfo(&reply_message);
1149 	pq_sendbyte(&reply_message, 'r');
1150 	pq_sendint64(&reply_message, writePtr);
1151 	pq_sendint64(&reply_message, flushPtr);
1152 	pq_sendint64(&reply_message, applyPtr);
1153 	pq_sendint64(&reply_message, GetCurrentTimestamp());
1154 	pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1155 
1156 	/* Send it */
1157 	elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1158 		 (uint32) (writePtr >> 32), (uint32) writePtr,
1159 		 (uint32) (flushPtr >> 32), (uint32) flushPtr,
1160 		 (uint32) (applyPtr >> 32), (uint32) applyPtr,
1161 		 requestReply ? " (reply requested)" : "");
1162 
1163 	walrcv_send(wrconn, reply_message.data, reply_message.len);
1164 }
1165 
1166 /*
1167  * Send hot standby feedback message to primary, plus the current time,
1168  * in case they don't have a watch.
1169  *
1170  * If the user disables feedback, send one final message to tell sender
1171  * to forget about the xmin on this standby. We also send this message
1172  * on first connect because a previous connection might have set xmin
1173  * on a replication slot. (If we're not using a slot it's harmless to
1174  * send a feedback message explicitly setting InvalidTransactionId).
1175  */
1176 static void
XLogWalRcvSendHSFeedback(bool immed)1177 XLogWalRcvSendHSFeedback(bool immed)
1178 {
1179 	TimestampTz now;
1180 	TransactionId nextXid;
1181 	uint32		xmin_epoch,
1182 				catalog_xmin_epoch;
1183 	TransactionId xmin,
1184 				catalog_xmin;
1185 	static TimestampTz sendTime = 0;
1186 
1187 	/* initially true so we always send at least one feedback message */
1188 	static bool master_has_standby_xmin = true;
1189 
1190 	/*
1191 	 * If the user doesn't want status to be reported to the master, be sure
1192 	 * to exit before doing anything at all.
1193 	 */
1194 	if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
1195 		!master_has_standby_xmin)
1196 		return;
1197 
1198 	/* Get current timestamp. */
1199 	now = GetCurrentTimestamp();
1200 
1201 	if (!immed)
1202 	{
1203 		/*
1204 		 * Send feedback at most once per wal_receiver_status_interval.
1205 		 */
1206 		if (!TimestampDifferenceExceeds(sendTime, now,
1207 										wal_receiver_status_interval * 1000))
1208 			return;
1209 		sendTime = now;
1210 	}
1211 
1212 	/*
1213 	 * If Hot Standby is not yet accepting connections there is nothing to
1214 	 * send. Check this after the interval has expired to reduce number of
1215 	 * calls.
1216 	 *
1217 	 * Bailing out here also ensures that we don't send feedback until we've
1218 	 * read our own replication slot state, so we don't tell the master to
1219 	 * discard needed xmin or catalog_xmin from any slots that may exist on
1220 	 * this replica.
1221 	 */
1222 	if (!HotStandbyActive())
1223 		return;
1224 
1225 	/*
1226 	 * Make the expensive call to get the oldest xmin once we are certain
1227 	 * everything else has been checked.
1228 	 */
1229 	if (hot_standby_feedback)
1230 	{
1231 		TransactionId slot_xmin;
1232 
1233 		/*
1234 		 * Usually GetOldestXmin() would include both global replication slot
1235 		 * xmin and catalog_xmin in its calculations, but we want to derive
1236 		 * separate values for each of those. So we ask for an xmin that
1237 		 * excludes the catalog_xmin.
1238 		 */
1239 		xmin = GetOldestXmin(NULL,
1240 							 PROCARRAY_FLAGS_DEFAULT | PROCARRAY_SLOTS_XMIN);
1241 
1242 		ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
1243 
1244 		if (TransactionIdIsValid(slot_xmin) &&
1245 			TransactionIdPrecedes(slot_xmin, xmin))
1246 			xmin = slot_xmin;
1247 	}
1248 	else
1249 	{
1250 		xmin = InvalidTransactionId;
1251 		catalog_xmin = InvalidTransactionId;
1252 	}
1253 
1254 	/*
1255 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
1256 	 * the epoch boundary.
1257 	 */
1258 	GetNextXidAndEpoch(&nextXid, &xmin_epoch);
1259 	catalog_xmin_epoch = xmin_epoch;
1260 	if (nextXid < xmin)
1261 		xmin_epoch--;
1262 	if (nextXid < catalog_xmin)
1263 		catalog_xmin_epoch--;
1264 
1265 	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1266 		 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1267 
1268 	/* Construct the message and send it. */
1269 	resetStringInfo(&reply_message);
1270 	pq_sendbyte(&reply_message, 'h');
1271 	pq_sendint64(&reply_message, GetCurrentTimestamp());
1272 	pq_sendint(&reply_message, xmin, 4);
1273 	pq_sendint(&reply_message, xmin_epoch, 4);
1274 	pq_sendint(&reply_message, catalog_xmin, 4);
1275 	pq_sendint(&reply_message, catalog_xmin_epoch, 4);
1276 	walrcv_send(wrconn, reply_message.data, reply_message.len);
1277 	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1278 		master_has_standby_xmin = true;
1279 	else
1280 		master_has_standby_xmin = false;
1281 }
1282 
1283 /*
1284  * Update shared memory status upon receiving a message from primary.
1285  *
1286  * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
1287  * message, reported by primary.
1288  */
1289 static void
ProcessWalSndrMessage(XLogRecPtr walEnd,TimestampTz sendTime)1290 ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
1291 {
1292 	WalRcvData *walrcv = WalRcv;
1293 
1294 	TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1295 
1296 	/* Update shared-memory status */
1297 	SpinLockAcquire(&walrcv->mutex);
1298 	if (walrcv->latestWalEnd < walEnd)
1299 		walrcv->latestWalEndTime = sendTime;
1300 	walrcv->latestWalEnd = walEnd;
1301 	walrcv->lastMsgSendTime = sendTime;
1302 	walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1303 	SpinLockRelease(&walrcv->mutex);
1304 
1305 	if (log_min_messages <= DEBUG2)
1306 	{
1307 		char	   *sendtime;
1308 		char	   *receipttime;
1309 		int			applyDelay;
1310 
1311 		/* Copy because timestamptz_to_str returns a static buffer */
1312 		sendtime = pstrdup(timestamptz_to_str(sendTime));
1313 		receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1314 		applyDelay = GetReplicationApplyDelay();
1315 
1316 		/* apply delay is not available */
1317 		if (applyDelay == -1)
1318 			elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1319 				 sendtime,
1320 				 receipttime,
1321 				 GetReplicationTransferLatency());
1322 		else
1323 			elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1324 				 sendtime,
1325 				 receipttime,
1326 				 applyDelay,
1327 				 GetReplicationTransferLatency());
1328 
1329 		pfree(sendtime);
1330 		pfree(receipttime);
1331 	}
1332 }
1333 
1334 /*
1335  * Wake up the walreceiver main loop.
1336  *
1337  * This is called by the startup process whenever interesting xlog records
1338  * are applied, so that walreceiver can check if it needs to send an apply
1339  * notification back to the master which may be waiting in a COMMIT with
1340  * synchronous_commit = remote_apply.
1341  */
1342 void
WalRcvForceReply(void)1343 WalRcvForceReply(void)
1344 {
1345 	Latch	   *latch;
1346 
1347 	WalRcv->force_reply = true;
1348 	/* fetching the latch pointer might not be atomic, so use spinlock */
1349 	SpinLockAcquire(&WalRcv->mutex);
1350 	latch = WalRcv->latch;
1351 	SpinLockRelease(&WalRcv->mutex);
1352 	if (latch)
1353 		SetLatch(latch);
1354 }
1355 
1356 /*
1357  * Return a string constant representing the state. This is used
1358  * in system functions and views, and should *not* be translated.
1359  */
1360 static const char *
WalRcvGetStateString(WalRcvState state)1361 WalRcvGetStateString(WalRcvState state)
1362 {
1363 	switch (state)
1364 	{
1365 		case WALRCV_STOPPED:
1366 			return "stopped";
1367 		case WALRCV_STARTING:
1368 			return "starting";
1369 		case WALRCV_STREAMING:
1370 			return "streaming";
1371 		case WALRCV_WAITING:
1372 			return "waiting";
1373 		case WALRCV_RESTARTING:
1374 			return "restarting";
1375 		case WALRCV_STOPPING:
1376 			return "stopping";
1377 	}
1378 	return "UNKNOWN";
1379 }
1380 
1381 /*
1382  * Returns activity of WAL receiver, including pid, state and xlog locations
1383  * received from the WAL sender of another server.
1384  */
1385 Datum
pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)1386 pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
1387 {
1388 	TupleDesc	tupdesc;
1389 	Datum	   *values;
1390 	bool	   *nulls;
1391 	int			pid;
1392 	bool		ready_to_display;
1393 	WalRcvState state;
1394 	XLogRecPtr	receive_start_lsn;
1395 	TimeLineID	receive_start_tli;
1396 	XLogRecPtr	received_lsn;
1397 	TimeLineID	received_tli;
1398 	TimestampTz last_send_time;
1399 	TimestampTz last_receipt_time;
1400 	XLogRecPtr	latest_end_lsn;
1401 	TimestampTz latest_end_time;
1402 	char		slotname[NAMEDATALEN];
1403 	char		conninfo[MAXCONNINFO];
1404 
1405 	/* Take a lock to ensure value consistency */
1406 	SpinLockAcquire(&WalRcv->mutex);
1407 	pid = (int) WalRcv->pid;
1408 	ready_to_display = WalRcv->ready_to_display;
1409 	state = WalRcv->walRcvState;
1410 	receive_start_lsn = WalRcv->receiveStart;
1411 	receive_start_tli = WalRcv->receiveStartTLI;
1412 	received_lsn = WalRcv->receivedUpto;
1413 	received_tli = WalRcv->receivedTLI;
1414 	last_send_time = WalRcv->lastMsgSendTime;
1415 	last_receipt_time = WalRcv->lastMsgReceiptTime;
1416 	latest_end_lsn = WalRcv->latestWalEnd;
1417 	latest_end_time = WalRcv->latestWalEndTime;
1418 	strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1419 	strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1420 	SpinLockRelease(&WalRcv->mutex);
1421 
1422 	/*
1423 	 * No WAL receiver (or not ready yet), just return a tuple with NULL
1424 	 * values
1425 	 */
1426 	if (pid == 0 || !ready_to_display)
1427 		PG_RETURN_NULL();
1428 
1429 	/* determine result type */
1430 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1431 		elog(ERROR, "return type must be a row type");
1432 
1433 	values = palloc0(sizeof(Datum) * tupdesc->natts);
1434 	nulls = palloc0(sizeof(bool) * tupdesc->natts);
1435 
1436 	/* Fetch values */
1437 	values[0] = Int32GetDatum(pid);
1438 
1439 	if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
1440 	{
1441 		/*
1442 		 * Only superusers and members of pg_read_all_stats can see details.
1443 		 * Other users only get the pid value
1444 		 * to know whether it is a WAL receiver, but no details.
1445 		 */
1446 		MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1447 	}
1448 	else
1449 	{
1450 		values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1451 
1452 		if (XLogRecPtrIsInvalid(receive_start_lsn))
1453 			nulls[2] = true;
1454 		else
1455 			values[2] = LSNGetDatum(receive_start_lsn);
1456 		values[3] = Int32GetDatum(receive_start_tli);
1457 		if (XLogRecPtrIsInvalid(received_lsn))
1458 			nulls[4] = true;
1459 		else
1460 			values[4] = LSNGetDatum(received_lsn);
1461 		values[5] = Int32GetDatum(received_tli);
1462 		if (last_send_time == 0)
1463 			nulls[6] = true;
1464 		else
1465 			values[6] = TimestampTzGetDatum(last_send_time);
1466 		if (last_receipt_time == 0)
1467 			nulls[7] = true;
1468 		else
1469 			values[7] = TimestampTzGetDatum(last_receipt_time);
1470 		if (XLogRecPtrIsInvalid(latest_end_lsn))
1471 			nulls[8] = true;
1472 		else
1473 			values[8] = LSNGetDatum(latest_end_lsn);
1474 		if (latest_end_time == 0)
1475 			nulls[9] = true;
1476 		else
1477 			values[9] = TimestampTzGetDatum(latest_end_time);
1478 		if (*slotname == '\0')
1479 			nulls[10] = true;
1480 		else
1481 			values[10] = CStringGetTextDatum(slotname);
1482 		if (*conninfo == '\0')
1483 			nulls[11] = true;
1484 		else
1485 			values[11] = CStringGetTextDatum(conninfo);
1486 	}
1487 
1488 	/* Returns the record as Datum */
1489 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1490 }
1491