1 /*-------------------------------------------------------------------------
2 *
3 * walreceiver.c
4 *
5 * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6 * is the process in the standby server that takes charge of receiving
7 * XLOG records from a primary server during streaming replication.
8 *
9 * When the startup process determines that it's time to start streaming,
10 * it instructs postmaster to start walreceiver. Walreceiver first connects
11 * to the primary server (it will be served by a walsender process
12 * in the primary server), and then keeps receiving XLOG records and
13 * writing them to the disk as long as the connection is alive. As XLOG
14 * records are received and flushed to disk, it updates the
15 * WalRcv->flushedUpto variable in shared memory, to inform the startup
16 * process of how far it can proceed with XLOG replay.
17 *
18 * A WAL receiver cannot directly load GUC parameters used when establishing
19 * its connection to the primary. Instead it relies on parameter values
20 * that are passed down by the startup process when streaming is requested.
21 * This applies, for example, to the replication slot and the connection
22 * string to be used for the connection with the primary.
23 *
24 * If the primary server ends streaming, but doesn't disconnect, walreceiver
25 * goes into "waiting" mode, and waits for the startup process to give new
26 * instructions. The startup process will treat that the same as
27 * disconnection, and will rescan the archive/pg_wal directory. But when the
28 * startup process wants to try streaming replication again, it will just
29 * nudge the existing walreceiver process that's waiting, instead of launching
30 * a new one.
31 *
32 * Normal termination is by SIGTERM, which instructs the walreceiver to
33 * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
34 * process, the walreceiver will simply abort and exit on SIGQUIT. A close
35 * of the connection and a FATAL error are treated not as a crash but as
36 * normal operation.
37 *
38 * This file contains the server-facing parts of walreceiver. The libpq-
39 * specific parts are in the libpqwalreceiver module. It's loaded
40 * dynamically to avoid linking the server with libpq.
41 *
42 * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
43 *
44 *
45 * IDENTIFICATION
46 * src/backend/replication/walreceiver.c
47 *
48 *-------------------------------------------------------------------------
49 */
50 #include "postgres.h"
51
52 #include <unistd.h>
53
54 #include "access/htup_details.h"
55 #include "access/timeline.h"
56 #include "access/transam.h"
57 #include "access/xlog_internal.h"
58 #include "access/xlogarchive.h"
59 #include "catalog/pg_authid.h"
60 #include "catalog/pg_type.h"
61 #include "common/ip.h"
62 #include "funcapi.h"
63 #include "libpq/pqformat.h"
64 #include "libpq/pqsignal.h"
65 #include "miscadmin.h"
66 #include "pgstat.h"
67 #include "postmaster/interrupt.h"
68 #include "replication/walreceiver.h"
69 #include "replication/walsender.h"
70 #include "storage/ipc.h"
71 #include "storage/pmsignal.h"
72 #include "storage/proc.h"
73 #include "storage/procarray.h"
74 #include "storage/procsignal.h"
75 #include "utils/acl.h"
76 #include "utils/builtins.h"
77 #include "utils/guc.h"
78 #include "utils/pg_lsn.h"
79 #include "utils/ps_status.h"
80 #include "utils/resowner.h"
81 #include "utils/timestamp.h"
82
83
84 /*
85 * GUC variables. (Other variables that affect walreceiver are in xlog.c
86 * because they're passed down from the startup process, for better
87 * synchronization.)
88 */
89 int wal_receiver_status_interval;
90 int wal_receiver_timeout;
91 bool hot_standby_feedback;
92
93 /* libpqwalreceiver connection */
94 static WalReceiverConn *wrconn = NULL;
95 WalReceiverFunctionsType *WalReceiverFunctions = NULL;
96
97 #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
98
99 /*
100 * These variables are used similarly to openLogFile/SegNo,
101 * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
102 * corresponding the filename of recvFile.
103 */
104 static int recvFile = -1;
105 static TimeLineID recvFileTLI = 0;
106 static XLogSegNo recvSegNo = 0;
107
108 /*
109 * LogstreamResult indicates the byte positions that we have already
110 * written/fsynced.
111 */
112 static struct
113 {
114 XLogRecPtr Write; /* last byte + 1 written out in the standby */
115 XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
116 } LogstreamResult;
117
118 static StringInfoData reply_message;
119 static StringInfoData incoming_message;
120
121 /* Prototypes for private functions */
122 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
123 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
124 static void WalRcvDie(int code, Datum arg);
125 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
126 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
127 static void XLogWalRcvFlush(bool dying);
128 static void XLogWalRcvClose(XLogRecPtr recptr);
129 static void XLogWalRcvSendReply(bool force, bool requestReply);
130 static void XLogWalRcvSendHSFeedback(bool immed);
131 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
132
133 /*
134 * Process any interrupts the walreceiver process may have received.
135 * This should be called any time the process's latch has become set.
136 *
137 * Currently, only SIGTERM is of interest. We can't just exit(1) within the
138 * SIGTERM signal handler, because the signal might arrive in the middle of
139 * some critical operation, like while we're holding a spinlock. Instead, the
140 * signal handler sets a flag variable as well as setting the process's latch.
141 * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
142 * latch has become set. Operations that could block for a long time, such as
143 * reading from a remote server, must pay attention to the latch too; see
144 * libpqrcv_PQgetResult for example.
145 */
146 void
ProcessWalRcvInterrupts(void)147 ProcessWalRcvInterrupts(void)
148 {
149 /*
150 * Although walreceiver interrupt handling doesn't use the same scheme as
151 * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
152 * any incoming signals on Win32, and also to make sure we process any
153 * barrier events.
154 */
155 CHECK_FOR_INTERRUPTS();
156
157 if (ShutdownRequestPending)
158 {
159 ereport(FATAL,
160 (errcode(ERRCODE_ADMIN_SHUTDOWN),
161 errmsg("terminating walreceiver process due to administrator command")));
162 }
163 }
164
165
166 /* Main entry point for walreceiver process */
167 void
WalReceiverMain(void)168 WalReceiverMain(void)
169 {
170 char conninfo[MAXCONNINFO];
171 char *tmp_conninfo;
172 char slotname[NAMEDATALEN];
173 bool is_temp_slot;
174 XLogRecPtr startpoint;
175 TimeLineID startpointTLI;
176 TimeLineID primaryTLI;
177 bool first_stream;
178 WalRcvData *walrcv = WalRcv;
179 TimestampTz last_recv_timestamp;
180 TimestampTz now;
181 bool ping_sent;
182 char *err;
183 char *sender_host = NULL;
184 int sender_port = 0;
185
186 /*
187 * WalRcv should be set up already (if we are a backend, we inherit this
188 * by fork() or EXEC_BACKEND mechanism from the postmaster).
189 */
190 Assert(walrcv != NULL);
191
192 now = GetCurrentTimestamp();
193
194 /*
195 * Mark walreceiver as running in shared memory.
196 *
197 * Do this as early as possible, so that if we fail later on, we'll set
198 * state to STOPPED. If we die before this, the startup process will keep
199 * waiting for us to start up, until it times out.
200 */
201 SpinLockAcquire(&walrcv->mutex);
202 Assert(walrcv->pid == 0);
203 switch (walrcv->walRcvState)
204 {
205 case WALRCV_STOPPING:
206 /* If we've already been requested to stop, don't start up. */
207 walrcv->walRcvState = WALRCV_STOPPED;
208 /* fall through */
209
210 case WALRCV_STOPPED:
211 SpinLockRelease(&walrcv->mutex);
212 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
213 proc_exit(1);
214 break;
215
216 case WALRCV_STARTING:
217 /* The usual case */
218 break;
219
220 case WALRCV_WAITING:
221 case WALRCV_STREAMING:
222 case WALRCV_RESTARTING:
223 default:
224 /* Shouldn't happen */
225 SpinLockRelease(&walrcv->mutex);
226 elog(PANIC, "walreceiver still running according to shared memory state");
227 }
228 /* Advertise our PID so that the startup process can kill us */
229 walrcv->pid = MyProcPid;
230 walrcv->walRcvState = WALRCV_STREAMING;
231
232 /* Fetch information required to start streaming */
233 walrcv->ready_to_display = false;
234 strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
235 strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
236 is_temp_slot = walrcv->is_temp_slot;
237 startpoint = walrcv->receiveStart;
238 startpointTLI = walrcv->receiveStartTLI;
239
240 /*
241 * At most one of is_temp_slot and slotname can be set; otherwise,
242 * RequestXLogStreaming messed up.
243 */
244 Assert(!is_temp_slot || (slotname[0] == '\0'));
245
246 /* Initialise to a sanish value */
247 walrcv->lastMsgSendTime =
248 walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
249
250 /* Report the latch to use to awaken this process */
251 walrcv->latch = &MyProc->procLatch;
252
253 SpinLockRelease(&walrcv->mutex);
254
255 pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
256
257 /* Arrange to clean up at walreceiver exit */
258 on_shmem_exit(WalRcvDie, 0);
259
260 /* Properly accept or ignore signals the postmaster might send us */
261 pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
262 * file */
263 pqsignal(SIGINT, SIG_IGN);
264 pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
265 /* SIGQUIT handler was already set up by InitPostmasterChild */
266 pqsignal(SIGALRM, SIG_IGN);
267 pqsignal(SIGPIPE, SIG_IGN);
268 pqsignal(SIGUSR1, procsignal_sigusr1_handler);
269 pqsignal(SIGUSR2, SIG_IGN);
270
271 /* Reset some signals that are accepted by postmaster but not here */
272 pqsignal(SIGCHLD, SIG_DFL);
273
274 /* Load the libpq-specific functions */
275 load_file("libpqwalreceiver", false);
276 if (WalReceiverFunctions == NULL)
277 elog(ERROR, "libpqwalreceiver didn't initialize correctly");
278
279 /* Unblock signals (they were blocked when the postmaster forked us) */
280 PG_SETMASK(&UnBlockSig);
281
282 /* Establish the connection to the primary for XLOG streaming */
283 wrconn = walrcv_connect(conninfo, false,
284 cluster_name[0] ? cluster_name : "walreceiver",
285 &err);
286 if (!wrconn)
287 ereport(ERROR,
288 (errcode(ERRCODE_CONNECTION_FAILURE),
289 errmsg("could not connect to the primary server: %s", err)));
290
291 /*
292 * Save user-visible connection string. This clobbers the original
293 * conninfo, for security. Also save host and port of the sender server
294 * this walreceiver is connected to.
295 */
296 tmp_conninfo = walrcv_get_conninfo(wrconn);
297 walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
298 SpinLockAcquire(&walrcv->mutex);
299 memset(walrcv->conninfo, 0, MAXCONNINFO);
300 if (tmp_conninfo)
301 strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
302
303 memset(walrcv->sender_host, 0, NI_MAXHOST);
304 if (sender_host)
305 strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
306
307 walrcv->sender_port = sender_port;
308 walrcv->ready_to_display = true;
309 SpinLockRelease(&walrcv->mutex);
310
311 if (tmp_conninfo)
312 pfree(tmp_conninfo);
313
314 if (sender_host)
315 pfree(sender_host);
316
317 first_stream = true;
318 for (;;)
319 {
320 char *primary_sysid;
321 char standby_sysid[32];
322 WalRcvStreamOptions options;
323
324 /*
325 * Check that we're connected to a valid server using the
326 * IDENTIFY_SYSTEM replication command.
327 */
328 primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
329
330 snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
331 GetSystemIdentifier());
332 if (strcmp(primary_sysid, standby_sysid) != 0)
333 {
334 ereport(ERROR,
335 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
336 errmsg("database system identifier differs between the primary and standby"),
337 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
338 primary_sysid, standby_sysid)));
339 }
340
341 /*
342 * Confirm that the current timeline of the primary is the same or
343 * ahead of ours.
344 */
345 if (primaryTLI < startpointTLI)
346 ereport(ERROR,
347 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
348 errmsg("highest timeline %u of the primary is behind recovery timeline %u",
349 primaryTLI, startpointTLI)));
350
351 /*
352 * Get any missing history files. We do this always, even when we're
353 * not interested in that timeline, so that if we're promoted to
354 * become the primary later on, we don't select the same timeline that
355 * was already used in the current primary. This isn't bullet-proof -
356 * you'll need some external software to manage your cluster if you
357 * need to ensure that a unique timeline id is chosen in every case,
358 * but let's avoid the confusion of timeline id collisions where we
359 * can.
360 */
361 WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
362
363 /*
364 * Create temporary replication slot if requested, and update slot
365 * name in shared memory. (Note the slot name cannot already be set
366 * in this case.)
367 */
368 if (is_temp_slot)
369 {
370 snprintf(slotname, sizeof(slotname),
371 "pg_walreceiver_%lld",
372 (long long int) walrcv_get_backend_pid(wrconn));
373
374 walrcv_create_slot(wrconn, slotname, true, 0, NULL);
375
376 SpinLockAcquire(&walrcv->mutex);
377 strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
378 SpinLockRelease(&walrcv->mutex);
379 }
380
381 /*
382 * Start streaming.
383 *
384 * We'll try to start at the requested starting point and timeline,
385 * even if it's different from the server's latest timeline. In case
386 * we've already reached the end of the old timeline, the server will
387 * finish the streaming immediately, and we will go back to await
388 * orders from the startup process. If recovery_target_timeline is
389 * 'latest', the startup process will scan pg_wal and find the new
390 * history file, bump recovery target timeline, and ask us to restart
391 * on the new timeline.
392 */
393 options.logical = false;
394 options.startpoint = startpoint;
395 options.slotname = slotname[0] != '\0' ? slotname : NULL;
396 options.proto.physical.startpointTLI = startpointTLI;
397 ThisTimeLineID = startpointTLI;
398 if (walrcv_startstreaming(wrconn, &options))
399 {
400 if (first_stream)
401 ereport(LOG,
402 (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
403 LSN_FORMAT_ARGS(startpoint), startpointTLI)));
404 else
405 ereport(LOG,
406 (errmsg("restarted WAL streaming at %X/%X on timeline %u",
407 LSN_FORMAT_ARGS(startpoint), startpointTLI)));
408 first_stream = false;
409
410 /* Initialize LogstreamResult and buffers for processing messages */
411 LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
412 initStringInfo(&reply_message);
413 initStringInfo(&incoming_message);
414
415 /* Initialize the last recv timestamp */
416 last_recv_timestamp = GetCurrentTimestamp();
417 ping_sent = false;
418
419 /* Loop until end-of-streaming or error */
420 for (;;)
421 {
422 char *buf;
423 int len;
424 bool endofwal = false;
425 pgsocket wait_fd = PGINVALID_SOCKET;
426 int rc;
427
428 /*
429 * Exit walreceiver if we're not in recovery. This should not
430 * happen, but cross-check the status here.
431 */
432 if (!RecoveryInProgress())
433 ereport(FATAL,
434 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
435 errmsg("cannot continue WAL streaming, recovery has already ended")));
436
437 /* Process any requests or signals received recently */
438 ProcessWalRcvInterrupts();
439
440 if (ConfigReloadPending)
441 {
442 ConfigReloadPending = false;
443 ProcessConfigFile(PGC_SIGHUP);
444 XLogWalRcvSendHSFeedback(true);
445 }
446
447 /* See if we can read data immediately */
448 len = walrcv_receive(wrconn, &buf, &wait_fd);
449 if (len != 0)
450 {
451 /*
452 * Process the received data, and any subsequent data we
453 * can read without blocking.
454 */
455 for (;;)
456 {
457 if (len > 0)
458 {
459 /*
460 * Something was received from primary, so reset
461 * timeout
462 */
463 last_recv_timestamp = GetCurrentTimestamp();
464 ping_sent = false;
465 XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
466 }
467 else if (len == 0)
468 break;
469 else if (len < 0)
470 {
471 ereport(LOG,
472 (errmsg("replication terminated by primary server"),
473 errdetail("End of WAL reached on timeline %u at %X/%X.",
474 startpointTLI,
475 LSN_FORMAT_ARGS(LogstreamResult.Write))));
476 endofwal = true;
477 break;
478 }
479 len = walrcv_receive(wrconn, &buf, &wait_fd);
480 }
481
482 /* Let the primary know that we received some data. */
483 XLogWalRcvSendReply(false, false);
484
485 /*
486 * If we've written some records, flush them to disk and
487 * let the startup process and primary server know about
488 * them.
489 */
490 XLogWalRcvFlush(false);
491 }
492
493 /* Check if we need to exit the streaming loop. */
494 if (endofwal)
495 break;
496
497 /*
498 * Ideally we would reuse a WaitEventSet object repeatedly
499 * here to avoid the overheads of WaitLatchOrSocket on epoll
500 * systems, but we can't be sure that libpq (or any other
501 * walreceiver implementation) has the same socket (even if
502 * the fd is the same number, it may have been closed and
503 * reopened since the last time). In future, if there is a
504 * function for removing sockets from WaitEventSet, then we
505 * could add and remove just the socket each time, potentially
506 * avoiding some system calls.
507 */
508 Assert(wait_fd != PGINVALID_SOCKET);
509 rc = WaitLatchOrSocket(MyLatch,
510 WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
511 WL_TIMEOUT | WL_LATCH_SET,
512 wait_fd,
513 NAPTIME_PER_CYCLE,
514 WAIT_EVENT_WAL_RECEIVER_MAIN);
515 if (rc & WL_LATCH_SET)
516 {
517 ResetLatch(MyLatch);
518 ProcessWalRcvInterrupts();
519
520 if (walrcv->force_reply)
521 {
522 /*
523 * The recovery process has asked us to send apply
524 * feedback now. Make sure the flag is really set to
525 * false in shared memory before sending the reply, so
526 * we don't miss a new request for a reply.
527 */
528 walrcv->force_reply = false;
529 pg_memory_barrier();
530 XLogWalRcvSendReply(true, false);
531 }
532 }
533 if (rc & WL_TIMEOUT)
534 {
535 /*
536 * We didn't receive anything new. If we haven't heard
537 * anything from the server for more than
538 * wal_receiver_timeout / 2, ping the server. Also, if
539 * it's been longer than wal_receiver_status_interval
540 * since the last update we sent, send a status update to
541 * the primary anyway, to report any progress in applying
542 * WAL.
543 */
544 bool requestReply = false;
545
546 /*
547 * Check if time since last receive from primary has
548 * reached the configured limit.
549 */
550 if (wal_receiver_timeout > 0)
551 {
552 TimestampTz now = GetCurrentTimestamp();
553 TimestampTz timeout;
554
555 timeout =
556 TimestampTzPlusMilliseconds(last_recv_timestamp,
557 wal_receiver_timeout);
558
559 if (now >= timeout)
560 ereport(ERROR,
561 (errcode(ERRCODE_CONNECTION_FAILURE),
562 errmsg("terminating walreceiver due to timeout")));
563
564 /*
565 * We didn't receive anything new, for half of
566 * receiver replication timeout. Ping the server.
567 */
568 if (!ping_sent)
569 {
570 timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
571 (wal_receiver_timeout / 2));
572 if (now >= timeout)
573 {
574 requestReply = true;
575 ping_sent = true;
576 }
577 }
578 }
579
580 XLogWalRcvSendReply(requestReply, requestReply);
581 XLogWalRcvSendHSFeedback(false);
582 }
583 }
584
585 /*
586 * The backend finished streaming. Exit streaming COPY-mode from
587 * our side, too.
588 */
589 walrcv_endstreaming(wrconn, &primaryTLI);
590
591 /*
592 * If the server had switched to a new timeline that we didn't
593 * know about when we began streaming, fetch its timeline history
594 * file now.
595 */
596 WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
597 }
598 else
599 ereport(LOG,
600 (errmsg("primary server contains no more WAL on requested timeline %u",
601 startpointTLI)));
602
603 /*
604 * End of WAL reached on the requested timeline. Close the last
605 * segment, and await for new orders from the startup process.
606 */
607 if (recvFile >= 0)
608 {
609 char xlogfname[MAXFNAMELEN];
610
611 XLogWalRcvFlush(false);
612 XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
613 if (close(recvFile) != 0)
614 ereport(PANIC,
615 (errcode_for_file_access(),
616 errmsg("could not close log segment %s: %m",
617 xlogfname)));
618
619 /*
620 * Create .done file forcibly to prevent the streamed segment from
621 * being archived later.
622 */
623 if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
624 XLogArchiveForceDone(xlogfname);
625 else
626 XLogArchiveNotify(xlogfname);
627 }
628 recvFile = -1;
629
630 elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
631 WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
632 }
633 /* not reached */
634 }
635
636 /*
637 * Wait for startup process to set receiveStart and receiveStartTLI.
638 */
639 static void
WalRcvWaitForStartPosition(XLogRecPtr * startpoint,TimeLineID * startpointTLI)640 WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
641 {
642 WalRcvData *walrcv = WalRcv;
643 int state;
644
645 SpinLockAcquire(&walrcv->mutex);
646 state = walrcv->walRcvState;
647 if (state != WALRCV_STREAMING)
648 {
649 SpinLockRelease(&walrcv->mutex);
650 if (state == WALRCV_STOPPING)
651 proc_exit(0);
652 else
653 elog(FATAL, "unexpected walreceiver state");
654 }
655 walrcv->walRcvState = WALRCV_WAITING;
656 walrcv->receiveStart = InvalidXLogRecPtr;
657 walrcv->receiveStartTLI = 0;
658 SpinLockRelease(&walrcv->mutex);
659
660 set_ps_display("idle");
661
662 /*
663 * nudge startup process to notice that we've stopped streaming and are
664 * now waiting for instructions.
665 */
666 WakeupRecovery();
667 for (;;)
668 {
669 ResetLatch(MyLatch);
670
671 ProcessWalRcvInterrupts();
672
673 SpinLockAcquire(&walrcv->mutex);
674 Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
675 walrcv->walRcvState == WALRCV_WAITING ||
676 walrcv->walRcvState == WALRCV_STOPPING);
677 if (walrcv->walRcvState == WALRCV_RESTARTING)
678 {
679 /*
680 * No need to handle changes in primary_conninfo or
681 * primary_slotname here. Startup process will signal us to
682 * terminate in case those change.
683 */
684 *startpoint = walrcv->receiveStart;
685 *startpointTLI = walrcv->receiveStartTLI;
686 walrcv->walRcvState = WALRCV_STREAMING;
687 SpinLockRelease(&walrcv->mutex);
688 break;
689 }
690 if (walrcv->walRcvState == WALRCV_STOPPING)
691 {
692 /*
693 * We should've received SIGTERM if the startup process wants us
694 * to die, but might as well check it here too.
695 */
696 SpinLockRelease(&walrcv->mutex);
697 exit(1);
698 }
699 SpinLockRelease(&walrcv->mutex);
700
701 (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
702 WAIT_EVENT_WAL_RECEIVER_WAIT_START);
703 }
704
705 if (update_process_title)
706 {
707 char activitymsg[50];
708
709 snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
710 LSN_FORMAT_ARGS(*startpoint));
711 set_ps_display(activitymsg);
712 }
713 }
714
715 /*
716 * Fetch any missing timeline history files between 'first' and 'last'
717 * (inclusive) from the server.
718 */
719 static void
WalRcvFetchTimeLineHistoryFiles(TimeLineID first,TimeLineID last)720 WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
721 {
722 TimeLineID tli;
723
724 for (tli = first; tli <= last; tli++)
725 {
726 /* there's no history file for timeline 1 */
727 if (tli != 1 && !existsTimeLineHistory(tli))
728 {
729 char *fname;
730 char *content;
731 int len;
732 char expectedfname[MAXFNAMELEN];
733
734 ereport(LOG,
735 (errmsg("fetching timeline history file for timeline %u from primary server",
736 tli)));
737
738 walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
739
740 /*
741 * Check that the filename on the primary matches what we
742 * calculated ourselves. This is just a sanity check, it should
743 * always match.
744 */
745 TLHistoryFileName(expectedfname, tli);
746 if (strcmp(fname, expectedfname) != 0)
747 ereport(ERROR,
748 (errcode(ERRCODE_PROTOCOL_VIOLATION),
749 errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
750 tli)));
751
752 /*
753 * Write the file to pg_wal.
754 */
755 writeTimeLineHistoryFile(tli, content, len);
756
757 /*
758 * Mark the streamed history file as ready for archiving if
759 * archive_mode is always.
760 */
761 if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
762 XLogArchiveForceDone(fname);
763 else
764 XLogArchiveNotify(fname);
765
766 pfree(fname);
767 pfree(content);
768 }
769 }
770 }
771
772 /*
773 * Mark us as STOPPED in shared memory at exit.
774 */
775 static void
WalRcvDie(int code,Datum arg)776 WalRcvDie(int code, Datum arg)
777 {
778 WalRcvData *walrcv = WalRcv;
779
780 /* Ensure that all WAL records received are flushed to disk */
781 XLogWalRcvFlush(true);
782
783 /* Mark ourselves inactive in shared memory */
784 SpinLockAcquire(&walrcv->mutex);
785 Assert(walrcv->walRcvState == WALRCV_STREAMING ||
786 walrcv->walRcvState == WALRCV_RESTARTING ||
787 walrcv->walRcvState == WALRCV_STARTING ||
788 walrcv->walRcvState == WALRCV_WAITING ||
789 walrcv->walRcvState == WALRCV_STOPPING);
790 Assert(walrcv->pid == MyProcPid);
791 walrcv->walRcvState = WALRCV_STOPPED;
792 walrcv->pid = 0;
793 walrcv->ready_to_display = false;
794 walrcv->latch = NULL;
795 SpinLockRelease(&walrcv->mutex);
796
797 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
798
799 /* Terminate the connection gracefully. */
800 if (wrconn != NULL)
801 walrcv_disconnect(wrconn);
802
803 /* Wake up the startup process to notice promptly that we're gone */
804 WakeupRecovery();
805 }
806
807 /*
808 * Accept the message from XLOG stream, and process it.
809 */
810 static void
XLogWalRcvProcessMsg(unsigned char type,char * buf,Size len)811 XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
812 {
813 int hdrlen;
814 XLogRecPtr dataStart;
815 XLogRecPtr walEnd;
816 TimestampTz sendTime;
817 bool replyRequested;
818
819 resetStringInfo(&incoming_message);
820
821 switch (type)
822 {
823 case 'w': /* WAL records */
824 {
825 /* copy message to StringInfo */
826 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
827 if (len < hdrlen)
828 ereport(ERROR,
829 (errcode(ERRCODE_PROTOCOL_VIOLATION),
830 errmsg_internal("invalid WAL message received from primary")));
831 appendBinaryStringInfo(&incoming_message, buf, hdrlen);
832
833 /* read the fields */
834 dataStart = pq_getmsgint64(&incoming_message);
835 walEnd = pq_getmsgint64(&incoming_message);
836 sendTime = pq_getmsgint64(&incoming_message);
837 ProcessWalSndrMessage(walEnd, sendTime);
838
839 buf += hdrlen;
840 len -= hdrlen;
841 XLogWalRcvWrite(buf, len, dataStart);
842 break;
843 }
844 case 'k': /* Keepalive */
845 {
846 /* copy message to StringInfo */
847 hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
848 if (len != hdrlen)
849 ereport(ERROR,
850 (errcode(ERRCODE_PROTOCOL_VIOLATION),
851 errmsg_internal("invalid keepalive message received from primary")));
852 appendBinaryStringInfo(&incoming_message, buf, hdrlen);
853
854 /* read the fields */
855 walEnd = pq_getmsgint64(&incoming_message);
856 sendTime = pq_getmsgint64(&incoming_message);
857 replyRequested = pq_getmsgbyte(&incoming_message);
858
859 ProcessWalSndrMessage(walEnd, sendTime);
860
861 /* If the primary requested a reply, send one immediately */
862 if (replyRequested)
863 XLogWalRcvSendReply(true, false);
864 break;
865 }
866 default:
867 ereport(ERROR,
868 (errcode(ERRCODE_PROTOCOL_VIOLATION),
869 errmsg_internal("invalid replication message type %d",
870 type)));
871 }
872 }
873
874 /*
875 * Write XLOG data to disk.
876 */
877 static void
XLogWalRcvWrite(char * buf,Size nbytes,XLogRecPtr recptr)878 XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
879 {
880 int startoff;
881 int byteswritten;
882
883 while (nbytes > 0)
884 {
885 int segbytes;
886
887 /* Close the current segment if it's completed */
888 if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
889 XLogWalRcvClose(recptr);
890
891 if (recvFile < 0)
892 {
893 bool use_existent = true;
894
895 /* Create/use new log file */
896 XLByteToSeg(recptr, recvSegNo, wal_segment_size);
897 recvFile = XLogFileInit(recvSegNo, &use_existent, true);
898 recvFileTLI = ThisTimeLineID;
899 }
900
901 /* Calculate the start offset of the received logs */
902 startoff = XLogSegmentOffset(recptr, wal_segment_size);
903
904 if (startoff + nbytes > wal_segment_size)
905 segbytes = wal_segment_size - startoff;
906 else
907 segbytes = nbytes;
908
909 /* OK to write the logs */
910 errno = 0;
911
912 byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
913 if (byteswritten <= 0)
914 {
915 char xlogfname[MAXFNAMELEN];
916 int save_errno;
917
918 /* if write didn't set errno, assume no disk space */
919 if (errno == 0)
920 errno = ENOSPC;
921
922 save_errno = errno;
923 XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
924 errno = save_errno;
925 ereport(PANIC,
926 (errcode_for_file_access(),
927 errmsg("could not write to log segment %s "
928 "at offset %u, length %lu: %m",
929 xlogfname, startoff, (unsigned long) segbytes)));
930 }
931
932 /* Update state for write */
933 recptr += byteswritten;
934
935 nbytes -= byteswritten;
936 buf += byteswritten;
937
938 LogstreamResult.Write = recptr;
939 }
940
941 /* Update shared-memory status */
942 pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
943
944 /*
945 * Close the current segment if it's fully written up in the last cycle of
946 * the loop, to create its archive notification file soon. Otherwise WAL
947 * archiving of the segment will be delayed until any data in the next
948 * segment is received and written.
949 */
950 if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
951 XLogWalRcvClose(recptr);
952 }
953
954 /*
955 * Flush the log to disk.
956 *
957 * If we're in the midst of dying, it's unwise to do anything that might throw
958 * an error, so we skip sending a reply in that case.
959 */
960 static void
XLogWalRcvFlush(bool dying)961 XLogWalRcvFlush(bool dying)
962 {
963 if (LogstreamResult.Flush < LogstreamResult.Write)
964 {
965 WalRcvData *walrcv = WalRcv;
966
967 issue_xlog_fsync(recvFile, recvSegNo);
968
969 LogstreamResult.Flush = LogstreamResult.Write;
970
971 /* Update shared-memory status */
972 SpinLockAcquire(&walrcv->mutex);
973 if (walrcv->flushedUpto < LogstreamResult.Flush)
974 {
975 walrcv->latestChunkStart = walrcv->flushedUpto;
976 walrcv->flushedUpto = LogstreamResult.Flush;
977 walrcv->receivedTLI = ThisTimeLineID;
978 }
979 SpinLockRelease(&walrcv->mutex);
980
981 /* Signal the startup process and walsender that new WAL has arrived */
982 WakeupRecovery();
983 if (AllowCascadeReplication())
984 WalSndWakeup();
985
986 /* Report XLOG streaming progress in PS display */
987 if (update_process_title)
988 {
989 char activitymsg[50];
990
991 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
992 LSN_FORMAT_ARGS(LogstreamResult.Write));
993 set_ps_display(activitymsg);
994 }
995
996 /* Also let the primary know that we made some progress */
997 if (!dying)
998 {
999 XLogWalRcvSendReply(false, false);
1000 XLogWalRcvSendHSFeedback(false);
1001 }
1002 }
1003 }
1004
1005 /*
1006 * Close the current segment.
1007 *
1008 * Flush the segment to disk before closing it. Otherwise we have to
1009 * reopen and fsync it later.
1010 *
1011 * Create an archive notification file since the segment is known completed.
1012 */
1013 static void
XLogWalRcvClose(XLogRecPtr recptr)1014 XLogWalRcvClose(XLogRecPtr recptr)
1015 {
1016 char xlogfname[MAXFNAMELEN];
1017
1018 Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
1019
1020 /*
1021 * fsync() and close current file before we switch to next one. We would
1022 * otherwise have to reopen this file to fsync it later
1023 */
1024 XLogWalRcvFlush(false);
1025
1026 XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
1027
1028 /*
1029 * XLOG segment files will be re-read by recovery in startup process soon,
1030 * so we don't advise the OS to release cache pages associated with the
1031 * file like XLogFileClose() does.
1032 */
1033 if (close(recvFile) != 0)
1034 ereport(PANIC,
1035 (errcode_for_file_access(),
1036 errmsg("could not close log segment %s: %m",
1037 xlogfname)));
1038
1039 /*
1040 * Create .done file forcibly to prevent the streamed segment from being
1041 * archived later.
1042 */
1043 if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
1044 XLogArchiveForceDone(xlogfname);
1045 else
1046 XLogArchiveNotify(xlogfname);
1047
1048 recvFile = -1;
1049 }
1050
1051 /*
1052 * Send reply message to primary, indicating our current WAL locations, oldest
1053 * xmin and the current time.
1054 *
1055 * If 'force' is not set, the message is only sent if enough time has
1056 * passed since last status update to reach wal_receiver_status_interval.
1057 * If wal_receiver_status_interval is disabled altogether and 'force' is
1058 * false, this is a no-op.
1059 *
1060 * If 'requestReply' is true, requests the server to reply immediately upon
1061 * receiving this message. This is used for heartbeats, when approaching
1062 * wal_receiver_timeout.
1063 */
1064 static void
XLogWalRcvSendReply(bool force,bool requestReply)1065 XLogWalRcvSendReply(bool force, bool requestReply)
1066 {
1067 static XLogRecPtr writePtr = 0;
1068 static XLogRecPtr flushPtr = 0;
1069 XLogRecPtr applyPtr;
1070 static TimestampTz sendTime = 0;
1071 TimestampTz now;
1072
1073 /*
1074 * If the user doesn't want status to be reported to the primary, be sure
1075 * to exit before doing anything at all.
1076 */
1077 if (!force && wal_receiver_status_interval <= 0)
1078 return;
1079
1080 /* Get current timestamp. */
1081 now = GetCurrentTimestamp();
1082
1083 /*
1084 * We can compare the write and flush positions to the last message we
1085 * sent without taking any lock, but the apply position requires a spin
1086 * lock, so we don't check that unless something else has changed or 10
1087 * seconds have passed. This means that the apply WAL location will
1088 * appear, from the primary's point of view, to lag slightly, but since
1089 * this is only for reporting purposes and only on idle systems, that's
1090 * probably OK.
1091 */
1092 if (!force
1093 && writePtr == LogstreamResult.Write
1094 && flushPtr == LogstreamResult.Flush
1095 && !TimestampDifferenceExceeds(sendTime, now,
1096 wal_receiver_status_interval * 1000))
1097 return;
1098 sendTime = now;
1099
1100 /* Construct a new message */
1101 writePtr = LogstreamResult.Write;
1102 flushPtr = LogstreamResult.Flush;
1103 applyPtr = GetXLogReplayRecPtr(NULL);
1104
1105 resetStringInfo(&reply_message);
1106 pq_sendbyte(&reply_message, 'r');
1107 pq_sendint64(&reply_message, writePtr);
1108 pq_sendint64(&reply_message, flushPtr);
1109 pq_sendint64(&reply_message, applyPtr);
1110 pq_sendint64(&reply_message, GetCurrentTimestamp());
1111 pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1112
1113 /* Send it */
1114 elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1115 LSN_FORMAT_ARGS(writePtr),
1116 LSN_FORMAT_ARGS(flushPtr),
1117 LSN_FORMAT_ARGS(applyPtr),
1118 requestReply ? " (reply requested)" : "");
1119
1120 walrcv_send(wrconn, reply_message.data, reply_message.len);
1121 }
1122
1123 /*
1124 * Send hot standby feedback message to primary, plus the current time,
1125 * in case they don't have a watch.
1126 *
1127 * If the user disables feedback, send one final message to tell sender
1128 * to forget about the xmin on this standby. We also send this message
1129 * on first connect because a previous connection might have set xmin
1130 * on a replication slot. (If we're not using a slot it's harmless to
1131 * send a feedback message explicitly setting InvalidTransactionId).
1132 */
1133 static void
XLogWalRcvSendHSFeedback(bool immed)1134 XLogWalRcvSendHSFeedback(bool immed)
1135 {
1136 TimestampTz now;
1137 FullTransactionId nextFullXid;
1138 TransactionId nextXid;
1139 uint32 xmin_epoch,
1140 catalog_xmin_epoch;
1141 TransactionId xmin,
1142 catalog_xmin;
1143 static TimestampTz sendTime = 0;
1144
1145 /* initially true so we always send at least one feedback message */
1146 static bool primary_has_standby_xmin = true;
1147
1148 /*
1149 * If the user doesn't want status to be reported to the primary, be sure
1150 * to exit before doing anything at all.
1151 */
1152 if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
1153 !primary_has_standby_xmin)
1154 return;
1155
1156 /* Get current timestamp. */
1157 now = GetCurrentTimestamp();
1158
1159 if (!immed)
1160 {
1161 /*
1162 * Send feedback at most once per wal_receiver_status_interval.
1163 */
1164 if (!TimestampDifferenceExceeds(sendTime, now,
1165 wal_receiver_status_interval * 1000))
1166 return;
1167 sendTime = now;
1168 }
1169
1170 /*
1171 * If Hot Standby is not yet accepting connections there is nothing to
1172 * send. Check this after the interval has expired to reduce number of
1173 * calls.
1174 *
1175 * Bailing out here also ensures that we don't send feedback until we've
1176 * read our own replication slot state, so we don't tell the primary to
1177 * discard needed xmin or catalog_xmin from any slots that may exist on
1178 * this replica.
1179 */
1180 if (!HotStandbyActive())
1181 return;
1182
1183 /*
1184 * Make the expensive call to get the oldest xmin once we are certain
1185 * everything else has been checked.
1186 */
1187 if (hot_standby_feedback)
1188 {
1189 GetReplicationHorizons(&xmin, &catalog_xmin);
1190 }
1191 else
1192 {
1193 xmin = InvalidTransactionId;
1194 catalog_xmin = InvalidTransactionId;
1195 }
1196
1197 /*
1198 * Get epoch and adjust if nextXid and oldestXmin are different sides of
1199 * the epoch boundary.
1200 */
1201 nextFullXid = ReadNextFullTransactionId();
1202 nextXid = XidFromFullTransactionId(nextFullXid);
1203 xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1204 catalog_xmin_epoch = xmin_epoch;
1205 if (nextXid < xmin)
1206 xmin_epoch--;
1207 if (nextXid < catalog_xmin)
1208 catalog_xmin_epoch--;
1209
1210 elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1211 xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1212
1213 /* Construct the message and send it. */
1214 resetStringInfo(&reply_message);
1215 pq_sendbyte(&reply_message, 'h');
1216 pq_sendint64(&reply_message, GetCurrentTimestamp());
1217 pq_sendint32(&reply_message, xmin);
1218 pq_sendint32(&reply_message, xmin_epoch);
1219 pq_sendint32(&reply_message, catalog_xmin);
1220 pq_sendint32(&reply_message, catalog_xmin_epoch);
1221 walrcv_send(wrconn, reply_message.data, reply_message.len);
1222 if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1223 primary_has_standby_xmin = true;
1224 else
1225 primary_has_standby_xmin = false;
1226 }
1227
1228 /*
1229 * Update shared memory status upon receiving a message from primary.
1230 *
1231 * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
1232 * message, reported by primary.
1233 */
1234 static void
ProcessWalSndrMessage(XLogRecPtr walEnd,TimestampTz sendTime)1235 ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
1236 {
1237 WalRcvData *walrcv = WalRcv;
1238
1239 TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1240
1241 /* Update shared-memory status */
1242 SpinLockAcquire(&walrcv->mutex);
1243 if (walrcv->latestWalEnd < walEnd)
1244 walrcv->latestWalEndTime = sendTime;
1245 walrcv->latestWalEnd = walEnd;
1246 walrcv->lastMsgSendTime = sendTime;
1247 walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1248 SpinLockRelease(&walrcv->mutex);
1249
1250 if (message_level_is_interesting(DEBUG2))
1251 {
1252 char *sendtime;
1253 char *receipttime;
1254 int applyDelay;
1255
1256 /* Copy because timestamptz_to_str returns a static buffer */
1257 sendtime = pstrdup(timestamptz_to_str(sendTime));
1258 receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1259 applyDelay = GetReplicationApplyDelay();
1260
1261 /* apply delay is not available */
1262 if (applyDelay == -1)
1263 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1264 sendtime,
1265 receipttime,
1266 GetReplicationTransferLatency());
1267 else
1268 elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1269 sendtime,
1270 receipttime,
1271 applyDelay,
1272 GetReplicationTransferLatency());
1273
1274 pfree(sendtime);
1275 pfree(receipttime);
1276 }
1277 }
1278
1279 /*
1280 * Wake up the walreceiver main loop.
1281 *
1282 * This is called by the startup process whenever interesting xlog records
1283 * are applied, so that walreceiver can check if it needs to send an apply
1284 * notification back to the primary which may be waiting in a COMMIT with
1285 * synchronous_commit = remote_apply.
1286 */
1287 void
WalRcvForceReply(void)1288 WalRcvForceReply(void)
1289 {
1290 Latch *latch;
1291
1292 WalRcv->force_reply = true;
1293 /* fetching the latch pointer might not be atomic, so use spinlock */
1294 SpinLockAcquire(&WalRcv->mutex);
1295 latch = WalRcv->latch;
1296 SpinLockRelease(&WalRcv->mutex);
1297 if (latch)
1298 SetLatch(latch);
1299 }
1300
1301 /*
1302 * Return a string constant representing the state. This is used
1303 * in system functions and views, and should *not* be translated.
1304 */
1305 static const char *
WalRcvGetStateString(WalRcvState state)1306 WalRcvGetStateString(WalRcvState state)
1307 {
1308 switch (state)
1309 {
1310 case WALRCV_STOPPED:
1311 return "stopped";
1312 case WALRCV_STARTING:
1313 return "starting";
1314 case WALRCV_STREAMING:
1315 return "streaming";
1316 case WALRCV_WAITING:
1317 return "waiting";
1318 case WALRCV_RESTARTING:
1319 return "restarting";
1320 case WALRCV_STOPPING:
1321 return "stopping";
1322 }
1323 return "UNKNOWN";
1324 }
1325
1326 /*
1327 * Returns activity of WAL receiver, including pid, state and xlog locations
1328 * received from the WAL sender of another server.
1329 */
1330 Datum
pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)1331 pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
1332 {
1333 TupleDesc tupdesc;
1334 Datum *values;
1335 bool *nulls;
1336 int pid;
1337 bool ready_to_display;
1338 WalRcvState state;
1339 XLogRecPtr receive_start_lsn;
1340 TimeLineID receive_start_tli;
1341 XLogRecPtr written_lsn;
1342 XLogRecPtr flushed_lsn;
1343 TimeLineID received_tli;
1344 TimestampTz last_send_time;
1345 TimestampTz last_receipt_time;
1346 XLogRecPtr latest_end_lsn;
1347 TimestampTz latest_end_time;
1348 char sender_host[NI_MAXHOST];
1349 int sender_port = 0;
1350 char slotname[NAMEDATALEN];
1351 char conninfo[MAXCONNINFO];
1352
1353 /* Take a lock to ensure value consistency */
1354 SpinLockAcquire(&WalRcv->mutex);
1355 pid = (int) WalRcv->pid;
1356 ready_to_display = WalRcv->ready_to_display;
1357 state = WalRcv->walRcvState;
1358 receive_start_lsn = WalRcv->receiveStart;
1359 receive_start_tli = WalRcv->receiveStartTLI;
1360 flushed_lsn = WalRcv->flushedUpto;
1361 received_tli = WalRcv->receivedTLI;
1362 last_send_time = WalRcv->lastMsgSendTime;
1363 last_receipt_time = WalRcv->lastMsgReceiptTime;
1364 latest_end_lsn = WalRcv->latestWalEnd;
1365 latest_end_time = WalRcv->latestWalEndTime;
1366 strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1367 strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1368 sender_port = WalRcv->sender_port;
1369 strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1370 SpinLockRelease(&WalRcv->mutex);
1371
1372 /*
1373 * No WAL receiver (or not ready yet), just return a tuple with NULL
1374 * values
1375 */
1376 if (pid == 0 || !ready_to_display)
1377 PG_RETURN_NULL();
1378
1379 /*
1380 * Read "writtenUpto" without holding a spinlock. Note that it may not be
1381 * consistent with the other shared variables of the WAL receiver
1382 * protected by a spinlock, but this should not be used for data integrity
1383 * checks.
1384 */
1385 written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1386
1387 /* determine result type */
1388 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1389 elog(ERROR, "return type must be a row type");
1390
1391 values = palloc0(sizeof(Datum) * tupdesc->natts);
1392 nulls = palloc0(sizeof(bool) * tupdesc->natts);
1393
1394 /* Fetch values */
1395 values[0] = Int32GetDatum(pid);
1396
1397 if (!is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1398 {
1399 /*
1400 * Only superusers and members of pg_read_all_stats can see details.
1401 * Other users only get the pid value to know whether it is a WAL
1402 * receiver, but no details.
1403 */
1404 MemSet(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1405 }
1406 else
1407 {
1408 values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1409
1410 if (XLogRecPtrIsInvalid(receive_start_lsn))
1411 nulls[2] = true;
1412 else
1413 values[2] = LSNGetDatum(receive_start_lsn);
1414 values[3] = Int32GetDatum(receive_start_tli);
1415 if (XLogRecPtrIsInvalid(written_lsn))
1416 nulls[4] = true;
1417 else
1418 values[4] = LSNGetDatum(written_lsn);
1419 if (XLogRecPtrIsInvalid(flushed_lsn))
1420 nulls[5] = true;
1421 else
1422 values[5] = LSNGetDatum(flushed_lsn);
1423 values[6] = Int32GetDatum(received_tli);
1424 if (last_send_time == 0)
1425 nulls[7] = true;
1426 else
1427 values[7] = TimestampTzGetDatum(last_send_time);
1428 if (last_receipt_time == 0)
1429 nulls[8] = true;
1430 else
1431 values[8] = TimestampTzGetDatum(last_receipt_time);
1432 if (XLogRecPtrIsInvalid(latest_end_lsn))
1433 nulls[9] = true;
1434 else
1435 values[9] = LSNGetDatum(latest_end_lsn);
1436 if (latest_end_time == 0)
1437 nulls[10] = true;
1438 else
1439 values[10] = TimestampTzGetDatum(latest_end_time);
1440 if (*slotname == '\0')
1441 nulls[11] = true;
1442 else
1443 values[11] = CStringGetTextDatum(slotname);
1444 if (*sender_host == '\0')
1445 nulls[12] = true;
1446 else
1447 values[12] = CStringGetTextDatum(sender_host);
1448 if (sender_port == 0)
1449 nulls[13] = true;
1450 else
1451 values[13] = Int32GetDatum(sender_port);
1452 if (*conninfo == '\0')
1453 nulls[14] = true;
1454 else
1455 values[14] = CStringGetTextDatum(conninfo);
1456 }
1457
1458 /* Returns the record as Datum */
1459 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1460 }
1461