1 /*-------------------------------------------------------------------------
2 *
3 * walreceiver.c
4 *
5 * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6 * is the process in the standby server that takes charge of receiving
7 * XLOG records from a primary server during streaming replication.
8 *
9 * When the startup process determines that it's time to start streaming,
10 * it instructs postmaster to start walreceiver. Walreceiver first connects
11 * to the primary server (it will be served by a walsender process
12 * in the primary server), and then keeps receiving XLOG records and
13 * writing them to the disk as long as the connection is alive. As XLOG
14 * records are received and flushed to disk, it updates the
15 * WalRcv->receivedUpto variable in shared memory, to inform the startup
16 * process of how far it can proceed with XLOG replay.
17 *
18 * If the primary server ends streaming, but doesn't disconnect, walreceiver
19 * goes into "waiting" mode, and waits for the startup process to give new
20 * instructions. The startup process will treat that the same as
21 * disconnection, and will rescan the archive/pg_wal directory. But when the
22 * startup process wants to try streaming replication again, it will just
23 * nudge the existing walreceiver process that's waiting, instead of launching
24 * a new one.
25 *
26 * Normal termination is by SIGTERM, which instructs the walreceiver to
27 * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
28 * process, the walreceiver will simply abort and exit on SIGQUIT. A close
29 * of the connection and a FATAL error are treated not as a crash but as
30 * normal operation.
31 *
32 * This file contains the server-facing parts of walreceiver. The libpq-
33 * specific parts are in the libpqwalreceiver module. It's loaded
34 * dynamically to avoid linking the server with libpq.
35 *
36 * Portions Copyright (c) 2010-2018, PostgreSQL Global Development Group
37 *
38 *
39 * IDENTIFICATION
40 * src/backend/replication/walreceiver.c
41 *
42 *-------------------------------------------------------------------------
43 */
44 #include "postgres.h"
45
46 #include <signal.h>
47 #include <unistd.h>
48
49 #include "access/htup_details.h"
50 #include "access/timeline.h"
51 #include "access/transam.h"
52 #include "access/xlog_internal.h"
53 #include "catalog/pg_authid.h"
54 #include "catalog/pg_type.h"
55 #include "common/ip.h"
56 #include "funcapi.h"
57 #include "libpq/pqformat.h"
58 #include "libpq/pqsignal.h"
59 #include "miscadmin.h"
60 #include "pgstat.h"
61 #include "replication/walreceiver.h"
62 #include "replication/walsender.h"
63 #include "storage/ipc.h"
64 #include "storage/pmsignal.h"
65 #include "storage/procarray.h"
66 #include "utils/builtins.h"
67 #include "utils/guc.h"
68 #include "utils/pg_lsn.h"
69 #include "utils/ps_status.h"
70 #include "utils/resowner.h"
71 #include "utils/timestamp.h"
72
73
74 /* GUC variables */
75 int wal_receiver_status_interval;
76 int wal_receiver_timeout;
77 bool hot_standby_feedback;
78
79 /* libpqwalreceiver connection */
80 static WalReceiverConn *wrconn = NULL;
81 WalReceiverFunctionsType *WalReceiverFunctions = NULL;
82
83 #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
84
85 /*
86 * These variables are used similarly to openLogFile/SegNo/Off,
87 * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
88 * corresponding the filename of recvFile.
89 */
90 static int recvFile = -1;
91 static TimeLineID recvFileTLI = 0;
92 static XLogSegNo recvSegNo = 0;
93 static uint32 recvOff = 0;
94
95 /*
96 * Flags set by interrupt handlers of walreceiver for later service in the
97 * main loop.
98 */
99 static volatile sig_atomic_t got_SIGHUP = false;
100 static volatile sig_atomic_t got_SIGTERM = false;
101
102 /*
103 * LogstreamResult indicates the byte positions that we have already
104 * written/fsynced.
105 */
106 static struct
107 {
108 XLogRecPtr Write; /* last byte + 1 written out in the standby */
109 XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
110 } LogstreamResult;
111
112 static StringInfoData reply_message;
113 static StringInfoData incoming_message;
114
115 /* Prototypes for private functions */
116 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
117 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
118 static void WalRcvDie(int code, Datum arg);
119 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
120 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
121 static void XLogWalRcvFlush(bool dying);
122 static void XLogWalRcvClose(XLogRecPtr recptr);
123 static void XLogWalRcvSendReply(bool force, bool requestReply);
124 static void XLogWalRcvSendHSFeedback(bool immed);
125 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
126
127 /* Signal handlers */
128 static void WalRcvSigHupHandler(SIGNAL_ARGS);
129 static void WalRcvSigUsr1Handler(SIGNAL_ARGS);
130 static void WalRcvShutdownHandler(SIGNAL_ARGS);
131 static void WalRcvQuickDieHandler(SIGNAL_ARGS);
132
133
134 /*
135 * Process any interrupts the walreceiver process may have received.
136 * This should be called any time the process's latch has become set.
137 *
138 * Currently, only SIGTERM is of interest. We can't just exit(1) within the
139 * SIGTERM signal handler, because the signal might arrive in the middle of
140 * some critical operation, like while we're holding a spinlock. Instead, the
141 * signal handler sets a flag variable as well as setting the process's latch.
142 * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
143 * latch has become set. Operations that could block for a long time, such as
144 * reading from a remote server, must pay attention to the latch too; see
145 * libpqrcv_PQgetResult for example.
146 */
147 void
ProcessWalRcvInterrupts(void)148 ProcessWalRcvInterrupts(void)
149 {
150 /*
151 * Although walreceiver interrupt handling doesn't use the same scheme as
152 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
153 * any incoming signals on Win32.
154 */
155 CHECK_FOR_INTERRUPTS();
156
157 if (got_SIGTERM)
158 {
159 ereport(FATAL,
160 (errcode(ERRCODE_ADMIN_SHUTDOWN),
161 errmsg("terminating walreceiver process due to administrator command")));
162 }
163 }
164
165
166 /* Main entry point for walreceiver process */
167 void
WalReceiverMain(void)168 WalReceiverMain(void)
169 {
170 char conninfo[MAXCONNINFO];
171 char *tmp_conninfo;
172 char slotname[NAMEDATALEN];
173 XLogRecPtr startpoint;
174 TimeLineID startpointTLI;
175 TimeLineID primaryTLI;
176 bool first_stream;
177 WalRcvData *walrcv = WalRcv;
178 TimestampTz last_recv_timestamp;
179 TimestampTz now;
180 bool ping_sent;
181 char *err;
182 char *sender_host = NULL;
183 int sender_port = 0;
184
185 /*
186 * WalRcv should be set up already (if we are a backend, we inherit this
187 * by fork() or EXEC_BACKEND mechanism from the postmaster).
188 */
189 Assert(walrcv != NULL);
190
191 now = GetCurrentTimestamp();
192
193 /*
194 * Mark walreceiver as running in shared memory.
195 *
196 * Do this as early as possible, so that if we fail later on, we'll set
197 * state to STOPPED. If we die before this, the startup process will keep
198 * waiting for us to start up, until it times out.
199 */
200 SpinLockAcquire(&walrcv->mutex);
201 Assert(walrcv->pid == 0);
202 switch (walrcv->walRcvState)
203 {
204 case WALRCV_STOPPING:
205 /* If we've already been requested to stop, don't start up. */
206 walrcv->walRcvState = WALRCV_STOPPED;
207 /* fall through */
208
209 case WALRCV_STOPPED:
210 SpinLockRelease(&walrcv->mutex);
211 proc_exit(1);
212 break;
213
214 case WALRCV_STARTING:
215 /* The usual case */
216 break;
217
218 case WALRCV_WAITING:
219 case WALRCV_STREAMING:
220 case WALRCV_RESTARTING:
221 default:
222 /* Shouldn't happen */
223 SpinLockRelease(&walrcv->mutex);
224 elog(PANIC, "walreceiver still running according to shared memory state");
225 }
226 /* Advertise our PID so that the startup process can kill us */
227 walrcv->pid = MyProcPid;
228 walrcv->walRcvState = WALRCV_STREAMING;
229
230 /* Fetch information required to start streaming */
231 walrcv->ready_to_display = false;
232 strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
233 strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
234 startpoint = walrcv->receiveStart;
235 startpointTLI = walrcv->receiveStartTLI;
236
237 /* Initialise to a sanish value */
238 walrcv->lastMsgSendTime =
239 walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
240
241 /* Report the latch to use to awaken this process */
242 walrcv->latch = &MyProc->procLatch;
243
244 SpinLockRelease(&walrcv->mutex);
245
246 /* Arrange to clean up at walreceiver exit */
247 on_shmem_exit(WalRcvDie, 0);
248
249 /* Properly accept or ignore signals the postmaster might send us */
250 pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
251 pqsignal(SIGINT, SIG_IGN);
252 pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
253 pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
254 pqsignal(SIGALRM, SIG_IGN);
255 pqsignal(SIGPIPE, SIG_IGN);
256 pqsignal(SIGUSR1, WalRcvSigUsr1Handler);
257 pqsignal(SIGUSR2, SIG_IGN);
258
259 /* Reset some signals that are accepted by postmaster but not here */
260 pqsignal(SIGCHLD, SIG_DFL);
261 pqsignal(SIGTTIN, SIG_DFL);
262 pqsignal(SIGTTOU, SIG_DFL);
263 pqsignal(SIGCONT, SIG_DFL);
264 pqsignal(SIGWINCH, SIG_DFL);
265
266 /* We allow SIGQUIT (quickdie) at all times */
267 sigdelset(&BlockSig, SIGQUIT);
268
269 /* Load the libpq-specific functions */
270 load_file("libpqwalreceiver", false);
271 if (WalReceiverFunctions == NULL)
272 elog(ERROR, "libpqwalreceiver didn't initialize correctly");
273
274 /*
275 * Create a resource owner to keep track of our resources (not clear that
276 * we need this, but may as well have one).
277 */
278 CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
279
280 /* Unblock signals (they were blocked when the postmaster forked us) */
281 PG_SETMASK(&UnBlockSig);
282
283 /* Establish the connection to the primary for XLOG streaming */
284 wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
285 if (!wrconn)
286 ereport(ERROR,
287 (errmsg("could not connect to the primary server: %s", err)));
288
289 /*
290 * Save user-visible connection string. This clobbers the original
291 * conninfo, for security. Also save host and port of the sender server
292 * this walreceiver is connected to.
293 */
294 tmp_conninfo = walrcv_get_conninfo(wrconn);
295 walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
296 SpinLockAcquire(&walrcv->mutex);
297 memset(walrcv->conninfo, 0, MAXCONNINFO);
298 if (tmp_conninfo)
299 strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
300
301 memset(walrcv->sender_host, 0, NI_MAXHOST);
302 if (sender_host)
303 strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
304
305 walrcv->sender_port = sender_port;
306 walrcv->ready_to_display = true;
307 SpinLockRelease(&walrcv->mutex);
308
309 if (tmp_conninfo)
310 pfree(tmp_conninfo);
311
312 if (sender_host)
313 pfree(sender_host);
314
315 first_stream = true;
316 for (;;)
317 {
318 char *primary_sysid;
319 char standby_sysid[32];
320 int server_version;
321 WalRcvStreamOptions options;
322
323 /*
324 * Check that we're connected to a valid server using the
325 * IDENTIFY_SYSTEM replication command.
326 */
327 primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
328 &server_version);
329
330 snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
331 GetSystemIdentifier());
332 if (strcmp(primary_sysid, standby_sysid) != 0)
333 {
334 ereport(ERROR,
335 (errmsg("database system identifier differs between the primary and standby"),
336 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
337 primary_sysid, standby_sysid)));
338 }
339
340 /*
341 * Confirm that the current timeline of the primary is the same or
342 * ahead of ours.
343 */
344 if (primaryTLI < startpointTLI)
345 ereport(ERROR,
346 (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
347 primaryTLI, startpointTLI)));
348
349 /*
350 * Get any missing history files. We do this always, even when we're
351 * not interested in that timeline, so that if we're promoted to
352 * become the master later on, we don't select the same timeline that
353 * was already used in the current master. This isn't bullet-proof -
354 * you'll need some external software to manage your cluster if you
355 * need to ensure that a unique timeline id is chosen in every case,
356 * but let's avoid the confusion of timeline id collisions where we
357 * can.
358 */
359 WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
360
361 /*
362 * Start streaming.
363 *
364 * We'll try to start at the requested starting point and timeline,
365 * even if it's different from the server's latest timeline. In case
366 * we've already reached the end of the old timeline, the server will
367 * finish the streaming immediately, and we will go back to await
368 * orders from the startup process. If recovery_target_timeline is
369 * 'latest', the startup process will scan pg_wal and find the new
370 * history file, bump recovery target timeline, and ask us to restart
371 * on the new timeline.
372 */
373 options.logical = false;
374 options.startpoint = startpoint;
375 options.slotname = slotname[0] != '\0' ? slotname : NULL;
376 options.proto.physical.startpointTLI = startpointTLI;
377 ThisTimeLineID = startpointTLI;
378 if (walrcv_startstreaming(wrconn, &options))
379 {
380 if (first_stream)
381 ereport(LOG,
382 (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
383 (uint32) (startpoint >> 32), (uint32) startpoint,
384 startpointTLI)));
385 else
386 ereport(LOG,
387 (errmsg("restarted WAL streaming at %X/%X on timeline %u",
388 (uint32) (startpoint >> 32), (uint32) startpoint,
389 startpointTLI)));
390 first_stream = false;
391
392 /* Initialize LogstreamResult and buffers for processing messages */
393 LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
394 initStringInfo(&reply_message);
395 initStringInfo(&incoming_message);
396
397 /* Initialize the last recv timestamp */
398 last_recv_timestamp = GetCurrentTimestamp();
399 ping_sent = false;
400
401 /* Loop until end-of-streaming or error */
402 for (;;)
403 {
404 char *buf;
405 int len;
406 bool endofwal = false;
407 pgsocket wait_fd = PGINVALID_SOCKET;
408 int rc;
409
410 /*
411 * Exit walreceiver if we're not in recovery. This should not
412 * happen, but cross-check the status here.
413 */
414 if (!RecoveryInProgress())
415 ereport(FATAL,
416 (errmsg("cannot continue WAL streaming, recovery has already ended")));
417
418 /* Process any requests or signals received recently */
419 ProcessWalRcvInterrupts();
420
421 if (got_SIGHUP)
422 {
423 got_SIGHUP = false;
424 ProcessConfigFile(PGC_SIGHUP);
425 XLogWalRcvSendHSFeedback(true);
426 }
427
428 /* See if we can read data immediately */
429 len = walrcv_receive(wrconn, &buf, &wait_fd);
430 if (len != 0)
431 {
432 /*
433 * Process the received data, and any subsequent data we
434 * can read without blocking.
435 */
436 for (;;)
437 {
438 if (len > 0)
439 {
440 /*
441 * Something was received from master, so reset
442 * timeout
443 */
444 last_recv_timestamp = GetCurrentTimestamp();
445 ping_sent = false;
446 XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
447 }
448 else if (len == 0)
449 break;
450 else if (len < 0)
451 {
452 ereport(LOG,
453 (errmsg("replication terminated by primary server"),
454 errdetail("End of WAL reached on timeline %u at %X/%X.",
455 startpointTLI,
456 (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
457 endofwal = true;
458 break;
459 }
460 len = walrcv_receive(wrconn, &buf, &wait_fd);
461 }
462
463 /* Let the master know that we received some data. */
464 XLogWalRcvSendReply(false, false);
465
466 /*
467 * If we've written some records, flush them to disk and
468 * let the startup process and primary server know about
469 * them.
470 */
471 XLogWalRcvFlush(false);
472 }
473
474 /* Check if we need to exit the streaming loop. */
475 if (endofwal)
476 break;
477
478 /*
479 * Ideally we would reuse a WaitEventSet object repeatedly
480 * here to avoid the overheads of WaitLatchOrSocket on epoll
481 * systems, but we can't be sure that libpq (or any other
482 * walreceiver implementation) has the same socket (even if
483 * the fd is the same number, it may have been closed and
484 * reopened since the last time). In future, if there is a
485 * function for removing sockets from WaitEventSet, then we
486 * could add and remove just the socket each time, potentially
487 * avoiding some system calls.
488 */
489 Assert(wait_fd != PGINVALID_SOCKET);
490 rc = WaitLatchOrSocket(walrcv->latch,
491 WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
492 WL_TIMEOUT | WL_LATCH_SET,
493 wait_fd,
494 NAPTIME_PER_CYCLE,
495 WAIT_EVENT_WAL_RECEIVER_MAIN);
496 if (rc & WL_LATCH_SET)
497 {
498 ResetLatch(walrcv->latch);
499 ProcessWalRcvInterrupts();
500
501 if (walrcv->force_reply)
502 {
503 /*
504 * The recovery process has asked us to send apply
505 * feedback now. Make sure the flag is really set to
506 * false in shared memory before sending the reply, so
507 * we don't miss a new request for a reply.
508 */
509 walrcv->force_reply = false;
510 pg_memory_barrier();
511 XLogWalRcvSendReply(true, false);
512 }
513 }
514 if (rc & WL_POSTMASTER_DEATH)
515 {
516 /*
517 * Emergency bailout if postmaster has died. This is to
518 * avoid the necessity for manual cleanup of all
519 * postmaster children.
520 */
521 exit(1);
522 }
523 if (rc & WL_TIMEOUT)
524 {
525 /*
526 * We didn't receive anything new. If we haven't heard
527 * anything from the server for more than
528 * wal_receiver_timeout / 2, ping the server. Also, if
529 * it's been longer than wal_receiver_status_interval
530 * since the last update we sent, send a status update to
531 * the master anyway, to report any progress in applying
532 * WAL.
533 */
534 bool requestReply = false;
535
536 /*
537 * Check if time since last receive from standby has
538 * reached the configured limit.
539 */
540 if (wal_receiver_timeout > 0)
541 {
542 TimestampTz now = GetCurrentTimestamp();
543 TimestampTz timeout;
544
545 timeout =
546 TimestampTzPlusMilliseconds(last_recv_timestamp,
547 wal_receiver_timeout);
548
549 if (now >= timeout)
550 ereport(ERROR,
551 (errmsg("terminating walreceiver due to timeout")));
552
553 /*
554 * We didn't receive anything new, for half of
555 * receiver replication timeout. Ping the server.
556 */
557 if (!ping_sent)
558 {
559 timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
560 (wal_receiver_timeout / 2));
561 if (now >= timeout)
562 {
563 requestReply = true;
564 ping_sent = true;
565 }
566 }
567 }
568
569 XLogWalRcvSendReply(requestReply, requestReply);
570 XLogWalRcvSendHSFeedback(false);
571 }
572 }
573
574 /*
575 * The backend finished streaming. Exit streaming COPY-mode from
576 * our side, too.
577 */
578 walrcv_endstreaming(wrconn, &primaryTLI);
579
580 /*
581 * If the server had switched to a new timeline that we didn't
582 * know about when we began streaming, fetch its timeline history
583 * file now.
584 */
585 WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
586 }
587 else
588 ereport(LOG,
589 (errmsg("primary server contains no more WAL on requested timeline %u",
590 startpointTLI)));
591
592 /*
593 * End of WAL reached on the requested timeline. Close the last
594 * segment, and await for new orders from the startup process.
595 */
596 if (recvFile >= 0)
597 {
598 char xlogfname[MAXFNAMELEN];
599
600 XLogWalRcvFlush(false);
601 if (close(recvFile) != 0)
602 ereport(PANIC,
603 (errcode_for_file_access(),
604 errmsg("could not close log segment %s: %m",
605 XLogFileNameP(recvFileTLI, recvSegNo))));
606
607 /*
608 * Create .done file forcibly to prevent the streamed segment from
609 * being archived later.
610 */
611 XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
612 if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
613 XLogArchiveForceDone(xlogfname);
614 else
615 XLogArchiveNotify(xlogfname);
616 }
617 recvFile = -1;
618
619 elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
620 WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
621 }
622 /* not reached */
623 }
624
625 /*
626 * Wait for startup process to set receiveStart and receiveStartTLI.
627 */
628 static void
WalRcvWaitForStartPosition(XLogRecPtr * startpoint,TimeLineID * startpointTLI)629 WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
630 {
631 WalRcvData *walrcv = WalRcv;
632 int state;
633
634 SpinLockAcquire(&walrcv->mutex);
635 state = walrcv->walRcvState;
636 if (state != WALRCV_STREAMING)
637 {
638 SpinLockRelease(&walrcv->mutex);
639 if (state == WALRCV_STOPPING)
640 proc_exit(0);
641 else
642 elog(FATAL, "unexpected walreceiver state");
643 }
644 walrcv->walRcvState = WALRCV_WAITING;
645 walrcv->receiveStart = InvalidXLogRecPtr;
646 walrcv->receiveStartTLI = 0;
647 SpinLockRelease(&walrcv->mutex);
648
649 if (update_process_title)
650 set_ps_display("idle", false);
651
652 /*
653 * nudge startup process to notice that we've stopped streaming and are
654 * now waiting for instructions.
655 */
656 WakeupRecovery();
657 for (;;)
658 {
659 ResetLatch(walrcv->latch);
660
661 /*
662 * Emergency bailout if postmaster has died. This is to avoid the
663 * necessity for manual cleanup of all postmaster children.
664 */
665 if (!PostmasterIsAlive())
666 exit(1);
667
668 ProcessWalRcvInterrupts();
669
670 SpinLockAcquire(&walrcv->mutex);
671 Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
672 walrcv->walRcvState == WALRCV_WAITING ||
673 walrcv->walRcvState == WALRCV_STOPPING);
674 if (walrcv->walRcvState == WALRCV_RESTARTING)
675 {
676 /* we don't expect primary_conninfo to change */
677 *startpoint = walrcv->receiveStart;
678 *startpointTLI = walrcv->receiveStartTLI;
679 walrcv->walRcvState = WALRCV_STREAMING;
680 SpinLockRelease(&walrcv->mutex);
681 break;
682 }
683 if (walrcv->walRcvState == WALRCV_STOPPING)
684 {
685 /*
686 * We should've received SIGTERM if the startup process wants us
687 * to die, but might as well check it here too.
688 */
689 SpinLockRelease(&walrcv->mutex);
690 exit(1);
691 }
692 SpinLockRelease(&walrcv->mutex);
693
694 WaitLatch(walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
695 WAIT_EVENT_WAL_RECEIVER_WAIT_START);
696 }
697
698 if (update_process_title)
699 {
700 char activitymsg[50];
701
702 snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
703 (uint32) (*startpoint >> 32),
704 (uint32) *startpoint);
705 set_ps_display(activitymsg, false);
706 }
707 }
708
709 /*
710 * Fetch any missing timeline history files between 'first' and 'last'
711 * (inclusive) from the server.
712 */
713 static void
WalRcvFetchTimeLineHistoryFiles(TimeLineID first,TimeLineID last)714 WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
715 {
716 TimeLineID tli;
717
718 for (tli = first; tli <= last; tli++)
719 {
720 /* there's no history file for timeline 1 */
721 if (tli != 1 && !existsTimeLineHistory(tli))
722 {
723 char *fname;
724 char *content;
725 int len;
726 char expectedfname[MAXFNAMELEN];
727
728 ereport(LOG,
729 (errmsg("fetching timeline history file for timeline %u from primary server",
730 tli)));
731
732 walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
733
734 /*
735 * Check that the filename on the master matches what we
736 * calculated ourselves. This is just a sanity check, it should
737 * always match.
738 */
739 TLHistoryFileName(expectedfname, tli);
740 if (strcmp(fname, expectedfname) != 0)
741 ereport(ERROR,
742 (errcode(ERRCODE_PROTOCOL_VIOLATION),
743 errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
744 tli)));
745
746 /*
747 * Write the file to pg_wal.
748 */
749 writeTimeLineHistoryFile(tli, content, len);
750
751 /*
752 * Mark the streamed history file as ready for archiving
753 * if archive_mode is always.
754 */
755 if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
756 XLogArchiveForceDone(fname);
757 else
758 XLogArchiveNotify(fname);
759
760 pfree(fname);
761 pfree(content);
762 }
763 }
764 }
765
766 /*
767 * Mark us as STOPPED in shared memory at exit.
768 */
769 static void
WalRcvDie(int code,Datum arg)770 WalRcvDie(int code, Datum arg)
771 {
772 WalRcvData *walrcv = WalRcv;
773
774 /* Ensure that all WAL records received are flushed to disk */
775 XLogWalRcvFlush(true);
776
777 /* Mark ourselves inactive in shared memory */
778 SpinLockAcquire(&walrcv->mutex);
779 Assert(walrcv->walRcvState == WALRCV_STREAMING ||
780 walrcv->walRcvState == WALRCV_RESTARTING ||
781 walrcv->walRcvState == WALRCV_STARTING ||
782 walrcv->walRcvState == WALRCV_WAITING ||
783 walrcv->walRcvState == WALRCV_STOPPING);
784 Assert(walrcv->pid == MyProcPid);
785 walrcv->walRcvState = WALRCV_STOPPED;
786 walrcv->pid = 0;
787 walrcv->ready_to_display = false;
788 walrcv->latch = NULL;
789 SpinLockRelease(&walrcv->mutex);
790
791 /* Terminate the connection gracefully. */
792 if (wrconn != NULL)
793 walrcv_disconnect(wrconn);
794
795 /* Wake up the startup process to notice promptly that we're gone */
796 WakeupRecovery();
797 }
798
799 /* SIGHUP: set flag to re-read config file at next convenient time */
800 static void
WalRcvSigHupHandler(SIGNAL_ARGS)801 WalRcvSigHupHandler(SIGNAL_ARGS)
802 {
803 got_SIGHUP = true;
804 }
805
806
807 /* SIGUSR1: used by latch mechanism */
808 static void
WalRcvSigUsr1Handler(SIGNAL_ARGS)809 WalRcvSigUsr1Handler(SIGNAL_ARGS)
810 {
811 int save_errno = errno;
812
813 latch_sigusr1_handler();
814
815 errno = save_errno;
816 }
817
818 /* SIGTERM: set flag for ProcessWalRcvInterrupts */
819 static void
WalRcvShutdownHandler(SIGNAL_ARGS)820 WalRcvShutdownHandler(SIGNAL_ARGS)
821 {
822 int save_errno = errno;
823
824 got_SIGTERM = true;
825
826 if (WalRcv->latch)
827 SetLatch(WalRcv->latch);
828
829 errno = save_errno;
830 }
831
832 /*
833 * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
834 *
835 * Some backend has bought the farm, so we need to stop what we're doing and
836 * exit.
837 */
838 static void
WalRcvQuickDieHandler(SIGNAL_ARGS)839 WalRcvQuickDieHandler(SIGNAL_ARGS)
840 {
841 /*
842 * We DO NOT want to run proc_exit() or atexit() callbacks -- we're here
843 * because shared memory may be corrupted, so we don't want to try to
844 * clean up our transaction. Just nail the windows shut and get out of
845 * town. The callbacks wouldn't be safe to run from a signal handler,
846 * anyway.
847 *
848 * Note we use _exit(2) not _exit(0). This is to force the postmaster
849 * into a system reset cycle if someone sends a manual SIGQUIT to a
850 * random backend. This is necessary precisely because we don't clean up
851 * our shared memory state. (The "dead man switch" mechanism in
852 * pmsignal.c should ensure the postmaster sees this as a crash, too, but
853 * no harm in being doubly sure.)
854 */
855 _exit(2);
856 }
857
858 /*
859 * Accept the message from XLOG stream, and process it.
860 */
861 static void
XLogWalRcvProcessMsg(unsigned char type,char * buf,Size len)862 XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
863 {
864 int hdrlen;
865 XLogRecPtr dataStart;
866 XLogRecPtr walEnd;
867 TimestampTz sendTime;
868 bool replyRequested;
869
870 resetStringInfo(&incoming_message);
871
872 switch (type)
873 {
874 case 'w': /* WAL records */
875 {
876 /* copy message to StringInfo */
877 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
878 if (len < hdrlen)
879 ereport(ERROR,
880 (errcode(ERRCODE_PROTOCOL_VIOLATION),
881 errmsg_internal("invalid WAL message received from primary")));
882 appendBinaryStringInfo(&incoming_message, buf, hdrlen);
883
884 /* read the fields */
885 dataStart = pq_getmsgint64(&incoming_message);
886 walEnd = pq_getmsgint64(&incoming_message);
887 sendTime = pq_getmsgint64(&incoming_message);
888 ProcessWalSndrMessage(walEnd, sendTime);
889
890 buf += hdrlen;
891 len -= hdrlen;
892 XLogWalRcvWrite(buf, len, dataStart);
893 break;
894 }
895 case 'k': /* Keepalive */
896 {
897 /* copy message to StringInfo */
898 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
899 if (len != hdrlen)
900 ereport(ERROR,
901 (errcode(ERRCODE_PROTOCOL_VIOLATION),
902 errmsg_internal("invalid keepalive message received from primary")));
903 appendBinaryStringInfo(&incoming_message, buf, hdrlen);
904
905 /* read the fields */
906 walEnd = pq_getmsgint64(&incoming_message);
907 sendTime = pq_getmsgint64(&incoming_message);
908 replyRequested = pq_getmsgbyte(&incoming_message);
909
910 ProcessWalSndrMessage(walEnd, sendTime);
911
912 /* If the primary requested a reply, send one immediately */
913 if (replyRequested)
914 XLogWalRcvSendReply(true, false);
915 break;
916 }
917 default:
918 ereport(ERROR,
919 (errcode(ERRCODE_PROTOCOL_VIOLATION),
920 errmsg_internal("invalid replication message type %d",
921 type)));
922 }
923 }
924
925 /*
926 * Write XLOG data to disk.
927 */
928 static void
XLogWalRcvWrite(char * buf,Size nbytes,XLogRecPtr recptr)929 XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
930 {
931 int startoff;
932 int byteswritten;
933
934 while (nbytes > 0)
935 {
936 int segbytes;
937
938 /* Close the current segment if it's completed */
939 if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
940 XLogWalRcvClose(recptr);
941
942 if (recvFile < 0)
943 {
944 bool use_existent = true;
945
946 /* Create/use new log file */
947 XLByteToSeg(recptr, recvSegNo, wal_segment_size);
948 recvFile = XLogFileInit(recvSegNo, &use_existent, true);
949 recvFileTLI = ThisTimeLineID;
950 recvOff = 0;
951 }
952
953 /* Calculate the start offset of the received logs */
954 startoff = XLogSegmentOffset(recptr, wal_segment_size);
955
956 if (startoff + nbytes > wal_segment_size)
957 segbytes = wal_segment_size - startoff;
958 else
959 segbytes = nbytes;
960
961 /* Need to seek in the file? */
962 if (recvOff != startoff)
963 {
964 if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
965 ereport(PANIC,
966 (errcode_for_file_access(),
967 errmsg("could not seek in log segment %s to offset %u: %m",
968 XLogFileNameP(recvFileTLI, recvSegNo),
969 startoff)));
970 recvOff = startoff;
971 }
972
973 /* OK to write the logs */
974 errno = 0;
975
976 byteswritten = write(recvFile, buf, segbytes);
977 if (byteswritten <= 0)
978 {
979 /* if write didn't set errno, assume no disk space */
980 if (errno == 0)
981 errno = ENOSPC;
982 ereport(PANIC,
983 (errcode_for_file_access(),
984 errmsg("could not write to log segment %s "
985 "at offset %u, length %lu: %m",
986 XLogFileNameP(recvFileTLI, recvSegNo),
987 recvOff, (unsigned long) segbytes)));
988 }
989
990 /* Update state for write */
991 recptr += byteswritten;
992
993 recvOff += byteswritten;
994 nbytes -= byteswritten;
995 buf += byteswritten;
996
997 LogstreamResult.Write = recptr;
998 }
999
1000 /*
1001 * Close the current segment if it's fully written up in the last cycle of
1002 * the loop, to create its archive notification file soon. Otherwise WAL
1003 * archiving of the segment will be delayed until any data in the next
1004 * segment is received and written.
1005 */
1006 if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
1007 XLogWalRcvClose(recptr);
1008 }
1009
1010 /*
1011 * Flush the log to disk.
1012 *
1013 * If we're in the midst of dying, it's unwise to do anything that might throw
1014 * an error, so we skip sending a reply in that case.
1015 */
1016 static void
XLogWalRcvFlush(bool dying)1017 XLogWalRcvFlush(bool dying)
1018 {
1019 if (LogstreamResult.Flush < LogstreamResult.Write)
1020 {
1021 WalRcvData *walrcv = WalRcv;
1022
1023 issue_xlog_fsync(recvFile, recvSegNo);
1024
1025 LogstreamResult.Flush = LogstreamResult.Write;
1026
1027 /* Update shared-memory status */
1028 SpinLockAcquire(&walrcv->mutex);
1029 if (walrcv->receivedUpto < LogstreamResult.Flush)
1030 {
1031 walrcv->latestChunkStart = walrcv->receivedUpto;
1032 walrcv->receivedUpto = LogstreamResult.Flush;
1033 walrcv->receivedTLI = ThisTimeLineID;
1034 }
1035 SpinLockRelease(&walrcv->mutex);
1036
1037 /* Signal the startup process and walsender that new WAL has arrived */
1038 WakeupRecovery();
1039 if (AllowCascadeReplication())
1040 WalSndWakeup();
1041
1042 /* Report XLOG streaming progress in PS display */
1043 if (update_process_title)
1044 {
1045 char activitymsg[50];
1046
1047 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1048 (uint32) (LogstreamResult.Write >> 32),
1049 (uint32) LogstreamResult.Write);
1050 set_ps_display(activitymsg, false);
1051 }
1052
1053 /* Also let the master know that we made some progress */
1054 if (!dying)
1055 {
1056 XLogWalRcvSendReply(false, false);
1057 XLogWalRcvSendHSFeedback(false);
1058 }
1059 }
1060 }
1061
1062 /*
1063 * Close the current segment.
1064 *
1065 * Flush the segment to disk before closing it. Otherwise we have to
1066 * reopen and fsync it later.
1067 *
1068 * Create an archive notification file since the segment is known completed.
1069 */
1070 static void
XLogWalRcvClose(XLogRecPtr recptr)1071 XLogWalRcvClose(XLogRecPtr recptr)
1072 {
1073 char xlogfname[MAXFNAMELEN];
1074
1075 Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
1076
1077 /*
1078 * fsync() and close current file before we switch to next one. We would
1079 * otherwise have to reopen this file to fsync it later
1080 */
1081 XLogWalRcvFlush(false);
1082
1083 XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
1084
1085 /*
1086 * XLOG segment files will be re-read by recovery in startup process soon,
1087 * so we don't advise the OS to release cache pages associated with the
1088 * file like XLogFileClose() does.
1089 */
1090 if (close(recvFile) != 0)
1091 ereport(PANIC,
1092 (errcode_for_file_access(),
1093 errmsg("could not close log segment %s: %m",
1094 xlogfname)));
1095
1096 /*
1097 * Create .done file forcibly to prevent the streamed segment from being
1098 * archived later.
1099 */
1100 if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
1101 XLogArchiveForceDone(xlogfname);
1102 else
1103 XLogArchiveNotify(xlogfname);
1104
1105 recvFile = -1;
1106 }
1107
1108 /*
1109 * Send reply message to primary, indicating our current WAL locations, oldest
1110 * xmin and the current time.
1111 *
1112 * If 'force' is not set, the message is only sent if enough time has
1113 * passed since last status update to reach wal_receiver_status_interval.
1114 * If wal_receiver_status_interval is disabled altogether and 'force' is
1115 * false, this is a no-op.
1116 *
1117 * If 'requestReply' is true, requests the server to reply immediately upon
1118 * receiving this message. This is used for heartbearts, when approaching
1119 * wal_receiver_timeout.
1120 */
1121 static void
XLogWalRcvSendReply(bool force,bool requestReply)1122 XLogWalRcvSendReply(bool force, bool requestReply)
1123 {
1124 static XLogRecPtr writePtr = 0;
1125 static XLogRecPtr flushPtr = 0;
1126 XLogRecPtr applyPtr;
1127 static TimestampTz sendTime = 0;
1128 TimestampTz now;
1129
1130 /*
1131 * If the user doesn't want status to be reported to the master, be sure
1132 * to exit before doing anything at all.
1133 */
1134 if (!force && wal_receiver_status_interval <= 0)
1135 return;
1136
1137 /* Get current timestamp. */
1138 now = GetCurrentTimestamp();
1139
1140 /*
1141 * We can compare the write and flush positions to the last message we
1142 * sent without taking any lock, but the apply position requires a spin
1143 * lock, so we don't check that unless something else has changed or 10
1144 * seconds have passed. This means that the apply WAL location will
1145 * appear, from the master's point of view, to lag slightly, but since
1146 * this is only for reporting purposes and only on idle systems, that's
1147 * probably OK.
1148 */
1149 if (!force
1150 && writePtr == LogstreamResult.Write
1151 && flushPtr == LogstreamResult.Flush
1152 && !TimestampDifferenceExceeds(sendTime, now,
1153 wal_receiver_status_interval * 1000))
1154 return;
1155 sendTime = now;
1156
1157 /* Construct a new message */
1158 writePtr = LogstreamResult.Write;
1159 flushPtr = LogstreamResult.Flush;
1160 applyPtr = GetXLogReplayRecPtr(NULL);
1161
1162 resetStringInfo(&reply_message);
1163 pq_sendbyte(&reply_message, 'r');
1164 pq_sendint64(&reply_message, writePtr);
1165 pq_sendint64(&reply_message, flushPtr);
1166 pq_sendint64(&reply_message, applyPtr);
1167 pq_sendint64(&reply_message, GetCurrentTimestamp());
1168 pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1169
1170 /* Send it */
1171 elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1172 (uint32) (writePtr >> 32), (uint32) writePtr,
1173 (uint32) (flushPtr >> 32), (uint32) flushPtr,
1174 (uint32) (applyPtr >> 32), (uint32) applyPtr,
1175 requestReply ? " (reply requested)" : "");
1176
1177 walrcv_send(wrconn, reply_message.data, reply_message.len);
1178 }
1179
1180 /*
1181 * Send hot standby feedback message to primary, plus the current time,
1182 * in case they don't have a watch.
1183 *
1184 * If the user disables feedback, send one final message to tell sender
1185 * to forget about the xmin on this standby. We also send this message
1186 * on first connect because a previous connection might have set xmin
1187 * on a replication slot. (If we're not using a slot it's harmless to
1188 * send a feedback message explicitly setting InvalidTransactionId).
1189 */
1190 static void
XLogWalRcvSendHSFeedback(bool immed)1191 XLogWalRcvSendHSFeedback(bool immed)
1192 {
1193 TimestampTz now;
1194 TransactionId nextXid;
1195 uint32 xmin_epoch,
1196 catalog_xmin_epoch;
1197 TransactionId xmin,
1198 catalog_xmin;
1199 static TimestampTz sendTime = 0;
1200
1201 /* initially true so we always send at least one feedback message */
1202 static bool master_has_standby_xmin = true;
1203
1204 /*
1205 * If the user doesn't want status to be reported to the master, be sure
1206 * to exit before doing anything at all.
1207 */
1208 if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
1209 !master_has_standby_xmin)
1210 return;
1211
1212 /* Get current timestamp. */
1213 now = GetCurrentTimestamp();
1214
1215 if (!immed)
1216 {
1217 /*
1218 * Send feedback at most once per wal_receiver_status_interval.
1219 */
1220 if (!TimestampDifferenceExceeds(sendTime, now,
1221 wal_receiver_status_interval * 1000))
1222 return;
1223 sendTime = now;
1224 }
1225
1226 /*
1227 * If Hot Standby is not yet accepting connections there is nothing to
1228 * send. Check this after the interval has expired to reduce number of
1229 * calls.
1230 *
1231 * Bailing out here also ensures that we don't send feedback until we've
1232 * read our own replication slot state, so we don't tell the master to
1233 * discard needed xmin or catalog_xmin from any slots that may exist on
1234 * this replica.
1235 */
1236 if (!HotStandbyActive())
1237 return;
1238
1239 /*
1240 * Make the expensive call to get the oldest xmin once we are certain
1241 * everything else has been checked.
1242 */
1243 if (hot_standby_feedback)
1244 {
1245 TransactionId slot_xmin;
1246
1247 /*
1248 * Usually GetOldestXmin() would include both global replication slot
1249 * xmin and catalog_xmin in its calculations, but we want to derive
1250 * separate values for each of those. So we ask for an xmin that
1251 * excludes the catalog_xmin.
1252 */
1253 xmin = GetOldestXmin(NULL,
1254 PROCARRAY_FLAGS_DEFAULT | PROCARRAY_SLOTS_XMIN);
1255
1256 ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
1257
1258 if (TransactionIdIsValid(slot_xmin) &&
1259 TransactionIdPrecedes(slot_xmin, xmin))
1260 xmin = slot_xmin;
1261 }
1262 else
1263 {
1264 xmin = InvalidTransactionId;
1265 catalog_xmin = InvalidTransactionId;
1266 }
1267
1268 /*
1269 * Get epoch and adjust if nextXid and oldestXmin are different sides of
1270 * the epoch boundary.
1271 */
1272 GetNextXidAndEpoch(&nextXid, &xmin_epoch);
1273 catalog_xmin_epoch = xmin_epoch;
1274 if (nextXid < xmin)
1275 xmin_epoch--;
1276 if (nextXid < catalog_xmin)
1277 catalog_xmin_epoch--;
1278
1279 elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1280 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1281
1282 /* Construct the message and send it. */
1283 resetStringInfo(&reply_message);
1284 pq_sendbyte(&reply_message, 'h');
1285 pq_sendint64(&reply_message, GetCurrentTimestamp());
1286 pq_sendint32(&reply_message, xmin);
1287 pq_sendint32(&reply_message, xmin_epoch);
1288 pq_sendint32(&reply_message, catalog_xmin);
1289 pq_sendint32(&reply_message, catalog_xmin_epoch);
1290 walrcv_send(wrconn, reply_message.data, reply_message.len);
1291 if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1292 master_has_standby_xmin = true;
1293 else
1294 master_has_standby_xmin = false;
1295 }
1296
1297 /*
1298 * Update shared memory status upon receiving a message from primary.
1299 *
1300 * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
1301 * message, reported by primary.
1302 */
1303 static void
ProcessWalSndrMessage(XLogRecPtr walEnd,TimestampTz sendTime)1304 ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
1305 {
1306 WalRcvData *walrcv = WalRcv;
1307
1308 TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1309
1310 /* Update shared-memory status */
1311 SpinLockAcquire(&walrcv->mutex);
1312 if (walrcv->latestWalEnd < walEnd)
1313 walrcv->latestWalEndTime = sendTime;
1314 walrcv->latestWalEnd = walEnd;
1315 walrcv->lastMsgSendTime = sendTime;
1316 walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1317 SpinLockRelease(&walrcv->mutex);
1318
1319 if (log_min_messages <= DEBUG2)
1320 {
1321 char *sendtime;
1322 char *receipttime;
1323 int applyDelay;
1324
1325 /* Copy because timestamptz_to_str returns a static buffer */
1326 sendtime = pstrdup(timestamptz_to_str(sendTime));
1327 receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1328 applyDelay = GetReplicationApplyDelay();
1329
1330 /* apply delay is not available */
1331 if (applyDelay == -1)
1332 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1333 sendtime,
1334 receipttime,
1335 GetReplicationTransferLatency());
1336 else
1337 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1338 sendtime,
1339 receipttime,
1340 applyDelay,
1341 GetReplicationTransferLatency());
1342
1343 pfree(sendtime);
1344 pfree(receipttime);
1345 }
1346 }
1347
1348 /*
1349 * Wake up the walreceiver main loop.
1350 *
1351 * This is called by the startup process whenever interesting xlog records
1352 * are applied, so that walreceiver can check if it needs to send an apply
1353 * notification back to the master which may be waiting in a COMMIT with
1354 * synchronous_commit = remote_apply.
1355 */
1356 void
WalRcvForceReply(void)1357 WalRcvForceReply(void)
1358 {
1359 Latch *latch;
1360
1361 WalRcv->force_reply = true;
1362 /* fetching the latch pointer might not be atomic, so use spinlock */
1363 SpinLockAcquire(&WalRcv->mutex);
1364 latch = WalRcv->latch;
1365 SpinLockRelease(&WalRcv->mutex);
1366 if (latch)
1367 SetLatch(latch);
1368 }
1369
1370 /*
1371 * Return a string constant representing the state. This is used
1372 * in system functions and views, and should *not* be translated.
1373 */
1374 static const char *
WalRcvGetStateString(WalRcvState state)1375 WalRcvGetStateString(WalRcvState state)
1376 {
1377 switch (state)
1378 {
1379 case WALRCV_STOPPED:
1380 return "stopped";
1381 case WALRCV_STARTING:
1382 return "starting";
1383 case WALRCV_STREAMING:
1384 return "streaming";
1385 case WALRCV_WAITING:
1386 return "waiting";
1387 case WALRCV_RESTARTING:
1388 return "restarting";
1389 case WALRCV_STOPPING:
1390 return "stopping";
1391 }
1392 return "UNKNOWN";
1393 }
1394
1395 /*
1396 * Returns activity of WAL receiver, including pid, state and xlog locations
1397 * received from the WAL sender of another server.
1398 */
1399 Datum
pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)1400 pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
1401 {
1402 TupleDesc tupdesc;
1403 Datum *values;
1404 bool *nulls;
1405 int pid;
1406 bool ready_to_display;
1407 WalRcvState state;
1408 XLogRecPtr receive_start_lsn;
1409 TimeLineID receive_start_tli;
1410 XLogRecPtr received_lsn;
1411 TimeLineID received_tli;
1412 TimestampTz last_send_time;
1413 TimestampTz last_receipt_time;
1414 XLogRecPtr latest_end_lsn;
1415 TimestampTz latest_end_time;
1416 char sender_host[NI_MAXHOST];
1417 int sender_port = 0;
1418 char slotname[NAMEDATALEN];
1419 char conninfo[MAXCONNINFO];
1420
1421 /* Take a lock to ensure value consistency */
1422 SpinLockAcquire(&WalRcv->mutex);
1423 pid = (int) WalRcv->pid;
1424 ready_to_display = WalRcv->ready_to_display;
1425 state = WalRcv->walRcvState;
1426 receive_start_lsn = WalRcv->receiveStart;
1427 receive_start_tli = WalRcv->receiveStartTLI;
1428 received_lsn = WalRcv->receivedUpto;
1429 received_tli = WalRcv->receivedTLI;
1430 last_send_time = WalRcv->lastMsgSendTime;
1431 last_receipt_time = WalRcv->lastMsgReceiptTime;
1432 latest_end_lsn = WalRcv->latestWalEnd;
1433 latest_end_time = WalRcv->latestWalEndTime;
1434 strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1435 strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1436 sender_port = WalRcv->sender_port;
1437 strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1438 SpinLockRelease(&WalRcv->mutex);
1439
1440 /*
1441 * No WAL receiver (or not ready yet), just return a tuple with NULL
1442 * values
1443 */
1444 if (pid == 0 || !ready_to_display)
1445 PG_RETURN_NULL();
1446
1447 /* determine result type */
1448 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1449 elog(ERROR, "return type must be a row type");
1450
1451 values = palloc0(sizeof(Datum) * tupdesc->natts);
1452 nulls = palloc0(sizeof(bool) * tupdesc->natts);
1453
1454 /* Fetch values */
1455 values[0] = Int32GetDatum(pid);
1456
1457 if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
1458 {
1459 /*
1460 * Only superusers and members of pg_read_all_stats can see details.
1461 * Other users only get the pid value to know whether it is a WAL
1462 * receiver, but no details.
1463 */
1464 MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1465 }
1466 else
1467 {
1468 values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1469
1470 if (XLogRecPtrIsInvalid(receive_start_lsn))
1471 nulls[2] = true;
1472 else
1473 values[2] = LSNGetDatum(receive_start_lsn);
1474 values[3] = Int32GetDatum(receive_start_tli);
1475 if (XLogRecPtrIsInvalid(received_lsn))
1476 nulls[4] = true;
1477 else
1478 values[4] = LSNGetDatum(received_lsn);
1479 values[5] = Int32GetDatum(received_tli);
1480 if (last_send_time == 0)
1481 nulls[6] = true;
1482 else
1483 values[6] = TimestampTzGetDatum(last_send_time);
1484 if (last_receipt_time == 0)
1485 nulls[7] = true;
1486 else
1487 values[7] = TimestampTzGetDatum(last_receipt_time);
1488 if (XLogRecPtrIsInvalid(latest_end_lsn))
1489 nulls[8] = true;
1490 else
1491 values[8] = LSNGetDatum(latest_end_lsn);
1492 if (latest_end_time == 0)
1493 nulls[9] = true;
1494 else
1495 values[9] = TimestampTzGetDatum(latest_end_time);
1496 if (*slotname == '\0')
1497 nulls[10] = true;
1498 else
1499 values[10] = CStringGetTextDatum(slotname);
1500 if (*sender_host == '\0')
1501 nulls[11] = true;
1502 else
1503 values[11] = CStringGetTextDatum(sender_host);
1504 if (sender_port == 0)
1505 nulls[12] = true;
1506 else
1507 values[12] = Int32GetDatum(sender_port);
1508 if (*conninfo == '\0')
1509 nulls[13] = true;
1510 else
1511 values[13] = CStringGetTextDatum(conninfo);
1512 }
1513
1514 /* Returns the record as Datum */
1515 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1516 }
1517