1 /*-------------------------------------------------------------------------
2  *
3  * walreceiver.c
4  *
5  * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6  * is the process in the standby server that takes charge of receiving
7  * XLOG records from a primary server during streaming replication.
8  *
9  * When the startup process determines that it's time to start streaming,
10  * it instructs postmaster to start walreceiver. Walreceiver first connects
11  * to the primary server (it will be served by a walsender process
12  * in the primary server), and then keeps receiving XLOG records and
13  * writing them to the disk as long as the connection is alive. As XLOG
14  * records are received and flushed to disk, it updates the
15  * WalRcv->receivedUpto variable in shared memory, to inform the startup
16  * process of how far it can proceed with XLOG replay.
17  *
18  * If the primary server ends streaming, but doesn't disconnect, walreceiver
19  * goes into "waiting" mode, and waits for the startup process to give new
20  * instructions. The startup process will treat that the same as
21  * disconnection, and will rescan the archive/pg_xlog directory. But when the
22  * startup process wants to try streaming replication again, it will just
23  * nudge the existing walreceiver process that's waiting, instead of launching
24  * a new one.
25  *
26  * Normal termination is by SIGTERM, which instructs the walreceiver to
27  * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
28  * process, the walreceiver will simply abort and exit on SIGQUIT. A close
29  * of the connection and a FATAL error are treated not as a crash but as
30  * normal operation.
31  *
32  * This file contains the server-facing parts of walreceiver. The libpq-
33  * specific parts are in the libpqwalreceiver module. It's loaded
34  * dynamically to avoid linking the server with libpq.
35  *
36  * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
37  *
38  *
39  * IDENTIFICATION
40  *	  src/backend/replication/walreceiver.c
41  *
42  *-------------------------------------------------------------------------
43  */
44 #include "postgres.h"
45 
46 #include <signal.h>
47 #include <unistd.h>
48 
49 #include "access/htup_details.h"
50 #include "access/timeline.h"
51 #include "access/transam.h"
52 #include "access/xlog_internal.h"
53 #include "catalog/pg_type.h"
54 #include "funcapi.h"
55 #include "libpq/pqformat.h"
56 #include "libpq/pqsignal.h"
57 #include "miscadmin.h"
58 #include "replication/walreceiver.h"
59 #include "replication/walsender.h"
60 #include "storage/ipc.h"
61 #include "storage/pmsignal.h"
62 #include "storage/procarray.h"
63 #include "utils/builtins.h"
64 #include "utils/guc.h"
65 #include "utils/pg_lsn.h"
66 #include "utils/ps_status.h"
67 #include "utils/resowner.h"
68 #include "utils/timestamp.h"
69 
70 
71 /* GUC variables */
72 int			wal_receiver_status_interval;
73 int			wal_receiver_timeout;
74 bool		hot_standby_feedback;
75 
76 /* libpqreceiver hooks to these when loaded */
77 walrcv_connect_type walrcv_connect = NULL;
78 walrcv_get_conninfo_type walrcv_get_conninfo = NULL;
79 walrcv_identify_system_type walrcv_identify_system = NULL;
80 walrcv_startstreaming_type walrcv_startstreaming = NULL;
81 walrcv_endstreaming_type walrcv_endstreaming = NULL;
82 walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile = NULL;
83 walrcv_receive_type walrcv_receive = NULL;
84 walrcv_send_type walrcv_send = NULL;
85 walrcv_disconnect_type walrcv_disconnect = NULL;
86 
87 #define NAPTIME_PER_CYCLE 100	/* max sleep time between cycles (100ms) */
88 
89 /*
90  * These variables are used similarly to openLogFile/SegNo/Off,
91  * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
92  * corresponding the filename of recvFile.
93  */
94 static int	recvFile = -1;
95 static TimeLineID recvFileTLI = 0;
96 static XLogSegNo recvSegNo = 0;
97 static uint32 recvOff = 0;
98 
99 /*
100  * Flags set by interrupt handlers of walreceiver for later service in the
101  * main loop.
102  */
103 static volatile sig_atomic_t got_SIGHUP = false;
104 static volatile sig_atomic_t got_SIGTERM = false;
105 
106 /*
107  * LogstreamResult indicates the byte positions that we have already
108  * written/fsynced.
109  */
110 static struct
111 {
112 	XLogRecPtr	Write;			/* last byte + 1 written out in the standby */
113 	XLogRecPtr	Flush;			/* last byte + 1 flushed in the standby */
114 }	LogstreamResult;
115 
116 static StringInfoData reply_message;
117 static StringInfoData incoming_message;
118 
119 /*
120  * About SIGTERM handling:
121  *
122  * We can't just exit(1) within SIGTERM signal handler, because the signal
123  * might arrive in the middle of some critical operation, like while we're
124  * holding a spinlock. We also can't just set a flag in signal handler and
125  * check it in the main loop, because we perform some blocking operations
126  * like libpqrcv_PQexec(), which can take a long time to finish.
127  *
128  * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
129  * safe for the signal handler to elog(FATAL) immediately. Otherwise it just
130  * sets got_SIGTERM flag, which is checked in the main loop when convenient.
131  *
132  * This is very much like what regular backends do with ImmediateInterruptOK,
133  * ProcessInterrupts() etc.
134  */
135 static volatile bool WalRcvImmediateInterruptOK = false;
136 
137 /* Prototypes for private functions */
138 static void ProcessWalRcvInterrupts(void);
139 static void EnableWalRcvImmediateExit(void);
140 static void DisableWalRcvImmediateExit(void);
141 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
142 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
143 static void WalRcvDie(int code, Datum arg);
144 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
145 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
146 static void XLogWalRcvFlush(bool dying);
147 static void XLogWalRcvClose(XLogRecPtr recptr);
148 static void XLogWalRcvSendReply(bool force, bool requestReply);
149 static void XLogWalRcvSendHSFeedback(bool immed);
150 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
151 
152 /* Signal handlers */
153 static void WalRcvSigHupHandler(SIGNAL_ARGS);
154 static void WalRcvSigUsr1Handler(SIGNAL_ARGS);
155 static void WalRcvShutdownHandler(SIGNAL_ARGS);
156 static void WalRcvQuickDieHandler(SIGNAL_ARGS);
157 
158 
159 static void
ProcessWalRcvInterrupts(void)160 ProcessWalRcvInterrupts(void)
161 {
162 	/*
163 	 * Although walreceiver interrupt handling doesn't use the same scheme as
164 	 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
165 	 * any incoming signals on Win32.
166 	 */
167 	CHECK_FOR_INTERRUPTS();
168 
169 	if (got_SIGTERM)
170 	{
171 		WalRcvImmediateInterruptOK = false;
172 		ereport(FATAL,
173 				(errcode(ERRCODE_ADMIN_SHUTDOWN),
174 				 errmsg("terminating walreceiver process due to administrator command")));
175 	}
176 }
177 
178 static void
EnableWalRcvImmediateExit(void)179 EnableWalRcvImmediateExit(void)
180 {
181 	WalRcvImmediateInterruptOK = true;
182 	ProcessWalRcvInterrupts();
183 }
184 
185 static void
DisableWalRcvImmediateExit(void)186 DisableWalRcvImmediateExit(void)
187 {
188 	WalRcvImmediateInterruptOK = false;
189 	ProcessWalRcvInterrupts();
190 }
191 
192 /* Main entry point for walreceiver process */
193 void
WalReceiverMain(void)194 WalReceiverMain(void)
195 {
196 	char		conninfo[MAXCONNINFO];
197 	char	   *tmp_conninfo;
198 	char		slotname[NAMEDATALEN];
199 	XLogRecPtr	startpoint;
200 	TimeLineID	startpointTLI;
201 	TimeLineID	primaryTLI;
202 	bool		first_stream;
203 	WalRcvData *walrcv = WalRcv;
204 	TimestampTz last_recv_timestamp;
205 	TimestampTz now;
206 	bool		ping_sent;
207 
208 	/*
209 	 * WalRcv should be set up already (if we are a backend, we inherit this
210 	 * by fork() or EXEC_BACKEND mechanism from the postmaster).
211 	 */
212 	Assert(walrcv != NULL);
213 
214 	now = GetCurrentTimestamp();
215 
216 	/*
217 	 * Mark walreceiver as running in shared memory.
218 	 *
219 	 * Do this as early as possible, so that if we fail later on, we'll set
220 	 * state to STOPPED. If we die before this, the startup process will keep
221 	 * waiting for us to start up, until it times out.
222 	 */
223 	SpinLockAcquire(&walrcv->mutex);
224 	Assert(walrcv->pid == 0);
225 	switch (walrcv->walRcvState)
226 	{
227 		case WALRCV_STOPPING:
228 			/* If we've already been requested to stop, don't start up. */
229 			walrcv->walRcvState = WALRCV_STOPPED;
230 			/* fall through */
231 
232 		case WALRCV_STOPPED:
233 			SpinLockRelease(&walrcv->mutex);
234 			proc_exit(1);
235 			break;
236 
237 		case WALRCV_STARTING:
238 			/* The usual case */
239 			break;
240 
241 		case WALRCV_WAITING:
242 		case WALRCV_STREAMING:
243 		case WALRCV_RESTARTING:
244 		default:
245 			/* Shouldn't happen */
246 			SpinLockRelease(&walrcv->mutex);
247 			elog(PANIC, "walreceiver still running according to shared memory state");
248 	}
249 	/* Advertise our PID so that the startup process can kill us */
250 	walrcv->pid = MyProcPid;
251 	walrcv->walRcvState = WALRCV_STREAMING;
252 
253 	/* Fetch information required to start streaming */
254 	walrcv->ready_to_display = false;
255 	strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
256 	strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
257 	startpoint = walrcv->receiveStart;
258 	startpointTLI = walrcv->receiveStartTLI;
259 
260 	/* Initialise to a sanish value */
261 	walrcv->lastMsgSendTime =
262 		walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
263 
264 	SpinLockRelease(&walrcv->mutex);
265 
266 	/* Arrange to clean up at walreceiver exit */
267 	on_shmem_exit(WalRcvDie, 0);
268 
269 	OwnLatch(&walrcv->latch);
270 
271 	/* Properly accept or ignore signals the postmaster might send us */
272 	pqsignal(SIGHUP, WalRcvSigHupHandler);		/* set flag to read config
273 												 * file */
274 	pqsignal(SIGINT, SIG_IGN);
275 	pqsignal(SIGTERM, WalRcvShutdownHandler);	/* request shutdown */
276 	pqsignal(SIGQUIT, WalRcvQuickDieHandler);	/* hard crash time */
277 	pqsignal(SIGALRM, SIG_IGN);
278 	pqsignal(SIGPIPE, SIG_IGN);
279 	pqsignal(SIGUSR1, WalRcvSigUsr1Handler);
280 	pqsignal(SIGUSR2, SIG_IGN);
281 
282 	/* Reset some signals that are accepted by postmaster but not here */
283 	pqsignal(SIGCHLD, SIG_DFL);
284 	pqsignal(SIGTTIN, SIG_DFL);
285 	pqsignal(SIGTTOU, SIG_DFL);
286 	pqsignal(SIGCONT, SIG_DFL);
287 	pqsignal(SIGWINCH, SIG_DFL);
288 
289 	/* We allow SIGQUIT (quickdie) at all times */
290 	sigdelset(&BlockSig, SIGQUIT);
291 
292 	/* Load the libpq-specific functions */
293 	load_file("libpqwalreceiver", false);
294 	if (walrcv_connect == NULL ||
295 		walrcv_get_conninfo == NULL ||
296 		walrcv_startstreaming == NULL ||
297 		walrcv_endstreaming == NULL ||
298 		walrcv_identify_system == NULL ||
299 		walrcv_readtimelinehistoryfile == NULL ||
300 		walrcv_receive == NULL || walrcv_send == NULL ||
301 		walrcv_disconnect == NULL)
302 		elog(ERROR, "libpqwalreceiver didn't initialize correctly");
303 
304 	/*
305 	 * Create a resource owner to keep track of our resources (not clear that
306 	 * we need this, but may as well have one).
307 	 */
308 	CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
309 
310 	/* Unblock signals (they were blocked when the postmaster forked us) */
311 	PG_SETMASK(&UnBlockSig);
312 
313 	/* Establish the connection to the primary for XLOG streaming */
314 	EnableWalRcvImmediateExit();
315 	walrcv_connect(conninfo);
316 	DisableWalRcvImmediateExit();
317 
318 	/*
319 	 * Save user-visible connection string.  This clobbers the original
320 	 * conninfo, for security.
321 	 */
322 	tmp_conninfo = walrcv_get_conninfo();
323 	SpinLockAcquire(&walrcv->mutex);
324 	memset(walrcv->conninfo, 0, MAXCONNINFO);
325 	if (tmp_conninfo)
326 		strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
327 	walrcv->ready_to_display = true;
328 	SpinLockRelease(&walrcv->mutex);
329 
330 	if (tmp_conninfo)
331 		pfree(tmp_conninfo);
332 
333 	first_stream = true;
334 	for (;;)
335 	{
336 		/*
337 		 * Check that we're connected to a valid server using the
338 		 * IDENTIFY_SYSTEM replication command,
339 		 */
340 		EnableWalRcvImmediateExit();
341 		walrcv_identify_system(&primaryTLI);
342 		DisableWalRcvImmediateExit();
343 
344 		/*
345 		 * Confirm that the current timeline of the primary is the same or
346 		 * ahead of ours.
347 		 */
348 		if (primaryTLI < startpointTLI)
349 			ereport(ERROR,
350 					(errmsg("highest timeline %u of the primary is behind recovery timeline %u",
351 							primaryTLI, startpointTLI)));
352 
353 		/*
354 		 * Get any missing history files. We do this always, even when we're
355 		 * not interested in that timeline, so that if we're promoted to
356 		 * become the master later on, we don't select the same timeline that
357 		 * was already used in the current master. This isn't bullet-proof -
358 		 * you'll need some external software to manage your cluster if you
359 		 * need to ensure that a unique timeline id is chosen in every case,
360 		 * but let's avoid the confusion of timeline id collisions where we
361 		 * can.
362 		 */
363 		WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
364 
365 		/*
366 		 * Start streaming.
367 		 *
368 		 * We'll try to start at the requested starting point and timeline,
369 		 * even if it's different from the server's latest timeline. In case
370 		 * we've already reached the end of the old timeline, the server will
371 		 * finish the streaming immediately, and we will go back to await
372 		 * orders from the startup process. If recovery_target_timeline is
373 		 * 'latest', the startup process will scan pg_xlog and find the new
374 		 * history file, bump recovery target timeline, and ask us to restart
375 		 * on the new timeline.
376 		 */
377 		ThisTimeLineID = startpointTLI;
378 		if (walrcv_startstreaming(startpointTLI, startpoint,
379 								  slotname[0] != '\0' ? slotname : NULL))
380 		{
381 			if (first_stream)
382 				ereport(LOG,
383 						(errmsg("started streaming WAL from primary at %X/%X on timeline %u",
384 							(uint32) (startpoint >> 32), (uint32) startpoint,
385 								startpointTLI)));
386 			else
387 				ereport(LOG,
388 				   (errmsg("restarted WAL streaming at %X/%X on timeline %u",
389 						   (uint32) (startpoint >> 32), (uint32) startpoint,
390 						   startpointTLI)));
391 			first_stream = false;
392 
393 			/* Initialize LogstreamResult and buffers for processing messages */
394 			LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
395 			initStringInfo(&reply_message);
396 			initStringInfo(&incoming_message);
397 
398 			/* Initialize the last recv timestamp */
399 			last_recv_timestamp = GetCurrentTimestamp();
400 			ping_sent = false;
401 
402 			/* Loop until end-of-streaming or error */
403 			for (;;)
404 			{
405 				char	   *buf;
406 				int			len;
407 				bool		endofwal = false;
408 				pgsocket	wait_fd = PGINVALID_SOCKET;
409 				int			rc;
410 
411 				/*
412 				 * Exit walreceiver if we're not in recovery. This should not
413 				 * happen, but cross-check the status here.
414 				 */
415 				if (!RecoveryInProgress())
416 					ereport(FATAL,
417 							(errmsg("cannot continue WAL streaming, recovery has already ended")));
418 
419 				/* Process any requests or signals received recently */
420 				ProcessWalRcvInterrupts();
421 
422 				if (got_SIGHUP)
423 				{
424 					got_SIGHUP = false;
425 					ProcessConfigFile(PGC_SIGHUP);
426 					XLogWalRcvSendHSFeedback(true);
427 				}
428 
429 				/* See if we can read data immediately */
430 				len = walrcv_receive(&buf, &wait_fd);
431 				if (len != 0)
432 				{
433 					/*
434 					 * Process the received data, and any subsequent data we
435 					 * can read without blocking.
436 					 */
437 					for (;;)
438 					{
439 						if (len > 0)
440 						{
441 							/*
442 							 * Something was received from master, so reset
443 							 * timeout
444 							 */
445 							last_recv_timestamp = GetCurrentTimestamp();
446 							ping_sent = false;
447 							XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
448 						}
449 						else if (len == 0)
450 							break;
451 						else if (len < 0)
452 						{
453 							ereport(LOG,
454 									(errmsg("replication terminated by primary server"),
455 									 errdetail("End of WAL reached on timeline %u at %X/%X.",
456 											   startpointTLI,
457 											   (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
458 							endofwal = true;
459 							break;
460 						}
461 						len = walrcv_receive(&buf, &wait_fd);
462 					}
463 
464 					/* Let the master know that we received some data. */
465 					XLogWalRcvSendReply(false, false);
466 
467 					/*
468 					 * If we've written some records, flush them to disk and
469 					 * let the startup process and primary server know about
470 					 * them.
471 					 */
472 					XLogWalRcvFlush(false);
473 				}
474 
475 				/* Check if we need to exit the streaming loop. */
476 				if (endofwal)
477 					break;
478 
479 				/*
480 				 * Ideally we would reuse a WaitEventSet object repeatedly
481 				 * here to avoid the overheads of WaitLatchOrSocket on epoll
482 				 * systems, but we can't be sure that libpq (or any other
483 				 * walreceiver implementation) has the same socket (even if
484 				 * the fd is the same number, it may have been closed and
485 				 * reopened since the last time).  In future, if there is a
486 				 * function for removing sockets from WaitEventSet, then we
487 				 * could add and remove just the socket each time, potentially
488 				 * avoiding some system calls.
489 				 */
490 				Assert(wait_fd != PGINVALID_SOCKET);
491 				rc = WaitLatchOrSocket(&walrcv->latch,
492 								   WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
493 									   WL_TIMEOUT | WL_LATCH_SET,
494 									   wait_fd,
495 									   NAPTIME_PER_CYCLE);
496 				if (rc & WL_LATCH_SET)
497 				{
498 					ResetLatch(&walrcv->latch);
499 					if (walrcv->force_reply)
500 					{
501 						/*
502 						 * The recovery process has asked us to send apply
503 						 * feedback now.  Make sure the flag is really set to
504 						 * false in shared memory before sending the reply, so
505 						 * we don't miss a new request for a reply.
506 						 */
507 						walrcv->force_reply = false;
508 						pg_memory_barrier();
509 						XLogWalRcvSendReply(true, false);
510 					}
511 				}
512 				if (rc & WL_POSTMASTER_DEATH)
513 				{
514 					/*
515 					 * Emergency bailout if postmaster has died.  This is to
516 					 * avoid the necessity for manual cleanup of all
517 					 * postmaster children.
518 					 */
519 					exit(1);
520 				}
521 				if (rc & WL_TIMEOUT)
522 				{
523 					/*
524 					 * We didn't receive anything new. If we haven't heard
525 					 * anything from the server for more than
526 					 * wal_receiver_timeout / 2, ping the server. Also, if
527 					 * it's been longer than wal_receiver_status_interval
528 					 * since the last update we sent, send a status update to
529 					 * the master anyway, to report any progress in applying
530 					 * WAL.
531 					 */
532 					bool		requestReply = false;
533 
534 					/*
535 					 * Check if time since last receive from standby has
536 					 * reached the configured limit.
537 					 */
538 					if (wal_receiver_timeout > 0)
539 					{
540 						TimestampTz now = GetCurrentTimestamp();
541 						TimestampTz timeout;
542 
543 						timeout =
544 							TimestampTzPlusMilliseconds(last_recv_timestamp,
545 														wal_receiver_timeout);
546 
547 						if (now >= timeout)
548 							ereport(ERROR,
549 									(errmsg("terminating walreceiver due to timeout")));
550 
551 						/*
552 						 * We didn't receive anything new, for half of
553 						 * receiver replication timeout. Ping the server.
554 						 */
555 						if (!ping_sent)
556 						{
557 							timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
558 												 (wal_receiver_timeout / 2));
559 							if (now >= timeout)
560 							{
561 								requestReply = true;
562 								ping_sent = true;
563 							}
564 						}
565 					}
566 
567 					XLogWalRcvSendReply(requestReply, requestReply);
568 					XLogWalRcvSendHSFeedback(false);
569 				}
570 			}
571 
572 			/*
573 			 * The backend finished streaming. Exit streaming COPY-mode from
574 			 * our side, too.
575 			 */
576 			EnableWalRcvImmediateExit();
577 			walrcv_endstreaming(&primaryTLI);
578 			DisableWalRcvImmediateExit();
579 
580 			/*
581 			 * If the server had switched to a new timeline that we didn't
582 			 * know about when we began streaming, fetch its timeline history
583 			 * file now.
584 			 */
585 			WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
586 		}
587 		else
588 			ereport(LOG,
589 					(errmsg("primary server contains no more WAL on requested timeline %u",
590 							startpointTLI)));
591 
592 		/*
593 		 * End of WAL reached on the requested timeline. Close the last
594 		 * segment, and await for new orders from the startup process.
595 		 */
596 		if (recvFile >= 0)
597 		{
598 			char		xlogfname[MAXFNAMELEN];
599 
600 			XLogWalRcvFlush(false);
601 			if (close(recvFile) != 0)
602 				ereport(PANIC,
603 						(errcode_for_file_access(),
604 						 errmsg("could not close log segment %s: %m",
605 								XLogFileNameP(recvFileTLI, recvSegNo))));
606 
607 			/*
608 			 * Create .done file forcibly to prevent the streamed segment from
609 			 * being archived later.
610 			 */
611 			XLogFileName(xlogfname, recvFileTLI, recvSegNo);
612 			if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
613 				XLogArchiveForceDone(xlogfname);
614 			else
615 				XLogArchiveNotify(xlogfname);
616 		}
617 		recvFile = -1;
618 
619 		elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
620 		WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
621 	}
622 	/* not reached */
623 }
624 
625 /*
626  * Wait for startup process to set receiveStart and receiveStartTLI.
627  */
628 static void
WalRcvWaitForStartPosition(XLogRecPtr * startpoint,TimeLineID * startpointTLI)629 WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
630 {
631 	WalRcvData *walrcv = WalRcv;
632 	int			state;
633 
634 	SpinLockAcquire(&walrcv->mutex);
635 	state = walrcv->walRcvState;
636 	if (state != WALRCV_STREAMING)
637 	{
638 		SpinLockRelease(&walrcv->mutex);
639 		if (state == WALRCV_STOPPING)
640 			proc_exit(0);
641 		else
642 			elog(FATAL, "unexpected walreceiver state");
643 	}
644 	walrcv->walRcvState = WALRCV_WAITING;
645 	walrcv->receiveStart = InvalidXLogRecPtr;
646 	walrcv->receiveStartTLI = 0;
647 	SpinLockRelease(&walrcv->mutex);
648 
649 	if (update_process_title)
650 		set_ps_display("idle", false);
651 
652 	/*
653 	 * nudge startup process to notice that we've stopped streaming and are
654 	 * now waiting for instructions.
655 	 */
656 	WakeupRecovery();
657 	for (;;)
658 	{
659 		ResetLatch(&walrcv->latch);
660 
661 		/*
662 		 * Emergency bailout if postmaster has died.  This is to avoid the
663 		 * necessity for manual cleanup of all postmaster children.
664 		 */
665 		if (!PostmasterIsAlive())
666 			exit(1);
667 
668 		ProcessWalRcvInterrupts();
669 
670 		SpinLockAcquire(&walrcv->mutex);
671 		Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
672 			   walrcv->walRcvState == WALRCV_WAITING ||
673 			   walrcv->walRcvState == WALRCV_STOPPING);
674 		if (walrcv->walRcvState == WALRCV_RESTARTING)
675 		{
676 			/* we don't expect primary_conninfo to change */
677 			*startpoint = walrcv->receiveStart;
678 			*startpointTLI = walrcv->receiveStartTLI;
679 			walrcv->walRcvState = WALRCV_STREAMING;
680 			SpinLockRelease(&walrcv->mutex);
681 			break;
682 		}
683 		if (walrcv->walRcvState == WALRCV_STOPPING)
684 		{
685 			/*
686 			 * We should've received SIGTERM if the startup process wants us
687 			 * to die, but might as well check it here too.
688 			 */
689 			SpinLockRelease(&walrcv->mutex);
690 			exit(1);
691 		}
692 		SpinLockRelease(&walrcv->mutex);
693 
694 		WaitLatch(&walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0);
695 	}
696 
697 	if (update_process_title)
698 	{
699 		char		activitymsg[50];
700 
701 		snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
702 				 (uint32) (*startpoint >> 32),
703 				 (uint32) *startpoint);
704 		set_ps_display(activitymsg, false);
705 	}
706 }
707 
708 /*
709  * Fetch any missing timeline history files between 'first' and 'last'
710  * (inclusive) from the server.
711  */
712 static void
WalRcvFetchTimeLineHistoryFiles(TimeLineID first,TimeLineID last)713 WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
714 {
715 	TimeLineID	tli;
716 
717 	for (tli = first; tli <= last; tli++)
718 	{
719 		/* there's no history file for timeline 1 */
720 		if (tli != 1 && !existsTimeLineHistory(tli))
721 		{
722 			char	   *fname;
723 			char	   *content;
724 			int			len;
725 			char		expectedfname[MAXFNAMELEN];
726 
727 			ereport(LOG,
728 					(errmsg("fetching timeline history file for timeline %u from primary server",
729 							tli)));
730 
731 			EnableWalRcvImmediateExit();
732 			walrcv_readtimelinehistoryfile(tli, &fname, &content, &len);
733 			DisableWalRcvImmediateExit();
734 
735 			/*
736 			 * Check that the filename on the master matches what we
737 			 * calculated ourselves. This is just a sanity check, it should
738 			 * always match.
739 			 */
740 			TLHistoryFileName(expectedfname, tli);
741 			if (strcmp(fname, expectedfname) != 0)
742 				ereport(ERROR,
743 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
744 						 errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
745 										 tli)));
746 
747 			/*
748 			 * Write the file to pg_xlog.
749 			 */
750 			writeTimeLineHistoryFile(tli, content, len);
751 
752 			/*
753 			 * Mark the streamed history file as ready for archiving
754 			 * if archive_mode is always.
755 			 */
756 			if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
757 				XLogArchiveForceDone(fname);
758 			else
759 				XLogArchiveNotify(fname);
760 
761 			pfree(fname);
762 			pfree(content);
763 		}
764 	}
765 }
766 
767 /*
768  * Mark us as STOPPED in shared memory at exit.
769  */
770 static void
WalRcvDie(int code,Datum arg)771 WalRcvDie(int code, Datum arg)
772 {
773 	WalRcvData *walrcv = WalRcv;
774 
775 	/* Ensure that all WAL records received are flushed to disk */
776 	XLogWalRcvFlush(true);
777 
778 	DisownLatch(&walrcv->latch);
779 
780 	SpinLockAcquire(&walrcv->mutex);
781 	Assert(walrcv->walRcvState == WALRCV_STREAMING ||
782 		   walrcv->walRcvState == WALRCV_RESTARTING ||
783 		   walrcv->walRcvState == WALRCV_STARTING ||
784 		   walrcv->walRcvState == WALRCV_WAITING ||
785 		   walrcv->walRcvState == WALRCV_STOPPING);
786 	Assert(walrcv->pid == MyProcPid);
787 	walrcv->walRcvState = WALRCV_STOPPED;
788 	walrcv->pid = 0;
789 	walrcv->ready_to_display = false;
790 	SpinLockRelease(&walrcv->mutex);
791 
792 	/* Terminate the connection gracefully. */
793 	if (walrcv_disconnect != NULL)
794 		walrcv_disconnect();
795 
796 	/* Wake up the startup process to notice promptly that we're gone */
797 	WakeupRecovery();
798 }
799 
800 /* SIGHUP: set flag to re-read config file at next convenient time */
801 static void
WalRcvSigHupHandler(SIGNAL_ARGS)802 WalRcvSigHupHandler(SIGNAL_ARGS)
803 {
804 	got_SIGHUP = true;
805 }
806 
807 
808 /* SIGUSR1: used by latch mechanism */
809 static void
WalRcvSigUsr1Handler(SIGNAL_ARGS)810 WalRcvSigUsr1Handler(SIGNAL_ARGS)
811 {
812 	int			save_errno = errno;
813 
814 	latch_sigusr1_handler();
815 
816 	errno = save_errno;
817 }
818 
819 /* SIGTERM: set flag for main loop, or shutdown immediately if safe */
820 static void
WalRcvShutdownHandler(SIGNAL_ARGS)821 WalRcvShutdownHandler(SIGNAL_ARGS)
822 {
823 	int			save_errno = errno;
824 
825 	got_SIGTERM = true;
826 
827 	SetLatch(&WalRcv->latch);
828 
829 	/* Don't joggle the elbow of proc_exit */
830 	if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
831 		ProcessWalRcvInterrupts();
832 
833 	errno = save_errno;
834 }
835 
836 /*
837  * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
838  *
839  * Some backend has bought the farm, so we need to stop what we're doing and
840  * exit.
841  */
842 static void
WalRcvQuickDieHandler(SIGNAL_ARGS)843 WalRcvQuickDieHandler(SIGNAL_ARGS)
844 {
845 	/*
846 	 * We DO NOT want to run proc_exit() or atexit() callbacks -- we're here
847 	 * because shared memory may be corrupted, so we don't want to try to
848 	 * clean up our transaction.  Just nail the windows shut and get out of
849 	 * town.  The callbacks wouldn't be safe to run from a signal handler,
850 	 * anyway.
851 	 *
852 	 * Note we use _exit(2) not _exit(0).  This is to force the postmaster
853 	 * into a system reset cycle if someone sends a manual SIGQUIT to a
854 	 * random backend.  This is necessary precisely because we don't clean up
855 	 * our shared memory state.  (The "dead man switch" mechanism in
856 	 * pmsignal.c should ensure the postmaster sees this as a crash, too, but
857 	 * no harm in being doubly sure.)
858 	 */
859 	_exit(2);
860 }
861 
862 /*
863  * Accept the message from XLOG stream, and process it.
864  */
865 static void
XLogWalRcvProcessMsg(unsigned char type,char * buf,Size len)866 XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
867 {
868 	int			hdrlen;
869 	XLogRecPtr	dataStart;
870 	XLogRecPtr	walEnd;
871 	TimestampTz sendTime;
872 	bool		replyRequested;
873 
874 	resetStringInfo(&incoming_message);
875 
876 	switch (type)
877 	{
878 		case 'w':				/* WAL records */
879 			{
880 				/* copy message to StringInfo */
881 				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
882 				if (len < hdrlen)
883 					ereport(ERROR,
884 							(errcode(ERRCODE_PROTOCOL_VIOLATION),
885 							 errmsg_internal("invalid WAL message received from primary")));
886 				appendBinaryStringInfo(&incoming_message, buf, hdrlen);
887 
888 				/* read the fields */
889 				dataStart = pq_getmsgint64(&incoming_message);
890 				walEnd = pq_getmsgint64(&incoming_message);
891 				sendTime = IntegerTimestampToTimestampTz(
892 										  pq_getmsgint64(&incoming_message));
893 				ProcessWalSndrMessage(walEnd, sendTime);
894 
895 				buf += hdrlen;
896 				len -= hdrlen;
897 				XLogWalRcvWrite(buf, len, dataStart);
898 				break;
899 			}
900 		case 'k':				/* Keepalive */
901 			{
902 				/* copy message to StringInfo */
903 				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
904 				if (len != hdrlen)
905 					ereport(ERROR,
906 							(errcode(ERRCODE_PROTOCOL_VIOLATION),
907 							 errmsg_internal("invalid keepalive message received from primary")));
908 				appendBinaryStringInfo(&incoming_message, buf, hdrlen);
909 
910 				/* read the fields */
911 				walEnd = pq_getmsgint64(&incoming_message);
912 				sendTime = IntegerTimestampToTimestampTz(
913 										  pq_getmsgint64(&incoming_message));
914 				replyRequested = pq_getmsgbyte(&incoming_message);
915 
916 				ProcessWalSndrMessage(walEnd, sendTime);
917 
918 				/* If the primary requested a reply, send one immediately */
919 				if (replyRequested)
920 					XLogWalRcvSendReply(true, false);
921 				break;
922 			}
923 		default:
924 			ereport(ERROR,
925 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
926 					 errmsg_internal("invalid replication message type %d",
927 									 type)));
928 	}
929 }
930 
931 /*
932  * Write XLOG data to disk.
933  */
934 static void
XLogWalRcvWrite(char * buf,Size nbytes,XLogRecPtr recptr)935 XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
936 {
937 	int			startoff;
938 	int			byteswritten;
939 
940 	while (nbytes > 0)
941 	{
942 		int			segbytes;
943 
944 		/* Close the current segment if it's completed */
945 		if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo))
946 			XLogWalRcvClose(recptr);
947 
948 		if (recvFile < 0)
949 		{
950 			bool		use_existent = true;
951 
952 			/* Create/use new log file */
953 			XLByteToSeg(recptr, recvSegNo);
954 			recvFile = XLogFileInit(recvSegNo, &use_existent, true);
955 			recvFileTLI = ThisTimeLineID;
956 			recvOff = 0;
957 		}
958 
959 		/* Calculate the start offset of the received logs */
960 		startoff = recptr % XLogSegSize;
961 
962 		if (startoff + nbytes > XLogSegSize)
963 			segbytes = XLogSegSize - startoff;
964 		else
965 			segbytes = nbytes;
966 
967 		/* Need to seek in the file? */
968 		if (recvOff != startoff)
969 		{
970 			if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
971 				ereport(PANIC,
972 						(errcode_for_file_access(),
973 				  errmsg("could not seek in log segment %s to offset %u: %m",
974 						 XLogFileNameP(recvFileTLI, recvSegNo),
975 						 startoff)));
976 			recvOff = startoff;
977 		}
978 
979 		/* OK to write the logs */
980 		errno = 0;
981 
982 		byteswritten = write(recvFile, buf, segbytes);
983 		if (byteswritten <= 0)
984 		{
985 			/* if write didn't set errno, assume no disk space */
986 			if (errno == 0)
987 				errno = ENOSPC;
988 			ereport(PANIC,
989 					(errcode_for_file_access(),
990 					 errmsg("could not write to log segment %s "
991 							"at offset %u, length %lu: %m",
992 							XLogFileNameP(recvFileTLI, recvSegNo),
993 							recvOff, (unsigned long) segbytes)));
994 		}
995 
996 		/* Update state for write */
997 		recptr += byteswritten;
998 
999 		recvOff += byteswritten;
1000 		nbytes -= byteswritten;
1001 		buf += byteswritten;
1002 
1003 		LogstreamResult.Write = recptr;
1004 	}
1005 
1006 	/*
1007 	 * Close the current segment if it's fully written up in the last cycle of
1008 	 * the loop, to create its archive notification file soon. Otherwise WAL
1009 	 * archiving of the segment will be delayed until any data in the next
1010 	 * segment is received and written.
1011 	 */
1012 	if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo))
1013 		XLogWalRcvClose(recptr);
1014 }
1015 
1016 /*
1017  * Flush the log to disk.
1018  *
1019  * If we're in the midst of dying, it's unwise to do anything that might throw
1020  * an error, so we skip sending a reply in that case.
1021  */
1022 static void
XLogWalRcvFlush(bool dying)1023 XLogWalRcvFlush(bool dying)
1024 {
1025 	if (LogstreamResult.Flush < LogstreamResult.Write)
1026 	{
1027 		WalRcvData *walrcv = WalRcv;
1028 
1029 		issue_xlog_fsync(recvFile, recvSegNo);
1030 
1031 		LogstreamResult.Flush = LogstreamResult.Write;
1032 
1033 		/* Update shared-memory status */
1034 		SpinLockAcquire(&walrcv->mutex);
1035 		if (walrcv->receivedUpto < LogstreamResult.Flush)
1036 		{
1037 			walrcv->latestChunkStart = walrcv->receivedUpto;
1038 			walrcv->receivedUpto = LogstreamResult.Flush;
1039 			walrcv->receivedTLI = ThisTimeLineID;
1040 		}
1041 		SpinLockRelease(&walrcv->mutex);
1042 
1043 		/* Signal the startup process and walsender that new WAL has arrived */
1044 		WakeupRecovery();
1045 		if (AllowCascadeReplication())
1046 			WalSndWakeup();
1047 
1048 		/* Report XLOG streaming progress in PS display */
1049 		if (update_process_title)
1050 		{
1051 			char		activitymsg[50];
1052 
1053 			snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1054 					 (uint32) (LogstreamResult.Write >> 32),
1055 					 (uint32) LogstreamResult.Write);
1056 			set_ps_display(activitymsg, false);
1057 		}
1058 
1059 		/* Also let the master know that we made some progress */
1060 		if (!dying)
1061 		{
1062 			XLogWalRcvSendReply(false, false);
1063 			XLogWalRcvSendHSFeedback(false);
1064 		}
1065 	}
1066 }
1067 
1068 /*
1069  * Close the current segment.
1070  *
1071  * Flush the segment to disk before closing it. Otherwise we have to
1072  * reopen and fsync it later.
1073  *
1074  * Create an archive notification file since the segment is known completed.
1075  */
1076 static void
XLogWalRcvClose(XLogRecPtr recptr)1077 XLogWalRcvClose(XLogRecPtr recptr)
1078 {
1079 	char		xlogfname[MAXFNAMELEN];
1080 
1081 	Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo));
1082 
1083 	/*
1084 	 * fsync() and close current file before we switch to next one. We would
1085 	 * otherwise have to reopen this file to fsync it later
1086 	 */
1087 	XLogWalRcvFlush(false);
1088 
1089 	XLogFileName(xlogfname, recvFileTLI, recvSegNo);
1090 
1091 	/*
1092 	 * XLOG segment files will be re-read by recovery in startup process soon,
1093 	 * so we don't advise the OS to release cache pages associated with the
1094 	 * file like XLogFileClose() does.
1095 	 */
1096 	if (close(recvFile) != 0)
1097 		ereport(PANIC,
1098 				(errcode_for_file_access(),
1099 				 errmsg("could not close log segment %s: %m",
1100 						xlogfname)));
1101 
1102 	/*
1103 	 * Create .done file forcibly to prevent the streamed segment from being
1104 	 * archived later.
1105 	 */
1106 	if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
1107 		XLogArchiveForceDone(xlogfname);
1108 	else
1109 		XLogArchiveNotify(xlogfname);
1110 
1111 	recvFile = -1;
1112 }
1113 
1114 /*
1115  * Send reply message to primary, indicating our current XLOG positions, oldest
1116  * xmin and the current time.
1117  *
1118  * If 'force' is not set, the message is only sent if enough time has
1119  * passed since last status update to reach wal_receiver_status_interval.
1120  * If wal_receiver_status_interval is disabled altogether and 'force' is
1121  * false, this is a no-op.
1122  *
1123  * If 'requestReply' is true, requests the server to reply immediately upon
1124  * receiving this message. This is used for heartbearts, when approaching
1125  * wal_receiver_timeout.
1126  */
1127 static void
XLogWalRcvSendReply(bool force,bool requestReply)1128 XLogWalRcvSendReply(bool force, bool requestReply)
1129 {
1130 	static XLogRecPtr writePtr = 0;
1131 	static XLogRecPtr flushPtr = 0;
1132 	XLogRecPtr	applyPtr;
1133 	static TimestampTz sendTime = 0;
1134 	TimestampTz now;
1135 
1136 	/*
1137 	 * If the user doesn't want status to be reported to the master, be sure
1138 	 * to exit before doing anything at all.
1139 	 */
1140 	if (!force && wal_receiver_status_interval <= 0)
1141 		return;
1142 
1143 	/* Get current timestamp. */
1144 	now = GetCurrentTimestamp();
1145 
1146 	/*
1147 	 * We can compare the write and flush positions to the last message we
1148 	 * sent without taking any lock, but the apply position requires a spin
1149 	 * lock, so we don't check that unless something else has changed or 10
1150 	 * seconds have passed.  This means that the apply log position will
1151 	 * appear, from the master's point of view, to lag slightly, but since
1152 	 * this is only for reporting purposes and only on idle systems, that's
1153 	 * probably OK.
1154 	 */
1155 	if (!force
1156 		&& writePtr == LogstreamResult.Write
1157 		&& flushPtr == LogstreamResult.Flush
1158 		&& !TimestampDifferenceExceeds(sendTime, now,
1159 									   wal_receiver_status_interval * 1000))
1160 		return;
1161 	sendTime = now;
1162 
1163 	/* Construct a new message */
1164 	writePtr = LogstreamResult.Write;
1165 	flushPtr = LogstreamResult.Flush;
1166 	applyPtr = GetXLogReplayRecPtr(NULL);
1167 
1168 	resetStringInfo(&reply_message);
1169 	pq_sendbyte(&reply_message, 'r');
1170 	pq_sendint64(&reply_message, writePtr);
1171 	pq_sendint64(&reply_message, flushPtr);
1172 	pq_sendint64(&reply_message, applyPtr);
1173 	pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
1174 	pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1175 
1176 	/* Send it */
1177 	elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1178 		 (uint32) (writePtr >> 32), (uint32) writePtr,
1179 		 (uint32) (flushPtr >> 32), (uint32) flushPtr,
1180 		 (uint32) (applyPtr >> 32), (uint32) applyPtr,
1181 		 requestReply ? " (reply requested)" : "");
1182 
1183 	walrcv_send(reply_message.data, reply_message.len);
1184 }
1185 
1186 /*
1187  * Send hot standby feedback message to primary, plus the current time,
1188  * in case they don't have a watch.
1189  *
1190  * If the user disables feedback, send one final message to tell sender
1191  * to forget about the xmin on this standby. We also send this message
1192  * on first connect because a previous connection might have set xmin
1193  * on a replication slot. (If we're not using a slot it's harmless to
1194  * send a feedback message explicitly setting InvalidTransactionId).
1195  */
1196 static void
XLogWalRcvSendHSFeedback(bool immed)1197 XLogWalRcvSendHSFeedback(bool immed)
1198 {
1199 	TimestampTz now;
1200 	TransactionId nextXid;
1201 	uint32		nextEpoch;
1202 	TransactionId xmin;
1203 	static TimestampTz sendTime = 0;
1204 	/* initially true so we always send at least one feedback message */
1205 	static bool master_has_standby_xmin = true;
1206 
1207 	/*
1208 	 * If the user doesn't want status to be reported to the master, be sure
1209 	 * to exit before doing anything at all.
1210 	 */
1211 	if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
1212 		!master_has_standby_xmin)
1213 		return;
1214 
1215 	/* Get current timestamp. */
1216 	now = GetCurrentTimestamp();
1217 
1218 	if (!immed)
1219 	{
1220 		/*
1221 		 * Send feedback at most once per wal_receiver_status_interval.
1222 		 */
1223 		if (!TimestampDifferenceExceeds(sendTime, now,
1224 										wal_receiver_status_interval * 1000))
1225 			return;
1226 		sendTime = now;
1227 	}
1228 
1229 	/*
1230 	 * If Hot Standby is not yet accepting connections there is nothing to
1231 	 * send. Check this after the interval has expired to reduce number of
1232 	 * calls.
1233 	 *
1234 	 * Bailing out here also ensures that we don't send feedback until we've
1235 	 * read our own replication slot state, so we don't tell the master to
1236 	 * discard needed xmin or catalog_xmin from any slots that may exist
1237 	 * on this replica.
1238 	 */
1239 	if (!HotStandbyActive())
1240 		return;
1241 
1242 	/*
1243 	 * Make the expensive call to get the oldest xmin once we are certain
1244 	 * everything else has been checked.
1245 	 */
1246 	if (hot_standby_feedback)
1247 		xmin = GetOldestXmin(NULL, false);
1248 	else
1249 		xmin = InvalidTransactionId;
1250 
1251 	/*
1252 	 * Get epoch and adjust if nextXid and oldestXmin are different sides of
1253 	 * the epoch boundary.
1254 	 */
1255 	GetNextXidAndEpoch(&nextXid, &nextEpoch);
1256 	if (nextXid < xmin)
1257 		nextEpoch--;
1258 
1259 	elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
1260 		 xmin, nextEpoch);
1261 
1262 	/* Construct the message and send it. */
1263 	resetStringInfo(&reply_message);
1264 	pq_sendbyte(&reply_message, 'h');
1265 	pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
1266 	pq_sendint(&reply_message, xmin, 4);
1267 	pq_sendint(&reply_message, nextEpoch, 4);
1268 	walrcv_send(reply_message.data, reply_message.len);
1269 	if (TransactionIdIsValid(xmin))
1270 		master_has_standby_xmin = true;
1271 	else
1272 		master_has_standby_xmin = false;
1273 }
1274 
1275 /*
1276  * Update shared memory status upon receiving a message from primary.
1277  *
1278  * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
1279  * message, reported by primary.
1280  */
1281 static void
ProcessWalSndrMessage(XLogRecPtr walEnd,TimestampTz sendTime)1282 ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
1283 {
1284 	WalRcvData *walrcv = WalRcv;
1285 
1286 	TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1287 
1288 	/* Update shared-memory status */
1289 	SpinLockAcquire(&walrcv->mutex);
1290 	if (walrcv->latestWalEnd < walEnd)
1291 		walrcv->latestWalEndTime = sendTime;
1292 	walrcv->latestWalEnd = walEnd;
1293 	walrcv->lastMsgSendTime = sendTime;
1294 	walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1295 	SpinLockRelease(&walrcv->mutex);
1296 
1297 	if (log_min_messages <= DEBUG2)
1298 	{
1299 		char	   *sendtime;
1300 		char	   *receipttime;
1301 		int			applyDelay;
1302 
1303 		/* Copy because timestamptz_to_str returns a static buffer */
1304 		sendtime = pstrdup(timestamptz_to_str(sendTime));
1305 		receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1306 		applyDelay = GetReplicationApplyDelay();
1307 
1308 		/* apply delay is not available */
1309 		if (applyDelay == -1)
1310 			elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1311 				 sendtime,
1312 				 receipttime,
1313 				 GetReplicationTransferLatency());
1314 		else
1315 			elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1316 				 sendtime,
1317 				 receipttime,
1318 				 applyDelay,
1319 				 GetReplicationTransferLatency());
1320 
1321 		pfree(sendtime);
1322 		pfree(receipttime);
1323 	}
1324 }
1325 
1326 /*
1327  * Wake up the walreceiver main loop.
1328  *
1329  * This is called by the startup process whenever interesting xlog records
1330  * are applied, so that walreceiver can check if it needs to send an apply
1331  * notification back to the master which may be waiting in a COMMIT with
1332  * synchronous_commit = remote_apply.
1333  */
1334 void
WalRcvForceReply(void)1335 WalRcvForceReply(void)
1336 {
1337 	WalRcv->force_reply = true;
1338 	SetLatch(&WalRcv->latch);
1339 }
1340 
1341 /*
1342  * Return a string constant representing the state. This is used
1343  * in system functions and views, and should *not* be translated.
1344  */
1345 static const char *
WalRcvGetStateString(WalRcvState state)1346 WalRcvGetStateString(WalRcvState state)
1347 {
1348 	switch (state)
1349 	{
1350 		case WALRCV_STOPPED:
1351 			return "stopped";
1352 		case WALRCV_STARTING:
1353 			return "starting";
1354 		case WALRCV_STREAMING:
1355 			return "streaming";
1356 		case WALRCV_WAITING:
1357 			return "waiting";
1358 		case WALRCV_RESTARTING:
1359 			return "restarting";
1360 		case WALRCV_STOPPING:
1361 			return "stopping";
1362 	}
1363 	return "UNKNOWN";
1364 }
1365 
1366 /*
1367  * Returns activity of WAL receiver, including pid, state and xlog locations
1368  * received from the WAL sender of another server.
1369  */
1370 Datum
pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)1371 pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
1372 {
1373 	TupleDesc	tupdesc;
1374 	Datum	   *values;
1375 	bool	   *nulls;
1376 	WalRcvData *walrcv = WalRcv;
1377 	WalRcvState state;
1378 	XLogRecPtr	receive_start_lsn;
1379 	TimeLineID	receive_start_tli;
1380 	XLogRecPtr	received_lsn;
1381 	TimeLineID	received_tli;
1382 	TimestampTz last_send_time;
1383 	TimestampTz last_receipt_time;
1384 	XLogRecPtr	latest_end_lsn;
1385 	TimestampTz latest_end_time;
1386 	char		slotname[NAMEDATALEN];
1387 	char		conninfo[MAXCONNINFO];
1388 
1389 	/*
1390 	 * No WAL receiver (or not ready yet), just return a tuple with NULL
1391 	 * values
1392 	 */
1393 	if (walrcv->pid == 0 || !walrcv->ready_to_display)
1394 		PG_RETURN_NULL();
1395 
1396 	/* determine result type */
1397 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1398 		elog(ERROR, "return type must be a row type");
1399 
1400 	values = palloc0(sizeof(Datum) * tupdesc->natts);
1401 	nulls = palloc0(sizeof(bool) * tupdesc->natts);
1402 
1403 	/* Take a lock to ensure value consistency */
1404 	SpinLockAcquire(&walrcv->mutex);
1405 	state = walrcv->walRcvState;
1406 	receive_start_lsn = walrcv->receiveStart;
1407 	receive_start_tli = walrcv->receiveStartTLI;
1408 	received_lsn = walrcv->receivedUpto;
1409 	received_tli = walrcv->receivedTLI;
1410 	last_send_time = walrcv->lastMsgSendTime;
1411 	last_receipt_time = walrcv->lastMsgReceiptTime;
1412 	latest_end_lsn = walrcv->latestWalEnd;
1413 	latest_end_time = walrcv->latestWalEndTime;
1414 	strlcpy(slotname, (char *) walrcv->slotname, sizeof(slotname));
1415 	strlcpy(conninfo, (char *) walrcv->conninfo, sizeof(conninfo));
1416 	SpinLockRelease(&walrcv->mutex);
1417 
1418 	/* Fetch values */
1419 	values[0] = Int32GetDatum(walrcv->pid);
1420 
1421 	if (!superuser())
1422 	{
1423 		/*
1424 		 * Only superusers can see details. Other users only get the pid value
1425 		 * to know whether it is a WAL receiver, but no details.
1426 		 */
1427 		MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1428 	}
1429 	else
1430 	{
1431 		values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1432 
1433 		if (XLogRecPtrIsInvalid(receive_start_lsn))
1434 			nulls[2] = true;
1435 		else
1436 			values[2] = LSNGetDatum(receive_start_lsn);
1437 		values[3] = Int32GetDatum(receive_start_tli);
1438 		if (XLogRecPtrIsInvalid(received_lsn))
1439 			nulls[4] = true;
1440 		else
1441 			values[4] = LSNGetDatum(received_lsn);
1442 		values[5] = Int32GetDatum(received_tli);
1443 		if (last_send_time == 0)
1444 			nulls[6] = true;
1445 		else
1446 			values[6] = TimestampTzGetDatum(last_send_time);
1447 		if (last_receipt_time == 0)
1448 			nulls[7] = true;
1449 		else
1450 			values[7] = TimestampTzGetDatum(last_receipt_time);
1451 		if (XLogRecPtrIsInvalid(latest_end_lsn))
1452 			nulls[8] = true;
1453 		else
1454 			values[8] = LSNGetDatum(latest_end_lsn);
1455 		if (latest_end_time == 0)
1456 			nulls[9] = true;
1457 		else
1458 			values[9] = TimestampTzGetDatum(latest_end_time);
1459 		if (*slotname == '\0')
1460 			nulls[10] = true;
1461 		else
1462 			values[10] = CStringGetTextDatum(slotname);
1463 		if (*conninfo == '\0')
1464 			nulls[11] = true;
1465 		else
1466 			values[11] = CStringGetTextDatum(conninfo);
1467 	}
1468 
1469 	/* Returns the record as Datum */
1470 	PG_RETURN_DATUM(HeapTupleGetDatum(
1471 								   heap_form_tuple(tupdesc, values, nulls)));
1472 }
1473