1 /*-------------------------------------------------------------------------
2 *
3 * walreceiver.c
4 *
5 * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6 * is the process in the standby server that takes charge of receiving
7 * XLOG records from a primary server during streaming replication.
8 *
9 * When the startup process determines that it's time to start streaming,
10 * it instructs postmaster to start walreceiver. Walreceiver first connects
11 * to the primary server (it will be served by a walsender process
12 * in the primary server), and then keeps receiving XLOG records and
13 * writing them to the disk as long as the connection is alive. As XLOG
14 * records are received and flushed to disk, it updates the
15 * WalRcv->receivedUpto variable in shared memory, to inform the startup
16 * process of how far it can proceed with XLOG replay.
17 *
18 * If the primary server ends streaming, but doesn't disconnect, walreceiver
19 * goes into "waiting" mode, and waits for the startup process to give new
20 * instructions. The startup process will treat that the same as
21 * disconnection, and will rescan the archive/pg_wal directory. But when the
22 * startup process wants to try streaming replication again, it will just
23 * nudge the existing walreceiver process that's waiting, instead of launching
24 * a new one.
25 *
26 * Normal termination is by SIGTERM, which instructs the walreceiver to
27 * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
28 * process, the walreceiver will simply abort and exit on SIGQUIT. A close
29 * of the connection and a FATAL error are treated not as a crash but as
30 * normal operation.
31 *
32 * This file contains the server-facing parts of walreceiver. The libpq-
33 * specific parts are in the libpqwalreceiver module. It's loaded
34 * dynamically to avoid linking the server with libpq.
35 *
36 * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
37 *
38 *
39 * IDENTIFICATION
40 * src/backend/replication/walreceiver.c
41 *
42 *-------------------------------------------------------------------------
43 */
44 #include "postgres.h"
45
46 #include <signal.h>
47 #include <unistd.h>
48
49 #include "access/htup_details.h"
50 #include "access/timeline.h"
51 #include "access/transam.h"
52 #include "access/xlog_internal.h"
53 #include "catalog/pg_authid.h"
54 #include "catalog/pg_type.h"
55 #include "funcapi.h"
56 #include "libpq/pqformat.h"
57 #include "libpq/pqsignal.h"
58 #include "miscadmin.h"
59 #include "pgstat.h"
60 #include "replication/walreceiver.h"
61 #include "replication/walsender.h"
62 #include "storage/ipc.h"
63 #include "storage/pmsignal.h"
64 #include "storage/procarray.h"
65 #include "utils/builtins.h"
66 #include "utils/guc.h"
67 #include "utils/pg_lsn.h"
68 #include "utils/ps_status.h"
69 #include "utils/resowner.h"
70 #include "utils/timestamp.h"
71
72
73 /* GUC variables */
74 int wal_receiver_status_interval;
75 int wal_receiver_timeout;
76 bool hot_standby_feedback;
77
78 /* libpqwalreceiver connection */
79 static WalReceiverConn *wrconn = NULL;
80 WalReceiverFunctionsType *WalReceiverFunctions = NULL;
81
82 #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
83
84 /*
85 * These variables are used similarly to openLogFile/SegNo/Off,
86 * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
87 * corresponding the filename of recvFile.
88 */
89 static int recvFile = -1;
90 static TimeLineID recvFileTLI = 0;
91 static XLogSegNo recvSegNo = 0;
92 static uint32 recvOff = 0;
93
94 /*
95 * Flags set by interrupt handlers of walreceiver for later service in the
96 * main loop.
97 */
98 static volatile sig_atomic_t got_SIGHUP = false;
99 static volatile sig_atomic_t got_SIGTERM = false;
100
101 /*
102 * LogstreamResult indicates the byte positions that we have already
103 * written/fsynced.
104 */
105 static struct
106 {
107 XLogRecPtr Write; /* last byte + 1 written out in the standby */
108 XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
109 } LogstreamResult;
110
111 static StringInfoData reply_message;
112 static StringInfoData incoming_message;
113
114 /* Prototypes for private functions */
115 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
116 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
117 static void WalRcvDie(int code, Datum arg);
118 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
119 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
120 static void XLogWalRcvFlush(bool dying);
121 static void XLogWalRcvClose(XLogRecPtr recptr);
122 static void XLogWalRcvSendReply(bool force, bool requestReply);
123 static void XLogWalRcvSendHSFeedback(bool immed);
124 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
125
126 /* Signal handlers */
127 static void WalRcvSigHupHandler(SIGNAL_ARGS);
128 static void WalRcvSigUsr1Handler(SIGNAL_ARGS);
129 static void WalRcvShutdownHandler(SIGNAL_ARGS);
130 static void WalRcvQuickDieHandler(SIGNAL_ARGS);
131
132
133 /*
134 * Process any interrupts the walreceiver process may have received.
135 * This should be called any time the process's latch has become set.
136 *
137 * Currently, only SIGTERM is of interest. We can't just exit(1) within the
138 * SIGTERM signal handler, because the signal might arrive in the middle of
139 * some critical operation, like while we're holding a spinlock. Instead, the
140 * signal handler sets a flag variable as well as setting the process's latch.
141 * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
142 * latch has become set. Operations that could block for a long time, such as
143 * reading from a remote server, must pay attention to the latch too; see
144 * libpqrcv_PQgetResult for example.
145 */
146 void
ProcessWalRcvInterrupts(void)147 ProcessWalRcvInterrupts(void)
148 {
149 /*
150 * Although walreceiver interrupt handling doesn't use the same scheme as
151 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
152 * any incoming signals on Win32.
153 */
154 CHECK_FOR_INTERRUPTS();
155
156 if (got_SIGTERM)
157 {
158 ereport(FATAL,
159 (errcode(ERRCODE_ADMIN_SHUTDOWN),
160 errmsg("terminating walreceiver process due to administrator command")));
161 }
162 }
163
164
165 /* Main entry point for walreceiver process */
166 void
WalReceiverMain(void)167 WalReceiverMain(void)
168 {
169 char conninfo[MAXCONNINFO];
170 char *tmp_conninfo;
171 char slotname[NAMEDATALEN];
172 XLogRecPtr startpoint;
173 TimeLineID startpointTLI;
174 TimeLineID primaryTLI;
175 bool first_stream;
176 WalRcvData *walrcv = WalRcv;
177 TimestampTz last_recv_timestamp;
178 TimestampTz now;
179 bool ping_sent;
180 char *err;
181
182 /*
183 * WalRcv should be set up already (if we are a backend, we inherit this
184 * by fork() or EXEC_BACKEND mechanism from the postmaster).
185 */
186 Assert(walrcv != NULL);
187
188 now = GetCurrentTimestamp();
189
190 /*
191 * Mark walreceiver as running in shared memory.
192 *
193 * Do this as early as possible, so that if we fail later on, we'll set
194 * state to STOPPED. If we die before this, the startup process will keep
195 * waiting for us to start up, until it times out.
196 */
197 SpinLockAcquire(&walrcv->mutex);
198 Assert(walrcv->pid == 0);
199 switch (walrcv->walRcvState)
200 {
201 case WALRCV_STOPPING:
202 /* If we've already been requested to stop, don't start up. */
203 walrcv->walRcvState = WALRCV_STOPPED;
204 /* fall through */
205
206 case WALRCV_STOPPED:
207 SpinLockRelease(&walrcv->mutex);
208 proc_exit(1);
209 break;
210
211 case WALRCV_STARTING:
212 /* The usual case */
213 break;
214
215 case WALRCV_WAITING:
216 case WALRCV_STREAMING:
217 case WALRCV_RESTARTING:
218 default:
219 /* Shouldn't happen */
220 SpinLockRelease(&walrcv->mutex);
221 elog(PANIC, "walreceiver still running according to shared memory state");
222 }
223 /* Advertise our PID so that the startup process can kill us */
224 walrcv->pid = MyProcPid;
225 walrcv->walRcvState = WALRCV_STREAMING;
226
227 /* Fetch information required to start streaming */
228 walrcv->ready_to_display = false;
229 strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
230 strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
231 startpoint = walrcv->receiveStart;
232 startpointTLI = walrcv->receiveStartTLI;
233
234 /* Initialise to a sanish value */
235 walrcv->lastMsgSendTime =
236 walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
237
238 /* Report the latch to use to awaken this process */
239 walrcv->latch = &MyProc->procLatch;
240
241 SpinLockRelease(&walrcv->mutex);
242
243 /* Arrange to clean up at walreceiver exit */
244 on_shmem_exit(WalRcvDie, 0);
245
246 /* Properly accept or ignore signals the postmaster might send us */
247 pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
248 pqsignal(SIGINT, SIG_IGN);
249 pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */
250 pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
251 pqsignal(SIGALRM, SIG_IGN);
252 pqsignal(SIGPIPE, SIG_IGN);
253 pqsignal(SIGUSR1, WalRcvSigUsr1Handler);
254 pqsignal(SIGUSR2, SIG_IGN);
255
256 /* Reset some signals that are accepted by postmaster but not here */
257 pqsignal(SIGCHLD, SIG_DFL);
258 pqsignal(SIGTTIN, SIG_DFL);
259 pqsignal(SIGTTOU, SIG_DFL);
260 pqsignal(SIGCONT, SIG_DFL);
261 pqsignal(SIGWINCH, SIG_DFL);
262
263 /* We allow SIGQUIT (quickdie) at all times */
264 sigdelset(&BlockSig, SIGQUIT);
265
266 /* Load the libpq-specific functions */
267 load_file("libpqwalreceiver", false);
268 if (WalReceiverFunctions == NULL)
269 elog(ERROR, "libpqwalreceiver didn't initialize correctly");
270
271 /*
272 * Create a resource owner to keep track of our resources (not clear that
273 * we need this, but may as well have one).
274 */
275 CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
276
277 /* Unblock signals (they were blocked when the postmaster forked us) */
278 PG_SETMASK(&UnBlockSig);
279
280 /* Establish the connection to the primary for XLOG streaming */
281 wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
282 if (!wrconn)
283 ereport(ERROR,
284 (errmsg("could not connect to the primary server: %s", err)));
285
286 /*
287 * Save user-visible connection string. This clobbers the original
288 * conninfo, for security.
289 */
290 tmp_conninfo = walrcv_get_conninfo(wrconn);
291 SpinLockAcquire(&walrcv->mutex);
292 memset(walrcv->conninfo, 0, MAXCONNINFO);
293 if (tmp_conninfo)
294 strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
295 walrcv->ready_to_display = true;
296 SpinLockRelease(&walrcv->mutex);
297
298 if (tmp_conninfo)
299 pfree(tmp_conninfo);
300
301 first_stream = true;
302 for (;;)
303 {
304 char *primary_sysid;
305 char standby_sysid[32];
306 int server_version;
307 WalRcvStreamOptions options;
308
309 /*
310 * Check that we're connected to a valid server using the
311 * IDENTIFY_SYSTEM replication command.
312 */
313 primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
314 &server_version);
315
316 snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
317 GetSystemIdentifier());
318 if (strcmp(primary_sysid, standby_sysid) != 0)
319 {
320 ereport(ERROR,
321 (errmsg("database system identifier differs between the primary and standby"),
322 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
323 primary_sysid, standby_sysid)));
324 }
325
326 /*
327 * Confirm that the current timeline of the primary is the same or
328 * ahead of ours.
329 */
330 if (primaryTLI < startpointTLI)
331 ereport(ERROR,
332 (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
333 primaryTLI, startpointTLI)));
334
335 /*
336 * Get any missing history files. We do this always, even when we're
337 * not interested in that timeline, so that if we're promoted to
338 * become the master later on, we don't select the same timeline that
339 * was already used in the current master. This isn't bullet-proof -
340 * you'll need some external software to manage your cluster if you
341 * need to ensure that a unique timeline id is chosen in every case,
342 * but let's avoid the confusion of timeline id collisions where we
343 * can.
344 */
345 WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
346
347 /*
348 * Start streaming.
349 *
350 * We'll try to start at the requested starting point and timeline,
351 * even if it's different from the server's latest timeline. In case
352 * we've already reached the end of the old timeline, the server will
353 * finish the streaming immediately, and we will go back to await
354 * orders from the startup process. If recovery_target_timeline is
355 * 'latest', the startup process will scan pg_wal and find the new
356 * history file, bump recovery target timeline, and ask us to restart
357 * on the new timeline.
358 */
359 options.logical = false;
360 options.startpoint = startpoint;
361 options.slotname = slotname[0] != '\0' ? slotname : NULL;
362 options.proto.physical.startpointTLI = startpointTLI;
363 ThisTimeLineID = startpointTLI;
364 if (walrcv_startstreaming(wrconn, &options))
365 {
366 if (first_stream)
367 ereport(LOG,
368 (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
369 (uint32) (startpoint >> 32), (uint32) startpoint,
370 startpointTLI)));
371 else
372 ereport(LOG,
373 (errmsg("restarted WAL streaming at %X/%X on timeline %u",
374 (uint32) (startpoint >> 32), (uint32) startpoint,
375 startpointTLI)));
376 first_stream = false;
377
378 /* Initialize LogstreamResult and buffers for processing messages */
379 LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
380 initStringInfo(&reply_message);
381 initStringInfo(&incoming_message);
382
383 /* Initialize the last recv timestamp */
384 last_recv_timestamp = GetCurrentTimestamp();
385 ping_sent = false;
386
387 /* Loop until end-of-streaming or error */
388 for (;;)
389 {
390 char *buf;
391 int len;
392 bool endofwal = false;
393 pgsocket wait_fd = PGINVALID_SOCKET;
394 int rc;
395
396 /*
397 * Exit walreceiver if we're not in recovery. This should not
398 * happen, but cross-check the status here.
399 */
400 if (!RecoveryInProgress())
401 ereport(FATAL,
402 (errmsg("cannot continue WAL streaming, recovery has already ended")));
403
404 /* Process any requests or signals received recently */
405 ProcessWalRcvInterrupts();
406
407 if (got_SIGHUP)
408 {
409 got_SIGHUP = false;
410 ProcessConfigFile(PGC_SIGHUP);
411 XLogWalRcvSendHSFeedback(true);
412 }
413
414 /* See if we can read data immediately */
415 len = walrcv_receive(wrconn, &buf, &wait_fd);
416 if (len != 0)
417 {
418 /*
419 * Process the received data, and any subsequent data we
420 * can read without blocking.
421 */
422 for (;;)
423 {
424 if (len > 0)
425 {
426 /*
427 * Something was received from master, so reset
428 * timeout
429 */
430 last_recv_timestamp = GetCurrentTimestamp();
431 ping_sent = false;
432 XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
433 }
434 else if (len == 0)
435 break;
436 else if (len < 0)
437 {
438 ereport(LOG,
439 (errmsg("replication terminated by primary server"),
440 errdetail("End of WAL reached on timeline %u at %X/%X.",
441 startpointTLI,
442 (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
443 endofwal = true;
444 break;
445 }
446 len = walrcv_receive(wrconn, &buf, &wait_fd);
447 }
448
449 /* Let the master know that we received some data. */
450 XLogWalRcvSendReply(false, false);
451
452 /*
453 * If we've written some records, flush them to disk and
454 * let the startup process and primary server know about
455 * them.
456 */
457 XLogWalRcvFlush(false);
458 }
459
460 /* Check if we need to exit the streaming loop. */
461 if (endofwal)
462 break;
463
464 /*
465 * Ideally we would reuse a WaitEventSet object repeatedly
466 * here to avoid the overheads of WaitLatchOrSocket on epoll
467 * systems, but we can't be sure that libpq (or any other
468 * walreceiver implementation) has the same socket (even if
469 * the fd is the same number, it may have been closed and
470 * reopened since the last time). In future, if there is a
471 * function for removing sockets from WaitEventSet, then we
472 * could add and remove just the socket each time, potentially
473 * avoiding some system calls.
474 */
475 Assert(wait_fd != PGINVALID_SOCKET);
476 rc = WaitLatchOrSocket(walrcv->latch,
477 WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
478 WL_TIMEOUT | WL_LATCH_SET,
479 wait_fd,
480 NAPTIME_PER_CYCLE,
481 WAIT_EVENT_WAL_RECEIVER_MAIN);
482 if (rc & WL_LATCH_SET)
483 {
484 ResetLatch(walrcv->latch);
485 ProcessWalRcvInterrupts();
486
487 if (walrcv->force_reply)
488 {
489 /*
490 * The recovery process has asked us to send apply
491 * feedback now. Make sure the flag is really set to
492 * false in shared memory before sending the reply, so
493 * we don't miss a new request for a reply.
494 */
495 walrcv->force_reply = false;
496 pg_memory_barrier();
497 XLogWalRcvSendReply(true, false);
498 }
499 }
500 if (rc & WL_POSTMASTER_DEATH)
501 {
502 /*
503 * Emergency bailout if postmaster has died. This is to
504 * avoid the necessity for manual cleanup of all
505 * postmaster children.
506 */
507 exit(1);
508 }
509 if (rc & WL_TIMEOUT)
510 {
511 /*
512 * We didn't receive anything new. If we haven't heard
513 * anything from the server for more than
514 * wal_receiver_timeout / 2, ping the server. Also, if
515 * it's been longer than wal_receiver_status_interval
516 * since the last update we sent, send a status update to
517 * the master anyway, to report any progress in applying
518 * WAL.
519 */
520 bool requestReply = false;
521
522 /*
523 * Check if time since last receive from standby has
524 * reached the configured limit.
525 */
526 if (wal_receiver_timeout > 0)
527 {
528 TimestampTz now = GetCurrentTimestamp();
529 TimestampTz timeout;
530
531 timeout =
532 TimestampTzPlusMilliseconds(last_recv_timestamp,
533 wal_receiver_timeout);
534
535 if (now >= timeout)
536 ereport(ERROR,
537 (errmsg("terminating walreceiver due to timeout")));
538
539 /*
540 * We didn't receive anything new, for half of
541 * receiver replication timeout. Ping the server.
542 */
543 if (!ping_sent)
544 {
545 timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
546 (wal_receiver_timeout / 2));
547 if (now >= timeout)
548 {
549 requestReply = true;
550 ping_sent = true;
551 }
552 }
553 }
554
555 XLogWalRcvSendReply(requestReply, requestReply);
556 XLogWalRcvSendHSFeedback(false);
557 }
558 }
559
560 /*
561 * The backend finished streaming. Exit streaming COPY-mode from
562 * our side, too.
563 */
564 walrcv_endstreaming(wrconn, &primaryTLI);
565
566 /*
567 * If the server had switched to a new timeline that we didn't
568 * know about when we began streaming, fetch its timeline history
569 * file now.
570 */
571 WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
572 }
573 else
574 ereport(LOG,
575 (errmsg("primary server contains no more WAL on requested timeline %u",
576 startpointTLI)));
577
578 /*
579 * End of WAL reached on the requested timeline. Close the last
580 * segment, and await for new orders from the startup process.
581 */
582 if (recvFile >= 0)
583 {
584 char xlogfname[MAXFNAMELEN];
585
586 XLogWalRcvFlush(false);
587 if (close(recvFile) != 0)
588 ereport(PANIC,
589 (errcode_for_file_access(),
590 errmsg("could not close log segment %s: %m",
591 XLogFileNameP(recvFileTLI, recvSegNo))));
592
593 /*
594 * Create .done file forcibly to prevent the streamed segment from
595 * being archived later.
596 */
597 XLogFileName(xlogfname, recvFileTLI, recvSegNo);
598 if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
599 XLogArchiveForceDone(xlogfname);
600 else
601 XLogArchiveNotify(xlogfname);
602 }
603 recvFile = -1;
604
605 elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
606 WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
607 }
608 /* not reached */
609 }
610
611 /*
612 * Wait for startup process to set receiveStart and receiveStartTLI.
613 */
614 static void
WalRcvWaitForStartPosition(XLogRecPtr * startpoint,TimeLineID * startpointTLI)615 WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
616 {
617 WalRcvData *walrcv = WalRcv;
618 int state;
619
620 SpinLockAcquire(&walrcv->mutex);
621 state = walrcv->walRcvState;
622 if (state != WALRCV_STREAMING)
623 {
624 SpinLockRelease(&walrcv->mutex);
625 if (state == WALRCV_STOPPING)
626 proc_exit(0);
627 else
628 elog(FATAL, "unexpected walreceiver state");
629 }
630 walrcv->walRcvState = WALRCV_WAITING;
631 walrcv->receiveStart = InvalidXLogRecPtr;
632 walrcv->receiveStartTLI = 0;
633 SpinLockRelease(&walrcv->mutex);
634
635 if (update_process_title)
636 set_ps_display("idle", false);
637
638 /*
639 * nudge startup process to notice that we've stopped streaming and are
640 * now waiting for instructions.
641 */
642 WakeupRecovery();
643 for (;;)
644 {
645 ResetLatch(walrcv->latch);
646
647 /*
648 * Emergency bailout if postmaster has died. This is to avoid the
649 * necessity for manual cleanup of all postmaster children.
650 */
651 if (!PostmasterIsAlive())
652 exit(1);
653
654 ProcessWalRcvInterrupts();
655
656 SpinLockAcquire(&walrcv->mutex);
657 Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
658 walrcv->walRcvState == WALRCV_WAITING ||
659 walrcv->walRcvState == WALRCV_STOPPING);
660 if (walrcv->walRcvState == WALRCV_RESTARTING)
661 {
662 /* we don't expect primary_conninfo to change */
663 *startpoint = walrcv->receiveStart;
664 *startpointTLI = walrcv->receiveStartTLI;
665 walrcv->walRcvState = WALRCV_STREAMING;
666 SpinLockRelease(&walrcv->mutex);
667 break;
668 }
669 if (walrcv->walRcvState == WALRCV_STOPPING)
670 {
671 /*
672 * We should've received SIGTERM if the startup process wants us
673 * to die, but might as well check it here too.
674 */
675 SpinLockRelease(&walrcv->mutex);
676 exit(1);
677 }
678 SpinLockRelease(&walrcv->mutex);
679
680 WaitLatch(walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
681 WAIT_EVENT_WAL_RECEIVER_WAIT_START);
682 }
683
684 if (update_process_title)
685 {
686 char activitymsg[50];
687
688 snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
689 (uint32) (*startpoint >> 32),
690 (uint32) *startpoint);
691 set_ps_display(activitymsg, false);
692 }
693 }
694
695 /*
696 * Fetch any missing timeline history files between 'first' and 'last'
697 * (inclusive) from the server.
698 */
699 static void
WalRcvFetchTimeLineHistoryFiles(TimeLineID first,TimeLineID last)700 WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
701 {
702 TimeLineID tli;
703
704 for (tli = first; tli <= last; tli++)
705 {
706 /* there's no history file for timeline 1 */
707 if (tli != 1 && !existsTimeLineHistory(tli))
708 {
709 char *fname;
710 char *content;
711 int len;
712 char expectedfname[MAXFNAMELEN];
713
714 ereport(LOG,
715 (errmsg("fetching timeline history file for timeline %u from primary server",
716 tli)));
717
718 walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
719
720 /*
721 * Check that the filename on the master matches what we
722 * calculated ourselves. This is just a sanity check, it should
723 * always match.
724 */
725 TLHistoryFileName(expectedfname, tli);
726 if (strcmp(fname, expectedfname) != 0)
727 ereport(ERROR,
728 (errcode(ERRCODE_PROTOCOL_VIOLATION),
729 errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
730 tli)));
731
732 /*
733 * Write the file to pg_wal.
734 */
735 writeTimeLineHistoryFile(tli, content, len);
736
737 /*
738 * Mark the streamed history file as ready for archiving
739 * if archive_mode is always.
740 */
741 if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
742 XLogArchiveForceDone(fname);
743 else
744 XLogArchiveNotify(fname);
745
746 pfree(fname);
747 pfree(content);
748 }
749 }
750 }
751
752 /*
753 * Mark us as STOPPED in shared memory at exit.
754 */
755 static void
WalRcvDie(int code,Datum arg)756 WalRcvDie(int code, Datum arg)
757 {
758 WalRcvData *walrcv = WalRcv;
759
760 /* Ensure that all WAL records received are flushed to disk */
761 XLogWalRcvFlush(true);
762
763 /* Mark ourselves inactive in shared memory */
764 SpinLockAcquire(&walrcv->mutex);
765 Assert(walrcv->walRcvState == WALRCV_STREAMING ||
766 walrcv->walRcvState == WALRCV_RESTARTING ||
767 walrcv->walRcvState == WALRCV_STARTING ||
768 walrcv->walRcvState == WALRCV_WAITING ||
769 walrcv->walRcvState == WALRCV_STOPPING);
770 Assert(walrcv->pid == MyProcPid);
771 walrcv->walRcvState = WALRCV_STOPPED;
772 walrcv->pid = 0;
773 walrcv->ready_to_display = false;
774 walrcv->latch = NULL;
775 SpinLockRelease(&walrcv->mutex);
776
777 /* Terminate the connection gracefully. */
778 if (wrconn != NULL)
779 walrcv_disconnect(wrconn);
780
781 /* Wake up the startup process to notice promptly that we're gone */
782 WakeupRecovery();
783 }
784
785 /* SIGHUP: set flag to re-read config file at next convenient time */
786 static void
WalRcvSigHupHandler(SIGNAL_ARGS)787 WalRcvSigHupHandler(SIGNAL_ARGS)
788 {
789 got_SIGHUP = true;
790 }
791
792
793 /* SIGUSR1: used by latch mechanism */
794 static void
WalRcvSigUsr1Handler(SIGNAL_ARGS)795 WalRcvSigUsr1Handler(SIGNAL_ARGS)
796 {
797 int save_errno = errno;
798
799 latch_sigusr1_handler();
800
801 errno = save_errno;
802 }
803
804 /* SIGTERM: set flag for ProcessWalRcvInterrupts */
805 static void
WalRcvShutdownHandler(SIGNAL_ARGS)806 WalRcvShutdownHandler(SIGNAL_ARGS)
807 {
808 int save_errno = errno;
809
810 got_SIGTERM = true;
811
812 if (WalRcv->latch)
813 SetLatch(WalRcv->latch);
814
815 errno = save_errno;
816 }
817
818 /*
819 * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
820 *
821 * Some backend has bought the farm, so we need to stop what we're doing and
822 * exit.
823 */
824 static void
WalRcvQuickDieHandler(SIGNAL_ARGS)825 WalRcvQuickDieHandler(SIGNAL_ARGS)
826 {
827 /*
828 * We DO NOT want to run proc_exit() or atexit() callbacks -- we're here
829 * because shared memory may be corrupted, so we don't want to try to
830 * clean up our transaction. Just nail the windows shut and get out of
831 * town. The callbacks wouldn't be safe to run from a signal handler,
832 * anyway.
833 *
834 * Note we use _exit(2) not _exit(0). This is to force the postmaster
835 * into a system reset cycle if someone sends a manual SIGQUIT to a
836 * random backend. This is necessary precisely because we don't clean up
837 * our shared memory state. (The "dead man switch" mechanism in
838 * pmsignal.c should ensure the postmaster sees this as a crash, too, but
839 * no harm in being doubly sure.)
840 */
841 _exit(2);
842 }
843
844 /*
845 * Accept the message from XLOG stream, and process it.
846 */
847 static void
XLogWalRcvProcessMsg(unsigned char type,char * buf,Size len)848 XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
849 {
850 int hdrlen;
851 XLogRecPtr dataStart;
852 XLogRecPtr walEnd;
853 TimestampTz sendTime;
854 bool replyRequested;
855
856 resetStringInfo(&incoming_message);
857
858 switch (type)
859 {
860 case 'w': /* WAL records */
861 {
862 /* copy message to StringInfo */
863 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
864 if (len < hdrlen)
865 ereport(ERROR,
866 (errcode(ERRCODE_PROTOCOL_VIOLATION),
867 errmsg_internal("invalid WAL message received from primary")));
868 appendBinaryStringInfo(&incoming_message, buf, hdrlen);
869
870 /* read the fields */
871 dataStart = pq_getmsgint64(&incoming_message);
872 walEnd = pq_getmsgint64(&incoming_message);
873 sendTime = pq_getmsgint64(&incoming_message);
874 ProcessWalSndrMessage(walEnd, sendTime);
875
876 buf += hdrlen;
877 len -= hdrlen;
878 XLogWalRcvWrite(buf, len, dataStart);
879 break;
880 }
881 case 'k': /* Keepalive */
882 {
883 /* copy message to StringInfo */
884 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
885 if (len != hdrlen)
886 ereport(ERROR,
887 (errcode(ERRCODE_PROTOCOL_VIOLATION),
888 errmsg_internal("invalid keepalive message received from primary")));
889 appendBinaryStringInfo(&incoming_message, buf, hdrlen);
890
891 /* read the fields */
892 walEnd = pq_getmsgint64(&incoming_message);
893 sendTime = pq_getmsgint64(&incoming_message);
894 replyRequested = pq_getmsgbyte(&incoming_message);
895
896 ProcessWalSndrMessage(walEnd, sendTime);
897
898 /* If the primary requested a reply, send one immediately */
899 if (replyRequested)
900 XLogWalRcvSendReply(true, false);
901 break;
902 }
903 default:
904 ereport(ERROR,
905 (errcode(ERRCODE_PROTOCOL_VIOLATION),
906 errmsg_internal("invalid replication message type %d",
907 type)));
908 }
909 }
910
911 /*
912 * Write XLOG data to disk.
913 */
914 static void
XLogWalRcvWrite(char * buf,Size nbytes,XLogRecPtr recptr)915 XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
916 {
917 int startoff;
918 int byteswritten;
919
920 while (nbytes > 0)
921 {
922 int segbytes;
923
924 /* Close the current segment if it's completed */
925 if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo))
926 XLogWalRcvClose(recptr);
927
928 if (recvFile < 0)
929 {
930 bool use_existent = true;
931
932 /* Create/use new log file */
933 XLByteToSeg(recptr, recvSegNo);
934 recvFile = XLogFileInit(recvSegNo, &use_existent, true);
935 recvFileTLI = ThisTimeLineID;
936 recvOff = 0;
937 }
938
939 /* Calculate the start offset of the received logs */
940 startoff = recptr % XLogSegSize;
941
942 if (startoff + nbytes > XLogSegSize)
943 segbytes = XLogSegSize - startoff;
944 else
945 segbytes = nbytes;
946
947 /* Need to seek in the file? */
948 if (recvOff != startoff)
949 {
950 if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
951 ereport(PANIC,
952 (errcode_for_file_access(),
953 errmsg("could not seek in log segment %s to offset %u: %m",
954 XLogFileNameP(recvFileTLI, recvSegNo),
955 startoff)));
956 recvOff = startoff;
957 }
958
959 /* OK to write the logs */
960 errno = 0;
961
962 byteswritten = write(recvFile, buf, segbytes);
963 if (byteswritten <= 0)
964 {
965 /* if write didn't set errno, assume no disk space */
966 if (errno == 0)
967 errno = ENOSPC;
968 ereport(PANIC,
969 (errcode_for_file_access(),
970 errmsg("could not write to log segment %s "
971 "at offset %u, length %lu: %m",
972 XLogFileNameP(recvFileTLI, recvSegNo),
973 recvOff, (unsigned long) segbytes)));
974 }
975
976 /* Update state for write */
977 recptr += byteswritten;
978
979 recvOff += byteswritten;
980 nbytes -= byteswritten;
981 buf += byteswritten;
982
983 LogstreamResult.Write = recptr;
984 }
985
986 /*
987 * Close the current segment if it's fully written up in the last cycle of
988 * the loop, to create its archive notification file soon. Otherwise WAL
989 * archiving of the segment will be delayed until any data in the next
990 * segment is received and written.
991 */
992 if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo))
993 XLogWalRcvClose(recptr);
994 }
995
996 /*
997 * Flush the log to disk.
998 *
999 * If we're in the midst of dying, it's unwise to do anything that might throw
1000 * an error, so we skip sending a reply in that case.
1001 */
1002 static void
XLogWalRcvFlush(bool dying)1003 XLogWalRcvFlush(bool dying)
1004 {
1005 if (LogstreamResult.Flush < LogstreamResult.Write)
1006 {
1007 WalRcvData *walrcv = WalRcv;
1008
1009 issue_xlog_fsync(recvFile, recvSegNo);
1010
1011 LogstreamResult.Flush = LogstreamResult.Write;
1012
1013 /* Update shared-memory status */
1014 SpinLockAcquire(&walrcv->mutex);
1015 if (walrcv->receivedUpto < LogstreamResult.Flush)
1016 {
1017 walrcv->latestChunkStart = walrcv->receivedUpto;
1018 walrcv->receivedUpto = LogstreamResult.Flush;
1019 walrcv->receivedTLI = ThisTimeLineID;
1020 }
1021 SpinLockRelease(&walrcv->mutex);
1022
1023 /* Signal the startup process and walsender that new WAL has arrived */
1024 WakeupRecovery();
1025 if (AllowCascadeReplication())
1026 WalSndWakeup();
1027
1028 /* Report XLOG streaming progress in PS display */
1029 if (update_process_title)
1030 {
1031 char activitymsg[50];
1032
1033 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1034 (uint32) (LogstreamResult.Write >> 32),
1035 (uint32) LogstreamResult.Write);
1036 set_ps_display(activitymsg, false);
1037 }
1038
1039 /* Also let the master know that we made some progress */
1040 if (!dying)
1041 {
1042 XLogWalRcvSendReply(false, false);
1043 XLogWalRcvSendHSFeedback(false);
1044 }
1045 }
1046 }
1047
1048 /*
1049 * Close the current segment.
1050 *
1051 * Flush the segment to disk before closing it. Otherwise we have to
1052 * reopen and fsync it later.
1053 *
1054 * Create an archive notification file since the segment is known completed.
1055 */
1056 static void
XLogWalRcvClose(XLogRecPtr recptr)1057 XLogWalRcvClose(XLogRecPtr recptr)
1058 {
1059 char xlogfname[MAXFNAMELEN];
1060
1061 Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo));
1062
1063 /*
1064 * fsync() and close current file before we switch to next one. We would
1065 * otherwise have to reopen this file to fsync it later
1066 */
1067 XLogWalRcvFlush(false);
1068
1069 XLogFileName(xlogfname, recvFileTLI, recvSegNo);
1070
1071 /*
1072 * XLOG segment files will be re-read by recovery in startup process soon,
1073 * so we don't advise the OS to release cache pages associated with the
1074 * file like XLogFileClose() does.
1075 */
1076 if (close(recvFile) != 0)
1077 ereport(PANIC,
1078 (errcode_for_file_access(),
1079 errmsg("could not close log segment %s: %m",
1080 xlogfname)));
1081
1082 /*
1083 * Create .done file forcibly to prevent the streamed segment from being
1084 * archived later.
1085 */
1086 if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
1087 XLogArchiveForceDone(xlogfname);
1088 else
1089 XLogArchiveNotify(xlogfname);
1090
1091 recvFile = -1;
1092 }
1093
1094 /*
1095 * Send reply message to primary, indicating our current WAL locations, oldest
1096 * xmin and the current time.
1097 *
1098 * If 'force' is not set, the message is only sent if enough time has
1099 * passed since last status update to reach wal_receiver_status_interval.
1100 * If wal_receiver_status_interval is disabled altogether and 'force' is
1101 * false, this is a no-op.
1102 *
1103 * If 'requestReply' is true, requests the server to reply immediately upon
1104 * receiving this message. This is used for heartbearts, when approaching
1105 * wal_receiver_timeout.
1106 */
1107 static void
XLogWalRcvSendReply(bool force,bool requestReply)1108 XLogWalRcvSendReply(bool force, bool requestReply)
1109 {
1110 static XLogRecPtr writePtr = 0;
1111 static XLogRecPtr flushPtr = 0;
1112 XLogRecPtr applyPtr;
1113 static TimestampTz sendTime = 0;
1114 TimestampTz now;
1115
1116 /*
1117 * If the user doesn't want status to be reported to the master, be sure
1118 * to exit before doing anything at all.
1119 */
1120 if (!force && wal_receiver_status_interval <= 0)
1121 return;
1122
1123 /* Get current timestamp. */
1124 now = GetCurrentTimestamp();
1125
1126 /*
1127 * We can compare the write and flush positions to the last message we
1128 * sent without taking any lock, but the apply position requires a spin
1129 * lock, so we don't check that unless something else has changed or 10
1130 * seconds have passed. This means that the apply WAL location will
1131 * appear, from the master's point of view, to lag slightly, but since
1132 * this is only for reporting purposes and only on idle systems, that's
1133 * probably OK.
1134 */
1135 if (!force
1136 && writePtr == LogstreamResult.Write
1137 && flushPtr == LogstreamResult.Flush
1138 && !TimestampDifferenceExceeds(sendTime, now,
1139 wal_receiver_status_interval * 1000))
1140 return;
1141 sendTime = now;
1142
1143 /* Construct a new message */
1144 writePtr = LogstreamResult.Write;
1145 flushPtr = LogstreamResult.Flush;
1146 applyPtr = GetXLogReplayRecPtr(NULL);
1147
1148 resetStringInfo(&reply_message);
1149 pq_sendbyte(&reply_message, 'r');
1150 pq_sendint64(&reply_message, writePtr);
1151 pq_sendint64(&reply_message, flushPtr);
1152 pq_sendint64(&reply_message, applyPtr);
1153 pq_sendint64(&reply_message, GetCurrentTimestamp());
1154 pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1155
1156 /* Send it */
1157 elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1158 (uint32) (writePtr >> 32), (uint32) writePtr,
1159 (uint32) (flushPtr >> 32), (uint32) flushPtr,
1160 (uint32) (applyPtr >> 32), (uint32) applyPtr,
1161 requestReply ? " (reply requested)" : "");
1162
1163 walrcv_send(wrconn, reply_message.data, reply_message.len);
1164 }
1165
1166 /*
1167 * Send hot standby feedback message to primary, plus the current time,
1168 * in case they don't have a watch.
1169 *
1170 * If the user disables feedback, send one final message to tell sender
1171 * to forget about the xmin on this standby. We also send this message
1172 * on first connect because a previous connection might have set xmin
1173 * on a replication slot. (If we're not using a slot it's harmless to
1174 * send a feedback message explicitly setting InvalidTransactionId).
1175 */
1176 static void
XLogWalRcvSendHSFeedback(bool immed)1177 XLogWalRcvSendHSFeedback(bool immed)
1178 {
1179 TimestampTz now;
1180 TransactionId nextXid;
1181 uint32 xmin_epoch,
1182 catalog_xmin_epoch;
1183 TransactionId xmin,
1184 catalog_xmin;
1185 static TimestampTz sendTime = 0;
1186
1187 /* initially true so we always send at least one feedback message */
1188 static bool master_has_standby_xmin = true;
1189
1190 /*
1191 * If the user doesn't want status to be reported to the master, be sure
1192 * to exit before doing anything at all.
1193 */
1194 if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
1195 !master_has_standby_xmin)
1196 return;
1197
1198 /* Get current timestamp. */
1199 now = GetCurrentTimestamp();
1200
1201 if (!immed)
1202 {
1203 /*
1204 * Send feedback at most once per wal_receiver_status_interval.
1205 */
1206 if (!TimestampDifferenceExceeds(sendTime, now,
1207 wal_receiver_status_interval * 1000))
1208 return;
1209 sendTime = now;
1210 }
1211
1212 /*
1213 * If Hot Standby is not yet accepting connections there is nothing to
1214 * send. Check this after the interval has expired to reduce number of
1215 * calls.
1216 *
1217 * Bailing out here also ensures that we don't send feedback until we've
1218 * read our own replication slot state, so we don't tell the master to
1219 * discard needed xmin or catalog_xmin from any slots that may exist on
1220 * this replica.
1221 */
1222 if (!HotStandbyActive())
1223 return;
1224
1225 /*
1226 * Make the expensive call to get the oldest xmin once we are certain
1227 * everything else has been checked.
1228 */
1229 if (hot_standby_feedback)
1230 {
1231 TransactionId slot_xmin;
1232
1233 /*
1234 * Usually GetOldestXmin() would include both global replication slot
1235 * xmin and catalog_xmin in its calculations, but we want to derive
1236 * separate values for each of those. So we ask for an xmin that
1237 * excludes the catalog_xmin.
1238 */
1239 xmin = GetOldestXmin(NULL,
1240 PROCARRAY_FLAGS_DEFAULT | PROCARRAY_SLOTS_XMIN);
1241
1242 ProcArrayGetReplicationSlotXmin(&slot_xmin, &catalog_xmin);
1243
1244 if (TransactionIdIsValid(slot_xmin) &&
1245 TransactionIdPrecedes(slot_xmin, xmin))
1246 xmin = slot_xmin;
1247 }
1248 else
1249 {
1250 xmin = InvalidTransactionId;
1251 catalog_xmin = InvalidTransactionId;
1252 }
1253
1254 /*
1255 * Get epoch and adjust if nextXid and oldestXmin are different sides of
1256 * the epoch boundary.
1257 */
1258 GetNextXidAndEpoch(&nextXid, &xmin_epoch);
1259 catalog_xmin_epoch = xmin_epoch;
1260 if (nextXid < xmin)
1261 xmin_epoch--;
1262 if (nextXid < catalog_xmin)
1263 catalog_xmin_epoch--;
1264
1265 elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1266 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1267
1268 /* Construct the message and send it. */
1269 resetStringInfo(&reply_message);
1270 pq_sendbyte(&reply_message, 'h');
1271 pq_sendint64(&reply_message, GetCurrentTimestamp());
1272 pq_sendint(&reply_message, xmin, 4);
1273 pq_sendint(&reply_message, xmin_epoch, 4);
1274 pq_sendint(&reply_message, catalog_xmin, 4);
1275 pq_sendint(&reply_message, catalog_xmin_epoch, 4);
1276 walrcv_send(wrconn, reply_message.data, reply_message.len);
1277 if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1278 master_has_standby_xmin = true;
1279 else
1280 master_has_standby_xmin = false;
1281 }
1282
1283 /*
1284 * Update shared memory status upon receiving a message from primary.
1285 *
1286 * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
1287 * message, reported by primary.
1288 */
1289 static void
ProcessWalSndrMessage(XLogRecPtr walEnd,TimestampTz sendTime)1290 ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
1291 {
1292 WalRcvData *walrcv = WalRcv;
1293
1294 TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1295
1296 /* Update shared-memory status */
1297 SpinLockAcquire(&walrcv->mutex);
1298 if (walrcv->latestWalEnd < walEnd)
1299 walrcv->latestWalEndTime = sendTime;
1300 walrcv->latestWalEnd = walEnd;
1301 walrcv->lastMsgSendTime = sendTime;
1302 walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1303 SpinLockRelease(&walrcv->mutex);
1304
1305 if (log_min_messages <= DEBUG2)
1306 {
1307 char *sendtime;
1308 char *receipttime;
1309 int applyDelay;
1310
1311 /* Copy because timestamptz_to_str returns a static buffer */
1312 sendtime = pstrdup(timestamptz_to_str(sendTime));
1313 receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1314 applyDelay = GetReplicationApplyDelay();
1315
1316 /* apply delay is not available */
1317 if (applyDelay == -1)
1318 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1319 sendtime,
1320 receipttime,
1321 GetReplicationTransferLatency());
1322 else
1323 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1324 sendtime,
1325 receipttime,
1326 applyDelay,
1327 GetReplicationTransferLatency());
1328
1329 pfree(sendtime);
1330 pfree(receipttime);
1331 }
1332 }
1333
1334 /*
1335 * Wake up the walreceiver main loop.
1336 *
1337 * This is called by the startup process whenever interesting xlog records
1338 * are applied, so that walreceiver can check if it needs to send an apply
1339 * notification back to the master which may be waiting in a COMMIT with
1340 * synchronous_commit = remote_apply.
1341 */
1342 void
WalRcvForceReply(void)1343 WalRcvForceReply(void)
1344 {
1345 Latch *latch;
1346
1347 WalRcv->force_reply = true;
1348 /* fetching the latch pointer might not be atomic, so use spinlock */
1349 SpinLockAcquire(&WalRcv->mutex);
1350 latch = WalRcv->latch;
1351 SpinLockRelease(&WalRcv->mutex);
1352 if (latch)
1353 SetLatch(latch);
1354 }
1355
1356 /*
1357 * Return a string constant representing the state. This is used
1358 * in system functions and views, and should *not* be translated.
1359 */
1360 static const char *
WalRcvGetStateString(WalRcvState state)1361 WalRcvGetStateString(WalRcvState state)
1362 {
1363 switch (state)
1364 {
1365 case WALRCV_STOPPED:
1366 return "stopped";
1367 case WALRCV_STARTING:
1368 return "starting";
1369 case WALRCV_STREAMING:
1370 return "streaming";
1371 case WALRCV_WAITING:
1372 return "waiting";
1373 case WALRCV_RESTARTING:
1374 return "restarting";
1375 case WALRCV_STOPPING:
1376 return "stopping";
1377 }
1378 return "UNKNOWN";
1379 }
1380
1381 /*
1382 * Returns activity of WAL receiver, including pid, state and xlog locations
1383 * received from the WAL sender of another server.
1384 */
1385 Datum
pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)1386 pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
1387 {
1388 TupleDesc tupdesc;
1389 Datum *values;
1390 bool *nulls;
1391 int pid;
1392 bool ready_to_display;
1393 WalRcvState state;
1394 XLogRecPtr receive_start_lsn;
1395 TimeLineID receive_start_tli;
1396 XLogRecPtr received_lsn;
1397 TimeLineID received_tli;
1398 TimestampTz last_send_time;
1399 TimestampTz last_receipt_time;
1400 XLogRecPtr latest_end_lsn;
1401 TimestampTz latest_end_time;
1402 char slotname[NAMEDATALEN];
1403 char conninfo[MAXCONNINFO];
1404
1405 /* Take a lock to ensure value consistency */
1406 SpinLockAcquire(&WalRcv->mutex);
1407 pid = (int) WalRcv->pid;
1408 ready_to_display = WalRcv->ready_to_display;
1409 state = WalRcv->walRcvState;
1410 receive_start_lsn = WalRcv->receiveStart;
1411 receive_start_tli = WalRcv->receiveStartTLI;
1412 received_lsn = WalRcv->receivedUpto;
1413 received_tli = WalRcv->receivedTLI;
1414 last_send_time = WalRcv->lastMsgSendTime;
1415 last_receipt_time = WalRcv->lastMsgReceiptTime;
1416 latest_end_lsn = WalRcv->latestWalEnd;
1417 latest_end_time = WalRcv->latestWalEndTime;
1418 strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1419 strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1420 SpinLockRelease(&WalRcv->mutex);
1421
1422 /*
1423 * No WAL receiver (or not ready yet), just return a tuple with NULL
1424 * values
1425 */
1426 if (pid == 0 || !ready_to_display)
1427 PG_RETURN_NULL();
1428
1429 /* determine result type */
1430 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1431 elog(ERROR, "return type must be a row type");
1432
1433 values = palloc0(sizeof(Datum) * tupdesc->natts);
1434 nulls = palloc0(sizeof(bool) * tupdesc->natts);
1435
1436 /* Fetch values */
1437 values[0] = Int32GetDatum(pid);
1438
1439 if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
1440 {
1441 /*
1442 * Only superusers and members of pg_read_all_stats can see details.
1443 * Other users only get the pid value
1444 * to know whether it is a WAL receiver, but no details.
1445 */
1446 MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1447 }
1448 else
1449 {
1450 values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1451
1452 if (XLogRecPtrIsInvalid(receive_start_lsn))
1453 nulls[2] = true;
1454 else
1455 values[2] = LSNGetDatum(receive_start_lsn);
1456 values[3] = Int32GetDatum(receive_start_tli);
1457 if (XLogRecPtrIsInvalid(received_lsn))
1458 nulls[4] = true;
1459 else
1460 values[4] = LSNGetDatum(received_lsn);
1461 values[5] = Int32GetDatum(received_tli);
1462 if (last_send_time == 0)
1463 nulls[6] = true;
1464 else
1465 values[6] = TimestampTzGetDatum(last_send_time);
1466 if (last_receipt_time == 0)
1467 nulls[7] = true;
1468 else
1469 values[7] = TimestampTzGetDatum(last_receipt_time);
1470 if (XLogRecPtrIsInvalid(latest_end_lsn))
1471 nulls[8] = true;
1472 else
1473 values[8] = LSNGetDatum(latest_end_lsn);
1474 if (latest_end_time == 0)
1475 nulls[9] = true;
1476 else
1477 values[9] = TimestampTzGetDatum(latest_end_time);
1478 if (*slotname == '\0')
1479 nulls[10] = true;
1480 else
1481 values[10] = CStringGetTextDatum(slotname);
1482 if (*conninfo == '\0')
1483 nulls[11] = true;
1484 else
1485 values[11] = CStringGetTextDatum(conninfo);
1486 }
1487
1488 /* Returns the record as Datum */
1489 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1490 }
1491