1 /*-------------------------------------------------------------------------
2  *
3  * walreceiver.c
4  *
5  * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6  * is the process in the standby server that takes charge of receiving
7  * XLOG records from a primary server during streaming replication.
8  *
9  * When the startup process determines that it's time to start streaming,
10  * it instructs postmaster to start walreceiver. Walreceiver first connects
11  * to the primary server (it will be served by a walsender process
12  * in the primary server), and then keeps receiving XLOG records and
13  * writing them to the disk as long as the connection is alive. As XLOG
14  * records are received and flushed to disk, it updates the
15  * WalRcv->receivedUpto variable in shared memory, to inform the startup
16  * process of how far it can proceed with XLOG replay.
17  *
18  * If the primary server ends streaming, but doesn't disconnect, walreceiver
19  * goes into "waiting" mode, and waits for the startup process to give new
20  * instructions. The startup process will treat that the same as
21  * disconnection, and will rescan the archive/pg_wal directory. But when the
22  * startup process wants to try streaming replication again, it will just
23  * nudge the existing walreceiver process that's waiting, instead of launching
24  * a new one.
25  *
26  * Normal termination is by SIGTERM, which instructs the walreceiver to
27  * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
28  * process, the walreceiver will simply abort and exit on SIGQUIT. A close
29  * of the connection and a FATAL error are treated not as a crash but as
30  * normal operation.
31  *
32  * This file contains the server-facing parts of walreceiver. The libpq-
33  * specific parts are in the libpqwalreceiver module. It's loaded
34  * dynamically to avoid linking the server with libpq.
35  *
36  * Portions Copyright (c) 2010-2018, PostgreSQL Global Development Group
37  *
38  *
39  * IDENTIFICATION
40  *	  src/backend/replication/walreceiver.c
41  *
42  *-------------------------------------------------------------------------
43  */
44 #include "postgres.h"
45 
46 #include <signal.h>
47 #include <unistd.h>
48 
49 #include "access/htup_details.h"
50 #include "access/timeline.h"
51 #include "access/transam.h"
52 #include "access/xlog_internal.h"
53 #include "catalog/pg_authid.h"
54 #include "catalog/pg_type.h"
55 #include "common/ip.h"
56 #include "funcapi.h"
57 #include "libpq/pqformat.h"
58 #include "libpq/pqsignal.h"
59 #include "miscadmin.h"
60 #include "pgstat.h"
61 #include "replication/walreceiver.h"
62 #include "replication/walsender.h"
63 #include "storage/ipc.h"
64 #include "storage/pmsignal.h"
65 #include "storage/procarray.h"
66 #include "utils/builtins.h"
67 #include "utils/guc.h"
68 #include "utils/pg_lsn.h"
69 #include "utils/ps_status.h"
70 #include "utils/resowner.h"
71 #include "utils/timestamp.h"
72 
73 
74 /* GUC variables */
75 int			wal_receiver_status_interval;
76 int			wal_receiver_timeout;
77 bool		hot_standby_feedback;
78 
79 /* libpqwalreceiver connection */
80 static WalReceiverConn *wrconn = NULL;
81 WalReceiverFunctionsType *WalReceiverFunctions = NULL;
82 
83 #define NAPTIME_PER_CYCLE 100	/* max sleep time between cycles (100ms) */
84 
85 /*
86  * These variables are used similarly to openLogFile/SegNo/Off,
87  * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
88  * corresponding the filename of recvFile.
89  */
90 static int	recvFile = -1;
91 static TimeLineID recvFileTLI = 0;
92 static XLogSegNo recvSegNo = 0;
93 static uint32 recvOff = 0;
94 
95 /*
96  * Flags set by interrupt handlers of walreceiver for later service in the
97  * main loop.
98  */
99 static volatile sig_atomic_t got_SIGHUP = false;
100 static volatile sig_atomic_t got_SIGTERM = false;
101 
102 /*
103  * LogstreamResult indicates the byte positions that we have already
104  * written/fsynced.
105  */
106 static struct
107 {
108 	XLogRecPtr	Write;			/* last byte + 1 written out in the standby */
109 	XLogRecPtr	Flush;			/* last byte + 1 flushed in the standby */
110 }			LogstreamResult;
111 
112 static StringInfoData reply_message;
113 static StringInfoData incoming_message;
114 
115 /* Prototypes for private functions */
116 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
117 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
118 static void WalRcvDie(int code, Datum arg);
119 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
120 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
121 static void XLogWalRcvFlush(bool dying);
122 static void XLogWalRcvClose(XLogRecPtr recptr);
123 static void XLogWalRcvSendReply(bool force, bool requestReply);
124 static void XLogWalRcvSendHSFeedback(bool immed);
125 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
126 
127 /* Signal handlers */
128 static void WalRcvSigHupHandler(SIGNAL_ARGS);
129 static void WalRcvSigUsr1Handler(SIGNAL_ARGS);
130 static void WalRcvShutdownHandler(SIGNAL_ARGS);
131 static void WalRcvQuickDieHandler(SIGNAL_ARGS);
132 
133 
134 /*
135  * Process any interrupts the walreceiver process may have received.
136  * This should be called any time the process's latch has become set.
137  *
138  * Currently, only SIGTERM is of interest.  We can't just exit(1) within the
139  * SIGTERM signal handler, because the signal might arrive in the middle of
140  * some critical operation, like while we're holding a spinlock.  Instead, the
141  * signal handler sets a flag variable as well as setting the process's latch.
142  * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
143  * latch has become set.  Operations that could block for a long time, such as
144  * reading from a remote server, must pay attention to the latch too; see
145  * libpqrcv_PQgetResult for example.
146  */
147 void
ProcessWalRcvInterrupts(void)148 ProcessWalRcvInterrupts(void)
149 {
150 	/*
151 	 * Although walreceiver interrupt handling doesn't use the same scheme as
152 	 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
153 	 * any incoming signals on Win32.
154 	 */
155 	CHECK_FOR_INTERRUPTS();
156 
157 	if (got_SIGTERM)
158 	{
159 		ereport(FATAL,
160 				(errcode(ERRCODE_ADMIN_SHUTDOWN),
161 				 errmsg("terminating walreceiver process due to administrator command")));
162 	}
163 }
164 
165 
166 /* Main entry point for walreceiver process */
167 void
WalReceiverMain(void)168 WalReceiverMain(void)
169 {
170 	char		conninfo[MAXCONNINFO];
171 	char	   *tmp_conninfo;
172 	char		slotname[NAMEDATALEN];
173 	XLogRecPtr	startpoint;
174 	TimeLineID	startpointTLI;
175 	TimeLineID	primaryTLI;
176 	bool		first_stream;
177 	WalRcvData *walrcv = WalRcv;
178 	TimestampTz last_recv_timestamp;
179 	TimestampTz now;
180 	bool		ping_sent;
181 	char	   *err;
182 	char	   *sender_host = NULL;
183 	int			sender_port = 0;
184 
185 	/*
186 	 * WalRcv should be set up already (if we are a backend, we inherit this
187 	 * by fork() or EXEC_BACKEND mechanism from the postmaster).
188 	 */
189 	Assert(walrcv != NULL);
190 
191 	now = GetCurrentTimestamp();
192 
193 	/*
194 	 * Mark walreceiver as running in shared memory.
195 	 *
196 	 * Do this as early as possible, so that if we fail later on, we'll set
197 	 * state to STOPPED. If we die before this, the startup process will keep
198 	 * waiting for us to start up, until it times out.
199 	 */
200 	SpinLockAcquire(&walrcv->mutex);
201 	Assert(walrcv->pid == 0);
202 	switch (walrcv->walRcvState)
203 	{
204 		case WALRCV_STOPPING:
205 			/* If we've already been requested to stop, don't start up. */
206 			walrcv->walRcvState = WALRCV_STOPPED;
207 			/* fall through */
208 
209 		case WALRCV_STOPPED:
210 			SpinLockRelease(&walrcv->mutex);
211 			proc_exit(1);
212 			break;
213 
214 		case WALRCV_STARTING:
215 			/* The usual case */
216 			break;
217 
218 		case WALRCV_WAITING:
219 		case WALRCV_STREAMING:
220 		case WALRCV_RESTARTING:
221 		default:
222 			/* Shouldn't happen */
223 			SpinLockRelease(&walrcv->mutex);
224 			elog(PANIC, "walreceiver still running according to shared memory state");
225 	}
226 	/* Advertise our PID so that the startup process can kill us */
227 	walrcv->pid = MyProcPid;
228 	walrcv->walRcvState = WALRCV_STREAMING;
229 
230 	/* Fetch information required to start streaming */
231 	walrcv->ready_to_display = false;
232 	strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
233 	strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
234 	startpoint = walrcv->receiveStart;
235 	startpointTLI = walrcv->receiveStartTLI;
236 
237 	/* Initialise to a sanish value */
238 	walrcv->lastMsgSendTime =
239 		walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
240 
241 	/* Report the latch to use to awaken this process */
242 	walrcv->latch = &MyProc->procLatch;
243 
244 	SpinLockRelease(&walrcv->mutex);
245 
246 	/* Arrange to clean up at walreceiver exit */
247 	on_shmem_exit(WalRcvDie, 0);
248 
249 	/* Properly accept or ignore signals the postmaster might send us */
250 	pqsignal(SIGHUP, WalRcvSigHupHandler);	/* set flag to read config file */
251 	pqsignal(SIGINT, SIG_IGN);
252 	pqsignal(SIGTERM, WalRcvShutdownHandler);	/* request shutdown */
253 	pqsignal(SIGQUIT, WalRcvQuickDieHandler);	/* hard crash time */
254 	pqsignal(SIGALRM, SIG_IGN);
255 	pqsignal(SIGPIPE, SIG_IGN);
256 	pqsignal(SIGUSR1, WalRcvSigUsr1Handler);
257 	pqsignal(SIGUSR2, SIG_IGN);
258 
259 	/* Reset some signals that are accepted by postmaster but not here */
260 	pqsignal(SIGCHLD, SIG_DFL);
261 	pqsignal(SIGTTIN, SIG_DFL);
262 	pqsignal(SIGTTOU, SIG_DFL);
263 	pqsignal(SIGCONT, SIG_DFL);
264 	pqsignal(SIGWINCH, SIG_DFL);
265 
266 	/* We allow SIGQUIT (quickdie) at all times */
267 	sigdelset(&BlockSig, SIGQUIT);
268 
269 	/* Load the libpq-specific functions */
270 	load_file("libpqwalreceiver", false);
271 	if (WalReceiverFunctions == NULL)
272 		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
273 
274 	/*
275 	 * Create a resource owner to keep track of our resources (not clear that
276 	 * we need this, but may as well have one).
277 	 */
278 	CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
279 
280 	/* Unblock signals (they were blocked when the postmaster forked us) */
281 	PG_SETMASK(&UnBlockSig);
282 
283 	/* Establish the connection to the primary for XLOG streaming */
284 	wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
285 	if (!wrconn)
286 		ereport(ERROR,
287 				(errmsg("could not connect to the primary server: %s", err)));
288 
289 	/*
290 	 * Save user-visible connection string.  This clobbers the original
291 	 * conninfo, for security. Also save host and port of the sender server
292 	 * this walreceiver is connected to.
293 	 */
294 	tmp_conninfo = walrcv_get_conninfo(wrconn);
295 	walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
296 	SpinLockAcquire(&walrcv->mutex);
297 	memset(walrcv->conninfo, 0, MAXCONNINFO);
298 	if (tmp_conninfo)
299 		strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
300 
301 	memset(walrcv->sender_host, 0, NI_MAXHOST);
302 	if (sender_host)
303 		strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
304 
305 	walrcv->sender_port = sender_port;
306 	walrcv->ready_to_display = true;
307 	SpinLockRelease(&walrcv->mutex);
308 
309 	if (tmp_conninfo)
310 		pfree(tmp_conninfo);
311 
312 	if (sender_host)
313 		pfree(sender_host);
314 
315 	first_stream = true;
316 	for (;;)
317 	{
318 		char	   *primary_sysid;
319 		char		standby_sysid[32];
320 		int			server_version;
321 		WalRcvStreamOptions options;
322 
323 		/*
324 		 * Check that we're connected to a valid server using the
325 		 * IDENTIFY_SYSTEM replication command.
326 		 */
327 		primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
328 											   &server_version);
329 
330 		snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
331 				 GetSystemIdentifier());
332 		if (strcmp(primary_sysid, standby_sysid) != 0)
333 		{
334 			ereport(ERROR,
335 					(errmsg("database system identifier differs between the primary and standby"),
336 					 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
337 							   primary_sysid, standby_sysid)));
338 		}
339 
340 		/*
341 		 * Confirm that the current timeline of the primary is the same or
342 		 * ahead of ours.
343 		 */
344 		if (primaryTLI < startpointTLI)
345 			ereport(ERROR,
346 					(errmsg("highest timeline %u of the primary is behind recovery timeline %u",
347 							primaryTLI, startpointTLI)));
348 
349 		/*
350 		 * Get any missing history files. We do this always, even when we're
351 		 * not interested in that timeline, so that if we're promoted to
352 		 * become the master later on, we don't select the same timeline that
353 		 * was already used in the current master. This isn't bullet-proof -
354 		 * you'll need some external software to manage your cluster if you
355 		 * need to ensure that a unique timeline id is chosen in every case,
356 		 * but let's avoid the confusion of timeline id collisions where we
357 		 * can.
358 		 */
359 		WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
360 
361 		/*
362 		 * Start streaming.
363 		 *
364 		 * We'll try to start at the requested starting point and timeline,
365 		 * even if it's different from the server's latest timeline. In case
366 		 * we've already reached the end of the old timeline, the server will
367 		 * finish the streaming immediately, and we will go back to await
368 		 * orders from the startup process. If recovery_target_timeline is
369 		 * 'latest', the startup process will scan pg_wal and find the new
370 		 * history file, bump recovery target timeline, and ask us to restart
371 		 * on the new timeline.
372 		 */
373 		options.logical = false;
374 		options.startpoint = startpoint;
375 		options.slotname = slotname[0] != '\0' ? slotname : NULL;
376 		options.proto.physical.startpointTLI = startpointTLI;
377 		ThisTimeLineID = startpointTLI;
378 		if (walrcv_startstreaming(wrconn, &options))
379 		{
380 			if (first_stream)
381 				ereport(LOG,
382 						(errmsg("started streaming WAL from primary at %X/%X on timeline %u",
383 								(uint32) (startpoint >> 32), (uint32) startpoint,
384 								startpointTLI)));
385 			else
386 				ereport(LOG,
387 						(errmsg("restarted WAL streaming at %X/%X on timeline %u",
388 								(uint32) (startpoint >> 32), (uint32) startpoint,
389 								startpointTLI)));
390 			first_stream = false;
391 
392 			/* Initialize LogstreamResult and buffers for processing messages */
393 			LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
394 			initStringInfo(&reply_message);
395 			initStringInfo(&incoming_message);
396 
397 			/* Initialize the last recv timestamp */
398 			last_recv_timestamp = GetCurrentTimestamp();
399 			ping_sent = false;
400 
401 			/* Loop until end-of-streaming or error */
402 			for (;;)
403 			{
404 				char	   *buf;
405 				int			len;
406 				bool		endofwal = false;
407 				pgsocket	wait_fd = PGINVALID_SOCKET;
408 				int			rc;
409 
410 				/*
411 				 * Exit walreceiver if we're not in recovery. This should not
412 				 * happen, but cross-check the status here.
413 				 */
414 				if (!RecoveryInProgress())
415 					ereport(FATAL,
416 							(errmsg("cannot continue WAL streaming, recovery has already ended")));
417 
418 				/* Process any requests or signals received recently */
419 				ProcessWalRcvInterrupts();
420 
421 				if (got_SIGHUP)
422 				{
423 					got_SIGHUP = false;
424 					ProcessConfigFile(PGC_SIGHUP);
425 					XLogWalRcvSendHSFeedback(true);
426 				}
427 
428 				/* See if we can read data immediately */
429 				len = walrcv_receive(wrconn, &buf, &wait_fd);
430 				if (len != 0)
431 				{
432 					/*
433 					 * Process the received data, and any subsequent data we
434 					 * can read without blocking.
435 					 */
436 					for (;;)
437 					{
438 						if (len > 0)
439 						{
440 							/*
441 							 * Something was received from master, so reset
442 							 * timeout
443 							 */
444 							last_recv_timestamp = GetCurrentTimestamp();
445 							ping_sent = false;
446 							XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
447 						}
448 						else if (len == 0)
449 							break;
450 						else if (len < 0)
451 						{
452 							ereport(LOG,
453 									(errmsg("replication terminated by primary server"),
454 									 errdetail("End of WAL reached on timeline %u at %X/%X.",
455 											   startpointTLI,
456 											   (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
457 							endofwal = true;
458 							break;
459 						}
460 						len = walrcv_receive(wrconn, &buf, &wait_fd);
461 					}
462 
463 					/* Let the master know that we received some data. */
464 					XLogWalRcvSendReply(false, false);
465 
466 					/*
467 					 * If we've written some records, flush them to disk and
468 					 * let the startup process and primary server know about
469 					 * them.
470 					 */
471 					XLogWalRcvFlush(false);
472 				}
473 
474 				/* Check if we need to exit the streaming loop. */
475 				if (endofwal)
476 					break;
477 
478 				/*
479 				 * Ideally we would reuse a WaitEventSet object repeatedly
480 				 * here to avoid the overheads of WaitLatchOrSocket on epoll
481 				 * systems, but we can't be sure that libpq (or any other
482 				 * walreceiver implementation) has the same socket (even if
483 				 * the fd is the same number, it may have been closed and
484 				 * reopened since the last time).  In future, if there is a
485 				 * function for removing sockets from WaitEventSet, then we
486 				 * could add and remove just the socket each time, potentially
487 				 * avoiding some system calls.
488 				 */
489 				Assert(wait_fd != PGINVALID_SOCKET);
490 				rc = WaitLatchOrSocket(walrcv->latch,
491 									   WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
492 									   WL_TIMEOUT | WL_LATCH_SET,
493 									   wait_fd,
494 									   NAPTIME_PER_CYCLE,
495 									   WAIT_EVENT_WAL_RECEIVER_MAIN);
496 				if (rc & WL_LATCH_SET)
497 				{
498 					ResetLatch(walrcv->latch);
499 					ProcessWalRcvInterrupts();
500 
501 					if (walrcv->force_reply)
502 					{
503 						/*
504 						 * The recovery process has asked us to send apply
505 						 * feedback now.  Make sure the flag is really set to
506 						 * false in shared memory before sending the reply, so
507 						 * we don't miss a new request for a reply.
508 						 */
509 						walrcv->force_reply = false;
510 						pg_memory_barrier();
511 						XLogWalRcvSendReply(true, false);
512 					}
513 				}
514 				if (rc & WL_POSTMASTER_DEATH)
515 				{
516 					/*
517 					 * Emergency bailout if postmaster has died.  This is to
518 					 * avoid the necessity for manual cleanup of all
519 					 * postmaster children.
520 					 */
521 					exit(1);
522 				}
523 				if (rc & WL_TIMEOUT)
524 				{
525 					/*
526 					 * We didn't receive anything new. If we haven't heard
527 					 * anything from the server for more than
528 					 * wal_receiver_timeout / 2, ping the server. Also, if
529 					 * it's been longer than wal_receiver_status_interval
530 					 * since the last update we sent, send a status update to
531 					 * the master anyway, to report any progress in applying
532 					 * WAL.
533 					 */
534 					bool		requestReply = false;
535 
536 					/*
537 					 * Check if time since last receive from standby has
538 					 * reached the configured limit.
539 					 */
540 					if (wal_receiver_timeout > 0)
541 					{
542 						TimestampTz now = GetCurrentTimestamp();
543 						TimestampTz timeout;
544 
545 						timeout =
546 							TimestampTzPlusMilliseconds(last_recv_timestamp,
547 														wal_receiver_timeout);
548 
549 						if (now >= timeout)
550 							ereport(ERROR,
551 									(errmsg("terminating walreceiver due to timeout")));
552 
553 						/*
554 						 * We didn't receive anything new, for half of
555 						 * receiver replication timeout. Ping the server.
556 						 */
557 						if (!ping_sent)
558 						{
559 							timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
560 																  (wal_receiver_timeout / 2));
561 							if (now >= timeout)
562 							{
563 								requestReply = true;
564 								ping_sent = true;
565 							}
566 						}
567 					}
568 
569 					XLogWalRcvSendReply(requestReply, requestReply);
570 					XLogWalRcvSendHSFeedback(false);
571 				}
572 			}
573 
574 			/*
575 			 * The backend finished streaming. Exit streaming COPY-mode from
576 			 * our side, too.
577 			 */
578 			walrcv_endstreaming(wrconn, &primaryTLI);
579 
580 			/*
581 			 * If the server had switched to a new timeline that we didn't
582 			 * know about when we began streaming, fetch its timeline history
583 			 * file now.
584 			 */
585 			WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
586 		}
587 		else
588 			ereport(LOG,
589 					(errmsg("primary server contains no more WAL on requested timeline %u",
590 							startpointTLI)));
591 
592 		/*
593 		 * End of WAL reached on the requested timeline. Close the last
594 		 * segment, and await for new orders from the startup process.
595 		 */
596 		if (recvFile >= 0)
597 		{
598 			char		xlogfname[MAXFNAMELEN];
599 
600 			XLogWalRcvFlush(false);
601 			if (close(recvFile) != 0)
602 				ereport(PANIC,
603 						(errcode_for_file_access(),
604 						 errmsg("could not close log segment %s: %m",
605 								XLogFileNameP(recvFileTLI, recvSegNo))));
606 
607 			/*
608 			 * Create .done file forcibly to prevent the streamed segment from
609 			 * being archived later.
610 			 */
611 			XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
612 			if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
613 				XLogArchiveForceDone(xlogfname);
614 			else
615 				XLogArchiveNotify(xlogfname);
616 		}
617 		recvFile = -1;
618 
619 		elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
620 		WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
621 	}
622 	/* not reached */
623 }
624 
625 /*
626  * Wait for startup process to set receiveStart and receiveStartTLI.
627  */
628 static void
WalRcvWaitForStartPosition(XLogRecPtr * startpoint,TimeLineID * startpointTLI)629 WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
630 {
631 	WalRcvData *walrcv = WalRcv;
632 	int			state;
633 
634 	SpinLockAcquire(&walrcv->mutex);
635 	state = walrcv->walRcvState;
636 	if (state != WALRCV_STREAMING)
637 	{
638 		SpinLockRelease(&walrcv->mutex);
639 		if (state == WALRCV_STOPPING)
640 			proc_exit(0);
641 		else
642 			elog(FATAL, "unexpected walreceiver state");
643 	}
644 	walrcv->walRcvState = WALRCV_WAITING;
645 	walrcv->receiveStart = InvalidXLogRecPtr;
646 	walrcv->receiveStartTLI = 0;
647 	SpinLockRelease(&walrcv->mutex);
648 
649 	if (update_process_title)
650 		set_ps_display("idle", false);
651 
652 	/*
653 	 * nudge startup process to notice that we've stopped streaming and are
654 	 * now waiting for instructions.
655 	 */
656 	WakeupRecovery();
657 	for (;;)
658 	{
659 		ResetLatch(walrcv->latch);
660 
661 		/*
662 		 * Emergency bailout if postmaster has died.  This is to avoid the
663 		 * necessity for manual cleanup of all postmaster children.
664 		 */
665 		if (!PostmasterIsAlive())
666 			exit(1);
667 
668 		ProcessWalRcvInterrupts();
669 
670 		SpinLockAcquire(&walrcv->mutex);
671 		Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
672 			   walrcv->walRcvState == WALRCV_WAITING ||
673 			   walrcv->walRcvState == WALRCV_STOPPING);
674 		if (walrcv->walRcvState == WALRCV_RESTARTING)
675 		{
676 			/* we don't expect primary_conninfo to change */
677 			*startpoint = walrcv->receiveStart;
678 			*startpointTLI = walrcv->receiveStartTLI;
679 			walrcv->walRcvState = WALRCV_STREAMING;
680 			SpinLockRelease(&walrcv->mutex);
681 			break;
682 		}
683 		if (walrcv->walRcvState == WALRCV_STOPPING)
684 		{
685 			/*
686 			 * We should've received SIGTERM if the startup process wants us
687 			 * to die, but might as well check it here too.
688 			 */
689 			SpinLockRelease(&walrcv->mutex);
690 			exit(1);
691 		}
692 		SpinLockRelease(&walrcv->mutex);
693 
694 		WaitLatch(walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
695 				  WAIT_EVENT_WAL_RECEIVER_WAIT_START);
696 	}
697 
698 	if (update_process_title)
699 	{
700 		char		activitymsg[50];
701 
702 		snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
703 				 (uint32) (*startpoint >> 32),
704 				 (uint32) *startpoint);
705 		set_ps_display(activitymsg, false);
706 	}
707 }
708 
709 /*
710  * Fetch any missing timeline history files between 'first' and 'last'
711  * (inclusive) from the server.
712  */
713 static void
WalRcvFetchTimeLineHistoryFiles(TimeLineID first,TimeLineID last)714 WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
715 {
716 	TimeLineID	tli;
717 
718 	for (tli = first; tli <= last; tli++)
719 	{
720 		/* there's no history file for timeline 1 */
721 		if (tli != 1 && !existsTimeLineHistory(tli))
722 		{
723 			char	   *fname;
724 			char	   *content;
725 			int			len;
726 			char		expectedfname[MAXFNAMELEN];
727 
728 			ereport(LOG,
729 					(errmsg("fetching timeline history file for timeline %u from primary server",
730 							tli)));
731 
732 			walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
733 
734 			/*
735 			 * Check that the filename on the master matches what we
736 			 * calculated ourselves. This is just a sanity check, it should
737 			 * always match.
738 			 */
739 			TLHistoryFileName(expectedfname, tli);
740 			if (strcmp(fname, expectedfname) != 0)
741 				ereport(ERROR,
742 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
743 						 errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
744 										 tli)));
745 
746 			/*
747 			 * Write the file to pg_wal.
748 			 */
749 			writeTimeLineHistoryFile(tli, content, len);
750 
751 			/*
752 			 * Mark the streamed history file as ready for archiving
753 			 * if archive_mode is always.
754 			 */
755 			if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
756 				XLogArchiveForceDone(fname);
757 			else
758 				XLogArchiveNotify(fname);
759 
760 			pfree(fname);
761 			pfree(content);
762 		}
763 	}
764 }
765 
766 /*
767  * Mark us as STOPPED in shared memory at exit.
768  */
769 static void
WalRcvDie(int code,Datum arg)770 WalRcvDie(int code, Datum arg)
771 {
772 	WalRcvData *walrcv = WalRcv;
773 
774 	/* Ensure that all WAL records received are flushed to disk */
775 	XLogWalRcvFlush(true);
776 
777 	/* Mark ourselves inactive in shared memory */
778 	SpinLockAcquire(&walrcv->mutex);
779 	Assert(walrcv->walRcvState == WALRCV_STREAMING ||
780 		   walrcv->walRcvState == WALRCV_RESTARTING ||
781 		   walrcv->walRcvState == WALRCV_STARTING ||
782 		   walrcv->walRcvState == WALRCV_WAITING ||
783 		   walrcv->walRcvState == WALRCV_STOPPING);
784 	Assert(walrcv->pid == MyProcPid);
785 	walrcv->walRcvState = WALRCV_STOPPED;
786 	walrcv->pid = 0;
787 	walrcv->ready_to_display = false;
788 	walrcv->latch = NULL;
789 	SpinLockRelease(&walrcv->mutex);
790 
791 	/* Terminate the connection gracefully. */
792 	if (wrconn != NULL)
793 		walrcv_disconnect(wrconn);
794 
795 	/* Wake up the startup process to notice promptly that we're gone */
796 	WakeupRecovery();
797 }
798 
799 /* SIGHUP: set flag to re-read config file at next convenient time */
800 static void
WalRcvSigHupHandler(SIGNAL_ARGS)801 WalRcvSigHupHandler(SIGNAL_ARGS)
802 {
803 	got_SIGHUP = true;
804 }
805 
806 
807 /* SIGUSR1: used by latch mechanism */
808 static void
WalRcvSigUsr1Handler(SIGNAL_ARGS)809 WalRcvSigUsr1Handler(SIGNAL_ARGS)
810 {
811 	int			save_errno = errno;
812 
813 	latch_sigusr1_handler();
814 
815 	errno = save_errno;
816 }
817 
818 /* SIGTERM: set flag for ProcessWalRcvInterrupts */
819 static void
WalRcvShutdownHandler(SIGNAL_ARGS)820 WalRcvShutdownHandler(SIGNAL_ARGS)
821 {
822 	int			save_errno = errno;
823 
824 	got_SIGTERM = true;
825 
826 	if (WalRcv->latch)
827 		SetLatch(WalRcv->latch);
828 
829 	errno = save_errno;
830 }
831 
832 /*
833  * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
834  *
835  * Some backend has bought the farm, so we need to stop what we're doing and
836  * exit.
837  */
838 static void
WalRcvQuickDieHandler(SIGNAL_ARGS)839 WalRcvQuickDieHandler(SIGNAL_ARGS)
840 {
841 	/*
842 	 * We DO NOT want to run proc_exit() or atexit() callbacks -- we're here
843 	 * because shared memory may be corrupted, so we don't want to try to
844 	 * clean up our transaction.  Just nail the windows shut and get out of
845 	 * town.  The callbacks wouldn't be safe to run from a signal handler,
846 	 * anyway.
847 	 *
848 	 * Note we use _exit(2) not _exit(0).  This is to force the postmaster
849 	 * into a system reset cycle if someone sends a manual SIGQUIT to a
850 	 * random backend.  This is necessary precisely because we don't clean up
851 	 * our shared memory state.  (The "dead man switch" mechanism in
852 	 * pmsignal.c should ensure the postmaster sees this as a crash, too, but
853 	 * no harm in being doubly sure.)
854 	 */
855 	_exit(2);
856 }
857 
858 /*
859  * Accept the message from XLOG stream, and process it.
860  */
861 static void
XLogWalRcvProcessMsg(unsigned char type,char * buf,Size len)862 XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
863 {
864 	int			hdrlen;
865 	XLogRecPtr	dataStart;
866 	XLogRecPtr	walEnd;
867 	TimestampTz sendTime;
868 	bool		replyRequested;
869 
870 	resetStringInfo(&incoming_message);
871 
872 	switch (type)
873 	{
874 		case 'w':				/* WAL records */
875 			{
876 				/* copy message to StringInfo */
877 				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
878 				if (len < hdrlen)
879 					ereport(ERROR,
880 							(errcode(ERRCODE_PROTOCOL_VIOLATION),
881 							 errmsg_internal("invalid WAL message received from primary")));
882 				appendBinaryStringInfo(&incoming_message, buf, hdrlen);
883 
884 				/* read the fields */
885 				dataStart = pq_getmsgint64(&incoming_message);
886 				walEnd = pq_getmsgint64(&incoming_message);
887 				sendTime = pq_getmsgint64(&incoming_message);
888 				ProcessWalSndrMessage(walEnd, sendTime);
889 
890 				buf += hdrlen;
891 				len -= hdrlen;
892 				XLogWalRcvWrite(buf, len, dataStart);
893 				break;
894 			}
895 		case 'k':				/* Keepalive */
896 			{
897 				/* copy message to StringInfo */
898 				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
899 				if (len != hdrlen)
900 					ereport(ERROR,
901 							(errcode(ERRCODE_PROTOCOL_VIOLATION),
902 							 errmsg_internal("invalid keepalive message received from primary")));
903 				appendBinaryStringInfo(&incoming_message, buf, hdrlen);
904 
905 				/* read the fields */
906 				walEnd = pq_getmsgint64(&incoming_message);
907 				sendTime = pq_getmsgint64(&incoming_message);
908 				replyRequested = pq_getmsgbyte(&incoming_message);
909 
910 				ProcessWalSndrMessage(walEnd, sendTime);
911 
912 				/* If the primary requested a reply, send one immediately */
913 				if (replyRequested)
914 					XLogWalRcvSendReply(true, false);
915 				break;
916 			}
917 		default:
918 			ereport(ERROR,
919 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
920 					 errmsg_internal("invalid replication message type %d",
921 									 type)));
922 	}
923 }
924 
925 /*
926  * Write XLOG data to disk.
927  */
928 static void
XLogWalRcvWrite(char * buf,Size nbytes,XLogRecPtr recptr)929 XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
930 {
931 	int			startoff;
932 	int			byteswritten;
933 
934 	while (nbytes > 0)
935 	{
936 		int			segbytes;
937 
938 		/* Close the current segment if it's completed */
939 		if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
940 			XLogWalRcvClose(recptr);
941 
942 		if (recvFile < 0)
943 		{
944 			bool		use_existent = true;
945 
946 			/* Create/use new log file */
947 			XLByteToSeg(recptr, recvSegNo, wal_segment_size);
948 			recvFile = XLogFileInit(recvSegNo, &use_existent, true);
949 			recvFileTLI = ThisTimeLineID;
950 			recvOff = 0;
951 		}
952 
953 		/* Calculate the start offset of the received logs */
954 		startoff = XLogSegmentOffset(recptr, wal_segment_size);
955 
956 		if (startoff + nbytes > wal_segment_size)
957 			segbytes = wal_segment_size - startoff;
958 		else
959 			segbytes = nbytes;
960 
961 		/* Need to seek in the file? */
962 		if (recvOff != startoff)
963 		{
964 			if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
965 				ereport(PANIC,
966 						(errcode_for_file_access(),
967 						 errmsg("could not seek in log segment %s to offset %u: %m",
968 								XLogFileNameP(recvFileTLI, recvSegNo),
969 								startoff)));
970 			recvOff = startoff;
971 		}
972 
973 		/* OK to write the logs */
974 		errno = 0;
975 
976 		byteswritten = write(recvFile, buf, segbytes);
977 		if (byteswritten <= 0)
978 		{
979 			/* if write didn't set errno, assume no disk space */
980 			if (errno == 0)
981 				errno = ENOSPC;
982 			ereport(PANIC,
983 					(errcode_for_file_access(),
984 					 errmsg("could not write to log segment %s "
985 							"at offset %u, length %lu: %m",
986 							XLogFileNameP(recvFileTLI, recvSegNo),
987 							recvOff, (unsigned long) segbytes)));
988 		}
989 
990 		/* Update state for write */
991 		recptr += byteswritten;
992 
993 		recvOff += byteswritten;
994 		nbytes -= byteswritten;
995 		buf += byteswritten;
996 
997 		LogstreamResult.Write = recptr;
998 	}
999 
1000 	/*
1001 	 * Close the current segment if it's fully written up in the last cycle of
1002 	 * the loop, to create its archive notification file soon. Otherwise WAL
1003 	 * archiving of the segment will be delayed until any data in the next
1004 	 * segment is received and written.
1005 	 */
1006 	if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
1007 		XLogWalRcvClose(recptr);
1008 }
1009 
1010 /*
1011  * Flush the log to disk.
1012  *
1013  * If we're in the midst of dying, it's unwise to do anything that might throw
1014  * an error, so we skip sending a reply in that case.
1015  */
1016 static void
XLogWalRcvFlush(bool dying)1017 XLogWalRcvFlush(bool dying)
1018 {
1019 	if (LogstreamResult.Flush < LogstreamResult.Write)
1020 	{
1021 		WalRcvData *walrcv = WalRcv;
1022 
1023 		issue_xlog_fsync(recvFile, recvSegNo);
1024 
1025 		LogstreamResult.Flush = LogstreamResult.Write;
1026 
1027 		/* Update shared-memory status */
1028 		SpinLockAcquire(&walrcv->mutex);
1029 		if (walrcv->receivedUpto < LogstreamResult.Flush)
1030 		{
1031 			walrcv->latestChunkStart = walrcv->receivedUpto;
1032 			walrcv->receivedUpto = LogstreamResult.Flush;
1033 			walrcv->receivedTLI = ThisTimeLineID;
1034 		}
1035 		SpinLockRelease(&walrcv->mutex);
1036 
1037 		/* Signal the startup process and walsender that new WAL has arrived */
1038 		WakeupRecovery();
1039 		if (AllowCascadeReplication())
1040 			WalSndWakeup();
1041 
1042 		/* Report XLOG streaming progress in PS display */
1043 		if (update_process_title)
1044 		{
1045 			char		activitymsg[50];
1046 
1047 			snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1048 					 (uint32) (LogstreamResult.Write >> 32),
1049 					 (uint32) LogstreamResult.Write);
1050 			set_ps_display(activitymsg, false);
1051 		}
1052 
1053 		/* Also let the master know that we made some progress */
1054 		if (!dying)
1055 		{
1056 			XLogWalRcvSendReply(false, false);
1057 			XLogWalRcvSendHSFeedback(false);
1058 		}
1059 	}
1060 }
1061 
1062 /*
1063  * Close the current segment.
1064  *
1065  * Flush the segment to disk before closing it. Otherwise we have to
1066  * reopen and fsync it later.
1067  *
1068  * Create an archive notification file since the segment is known completed.
1069  */
1070 static void
XLogWalRcvClose(XLogRecPtr recptr)1071 XLogWalRcvClose(XLogRecPtr recptr)
1072 {
1073 	char		xlogfname[MAXFNAMELEN];
1074 
1075 	Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
1076 
1077 	/*
1078 	 * fsync() and close current file before we switch to next one. We would
1079 	 * otherwise have to reopen this file to fsync it later
1080 	 */
1081 	XLogWalRcvFlush(false);
1082 
1083 	XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
1084 
1085 	/*
1086 	 * XLOG segment files will be re-read by recovery in startup process soon,
1087 	 * so we don't advise the OS to release cache pages associated with the
1088 	 * file like XLogFileClose() does.
1089 	 */
1090 	if (close(recvFile) != 0)
1091 		ereport(PANIC,
1092 				(errcode_for_file_access(),
1093 				 errmsg("could not close log segment %s: %m",
1094 						xlogfname)));
1095 
1096 	/*
1097 	 * Create .done file forcibly to prevent the streamed segment from being
1098 	 * archived later.
1099 	 */
1100 	if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
1101 		XLogArchiveForceDone(xlogfname);
1102 	else
1103 		XLogArchiveNotify(xlogfname);
1104 
1105 	recvFile = -1;
1106 }
1107 
1108 /*
1109  * Send reply message to primary, indicating our current WAL locations, oldest
1110  * xmin and the current time.
1111  *
1112  * If 'force' is not set, the message is only sent if enough time has
1113  * passed since last status update to reach wal_receiver_status_interval.
1114  * If wal_receiver_status_interval is disabled altogether and 'force' is
1115  * false, this is a no-op.
1116  *
1117  * If 'requestReply' is true, requests the server to reply immediately upon
1118  * receiving this message. This is used for heartbearts, when approaching
1119  * wal_receiver_timeout.
1120  */
1121 static void
XLogWalRcvSendReply(bool force,bool requestReply)1122 XLogWalRcvSendReply(bool force, bool requestReply)
1123 {
1124 	static XLogRecPtr writePtr = 0;
1125 	static XLogRecPtr flushPtr = 0;
1126 	XLogRecPtr	applyPtr;
1127 	static TimestampTz sendTime = 0;
1128 	TimestampTz now;
1129 
1130 	/*
1131 	 * If the user doesn't want status to be reported to the master, be sure
1132 	 * to exit before doing anything at all.
1133 	 */
1134 	if (!force && wal_receiver_status_interval <= 0)
1135 		return;
1136 
1137 	/* Get current timestamp. */
1138 	now = GetCurrentTimestamp();
1139 
1140 	/*
1141 	 * We can compare the write and flush positions to the last message we
1142 	 * sent without taking any lock, but the apply position requires a spin
1143 	 * lock, so we don't check that unless something else has changed or 10
1144 	 * seconds have passed.  This means that the apply WAL location will
1145 	 * appear, from the master's point of view, to lag slightly, but since
1146 	 * this is only for reporting purposes and only on idle systems, that's
1147 	 * probably OK.
1148 	 */
1149 	if (!force
1150 		&& writePtr == LogstreamResult.Write
1151 		&& flushPtr == LogstreamResult.Flush
1152 		&& !TimestampDifferenceExceeds(sendTime, now,
1153 									   wal_receiver_status_interval * 1000))
1154 		return;
1155 	sendTime = now;
1156 
1157 	/* Construct a new message */
1158 	writePtr = LogstreamResult.Write;
1159 	flushPtr = LogstreamResult.Flush;
1160 	applyPtr = GetXLogReplayRecPtr(NULL);
1161 
1162 	resetStringInfo(&reply_message);
1163 	pq_sendbyte(&reply_message, 'r');
1164 	pq_sendint64(&reply_message, writePtr);
1165 	pq_sendint64(&reply_message, flushPtr);
1166 	pq_sendint64(&reply_message, applyPtr);
1167 	pq_sendint64(&reply_message, GetCurrentTimestamp());
1168 	pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1169 
1170 	/* Send it */
1171 	elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1172 		 (uint32) (writePtr >> 32), (uint32) writePtr,
1173 		 (uint32) (flushPtr >> 32), (uint32) flushPtr,
1174 		 (uint32) (applyPtr >> 32), (uint32) applyPtr,
1175 		 requestReply ? " (reply requested)" : "");
1176 
1177 	walrcv_send(wrconn, reply_message.data, reply_message.len);
1178 }
1179 
1180 /*
1181  * Send hot standby feedback message to primary, plus the current time,
1182  * in case they don't have a watch.
1183  *
1184  * If the user disables feedback, send one final message to tell sender
1185  * to forget about the xmin on this standby. We also send this message
1186  * on first connect because a previous connection might have set xmin
1187  * on a replication slot. (If we're not using a slot it's harmless to
1188  * send a feedback message explicitly setting InvalidTransactionId).
1189  */
1190 static void
XLogWalRcvSendHSFeedback(bool immed)1191 XLogWalRcvSendHSFeedback(bool immed)
1192 {
1193 	TimestampTz now;
1194 	TransactionId nextXid;
1195 	uint32		xmin_epoch,
1196 				catalog_xmin_epoch;
1197 	TransactionId xmin,
1198 				catalog_xmin;
1199 	static TimestampTz sendTime = 0;
1200 
1201 	/* initially true so we always send at least one feedback message */
1202 	static bool master_has_standby_xmin = true;
1203 
1204 	/*
1205 	 * If the user doesn't want status to be reported to the master, be sure
1206 	 * to exit before doing anything at all.
1207 	 */
1208 	if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
1209 		!master_has_standby_xmin)
1210 		return;
1211 
1212 	/* Get current timestamp. */
1213 	now = GetCurrentTimestamp();
1214 
1215 	if (!immed)
1216 	{
1217 		/*
1218 		 * Send feedback at most once per wal_receiver_status_interval.
1219 		 */
1220 		if (!TimestampDifferenceExceeds(sendTime, now,
1221 										wal_receiver_status_interval * 1000))
1222 			return;
1223 		sendTime = now;
1224 	}
1225 
1226 	/*
1227 	 * If Hot Standby is not yet accepting connections there is nothing to
1228 	 * send. Check this after the interval has expired to reduce number of
1229 	 * calls.
1230 	 *
1231 	 * Bailing out here also ensures that we don't send feedback until we've
1232 	 * read our own replication slot state, so we don't tell the master to
1233 	 * discard needed xmin or catalog_xmin from any slots that may exist on
1234 	 * this replica.
1235 	 */
1236 	if (!HotStandbyActive())
1237 		return;
1238 
1239 	/*
1240 	 * Make the expensive call to get the oldest xmin once we are certain
1241 	 * everything else has been checked.
1242 	 */
1243 	if (hot_standby_feedback)
1244 	{
1245 		TransactionId slot_xmin;
1246 
1247 		/*
1248 		 * Usually GetOldestXmin() would include both global replication slot
1249 		 * xmin and catalog_xmin in its calculations, but we want to derive
1250 		 * separate values for each of those. So we ask for an xmin that
1251 		 * excludes the catalog_xmin.
1252 		 */
1253 		xmin = GetOldestXmin(NULL,
1254 							 PROCARRAY_FLAGS_DEFAULT | PROCARRAY_SLOTS_XMIN);
1255 
1256 		ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
1257 
1258 		if (TransactionIdIsValid(slot_xmin) &&
1259 			TransactionIdPrecedes(slot_xmin, xmin))
1260 			xmin = slot_xmin;
1261 	}
1262 	else
1263 	{
1264 		xmin = InvalidTransactionId;
1265 		catalog_xmin = InvalidTransactionId;
1266 	}
1267 
1268 	/*
1269 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
1270 	 * the epoch boundary.
1271 	 */
1272 	GetNextXidAndEpoch(&nextXid, &xmin_epoch);
1273 	catalog_xmin_epoch = xmin_epoch;
1274 	if (nextXid < xmin)
1275 		xmin_epoch--;
1276 	if (nextXid < catalog_xmin)
1277 		catalog_xmin_epoch--;
1278 
1279 	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1280 		 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1281 
1282 	/* Construct the message and send it. */
1283 	resetStringInfo(&reply_message);
1284 	pq_sendbyte(&reply_message, 'h');
1285 	pq_sendint64(&reply_message, GetCurrentTimestamp());
1286 	pq_sendint32(&reply_message, xmin);
1287 	pq_sendint32(&reply_message, xmin_epoch);
1288 	pq_sendint32(&reply_message, catalog_xmin);
1289 	pq_sendint32(&reply_message, catalog_xmin_epoch);
1290 	walrcv_send(wrconn, reply_message.data, reply_message.len);
1291 	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1292 		master_has_standby_xmin = true;
1293 	else
1294 		master_has_standby_xmin = false;
1295 }
1296 
1297 /*
1298  * Update shared memory status upon receiving a message from primary.
1299  *
1300  * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
1301  * message, reported by primary.
1302  */
1303 static void
ProcessWalSndrMessage(XLogRecPtr walEnd,TimestampTz sendTime)1304 ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
1305 {
1306 	WalRcvData *walrcv = WalRcv;
1307 
1308 	TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1309 
1310 	/* Update shared-memory status */
1311 	SpinLockAcquire(&walrcv->mutex);
1312 	if (walrcv->latestWalEnd < walEnd)
1313 		walrcv->latestWalEndTime = sendTime;
1314 	walrcv->latestWalEnd = walEnd;
1315 	walrcv->lastMsgSendTime = sendTime;
1316 	walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1317 	SpinLockRelease(&walrcv->mutex);
1318 
1319 	if (log_min_messages <= DEBUG2)
1320 	{
1321 		char	   *sendtime;
1322 		char	   *receipttime;
1323 		int			applyDelay;
1324 
1325 		/* Copy because timestamptz_to_str returns a static buffer */
1326 		sendtime = pstrdup(timestamptz_to_str(sendTime));
1327 		receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1328 		applyDelay = GetReplicationApplyDelay();
1329 
1330 		/* apply delay is not available */
1331 		if (applyDelay == -1)
1332 			elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1333 				 sendtime,
1334 				 receipttime,
1335 				 GetReplicationTransferLatency());
1336 		else
1337 			elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1338 				 sendtime,
1339 				 receipttime,
1340 				 applyDelay,
1341 				 GetReplicationTransferLatency());
1342 
1343 		pfree(sendtime);
1344 		pfree(receipttime);
1345 	}
1346 }
1347 
1348 /*
1349  * Wake up the walreceiver main loop.
1350  *
1351  * This is called by the startup process whenever interesting xlog records
1352  * are applied, so that walreceiver can check if it needs to send an apply
1353  * notification back to the master which may be waiting in a COMMIT with
1354  * synchronous_commit = remote_apply.
1355  */
1356 void
WalRcvForceReply(void)1357 WalRcvForceReply(void)
1358 {
1359 	Latch	   *latch;
1360 
1361 	WalRcv->force_reply = true;
1362 	/* fetching the latch pointer might not be atomic, so use spinlock */
1363 	SpinLockAcquire(&WalRcv->mutex);
1364 	latch = WalRcv->latch;
1365 	SpinLockRelease(&WalRcv->mutex);
1366 	if (latch)
1367 		SetLatch(latch);
1368 }
1369 
1370 /*
1371  * Return a string constant representing the state. This is used
1372  * in system functions and views, and should *not* be translated.
1373  */
1374 static const char *
WalRcvGetStateString(WalRcvState state)1375 WalRcvGetStateString(WalRcvState state)
1376 {
1377 	switch (state)
1378 	{
1379 		case WALRCV_STOPPED:
1380 			return "stopped";
1381 		case WALRCV_STARTING:
1382 			return "starting";
1383 		case WALRCV_STREAMING:
1384 			return "streaming";
1385 		case WALRCV_WAITING:
1386 			return "waiting";
1387 		case WALRCV_RESTARTING:
1388 			return "restarting";
1389 		case WALRCV_STOPPING:
1390 			return "stopping";
1391 	}
1392 	return "UNKNOWN";
1393 }
1394 
1395 /*
1396  * Returns activity of WAL receiver, including pid, state and xlog locations
1397  * received from the WAL sender of another server.
1398  */
1399 Datum
pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)1400 pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
1401 {
1402 	TupleDesc	tupdesc;
1403 	Datum	   *values;
1404 	bool	   *nulls;
1405 	int			pid;
1406 	bool		ready_to_display;
1407 	WalRcvState state;
1408 	XLogRecPtr	receive_start_lsn;
1409 	TimeLineID	receive_start_tli;
1410 	XLogRecPtr	received_lsn;
1411 	TimeLineID	received_tli;
1412 	TimestampTz last_send_time;
1413 	TimestampTz last_receipt_time;
1414 	XLogRecPtr	latest_end_lsn;
1415 	TimestampTz latest_end_time;
1416 	char		sender_host[NI_MAXHOST];
1417 	int			sender_port = 0;
1418 	char		slotname[NAMEDATALEN];
1419 	char		conninfo[MAXCONNINFO];
1420 
1421 	/* Take a lock to ensure value consistency */
1422 	SpinLockAcquire(&WalRcv->mutex);
1423 	pid = (int) WalRcv->pid;
1424 	ready_to_display = WalRcv->ready_to_display;
1425 	state = WalRcv->walRcvState;
1426 	receive_start_lsn = WalRcv->receiveStart;
1427 	receive_start_tli = WalRcv->receiveStartTLI;
1428 	received_lsn = WalRcv->receivedUpto;
1429 	received_tli = WalRcv->receivedTLI;
1430 	last_send_time = WalRcv->lastMsgSendTime;
1431 	last_receipt_time = WalRcv->lastMsgReceiptTime;
1432 	latest_end_lsn = WalRcv->latestWalEnd;
1433 	latest_end_time = WalRcv->latestWalEndTime;
1434 	strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1435 	strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1436 	sender_port = WalRcv->sender_port;
1437 	strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1438 	SpinLockRelease(&WalRcv->mutex);
1439 
1440 	/*
1441 	 * No WAL receiver (or not ready yet), just return a tuple with NULL
1442 	 * values
1443 	 */
1444 	if (pid == 0 || !ready_to_display)
1445 		PG_RETURN_NULL();
1446 
1447 	/* determine result type */
1448 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1449 		elog(ERROR, "return type must be a row type");
1450 
1451 	values = palloc0(sizeof(Datum) * tupdesc->natts);
1452 	nulls = palloc0(sizeof(bool) * tupdesc->natts);
1453 
1454 	/* Fetch values */
1455 	values[0] = Int32GetDatum(pid);
1456 
1457 	if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
1458 	{
1459 		/*
1460 		 * Only superusers and members of pg_read_all_stats can see details.
1461 		 * Other users only get the pid value to know whether it is a WAL
1462 		 * receiver, but no details.
1463 		 */
1464 		MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1465 	}
1466 	else
1467 	{
1468 		values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1469 
1470 		if (XLogRecPtrIsInvalid(receive_start_lsn))
1471 			nulls[2] = true;
1472 		else
1473 			values[2] = LSNGetDatum(receive_start_lsn);
1474 		values[3] = Int32GetDatum(receive_start_tli);
1475 		if (XLogRecPtrIsInvalid(received_lsn))
1476 			nulls[4] = true;
1477 		else
1478 			values[4] = LSNGetDatum(received_lsn);
1479 		values[5] = Int32GetDatum(received_tli);
1480 		if (last_send_time == 0)
1481 			nulls[6] = true;
1482 		else
1483 			values[6] = TimestampTzGetDatum(last_send_time);
1484 		if (last_receipt_time == 0)
1485 			nulls[7] = true;
1486 		else
1487 			values[7] = TimestampTzGetDatum(last_receipt_time);
1488 		if (XLogRecPtrIsInvalid(latest_end_lsn))
1489 			nulls[8] = true;
1490 		else
1491 			values[8] = LSNGetDatum(latest_end_lsn);
1492 		if (latest_end_time == 0)
1493 			nulls[9] = true;
1494 		else
1495 			values[9] = TimestampTzGetDatum(latest_end_time);
1496 		if (*slotname == '\0')
1497 			nulls[10] = true;
1498 		else
1499 			values[10] = CStringGetTextDatum(slotname);
1500 		if (*sender_host == '\0')
1501 			nulls[11] = true;
1502 		else
1503 			values[11] = CStringGetTextDatum(sender_host);
1504 		if (sender_port == 0)
1505 			nulls[12] = true;
1506 		else
1507 			values[12] = Int32GetDatum(sender_port);
1508 		if (*conninfo == '\0')
1509 			nulls[13] = true;
1510 		else
1511 			values[13] = CStringGetTextDatum(conninfo);
1512 	}
1513 
1514 	/* Returns the record as Datum */
1515 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1516 }
1517