1 /*-------------------------------------------------------------------------
2 *
3 * walsender.c
4 *
5 * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6 * care of sending XLOG from the primary server to a single recipient.
7 * (Note that there can be more than one walsender process concurrently.)
8 * It is started by the postmaster when the walreceiver of a standby server
9 * connects to the primary server and requests XLOG streaming replication.
10 *
11 * A walsender is similar to a regular backend, ie. there is a one-to-one
12 * relationship between a connection and a walsender process, but instead
13 * of processing SQL queries, it understands a small set of special
14 * replication-mode commands. The START_REPLICATION command begins streaming
15 * WAL to the client. While streaming, the walsender keeps reading XLOG
16 * records from the disk and sends them to the standby server over the
17 * COPY protocol, until either side ends the replication by exiting COPY
18 * mode (or until the connection is closed).
19 *
20 * Normal termination is by SIGTERM, which instructs the walsender to
21 * close the connection and exit(0) at the next convenient moment. Emergency
22 * termination is by SIGQUIT; like any backend, the walsender will simply
23 * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24 * are treated as not a crash but approximately normal termination;
25 * the walsender will exit quickly without sending any more XLOG records.
26 *
27 * If the server is shut down, checkpointer sends us
28 * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29 * the backend is idle or runs an SQL query this causes the backend to
30 * shutdown, if logical replication is in progress all existing WAL records
31 * are processed followed by a shutdown. Otherwise this causes the walsender
32 * to switch to the "stopping" state. In this state, the walsender will reject
33 * any further replication commands. The checkpointer begins the shutdown
34 * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35 * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36 * walsender to send any outstanding WAL, including the shutdown checkpoint
37 * record, wait for it to be replicated to the standby, and then exit.
38 *
39 *
40 * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
41 *
42 * IDENTIFICATION
43 * src/backend/replication/walsender.c
44 *
45 *-------------------------------------------------------------------------
46 */
47 #include "postgres.h"
48
49 #include <signal.h>
50 #include <unistd.h>
51
52 #include "access/printtup.h"
53 #include "access/timeline.h"
54 #include "access/transam.h"
55 #include "access/xact.h"
56 #include "access/xlog_internal.h"
57 #include "access/xlogreader.h"
58 #include "access/xlogutils.h"
59 #include "catalog/pg_authid.h"
60 #include "catalog/pg_type.h"
61 #include "commands/dbcommands.h"
62 #include "commands/defrem.h"
63 #include "funcapi.h"
64 #include "libpq/libpq.h"
65 #include "libpq/pqformat.h"
66 #include "miscadmin.h"
67 #include "nodes/replnodes.h"
68 #include "pgstat.h"
69 #include "postmaster/interrupt.h"
70 #include "replication/basebackup.h"
71 #include "replication/decode.h"
72 #include "replication/logical.h"
73 #include "replication/slot.h"
74 #include "replication/snapbuild.h"
75 #include "replication/syncrep.h"
76 #include "replication/walreceiver.h"
77 #include "replication/walsender.h"
78 #include "replication/walsender_private.h"
79 #include "storage/condition_variable.h"
80 #include "storage/fd.h"
81 #include "storage/ipc.h"
82 #include "storage/pmsignal.h"
83 #include "storage/proc.h"
84 #include "storage/procarray.h"
85 #include "tcop/dest.h"
86 #include "tcop/tcopprot.h"
87 #include "utils/acl.h"
88 #include "utils/builtins.h"
89 #include "utils/guc.h"
90 #include "utils/memutils.h"
91 #include "utils/pg_lsn.h"
92 #include "utils/portal.h"
93 #include "utils/ps_status.h"
94 #include "utils/timeout.h"
95 #include "utils/timestamp.h"
96
97 /*
98 * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
99 *
100 * We don't have a good idea of what a good value would be; there's some
101 * overhead per message in both walsender and walreceiver, but on the other
102 * hand sending large batches makes walsender less responsive to signals
103 * because signals are checked only between messages. 128kB (with
104 * default 8k blocks) seems like a reasonable guess for now.
105 */
106 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
107
108 /* Array of WalSnds in shared memory */
109 WalSndCtlData *WalSndCtl = NULL;
110
111 /* My slot in the shared memory array */
112 WalSnd *MyWalSnd = NULL;
113
114 /* Global state */
115 bool am_walsender = false; /* Am I a walsender process? */
116 bool am_cascading_walsender = false; /* Am I cascading WAL to another
117 * standby? */
118 bool am_db_walsender = false; /* Connected to a database? */
119
120 /* User-settable parameters for walsender */
121 int max_wal_senders = 0; /* the maximum number of concurrent
122 * walsenders */
123 int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
124 * data message */
125 bool log_replication_commands = false;
126
127 /*
128 * State for WalSndWakeupRequest
129 */
130 bool wake_wal_senders = false;
131
132 /*
133 * xlogreader used for replication. Note that a WAL sender doing physical
134 * replication does not need xlogreader to read WAL, but it needs one to
135 * keep a state of its work.
136 */
137 static XLogReaderState *xlogreader = NULL;
138
139 /*
140 * These variables keep track of the state of the timeline we're currently
141 * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
142 * the timeline is not the latest timeline on this server, and the server's
143 * history forked off from that timeline at sendTimeLineValidUpto.
144 */
145 static TimeLineID sendTimeLine = 0;
146 static TimeLineID sendTimeLineNextTLI = 0;
147 static bool sendTimeLineIsHistoric = false;
148 static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
149
150 /*
151 * How far have we sent WAL already? This is also advertised in
152 * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
153 */
154 static XLogRecPtr sentPtr = InvalidXLogRecPtr;
155
156 /* Buffers for constructing outgoing messages and processing reply messages. */
157 static StringInfoData output_message;
158 static StringInfoData reply_message;
159 static StringInfoData tmpbuf;
160
161 /* Timestamp of last ProcessRepliesIfAny(). */
162 static TimestampTz last_processing = 0;
163
164 /*
165 * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
166 * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
167 */
168 static TimestampTz last_reply_timestamp = 0;
169
170 /* Have we sent a heartbeat message asking for reply, since last reply? */
171 static bool waiting_for_ping_response = false;
172
173 /*
174 * While streaming WAL in Copy mode, streamingDoneSending is set to true
175 * after we have sent CopyDone. We should not send any more CopyData messages
176 * after that. streamingDoneReceiving is set to true when we receive CopyDone
177 * from the other end. When both become true, it's time to exit Copy mode.
178 */
179 static bool streamingDoneSending;
180 static bool streamingDoneReceiving;
181
182 /* Are we there yet? */
183 static bool WalSndCaughtUp = false;
184
185 /* Flags set by signal handlers for later service in main loop */
186 static volatile sig_atomic_t got_SIGUSR2 = false;
187 static volatile sig_atomic_t got_STOPPING = false;
188
189 /*
190 * This is set while we are streaming. When not set
191 * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
192 * the main loop is responsible for checking got_STOPPING and terminating when
193 * it's set (after streaming any remaining WAL).
194 */
195 static volatile sig_atomic_t replication_active = false;
196
197 static LogicalDecodingContext *logical_decoding_ctx = NULL;
198
199 /* A sample associating a WAL location with the time it was written. */
200 typedef struct
201 {
202 XLogRecPtr lsn;
203 TimestampTz time;
204 } WalTimeSample;
205
206 /* The size of our buffer of time samples. */
207 #define LAG_TRACKER_BUFFER_SIZE 8192
208
209 /* A mechanism for tracking replication lag. */
210 typedef struct
211 {
212 XLogRecPtr last_lsn;
213 WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE];
214 int write_head;
215 int read_heads[NUM_SYNC_REP_WAIT_MODE];
216 WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
217 } LagTracker;
218
219 static LagTracker *lag_tracker;
220
221 /* Signal handlers */
222 static void WalSndLastCycleHandler(SIGNAL_ARGS);
223
224 /* Prototypes for private functions */
225 typedef void (*WalSndSendDataCallback) (void);
226 static void WalSndLoop(WalSndSendDataCallback send_data);
227 static void InitWalSenderSlot(void);
228 static void WalSndKill(int code, Datum arg);
229 static void WalSndShutdown(void) pg_attribute_noreturn();
230 static void XLogSendPhysical(void);
231 static void XLogSendLogical(void);
232 static void WalSndDone(WalSndSendDataCallback send_data);
233 static XLogRecPtr GetStandbyFlushRecPtr(void);
234 static void IdentifySystem(void);
235 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
236 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
237 static void StartReplication(StartReplicationCmd *cmd);
238 static void StartLogicalReplication(StartReplicationCmd *cmd);
239 static void ProcessStandbyMessage(void);
240 static void ProcessStandbyReplyMessage(void);
241 static void ProcessStandbyHSFeedbackMessage(void);
242 static void ProcessRepliesIfAny(void);
243 static void WalSndKeepalive(bool requestReply);
244 static void WalSndKeepaliveIfNecessary(void);
245 static void WalSndCheckTimeOut(void);
246 static long WalSndComputeSleeptime(TimestampTz now);
247 static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
248 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
249 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
250 static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
251 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
252 static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
253 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
254 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
255
256 static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
257 TimeLineID *tli_p);
258
259
260 /* Initialize walsender process before entering the main command loop */
261 void
InitWalSender(void)262 InitWalSender(void)
263 {
264 am_cascading_walsender = RecoveryInProgress();
265
266 /* Create a per-walsender data structure in shared memory */
267 InitWalSenderSlot();
268
269 /*
270 * We don't currently need any ResourceOwner in a walsender process, but
271 * if we did, we could call CreateAuxProcessResourceOwner here.
272 */
273
274 /*
275 * Let postmaster know that we're a WAL sender. Once we've declared us as
276 * a WAL sender process, postmaster will let us outlive the bgwriter and
277 * kill us last in the shutdown sequence, so we get a chance to stream all
278 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
279 * there's no going back, and we mustn't write any WAL records after this.
280 */
281 MarkPostmasterChildWalSender();
282 SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
283
284 /* Initialize empty timestamp buffer for lag tracking. */
285 lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
286 }
287
288 /*
289 * Clean up after an error.
290 *
291 * WAL sender processes don't use transactions like regular backends do.
292 * This function does any cleanup required after an error in a WAL sender
293 * process, similar to what transaction abort does in a regular backend.
294 */
295 void
WalSndErrorCleanup(void)296 WalSndErrorCleanup(void)
297 {
298 LWLockReleaseAll();
299 ConditionVariableCancelSleep();
300 pgstat_report_wait_end();
301
302 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
303 wal_segment_close(xlogreader);
304
305 if (MyReplicationSlot != NULL)
306 ReplicationSlotRelease();
307
308 ReplicationSlotCleanup();
309
310 replication_active = false;
311
312 /*
313 * If there is a transaction in progress, it will clean up our
314 * ResourceOwner, but if a replication command set up a resource owner
315 * without a transaction, we've got to clean that up now.
316 */
317 if (!IsTransactionOrTransactionBlock())
318 WalSndResourceCleanup(false);
319
320 if (got_STOPPING || got_SIGUSR2)
321 proc_exit(0);
322
323 /* Revert back to startup state */
324 WalSndSetState(WALSNDSTATE_STARTUP);
325 }
326
327 /*
328 * Clean up any ResourceOwner we created.
329 */
330 void
WalSndResourceCleanup(bool isCommit)331 WalSndResourceCleanup(bool isCommit)
332 {
333 ResourceOwner resowner;
334
335 if (CurrentResourceOwner == NULL)
336 return;
337
338 /*
339 * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
340 * in a local variable and clear it first.
341 */
342 resowner = CurrentResourceOwner;
343 CurrentResourceOwner = NULL;
344
345 /* Now we can release resources and delete it. */
346 ResourceOwnerRelease(resowner,
347 RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
348 ResourceOwnerRelease(resowner,
349 RESOURCE_RELEASE_LOCKS, isCommit, true);
350 ResourceOwnerRelease(resowner,
351 RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
352 ResourceOwnerDelete(resowner);
353 }
354
355 /*
356 * Handle a client's connection abort in an orderly manner.
357 */
358 static void
WalSndShutdown(void)359 WalSndShutdown(void)
360 {
361 /*
362 * Reset whereToSendOutput to prevent ereport from attempting to send any
363 * more messages to the standby.
364 */
365 if (whereToSendOutput == DestRemote)
366 whereToSendOutput = DestNone;
367
368 proc_exit(0);
369 abort(); /* keep the compiler quiet */
370 }
371
372 /*
373 * Handle the IDENTIFY_SYSTEM command.
374 */
375 static void
IdentifySystem(void)376 IdentifySystem(void)
377 {
378 char sysid[32];
379 char xloc[MAXFNAMELEN];
380 XLogRecPtr logptr;
381 char *dbname = NULL;
382 DestReceiver *dest;
383 TupOutputState *tstate;
384 TupleDesc tupdesc;
385 Datum values[4];
386 bool nulls[4];
387
388 /*
389 * Reply with a result set with one row, four columns. First col is system
390 * ID, second is timeline ID, third is current xlog location and the
391 * fourth contains the database name if we are connected to one.
392 */
393
394 snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
395 GetSystemIdentifier());
396
397 am_cascading_walsender = RecoveryInProgress();
398 if (am_cascading_walsender)
399 {
400 /* this also updates ThisTimeLineID */
401 logptr = GetStandbyFlushRecPtr();
402 }
403 else
404 logptr = GetFlushRecPtr();
405
406 snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
407
408 if (MyDatabaseId != InvalidOid)
409 {
410 MemoryContext cur = CurrentMemoryContext;
411
412 /* syscache access needs a transaction env. */
413 StartTransactionCommand();
414 /* make dbname live outside TX context */
415 MemoryContextSwitchTo(cur);
416 dbname = get_database_name(MyDatabaseId);
417 CommitTransactionCommand();
418 /* CommitTransactionCommand switches to TopMemoryContext */
419 MemoryContextSwitchTo(cur);
420 }
421
422 dest = CreateDestReceiver(DestRemoteSimple);
423 MemSet(nulls, false, sizeof(nulls));
424
425 /* need a tuple descriptor representing four columns */
426 tupdesc = CreateTemplateTupleDesc(4);
427 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
428 TEXTOID, -1, 0);
429 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
430 INT4OID, -1, 0);
431 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
432 TEXTOID, -1, 0);
433 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
434 TEXTOID, -1, 0);
435
436 /* prepare for projection of tuples */
437 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
438
439 /* column 1: system identifier */
440 values[0] = CStringGetTextDatum(sysid);
441
442 /* column 2: timeline */
443 values[1] = Int32GetDatum(ThisTimeLineID);
444
445 /* column 3: wal location */
446 values[2] = CStringGetTextDatum(xloc);
447
448 /* column 4: database name, or NULL if none */
449 if (dbname)
450 values[3] = CStringGetTextDatum(dbname);
451 else
452 nulls[3] = true;
453
454 /* send it to dest */
455 do_tup_output(tstate, values, nulls);
456
457 end_tup_output(tstate);
458 }
459
460
461 /*
462 * Handle TIMELINE_HISTORY command.
463 */
464 static void
SendTimeLineHistory(TimeLineHistoryCmd * cmd)465 SendTimeLineHistory(TimeLineHistoryCmd *cmd)
466 {
467 StringInfoData buf;
468 char histfname[MAXFNAMELEN];
469 char path[MAXPGPATH];
470 int fd;
471 off_t histfilelen;
472 off_t bytesleft;
473 Size len;
474
475 /*
476 * Reply with a result set with one row, and two columns. The first col is
477 * the name of the history file, 2nd is the contents.
478 */
479
480 TLHistoryFileName(histfname, cmd->timeline);
481 TLHistoryFilePath(path, cmd->timeline);
482
483 /* Send a RowDescription message */
484 pq_beginmessage(&buf, 'T');
485 pq_sendint16(&buf, 2); /* 2 fields */
486
487 /* first field */
488 pq_sendstring(&buf, "filename"); /* col name */
489 pq_sendint32(&buf, 0); /* table oid */
490 pq_sendint16(&buf, 0); /* attnum */
491 pq_sendint32(&buf, TEXTOID); /* type oid */
492 pq_sendint16(&buf, -1); /* typlen */
493 pq_sendint32(&buf, 0); /* typmod */
494 pq_sendint16(&buf, 0); /* format code */
495
496 /* second field */
497 pq_sendstring(&buf, "content"); /* col name */
498 pq_sendint32(&buf, 0); /* table oid */
499 pq_sendint16(&buf, 0); /* attnum */
500 pq_sendint32(&buf, TEXTOID); /* type oid */
501 pq_sendint16(&buf, -1); /* typlen */
502 pq_sendint32(&buf, 0); /* typmod */
503 pq_sendint16(&buf, 0); /* format code */
504 pq_endmessage(&buf);
505
506 /* Send a DataRow message */
507 pq_beginmessage(&buf, 'D');
508 pq_sendint16(&buf, 2); /* # of columns */
509 len = strlen(histfname);
510 pq_sendint32(&buf, len); /* col1 len */
511 pq_sendbytes(&buf, histfname, len);
512
513 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
514 if (fd < 0)
515 ereport(ERROR,
516 (errcode_for_file_access(),
517 errmsg("could not open file \"%s\": %m", path)));
518
519 /* Determine file length and send it to client */
520 histfilelen = lseek(fd, 0, SEEK_END);
521 if (histfilelen < 0)
522 ereport(ERROR,
523 (errcode_for_file_access(),
524 errmsg("could not seek to end of file \"%s\": %m", path)));
525 if (lseek(fd, 0, SEEK_SET) != 0)
526 ereport(ERROR,
527 (errcode_for_file_access(),
528 errmsg("could not seek to beginning of file \"%s\": %m", path)));
529
530 pq_sendint32(&buf, histfilelen); /* col2 len */
531
532 bytesleft = histfilelen;
533 while (bytesleft > 0)
534 {
535 PGAlignedBlock rbuf;
536 int nread;
537
538 pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
539 nread = read(fd, rbuf.data, sizeof(rbuf));
540 pgstat_report_wait_end();
541 if (nread < 0)
542 ereport(ERROR,
543 (errcode_for_file_access(),
544 errmsg("could not read file \"%s\": %m",
545 path)));
546 else if (nread == 0)
547 ereport(ERROR,
548 (errcode(ERRCODE_DATA_CORRUPTED),
549 errmsg("could not read file \"%s\": read %d of %zu",
550 path, nread, (Size) bytesleft)));
551
552 pq_sendbytes(&buf, rbuf.data, nread);
553 bytesleft -= nread;
554 }
555
556 if (CloseTransientFile(fd) != 0)
557 ereport(ERROR,
558 (errcode_for_file_access(),
559 errmsg("could not close file \"%s\": %m", path)));
560
561 pq_endmessage(&buf);
562 }
563
564 /*
565 * Handle START_REPLICATION command.
566 *
567 * At the moment, this never returns, but an ereport(ERROR) will take us back
568 * to the main loop.
569 */
570 static void
StartReplication(StartReplicationCmd * cmd)571 StartReplication(StartReplicationCmd *cmd)
572 {
573 StringInfoData buf;
574 XLogRecPtr FlushPtr;
575
576 if (ThisTimeLineID == 0)
577 ereport(ERROR,
578 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
579 errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
580
581 /* create xlogreader for physical replication */
582 xlogreader =
583 XLogReaderAllocate(wal_segment_size, NULL,
584 XL_ROUTINE(.segment_open = WalSndSegmentOpen,
585 .segment_close = wal_segment_close),
586 NULL);
587
588 if (!xlogreader)
589 ereport(ERROR,
590 (errcode(ERRCODE_OUT_OF_MEMORY),
591 errmsg("out of memory")));
592
593 /*
594 * We assume here that we're logging enough information in the WAL for
595 * log-shipping, since this is checked in PostmasterMain().
596 *
597 * NOTE: wal_level can only change at shutdown, so in most cases it is
598 * difficult for there to be WAL data that we can still see that was
599 * written at wal_level='minimal'.
600 */
601
602 if (cmd->slotname)
603 {
604 ReplicationSlotAcquire(cmd->slotname, true);
605 if (SlotIsLogical(MyReplicationSlot))
606 ereport(ERROR,
607 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
608 errmsg("cannot use a logical replication slot for physical replication")));
609
610 /*
611 * We don't need to verify the slot's restart_lsn here; instead we
612 * rely on the caller requesting the starting point to use. If the
613 * WAL segment doesn't exist, we'll fail later.
614 */
615 }
616
617 /*
618 * Select the timeline. If it was given explicitly by the client, use
619 * that. Otherwise use the timeline of the last replayed record, which is
620 * kept in ThisTimeLineID.
621 */
622 if (am_cascading_walsender)
623 {
624 /* this also updates ThisTimeLineID */
625 FlushPtr = GetStandbyFlushRecPtr();
626 }
627 else
628 FlushPtr = GetFlushRecPtr();
629
630 if (cmd->timeline != 0)
631 {
632 XLogRecPtr switchpoint;
633
634 sendTimeLine = cmd->timeline;
635 if (sendTimeLine == ThisTimeLineID)
636 {
637 sendTimeLineIsHistoric = false;
638 sendTimeLineValidUpto = InvalidXLogRecPtr;
639 }
640 else
641 {
642 List *timeLineHistory;
643
644 sendTimeLineIsHistoric = true;
645
646 /*
647 * Check that the timeline the client requested exists, and the
648 * requested start location is on that timeline.
649 */
650 timeLineHistory = readTimeLineHistory(ThisTimeLineID);
651 switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
652 &sendTimeLineNextTLI);
653 list_free_deep(timeLineHistory);
654
655 /*
656 * Found the requested timeline in the history. Check that
657 * requested startpoint is on that timeline in our history.
658 *
659 * This is quite loose on purpose. We only check that we didn't
660 * fork off the requested timeline before the switchpoint. We
661 * don't check that we switched *to* it before the requested
662 * starting point. This is because the client can legitimately
663 * request to start replication from the beginning of the WAL
664 * segment that contains switchpoint, but on the new timeline, so
665 * that it doesn't end up with a partial segment. If you ask for
666 * too old a starting point, you'll get an error later when we
667 * fail to find the requested WAL segment in pg_wal.
668 *
669 * XXX: we could be more strict here and only allow a startpoint
670 * that's older than the switchpoint, if it's still in the same
671 * WAL segment.
672 */
673 if (!XLogRecPtrIsInvalid(switchpoint) &&
674 switchpoint < cmd->startpoint)
675 {
676 ereport(ERROR,
677 (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
678 LSN_FORMAT_ARGS(cmd->startpoint),
679 cmd->timeline),
680 errdetail("This server's history forked from timeline %u at %X/%X.",
681 cmd->timeline,
682 LSN_FORMAT_ARGS(switchpoint))));
683 }
684 sendTimeLineValidUpto = switchpoint;
685 }
686 }
687 else
688 {
689 sendTimeLine = ThisTimeLineID;
690 sendTimeLineValidUpto = InvalidXLogRecPtr;
691 sendTimeLineIsHistoric = false;
692 }
693
694 streamingDoneSending = streamingDoneReceiving = false;
695
696 /* If there is nothing to stream, don't even enter COPY mode */
697 if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
698 {
699 /*
700 * When we first start replication the standby will be behind the
701 * primary. For some applications, for example synchronous
702 * replication, it is important to have a clear state for this initial
703 * catchup mode, so we can trigger actions when we change streaming
704 * state later. We may stay in this state for a long time, which is
705 * exactly why we want to be able to monitor whether or not we are
706 * still here.
707 */
708 WalSndSetState(WALSNDSTATE_CATCHUP);
709
710 /* Send a CopyBothResponse message, and start streaming */
711 pq_beginmessage(&buf, 'W');
712 pq_sendbyte(&buf, 0);
713 pq_sendint16(&buf, 0);
714 pq_endmessage(&buf);
715 pq_flush();
716
717 /*
718 * Don't allow a request to stream from a future point in WAL that
719 * hasn't been flushed to disk in this server yet.
720 */
721 if (FlushPtr < cmd->startpoint)
722 {
723 ereport(ERROR,
724 (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
725 LSN_FORMAT_ARGS(cmd->startpoint),
726 LSN_FORMAT_ARGS(FlushPtr))));
727 }
728
729 /* Start streaming from the requested point */
730 sentPtr = cmd->startpoint;
731
732 /* Initialize shared memory status, too */
733 SpinLockAcquire(&MyWalSnd->mutex);
734 MyWalSnd->sentPtr = sentPtr;
735 SpinLockRelease(&MyWalSnd->mutex);
736
737 SyncRepInitConfig();
738
739 /* Main loop of walsender */
740 replication_active = true;
741
742 WalSndLoop(XLogSendPhysical);
743
744 replication_active = false;
745 if (got_STOPPING)
746 proc_exit(0);
747 WalSndSetState(WALSNDSTATE_STARTUP);
748
749 Assert(streamingDoneSending && streamingDoneReceiving);
750 }
751
752 if (cmd->slotname)
753 ReplicationSlotRelease();
754
755 /*
756 * Copy is finished now. Send a single-row result set indicating the next
757 * timeline.
758 */
759 if (sendTimeLineIsHistoric)
760 {
761 char startpos_str[8 + 1 + 8 + 1];
762 DestReceiver *dest;
763 TupOutputState *tstate;
764 TupleDesc tupdesc;
765 Datum values[2];
766 bool nulls[2];
767
768 snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
769 LSN_FORMAT_ARGS(sendTimeLineValidUpto));
770
771 dest = CreateDestReceiver(DestRemoteSimple);
772 MemSet(nulls, false, sizeof(nulls));
773
774 /*
775 * Need a tuple descriptor representing two columns. int8 may seem
776 * like a surprising data type for this, but in theory int4 would not
777 * be wide enough for this, as TimeLineID is unsigned.
778 */
779 tupdesc = CreateTemplateTupleDesc(2);
780 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
781 INT8OID, -1, 0);
782 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
783 TEXTOID, -1, 0);
784
785 /* prepare for projection of tuple */
786 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
787
788 values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
789 values[1] = CStringGetTextDatum(startpos_str);
790
791 /* send it to dest */
792 do_tup_output(tstate, values, nulls);
793
794 end_tup_output(tstate);
795 }
796
797 /* Send CommandComplete message */
798 EndReplicationCommand("START_STREAMING");
799 }
800
801 /*
802 * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
803 * walsender process.
804 *
805 * Inside the walsender we can do better than read_local_xlog_page,
806 * which has to do a plain sleep/busy loop, because the walsender's latch gets
807 * set every time WAL is flushed.
808 */
809 static int
logical_read_xlog_page(XLogReaderState * state,XLogRecPtr targetPagePtr,int reqLen,XLogRecPtr targetRecPtr,char * cur_page)810 logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
811 XLogRecPtr targetRecPtr, char *cur_page)
812 {
813 XLogRecPtr flushptr;
814 int count;
815 WALReadError errinfo;
816 XLogSegNo segno;
817
818 XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
819 sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
820 sendTimeLine = state->currTLI;
821 sendTimeLineValidUpto = state->currTLIValidUntil;
822 sendTimeLineNextTLI = state->nextTLI;
823
824 /* make sure we have enough WAL available */
825 flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
826
827 /* fail if not (implies we are going to shut down) */
828 if (flushptr < targetPagePtr + reqLen)
829 return -1;
830
831 if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
832 count = XLOG_BLCKSZ; /* more than one block available */
833 else
834 count = flushptr - targetPagePtr; /* part of the page available */
835
836 /* now actually read the data, we know it's there */
837 if (!WALRead(state,
838 cur_page,
839 targetPagePtr,
840 XLOG_BLCKSZ,
841 state->seg.ws_tli, /* Pass the current TLI because only
842 * WalSndSegmentOpen controls whether new
843 * TLI is needed. */
844 &errinfo))
845 WALReadRaiseError(&errinfo);
846
847 /*
848 * After reading into the buffer, check that what we read was valid. We do
849 * this after reading, because even though the segment was present when we
850 * opened it, it might get recycled or removed while we read it. The
851 * read() succeeds in that case, but the data we tried to read might
852 * already have been overwritten with new WAL records.
853 */
854 XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
855 CheckXLogRemoved(segno, state->seg.ws_tli);
856
857 return count;
858 }
859
860 /*
861 * Process extra options given to CREATE_REPLICATION_SLOT.
862 */
863 static void
parseCreateReplSlotOptions(CreateReplicationSlotCmd * cmd,bool * reserve_wal,CRSSnapshotAction * snapshot_action)864 parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
865 bool *reserve_wal,
866 CRSSnapshotAction *snapshot_action)
867 {
868 ListCell *lc;
869 bool snapshot_action_given = false;
870 bool reserve_wal_given = false;
871
872 /* Parse options */
873 foreach(lc, cmd->options)
874 {
875 DefElem *defel = (DefElem *) lfirst(lc);
876
877 if (strcmp(defel->defname, "export_snapshot") == 0)
878 {
879 if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
880 ereport(ERROR,
881 (errcode(ERRCODE_SYNTAX_ERROR),
882 errmsg("conflicting or redundant options")));
883
884 snapshot_action_given = true;
885 *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
886 CRS_NOEXPORT_SNAPSHOT;
887 }
888 else if (strcmp(defel->defname, "use_snapshot") == 0)
889 {
890 if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
891 ereport(ERROR,
892 (errcode(ERRCODE_SYNTAX_ERROR),
893 errmsg("conflicting or redundant options")));
894
895 snapshot_action_given = true;
896 *snapshot_action = CRS_USE_SNAPSHOT;
897 }
898 else if (strcmp(defel->defname, "reserve_wal") == 0)
899 {
900 if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
901 ereport(ERROR,
902 (errcode(ERRCODE_SYNTAX_ERROR),
903 errmsg("conflicting or redundant options")));
904
905 reserve_wal_given = true;
906 *reserve_wal = true;
907 }
908 else
909 elog(ERROR, "unrecognized option: %s", defel->defname);
910 }
911 }
912
913 /*
914 * Create a new replication slot.
915 */
916 static void
CreateReplicationSlot(CreateReplicationSlotCmd * cmd)917 CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
918 {
919 const char *snapshot_name = NULL;
920 char xloc[MAXFNAMELEN];
921 char *slot_name;
922 bool reserve_wal = false;
923 CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
924 DestReceiver *dest;
925 TupOutputState *tstate;
926 TupleDesc tupdesc;
927 Datum values[4];
928 bool nulls[4];
929
930 Assert(!MyReplicationSlot);
931
932 parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
933
934 /* setup state for WalSndSegmentOpen */
935 sendTimeLineIsHistoric = false;
936 sendTimeLine = ThisTimeLineID;
937
938 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
939 {
940 ReplicationSlotCreate(cmd->slotname, false,
941 cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
942 false);
943 }
944 else
945 {
946 CheckLogicalDecodingRequirements();
947
948 /*
949 * Initially create persistent slot as ephemeral - that allows us to
950 * nicely handle errors during initialization because it'll get
951 * dropped if this transaction fails. We'll make it persistent at the
952 * end. Temporary slots can be created as temporary from beginning as
953 * they get dropped on error as well.
954 */
955 ReplicationSlotCreate(cmd->slotname, true,
956 cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
957 false);
958 }
959
960 if (cmd->kind == REPLICATION_KIND_LOGICAL)
961 {
962 LogicalDecodingContext *ctx;
963 bool need_full_snapshot = false;
964
965 /*
966 * Do options check early so that we can bail before calling the
967 * DecodingContextFindStartpoint which can take long time.
968 */
969 if (snapshot_action == CRS_EXPORT_SNAPSHOT)
970 {
971 if (IsTransactionBlock())
972 ereport(ERROR,
973 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
974 (errmsg("%s must not be called inside a transaction",
975 "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
976
977 need_full_snapshot = true;
978 }
979 else if (snapshot_action == CRS_USE_SNAPSHOT)
980 {
981 if (!IsTransactionBlock())
982 ereport(ERROR,
983 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
984 (errmsg("%s must be called inside a transaction",
985 "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
986
987 if (XactIsoLevel != XACT_REPEATABLE_READ)
988 ereport(ERROR,
989 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
990 (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
991 "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
992
993 if (FirstSnapshotSet)
994 ereport(ERROR,
995 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
996 (errmsg("%s must be called before any query",
997 "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
998
999 if (IsSubTransaction())
1000 ereport(ERROR,
1001 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1002 (errmsg("%s must not be called in a subtransaction",
1003 "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
1004
1005 need_full_snapshot = true;
1006 }
1007
1008 ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1009 InvalidXLogRecPtr,
1010 XL_ROUTINE(.page_read = logical_read_xlog_page,
1011 .segment_open = WalSndSegmentOpen,
1012 .segment_close = wal_segment_close),
1013 WalSndPrepareWrite, WalSndWriteData,
1014 WalSndUpdateProgress);
1015
1016 /*
1017 * Signal that we don't need the timeout mechanism. We're just
1018 * creating the replication slot and don't yet accept feedback
1019 * messages or send keepalives. As we possibly need to wait for
1020 * further WAL the walsender would otherwise possibly be killed too
1021 * soon.
1022 */
1023 last_reply_timestamp = 0;
1024
1025 /* build initial snapshot, might take a while */
1026 DecodingContextFindStartpoint(ctx);
1027
1028 /*
1029 * Export or use the snapshot if we've been asked to do so.
1030 *
1031 * NB. We will convert the snapbuild.c kind of snapshot to normal
1032 * snapshot when doing this.
1033 */
1034 if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1035 {
1036 snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1037 }
1038 else if (snapshot_action == CRS_USE_SNAPSHOT)
1039 {
1040 Snapshot snap;
1041
1042 snap = SnapBuildInitialSnapshot(ctx->snapshot_builder);
1043 RestoreTransactionSnapshot(snap, MyProc);
1044 }
1045
1046 /* don't need the decoding context anymore */
1047 FreeDecodingContext(ctx);
1048
1049 if (!cmd->temporary)
1050 ReplicationSlotPersist();
1051 }
1052 else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1053 {
1054 ReplicationSlotReserveWal();
1055
1056 ReplicationSlotMarkDirty();
1057
1058 /* Write this slot to disk if it's a permanent one. */
1059 if (!cmd->temporary)
1060 ReplicationSlotSave();
1061 }
1062
1063 snprintf(xloc, sizeof(xloc), "%X/%X",
1064 LSN_FORMAT_ARGS(MyReplicationSlot->data.confirmed_flush));
1065
1066 dest = CreateDestReceiver(DestRemoteSimple);
1067 MemSet(nulls, false, sizeof(nulls));
1068
1069 /*----------
1070 * Need a tuple descriptor representing four columns:
1071 * - first field: the slot name
1072 * - second field: LSN at which we became consistent
1073 * - third field: exported snapshot's name
1074 * - fourth field: output plugin
1075 *----------
1076 */
1077 tupdesc = CreateTemplateTupleDesc(4);
1078 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1079 TEXTOID, -1, 0);
1080 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1081 TEXTOID, -1, 0);
1082 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1083 TEXTOID, -1, 0);
1084 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1085 TEXTOID, -1, 0);
1086
1087 /* prepare for projection of tuples */
1088 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1089
1090 /* slot_name */
1091 slot_name = NameStr(MyReplicationSlot->data.name);
1092 values[0] = CStringGetTextDatum(slot_name);
1093
1094 /* consistent wal location */
1095 values[1] = CStringGetTextDatum(xloc);
1096
1097 /* snapshot name, or NULL if none */
1098 if (snapshot_name != NULL)
1099 values[2] = CStringGetTextDatum(snapshot_name);
1100 else
1101 nulls[2] = true;
1102
1103 /* plugin, or NULL if none */
1104 if (cmd->plugin != NULL)
1105 values[3] = CStringGetTextDatum(cmd->plugin);
1106 else
1107 nulls[3] = true;
1108
1109 /* send it to dest */
1110 do_tup_output(tstate, values, nulls);
1111 end_tup_output(tstate);
1112
1113 ReplicationSlotRelease();
1114 }
1115
1116 /*
1117 * Get rid of a replication slot that is no longer wanted.
1118 */
1119 static void
DropReplicationSlot(DropReplicationSlotCmd * cmd)1120 DropReplicationSlot(DropReplicationSlotCmd *cmd)
1121 {
1122 ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1123 }
1124
1125 /*
1126 * Load previously initiated logical slot and prepare for sending data (via
1127 * WalSndLoop).
1128 */
1129 static void
StartLogicalReplication(StartReplicationCmd * cmd)1130 StartLogicalReplication(StartReplicationCmd *cmd)
1131 {
1132 StringInfoData buf;
1133 QueryCompletion qc;
1134
1135 /* make sure that our requirements are still fulfilled */
1136 CheckLogicalDecodingRequirements();
1137
1138 Assert(!MyReplicationSlot);
1139
1140 ReplicationSlotAcquire(cmd->slotname, true);
1141
1142 if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
1143 ereport(ERROR,
1144 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1145 errmsg("cannot read from logical replication slot \"%s\"",
1146 cmd->slotname),
1147 errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
1148
1149 /*
1150 * Force a disconnect, so that the decoding code doesn't need to care
1151 * about an eventual switch from running in recovery, to running in a
1152 * normal environment. Client code is expected to handle reconnects.
1153 */
1154 if (am_cascading_walsender && !RecoveryInProgress())
1155 {
1156 ereport(LOG,
1157 (errmsg("terminating walsender process after promotion")));
1158 got_STOPPING = true;
1159 }
1160
1161 /*
1162 * Create our decoding context, making it start at the previously ack'ed
1163 * position.
1164 *
1165 * Do this before sending a CopyBothResponse message, so that any errors
1166 * are reported early.
1167 */
1168 logical_decoding_ctx =
1169 CreateDecodingContext(cmd->startpoint, cmd->options, false,
1170 XL_ROUTINE(.page_read = logical_read_xlog_page,
1171 .segment_open = WalSndSegmentOpen,
1172 .segment_close = wal_segment_close),
1173 WalSndPrepareWrite, WalSndWriteData,
1174 WalSndUpdateProgress);
1175 xlogreader = logical_decoding_ctx->reader;
1176
1177 WalSndSetState(WALSNDSTATE_CATCHUP);
1178
1179 /* Send a CopyBothResponse message, and start streaming */
1180 pq_beginmessage(&buf, 'W');
1181 pq_sendbyte(&buf, 0);
1182 pq_sendint16(&buf, 0);
1183 pq_endmessage(&buf);
1184 pq_flush();
1185
1186 /* Start reading WAL from the oldest required WAL. */
1187 XLogBeginRead(logical_decoding_ctx->reader,
1188 MyReplicationSlot->data.restart_lsn);
1189
1190 /*
1191 * Report the location after which we'll send out further commits as the
1192 * current sentPtr.
1193 */
1194 sentPtr = MyReplicationSlot->data.confirmed_flush;
1195
1196 /* Also update the sent position status in shared memory */
1197 SpinLockAcquire(&MyWalSnd->mutex);
1198 MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn;
1199 SpinLockRelease(&MyWalSnd->mutex);
1200
1201 replication_active = true;
1202
1203 SyncRepInitConfig();
1204
1205 /* Main loop of walsender */
1206 WalSndLoop(XLogSendLogical);
1207
1208 FreeDecodingContext(logical_decoding_ctx);
1209 ReplicationSlotRelease();
1210
1211 replication_active = false;
1212 if (got_STOPPING)
1213 proc_exit(0);
1214 WalSndSetState(WALSNDSTATE_STARTUP);
1215
1216 /* Get out of COPY mode (CommandComplete). */
1217 SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1218 EndCommand(&qc, DestRemote, false);
1219 }
1220
1221 /*
1222 * LogicalDecodingContext 'prepare_write' callback.
1223 *
1224 * Prepare a write into a StringInfo.
1225 *
1226 * Don't do anything lasting in here, it's quite possible that nothing will be done
1227 * with the data.
1228 */
1229 static void
WalSndPrepareWrite(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid,bool last_write)1230 WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
1231 {
1232 /* can't have sync rep confused by sending the same LSN several times */
1233 if (!last_write)
1234 lsn = InvalidXLogRecPtr;
1235
1236 resetStringInfo(ctx->out);
1237
1238 pq_sendbyte(ctx->out, 'w');
1239 pq_sendint64(ctx->out, lsn); /* dataStart */
1240 pq_sendint64(ctx->out, lsn); /* walEnd */
1241
1242 /*
1243 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1244 * reserve space here.
1245 */
1246 pq_sendint64(ctx->out, 0); /* sendtime */
1247 }
1248
1249 /*
1250 * LogicalDecodingContext 'write' callback.
1251 *
1252 * Actually write out data previously prepared by WalSndPrepareWrite out to
1253 * the network. Take as long as needed, but process replies from the other
1254 * side and check timeouts during that.
1255 */
1256 static void
WalSndWriteData(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid,bool last_write)1257 WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
1258 bool last_write)
1259 {
1260 TimestampTz now;
1261
1262 /*
1263 * Fill the send timestamp last, so that it is taken as late as possible.
1264 * This is somewhat ugly, but the protocol is set as it's already used for
1265 * several releases by streaming physical replication.
1266 */
1267 resetStringInfo(&tmpbuf);
1268 now = GetCurrentTimestamp();
1269 pq_sendint64(&tmpbuf, now);
1270 memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1271 tmpbuf.data, sizeof(int64));
1272
1273 /* output previously gathered data in a CopyData packet */
1274 pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1275
1276 CHECK_FOR_INTERRUPTS();
1277
1278 /* Try to flush pending output to the client */
1279 if (pq_flush_if_writable() != 0)
1280 WalSndShutdown();
1281
1282 /* Try taking fast path unless we get too close to walsender timeout. */
1283 if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
1284 wal_sender_timeout / 2) &&
1285 !pq_is_send_pending())
1286 {
1287 return;
1288 }
1289
1290 /* If we have pending write here, go to slow path */
1291 for (;;)
1292 {
1293 long sleeptime;
1294
1295 /* Check for input from the client */
1296 ProcessRepliesIfAny();
1297
1298 /* die if timeout was reached */
1299 WalSndCheckTimeOut();
1300
1301 /* Send keepalive if the time has come */
1302 WalSndKeepaliveIfNecessary();
1303
1304 if (!pq_is_send_pending())
1305 break;
1306
1307 sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1308
1309 /* Sleep until something happens or we time out */
1310 WalSndWait(WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE, sleeptime,
1311 WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1312
1313 /* Clear any already-pending wakeups */
1314 ResetLatch(MyLatch);
1315
1316 CHECK_FOR_INTERRUPTS();
1317
1318 /* Process any requests or signals received recently */
1319 if (ConfigReloadPending)
1320 {
1321 ConfigReloadPending = false;
1322 ProcessConfigFile(PGC_SIGHUP);
1323 SyncRepInitConfig();
1324 }
1325
1326 /* Try to flush pending output to the client */
1327 if (pq_flush_if_writable() != 0)
1328 WalSndShutdown();
1329 }
1330
1331 /* reactivate latch so WalSndLoop knows to continue */
1332 SetLatch(MyLatch);
1333 }
1334
1335 /*
1336 * LogicalDecodingContext 'update_progress' callback.
1337 *
1338 * Write the current position to the lag tracker (see XLogSendPhysical).
1339 */
1340 static void
WalSndUpdateProgress(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid)1341 WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
1342 {
1343 static TimestampTz sendTime = 0;
1344 TimestampTz now = GetCurrentTimestamp();
1345
1346 /*
1347 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1348 * avoid flooding the lag tracker when we commit frequently.
1349 */
1350 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1351 if (!TimestampDifferenceExceeds(sendTime, now,
1352 WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1353 return;
1354
1355 LagTrackerWrite(lsn, now);
1356 sendTime = now;
1357 }
1358
1359 /*
1360 * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1361 *
1362 * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1363 * if we detect a shutdown request (either from postmaster or client)
1364 * we will return early, so caller must always check.
1365 */
1366 static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)1367 WalSndWaitForWal(XLogRecPtr loc)
1368 {
1369 int wakeEvents;
1370 static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1371
1372 /*
1373 * Fast path to avoid acquiring the spinlock in case we already know we
1374 * have enough WAL available. This is particularly interesting if we're
1375 * far behind.
1376 */
1377 if (RecentFlushPtr != InvalidXLogRecPtr &&
1378 loc <= RecentFlushPtr)
1379 return RecentFlushPtr;
1380
1381 /* Get a more recent flush pointer. */
1382 if (!RecoveryInProgress())
1383 RecentFlushPtr = GetFlushRecPtr();
1384 else
1385 RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1386
1387 for (;;)
1388 {
1389 long sleeptime;
1390
1391 /* Clear any already-pending wakeups */
1392 ResetLatch(MyLatch);
1393
1394 CHECK_FOR_INTERRUPTS();
1395
1396 /* Process any requests or signals received recently */
1397 if (ConfigReloadPending)
1398 {
1399 ConfigReloadPending = false;
1400 ProcessConfigFile(PGC_SIGHUP);
1401 SyncRepInitConfig();
1402 }
1403
1404 /* Check for input from the client */
1405 ProcessRepliesIfAny();
1406
1407 /*
1408 * If we're shutting down, trigger pending WAL to be written out,
1409 * otherwise we'd possibly end up waiting for WAL that never gets
1410 * written, because walwriter has shut down already.
1411 */
1412 if (got_STOPPING)
1413 XLogBackgroundFlush();
1414
1415 /* Update our idea of the currently flushed position. */
1416 if (!RecoveryInProgress())
1417 RecentFlushPtr = GetFlushRecPtr();
1418 else
1419 RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1420
1421 /*
1422 * If postmaster asked us to stop, don't wait anymore.
1423 *
1424 * It's important to do this check after the recomputation of
1425 * RecentFlushPtr, so we can send all remaining data before shutting
1426 * down.
1427 */
1428 if (got_STOPPING)
1429 break;
1430
1431 /*
1432 * We only send regular messages to the client for full decoded
1433 * transactions, but a synchronous replication and walsender shutdown
1434 * possibly are waiting for a later location. So, before sleeping, we
1435 * send a ping containing the flush location. If the receiver is
1436 * otherwise idle, this keepalive will trigger a reply. Processing the
1437 * reply will update these MyWalSnd locations.
1438 */
1439 if (MyWalSnd->flush < sentPtr &&
1440 MyWalSnd->write < sentPtr &&
1441 !waiting_for_ping_response)
1442 WalSndKeepalive(false);
1443
1444 /* check whether we're done */
1445 if (loc <= RecentFlushPtr)
1446 break;
1447
1448 /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1449 WalSndCaughtUp = true;
1450
1451 /*
1452 * Try to flush any pending output to the client.
1453 */
1454 if (pq_flush_if_writable() != 0)
1455 WalSndShutdown();
1456
1457 /*
1458 * If we have received CopyDone from the client, sent CopyDone
1459 * ourselves, and the output buffer is empty, it's time to exit
1460 * streaming, so fail the current WAL fetch request.
1461 */
1462 if (streamingDoneReceiving && streamingDoneSending &&
1463 !pq_is_send_pending())
1464 break;
1465
1466 /* die if timeout was reached */
1467 WalSndCheckTimeOut();
1468
1469 /* Send keepalive if the time has come */
1470 WalSndKeepaliveIfNecessary();
1471
1472 /*
1473 * Sleep until something happens or we time out. Also wait for the
1474 * socket becoming writable, if there's still pending output.
1475 * Otherwise we might sit on sendable output data while waiting for
1476 * new WAL to be generated. (But if we have nothing to send, we don't
1477 * want to wake on socket-writable.)
1478 */
1479 sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1480
1481 wakeEvents = WL_SOCKET_READABLE;
1482
1483 if (pq_is_send_pending())
1484 wakeEvents |= WL_SOCKET_WRITEABLE;
1485
1486 WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1487 }
1488
1489 /* reactivate latch so WalSndLoop knows to continue */
1490 SetLatch(MyLatch);
1491 return RecentFlushPtr;
1492 }
1493
1494 /*
1495 * Execute an incoming replication command.
1496 *
1497 * Returns true if the cmd_string was recognized as WalSender command, false
1498 * if not.
1499 */
1500 bool
exec_replication_command(const char * cmd_string)1501 exec_replication_command(const char *cmd_string)
1502 {
1503 int parse_rc;
1504 Node *cmd_node;
1505 const char *cmdtag;
1506 MemoryContext cmd_context;
1507 MemoryContext old_context;
1508
1509 /*
1510 * If WAL sender has been told that shutdown is getting close, switch its
1511 * status accordingly to handle the next replication commands correctly.
1512 */
1513 if (got_STOPPING)
1514 WalSndSetState(WALSNDSTATE_STOPPING);
1515
1516 /*
1517 * Throw error if in stopping mode. We need prevent commands that could
1518 * generate WAL while the shutdown checkpoint is being written. To be
1519 * safe, we just prohibit all new commands.
1520 */
1521 if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1522 ereport(ERROR,
1523 (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1524
1525 /*
1526 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1527 * command arrives. Clean up the old stuff if there's anything.
1528 */
1529 SnapBuildClearExportedSnapshot();
1530
1531 CHECK_FOR_INTERRUPTS();
1532
1533 /*
1534 * Parse the command.
1535 */
1536 cmd_context = AllocSetContextCreate(CurrentMemoryContext,
1537 "Replication command context",
1538 ALLOCSET_DEFAULT_SIZES);
1539 old_context = MemoryContextSwitchTo(cmd_context);
1540
1541 replication_scanner_init(cmd_string);
1542 parse_rc = replication_yyparse();
1543 if (parse_rc != 0)
1544 ereport(ERROR,
1545 (errcode(ERRCODE_SYNTAX_ERROR),
1546 errmsg_internal("replication command parser returned %d",
1547 parse_rc)));
1548 replication_scanner_finish();
1549
1550 cmd_node = replication_parse_result;
1551
1552 /*
1553 * If it's a SQL command, just clean up our mess and return false; the
1554 * caller will take care of executing it.
1555 */
1556 if (IsA(cmd_node, SQLCmd))
1557 {
1558 if (MyDatabaseId == InvalidOid)
1559 ereport(ERROR,
1560 (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1561
1562 MemoryContextSwitchTo(old_context);
1563 MemoryContextDelete(cmd_context);
1564
1565 /* Tell the caller that this wasn't a WalSender command. */
1566 return false;
1567 }
1568
1569 /*
1570 * Report query to various monitoring facilities. For this purpose, we
1571 * report replication commands just like SQL commands.
1572 */
1573 debug_query_string = cmd_string;
1574
1575 pgstat_report_activity(STATE_RUNNING, cmd_string);
1576
1577 /*
1578 * Log replication command if log_replication_commands is enabled. Even
1579 * when it's disabled, log the command with DEBUG1 level for backward
1580 * compatibility.
1581 */
1582 ereport(log_replication_commands ? LOG : DEBUG1,
1583 (errmsg("received replication command: %s", cmd_string)));
1584
1585 /*
1586 * Disallow replication commands in aborted transaction blocks.
1587 */
1588 if (IsAbortedTransactionBlockState())
1589 ereport(ERROR,
1590 (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1591 errmsg("current transaction is aborted, "
1592 "commands ignored until end of transaction block")));
1593
1594 CHECK_FOR_INTERRUPTS();
1595
1596 /*
1597 * Allocate buffers that will be used for each outgoing and incoming
1598 * message. We do this just once per command to reduce palloc overhead.
1599 */
1600 initStringInfo(&output_message);
1601 initStringInfo(&reply_message);
1602 initStringInfo(&tmpbuf);
1603
1604 switch (cmd_node->type)
1605 {
1606 case T_IdentifySystemCmd:
1607 cmdtag = "IDENTIFY_SYSTEM";
1608 set_ps_display(cmdtag);
1609 IdentifySystem();
1610 EndReplicationCommand(cmdtag);
1611 break;
1612
1613 case T_BaseBackupCmd:
1614 cmdtag = "BASE_BACKUP";
1615 set_ps_display(cmdtag);
1616 PreventInTransactionBlock(true, cmdtag);
1617 SendBaseBackup((BaseBackupCmd *) cmd_node);
1618 EndReplicationCommand(cmdtag);
1619 break;
1620
1621 case T_CreateReplicationSlotCmd:
1622 cmdtag = "CREATE_REPLICATION_SLOT";
1623 set_ps_display(cmdtag);
1624 CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
1625 EndReplicationCommand(cmdtag);
1626 break;
1627
1628 case T_DropReplicationSlotCmd:
1629 cmdtag = "DROP_REPLICATION_SLOT";
1630 set_ps_display(cmdtag);
1631 DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
1632 EndReplicationCommand(cmdtag);
1633 break;
1634
1635 case T_StartReplicationCmd:
1636 {
1637 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1638
1639 cmdtag = "START_REPLICATION";
1640 set_ps_display(cmdtag);
1641 PreventInTransactionBlock(true, cmdtag);
1642
1643 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1644 StartReplication(cmd);
1645 else
1646 StartLogicalReplication(cmd);
1647
1648 /* dupe, but necessary per libpqrcv_endstreaming */
1649 EndReplicationCommand(cmdtag);
1650
1651 Assert(xlogreader != NULL);
1652 break;
1653 }
1654
1655 case T_TimeLineHistoryCmd:
1656 cmdtag = "TIMELINE_HISTORY";
1657 set_ps_display(cmdtag);
1658 PreventInTransactionBlock(true, cmdtag);
1659 SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
1660 EndReplicationCommand(cmdtag);
1661 break;
1662
1663 case T_VariableShowStmt:
1664 {
1665 DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
1666 VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1667
1668 cmdtag = "SHOW";
1669 set_ps_display(cmdtag);
1670
1671 /* syscache access needs a transaction environment */
1672 StartTransactionCommand();
1673 GetPGVariable(n->name, dest);
1674 CommitTransactionCommand();
1675 EndReplicationCommand(cmdtag);
1676 }
1677 break;
1678
1679 default:
1680 elog(ERROR, "unrecognized replication command node tag: %u",
1681 cmd_node->type);
1682 }
1683
1684 /* done */
1685 MemoryContextSwitchTo(old_context);
1686 MemoryContextDelete(cmd_context);
1687
1688 /*
1689 * We need not update ps display or pg_stat_activity, because PostgresMain
1690 * will reset those to "idle". But we must reset debug_query_string to
1691 * ensure it doesn't become a dangling pointer.
1692 */
1693 debug_query_string = NULL;
1694
1695 return true;
1696 }
1697
1698 /*
1699 * Process any incoming messages while streaming. Also checks if the remote
1700 * end has closed the connection.
1701 */
1702 static void
ProcessRepliesIfAny(void)1703 ProcessRepliesIfAny(void)
1704 {
1705 unsigned char firstchar;
1706 int maxmsglen;
1707 int r;
1708 bool received = false;
1709
1710 last_processing = GetCurrentTimestamp();
1711
1712 /*
1713 * If we already received a CopyDone from the frontend, any subsequent
1714 * message is the beginning of a new command, and should be processed in
1715 * the main processing loop.
1716 */
1717 while (!streamingDoneReceiving)
1718 {
1719 pq_startmsgread();
1720 r = pq_getbyte_if_available(&firstchar);
1721 if (r < 0)
1722 {
1723 /* unexpected error or EOF */
1724 ereport(COMMERROR,
1725 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1726 errmsg("unexpected EOF on standby connection")));
1727 proc_exit(0);
1728 }
1729 if (r == 0)
1730 {
1731 /* no data available without blocking */
1732 pq_endmsgread();
1733 break;
1734 }
1735
1736 /* Validate message type and set packet size limit */
1737 switch (firstchar)
1738 {
1739 case 'd':
1740 maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
1741 break;
1742 case 'c':
1743 case 'X':
1744 maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
1745 break;
1746 default:
1747 ereport(FATAL,
1748 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1749 errmsg("invalid standby message type \"%c\"",
1750 firstchar)));
1751 maxmsglen = 0; /* keep compiler quiet */
1752 break;
1753 }
1754
1755 /* Read the message contents */
1756 resetStringInfo(&reply_message);
1757 if (pq_getmessage(&reply_message, maxmsglen))
1758 {
1759 ereport(COMMERROR,
1760 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1761 errmsg("unexpected EOF on standby connection")));
1762 proc_exit(0);
1763 }
1764
1765 /* ... and process it */
1766 switch (firstchar)
1767 {
1768 /*
1769 * 'd' means a standby reply wrapped in a CopyData packet.
1770 */
1771 case 'd':
1772 ProcessStandbyMessage();
1773 received = true;
1774 break;
1775
1776 /*
1777 * CopyDone means the standby requested to finish streaming.
1778 * Reply with CopyDone, if we had not sent that already.
1779 */
1780 case 'c':
1781 if (!streamingDoneSending)
1782 {
1783 pq_putmessage_noblock('c', NULL, 0);
1784 streamingDoneSending = true;
1785 }
1786
1787 streamingDoneReceiving = true;
1788 received = true;
1789 break;
1790
1791 /*
1792 * 'X' means that the standby is closing down the socket.
1793 */
1794 case 'X':
1795 proc_exit(0);
1796
1797 default:
1798 Assert(false); /* NOT REACHED */
1799 }
1800 }
1801
1802 /*
1803 * Save the last reply timestamp if we've received at least one reply.
1804 */
1805 if (received)
1806 {
1807 last_reply_timestamp = last_processing;
1808 waiting_for_ping_response = false;
1809 }
1810 }
1811
1812 /*
1813 * Process a status update message received from standby.
1814 */
1815 static void
ProcessStandbyMessage(void)1816 ProcessStandbyMessage(void)
1817 {
1818 char msgtype;
1819
1820 /*
1821 * Check message type from the first byte.
1822 */
1823 msgtype = pq_getmsgbyte(&reply_message);
1824
1825 switch (msgtype)
1826 {
1827 case 'r':
1828 ProcessStandbyReplyMessage();
1829 break;
1830
1831 case 'h':
1832 ProcessStandbyHSFeedbackMessage();
1833 break;
1834
1835 default:
1836 ereport(COMMERROR,
1837 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1838 errmsg("unexpected message type \"%c\"", msgtype)));
1839 proc_exit(0);
1840 }
1841 }
1842
1843 /*
1844 * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1845 */
1846 static void
PhysicalConfirmReceivedLocation(XLogRecPtr lsn)1847 PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
1848 {
1849 bool changed = false;
1850 ReplicationSlot *slot = MyReplicationSlot;
1851
1852 Assert(lsn != InvalidXLogRecPtr);
1853 SpinLockAcquire(&slot->mutex);
1854 if (slot->data.restart_lsn != lsn)
1855 {
1856 changed = true;
1857 slot->data.restart_lsn = lsn;
1858 }
1859 SpinLockRelease(&slot->mutex);
1860
1861 if (changed)
1862 {
1863 ReplicationSlotMarkDirty();
1864 ReplicationSlotsComputeRequiredLSN();
1865 }
1866
1867 /*
1868 * One could argue that the slot should be saved to disk now, but that'd
1869 * be energy wasted - the worst lost information can do here is give us
1870 * wrong information in a statistics view - we'll just potentially be more
1871 * conservative in removing files.
1872 */
1873 }
1874
1875 /*
1876 * Regular reply from standby advising of WAL locations on standby server.
1877 */
1878 static void
ProcessStandbyReplyMessage(void)1879 ProcessStandbyReplyMessage(void)
1880 {
1881 XLogRecPtr writePtr,
1882 flushPtr,
1883 applyPtr;
1884 bool replyRequested;
1885 TimeOffset writeLag,
1886 flushLag,
1887 applyLag;
1888 bool clearLagTimes;
1889 TimestampTz now;
1890 TimestampTz replyTime;
1891
1892 static bool fullyAppliedLastTime = false;
1893
1894 /* the caller already consumed the msgtype byte */
1895 writePtr = pq_getmsgint64(&reply_message);
1896 flushPtr = pq_getmsgint64(&reply_message);
1897 applyPtr = pq_getmsgint64(&reply_message);
1898 replyTime = pq_getmsgint64(&reply_message);
1899 replyRequested = pq_getmsgbyte(&reply_message);
1900
1901 if (message_level_is_interesting(DEBUG2))
1902 {
1903 char *replyTimeStr;
1904
1905 /* Copy because timestamptz_to_str returns a static buffer */
1906 replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1907
1908 elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
1909 LSN_FORMAT_ARGS(writePtr),
1910 LSN_FORMAT_ARGS(flushPtr),
1911 LSN_FORMAT_ARGS(applyPtr),
1912 replyRequested ? " (reply requested)" : "",
1913 replyTimeStr);
1914
1915 pfree(replyTimeStr);
1916 }
1917
1918 /* See if we can compute the round-trip lag for these positions. */
1919 now = GetCurrentTimestamp();
1920 writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
1921 flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
1922 applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
1923
1924 /*
1925 * If the standby reports that it has fully replayed the WAL in two
1926 * consecutive reply messages, then the second such message must result
1927 * from wal_receiver_status_interval expiring on the standby. This is a
1928 * convenient time to forget the lag times measured when it last
1929 * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
1930 * until more WAL traffic arrives.
1931 */
1932 clearLagTimes = false;
1933 if (applyPtr == sentPtr)
1934 {
1935 if (fullyAppliedLastTime)
1936 clearLagTimes = true;
1937 fullyAppliedLastTime = true;
1938 }
1939 else
1940 fullyAppliedLastTime = false;
1941
1942 /* Send a reply if the standby requested one. */
1943 if (replyRequested)
1944 WalSndKeepalive(false);
1945
1946 /*
1947 * Update shared state for this WalSender process based on reply data from
1948 * standby.
1949 */
1950 {
1951 WalSnd *walsnd = MyWalSnd;
1952
1953 SpinLockAcquire(&walsnd->mutex);
1954 walsnd->write = writePtr;
1955 walsnd->flush = flushPtr;
1956 walsnd->apply = applyPtr;
1957 if (writeLag != -1 || clearLagTimes)
1958 walsnd->writeLag = writeLag;
1959 if (flushLag != -1 || clearLagTimes)
1960 walsnd->flushLag = flushLag;
1961 if (applyLag != -1 || clearLagTimes)
1962 walsnd->applyLag = applyLag;
1963 walsnd->replyTime = replyTime;
1964 SpinLockRelease(&walsnd->mutex);
1965 }
1966
1967 if (!am_cascading_walsender)
1968 SyncRepReleaseWaiters();
1969
1970 /*
1971 * Advance our local xmin horizon when the client confirmed a flush.
1972 */
1973 if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1974 {
1975 if (SlotIsLogical(MyReplicationSlot))
1976 LogicalConfirmReceivedLocation(flushPtr);
1977 else
1978 PhysicalConfirmReceivedLocation(flushPtr);
1979 }
1980 }
1981
1982 /* compute new replication slot xmin horizon if needed */
1983 static void
PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin,TransactionId feedbackCatalogXmin)1984 PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
1985 {
1986 bool changed = false;
1987 ReplicationSlot *slot = MyReplicationSlot;
1988
1989 SpinLockAcquire(&slot->mutex);
1990 MyProc->xmin = InvalidTransactionId;
1991
1992 /*
1993 * For physical replication we don't need the interlock provided by xmin
1994 * and effective_xmin since the consequences of a missed increase are
1995 * limited to query cancellations, so set both at once.
1996 */
1997 if (!TransactionIdIsNormal(slot->data.xmin) ||
1998 !TransactionIdIsNormal(feedbackXmin) ||
1999 TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2000 {
2001 changed = true;
2002 slot->data.xmin = feedbackXmin;
2003 slot->effective_xmin = feedbackXmin;
2004 }
2005 if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2006 !TransactionIdIsNormal(feedbackCatalogXmin) ||
2007 TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2008 {
2009 changed = true;
2010 slot->data.catalog_xmin = feedbackCatalogXmin;
2011 slot->effective_catalog_xmin = feedbackCatalogXmin;
2012 }
2013 SpinLockRelease(&slot->mutex);
2014
2015 if (changed)
2016 {
2017 ReplicationSlotMarkDirty();
2018 ReplicationSlotsComputeRequiredXmin(false);
2019 }
2020 }
2021
2022 /*
2023 * Check that the provided xmin/epoch are sane, that is, not in the future
2024 * and not so far back as to be already wrapped around.
2025 *
2026 * Epoch of nextXid should be same as standby, or if the counter has
2027 * wrapped, then one greater than standby.
2028 *
2029 * This check doesn't care about whether clog exists for these xids
2030 * at all.
2031 */
2032 static bool
TransactionIdInRecentPast(TransactionId xid,uint32 epoch)2033 TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
2034 {
2035 FullTransactionId nextFullXid;
2036 TransactionId nextXid;
2037 uint32 nextEpoch;
2038
2039 nextFullXid = ReadNextFullTransactionId();
2040 nextXid = XidFromFullTransactionId(nextFullXid);
2041 nextEpoch = EpochFromFullTransactionId(nextFullXid);
2042
2043 if (xid <= nextXid)
2044 {
2045 if (epoch != nextEpoch)
2046 return false;
2047 }
2048 else
2049 {
2050 if (epoch + 1 != nextEpoch)
2051 return false;
2052 }
2053
2054 if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2055 return false; /* epoch OK, but it's wrapped around */
2056
2057 return true;
2058 }
2059
2060 /*
2061 * Hot Standby feedback
2062 */
2063 static void
ProcessStandbyHSFeedbackMessage(void)2064 ProcessStandbyHSFeedbackMessage(void)
2065 {
2066 TransactionId feedbackXmin;
2067 uint32 feedbackEpoch;
2068 TransactionId feedbackCatalogXmin;
2069 uint32 feedbackCatalogEpoch;
2070 TimestampTz replyTime;
2071
2072 /*
2073 * Decipher the reply message. The caller already consumed the msgtype
2074 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2075 * of this message.
2076 */
2077 replyTime = pq_getmsgint64(&reply_message);
2078 feedbackXmin = pq_getmsgint(&reply_message, 4);
2079 feedbackEpoch = pq_getmsgint(&reply_message, 4);
2080 feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2081 feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2082
2083 if (message_level_is_interesting(DEBUG2))
2084 {
2085 char *replyTimeStr;
2086
2087 /* Copy because timestamptz_to_str returns a static buffer */
2088 replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2089
2090 elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2091 feedbackXmin,
2092 feedbackEpoch,
2093 feedbackCatalogXmin,
2094 feedbackCatalogEpoch,
2095 replyTimeStr);
2096
2097 pfree(replyTimeStr);
2098 }
2099
2100 /*
2101 * Update shared state for this WalSender process based on reply data from
2102 * standby.
2103 */
2104 {
2105 WalSnd *walsnd = MyWalSnd;
2106
2107 SpinLockAcquire(&walsnd->mutex);
2108 walsnd->replyTime = replyTime;
2109 SpinLockRelease(&walsnd->mutex);
2110 }
2111
2112 /*
2113 * Unset WalSender's xmins if the feedback message values are invalid.
2114 * This happens when the downstream turned hot_standby_feedback off.
2115 */
2116 if (!TransactionIdIsNormal(feedbackXmin)
2117 && !TransactionIdIsNormal(feedbackCatalogXmin))
2118 {
2119 MyProc->xmin = InvalidTransactionId;
2120 if (MyReplicationSlot != NULL)
2121 PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2122 return;
2123 }
2124
2125 /*
2126 * Check that the provided xmin/epoch are sane, that is, not in the future
2127 * and not so far back as to be already wrapped around. Ignore if not.
2128 */
2129 if (TransactionIdIsNormal(feedbackXmin) &&
2130 !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2131 return;
2132
2133 if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2134 !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2135 return;
2136
2137 /*
2138 * Set the WalSender's xmin equal to the standby's requested xmin, so that
2139 * the xmin will be taken into account by GetSnapshotData() /
2140 * ComputeXidHorizons(). This will hold back the removal of dead rows and
2141 * thereby prevent the generation of cleanup conflicts on the standby
2142 * server.
2143 *
2144 * There is a small window for a race condition here: although we just
2145 * checked that feedbackXmin precedes nextXid, the nextXid could have
2146 * gotten advanced between our fetching it and applying the xmin below,
2147 * perhaps far enough to make feedbackXmin wrap around. In that case the
2148 * xmin we set here would be "in the future" and have no effect. No point
2149 * in worrying about this since it's too late to save the desired data
2150 * anyway. Assuming that the standby sends us an increasing sequence of
2151 * xmins, this could only happen during the first reply cycle, else our
2152 * own xmin would prevent nextXid from advancing so far.
2153 *
2154 * We don't bother taking the ProcArrayLock here. Setting the xmin field
2155 * is assumed atomic, and there's no real need to prevent concurrent
2156 * horizon determinations. (If we're moving our xmin forward, this is
2157 * obviously safe, and if we're moving it backwards, well, the data is at
2158 * risk already since a VACUUM could already have determined the horizon.)
2159 *
2160 * If we're using a replication slot we reserve the xmin via that,
2161 * otherwise via the walsender's PGPROC entry. We can only track the
2162 * catalog xmin separately when using a slot, so we store the least of the
2163 * two provided when not using a slot.
2164 *
2165 * XXX: It might make sense to generalize the ephemeral slot concept and
2166 * always use the slot mechanism to handle the feedback xmin.
2167 */
2168 if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2169 PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2170 else
2171 {
2172 if (TransactionIdIsNormal(feedbackCatalogXmin)
2173 && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2174 MyProc->xmin = feedbackCatalogXmin;
2175 else
2176 MyProc->xmin = feedbackXmin;
2177 }
2178 }
2179
2180 /*
2181 * Compute how long send/receive loops should sleep.
2182 *
2183 * If wal_sender_timeout is enabled we want to wake up in time to send
2184 * keepalives and to abort the connection if wal_sender_timeout has been
2185 * reached.
2186 */
2187 static long
WalSndComputeSleeptime(TimestampTz now)2188 WalSndComputeSleeptime(TimestampTz now)
2189 {
2190 long sleeptime = 10000; /* 10 s */
2191
2192 if (wal_sender_timeout > 0 && last_reply_timestamp > 0)
2193 {
2194 TimestampTz wakeup_time;
2195
2196 /*
2197 * At the latest stop sleeping once wal_sender_timeout has been
2198 * reached.
2199 */
2200 wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2201 wal_sender_timeout);
2202
2203 /*
2204 * If no ping has been sent yet, wakeup when it's time to do so.
2205 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2206 * the timeout passed without a response.
2207 */
2208 if (!waiting_for_ping_response)
2209 wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2210 wal_sender_timeout / 2);
2211
2212 /* Compute relative time until wakeup. */
2213 sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2214 }
2215
2216 return sleeptime;
2217 }
2218
2219 /*
2220 * Check whether there have been responses by the client within
2221 * wal_sender_timeout and shutdown if not. Using last_processing as the
2222 * reference point avoids counting server-side stalls against the client.
2223 * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2224 * postdate last_processing by more than wal_sender_timeout. If that happens,
2225 * the client must reply almost immediately to avoid a timeout. This rarely
2226 * affects the default configuration, under which clients spontaneously send a
2227 * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
2228 * could eliminate that problem by recognizing timeout expiration at
2229 * wal_sender_timeout/2 after the keepalive.
2230 */
2231 static void
WalSndCheckTimeOut(void)2232 WalSndCheckTimeOut(void)
2233 {
2234 TimestampTz timeout;
2235
2236 /* don't bail out if we're doing something that doesn't require timeouts */
2237 if (last_reply_timestamp <= 0)
2238 return;
2239
2240 timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
2241 wal_sender_timeout);
2242
2243 if (wal_sender_timeout > 0 && last_processing >= timeout)
2244 {
2245 /*
2246 * Since typically expiration of replication timeout means
2247 * communication problem, we don't send the error message to the
2248 * standby.
2249 */
2250 ereport(COMMERROR,
2251 (errmsg("terminating walsender process due to replication timeout")));
2252
2253 WalSndShutdown();
2254 }
2255 }
2256
2257 /* Main loop of walsender process that streams the WAL over Copy messages. */
2258 static void
WalSndLoop(WalSndSendDataCallback send_data)2259 WalSndLoop(WalSndSendDataCallback send_data)
2260 {
2261 /*
2262 * Initialize the last reply timestamp. That enables timeout processing
2263 * from hereon.
2264 */
2265 last_reply_timestamp = GetCurrentTimestamp();
2266 waiting_for_ping_response = false;
2267
2268 /*
2269 * Loop until we reach the end of this timeline or the client requests to
2270 * stop streaming.
2271 */
2272 for (;;)
2273 {
2274 /* Clear any already-pending wakeups */
2275 ResetLatch(MyLatch);
2276
2277 CHECK_FOR_INTERRUPTS();
2278
2279 /* Process any requests or signals received recently */
2280 if (ConfigReloadPending)
2281 {
2282 ConfigReloadPending = false;
2283 ProcessConfigFile(PGC_SIGHUP);
2284 SyncRepInitConfig();
2285 }
2286
2287 /* Check for input from the client */
2288 ProcessRepliesIfAny();
2289
2290 /*
2291 * If we have received CopyDone from the client, sent CopyDone
2292 * ourselves, and the output buffer is empty, it's time to exit
2293 * streaming.
2294 */
2295 if (streamingDoneReceiving && streamingDoneSending &&
2296 !pq_is_send_pending())
2297 break;
2298
2299 /*
2300 * If we don't have any pending data in the output buffer, try to send
2301 * some more. If there is some, we don't bother to call send_data
2302 * again until we've flushed it ... but we'd better assume we are not
2303 * caught up.
2304 */
2305 if (!pq_is_send_pending())
2306 send_data();
2307 else
2308 WalSndCaughtUp = false;
2309
2310 /* Try to flush pending output to the client */
2311 if (pq_flush_if_writable() != 0)
2312 WalSndShutdown();
2313
2314 /* If nothing remains to be sent right now ... */
2315 if (WalSndCaughtUp && !pq_is_send_pending())
2316 {
2317 /*
2318 * If we're in catchup state, move to streaming. This is an
2319 * important state change for users to know about, since before
2320 * this point data loss might occur if the primary dies and we
2321 * need to failover to the standby. The state change is also
2322 * important for synchronous replication, since commits that
2323 * started to wait at that point might wait for some time.
2324 */
2325 if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2326 {
2327 ereport(DEBUG1,
2328 (errmsg_internal("\"%s\" has now caught up with upstream server",
2329 application_name)));
2330 WalSndSetState(WALSNDSTATE_STREAMING);
2331 }
2332
2333 /*
2334 * When SIGUSR2 arrives, we send any outstanding logs up to the
2335 * shutdown checkpoint record (i.e., the latest record), wait for
2336 * them to be replicated to the standby, and exit. This may be a
2337 * normal termination at shutdown, or a promotion, the walsender
2338 * is not sure which.
2339 */
2340 if (got_SIGUSR2)
2341 WalSndDone(send_data);
2342 }
2343
2344 /* Check for replication timeout. */
2345 WalSndCheckTimeOut();
2346
2347 /* Send keepalive if the time has come */
2348 WalSndKeepaliveIfNecessary();
2349
2350 /*
2351 * Block if we have unsent data. XXX For logical replication, let
2352 * WalSndWaitForWal() handle any other blocking; idle receivers need
2353 * its additional actions. For physical replication, also block if
2354 * caught up; its send_data does not block.
2355 */
2356 if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2357 !streamingDoneSending) ||
2358 pq_is_send_pending())
2359 {
2360 long sleeptime;
2361 int wakeEvents;
2362
2363 if (!streamingDoneReceiving)
2364 wakeEvents = WL_SOCKET_READABLE;
2365 else
2366 wakeEvents = 0;
2367
2368 /*
2369 * Use fresh timestamp, not last_processing, to reduce the chance
2370 * of reaching wal_sender_timeout before sending a keepalive.
2371 */
2372 sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
2373
2374 if (pq_is_send_pending())
2375 wakeEvents |= WL_SOCKET_WRITEABLE;
2376
2377 /* Sleep until something happens or we time out */
2378 WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2379 }
2380 }
2381 }
2382
2383 /* Initialize a per-walsender data structure for this walsender process */
2384 static void
InitWalSenderSlot(void)2385 InitWalSenderSlot(void)
2386 {
2387 int i;
2388
2389 /*
2390 * WalSndCtl should be set up already (we inherit this by fork() or
2391 * EXEC_BACKEND mechanism from the postmaster).
2392 */
2393 Assert(WalSndCtl != NULL);
2394 Assert(MyWalSnd == NULL);
2395
2396 /*
2397 * Find a free walsender slot and reserve it. This must not fail due to
2398 * the prior check for free WAL senders in InitProcess().
2399 */
2400 for (i = 0; i < max_wal_senders; i++)
2401 {
2402 WalSnd *walsnd = &WalSndCtl->walsnds[i];
2403
2404 SpinLockAcquire(&walsnd->mutex);
2405
2406 if (walsnd->pid != 0)
2407 {
2408 SpinLockRelease(&walsnd->mutex);
2409 continue;
2410 }
2411 else
2412 {
2413 /*
2414 * Found a free slot. Reserve it for us.
2415 */
2416 walsnd->pid = MyProcPid;
2417 walsnd->state = WALSNDSTATE_STARTUP;
2418 walsnd->sentPtr = InvalidXLogRecPtr;
2419 walsnd->needreload = false;
2420 walsnd->write = InvalidXLogRecPtr;
2421 walsnd->flush = InvalidXLogRecPtr;
2422 walsnd->apply = InvalidXLogRecPtr;
2423 walsnd->writeLag = -1;
2424 walsnd->flushLag = -1;
2425 walsnd->applyLag = -1;
2426 walsnd->sync_standby_priority = 0;
2427 walsnd->latch = &MyProc->procLatch;
2428 walsnd->replyTime = 0;
2429 SpinLockRelease(&walsnd->mutex);
2430 /* don't need the lock anymore */
2431 MyWalSnd = (WalSnd *) walsnd;
2432
2433 break;
2434 }
2435 }
2436
2437 Assert(MyWalSnd != NULL);
2438
2439 /* Arrange to clean up at walsender exit */
2440 on_shmem_exit(WalSndKill, 0);
2441 }
2442
2443 /* Destroy the per-walsender data structure for this walsender process */
2444 static void
WalSndKill(int code,Datum arg)2445 WalSndKill(int code, Datum arg)
2446 {
2447 WalSnd *walsnd = MyWalSnd;
2448
2449 Assert(walsnd != NULL);
2450
2451 MyWalSnd = NULL;
2452
2453 SpinLockAcquire(&walsnd->mutex);
2454 /* clear latch while holding the spinlock, so it can safely be read */
2455 walsnd->latch = NULL;
2456 /* Mark WalSnd struct as no longer being in use. */
2457 walsnd->pid = 0;
2458 SpinLockRelease(&walsnd->mutex);
2459 }
2460
2461 /* XLogReaderRoutine->segment_open callback */
2462 static void
WalSndSegmentOpen(XLogReaderState * state,XLogSegNo nextSegNo,TimeLineID * tli_p)2463 WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
2464 TimeLineID *tli_p)
2465 {
2466 char path[MAXPGPATH];
2467
2468 /*-------
2469 * When reading from a historic timeline, and there is a timeline switch
2470 * within this segment, read from the WAL segment belonging to the new
2471 * timeline.
2472 *
2473 * For example, imagine that this server is currently on timeline 5, and
2474 * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2475 * 0/13002088. In pg_wal, we have these files:
2476 *
2477 * ...
2478 * 000000040000000000000012
2479 * 000000040000000000000013
2480 * 000000050000000000000013
2481 * 000000050000000000000014
2482 * ...
2483 *
2484 * In this situation, when requested to send the WAL from segment 0x13, on
2485 * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2486 * recovery prefers files from newer timelines, so if the segment was
2487 * restored from the archive on this server, the file belonging to the old
2488 * timeline, 000000040000000000000013, might not exist. Their contents are
2489 * equal up to the switchpoint, because at a timeline switch, the used
2490 * portion of the old segment is copied to the new file. -------
2491 */
2492 *tli_p = sendTimeLine;
2493 if (sendTimeLineIsHistoric)
2494 {
2495 XLogSegNo endSegNo;
2496
2497 XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
2498 if (nextSegNo == endSegNo)
2499 *tli_p = sendTimeLineNextTLI;
2500 }
2501
2502 XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2503 state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2504 if (state->seg.ws_file >= 0)
2505 return;
2506
2507 /*
2508 * If the file is not found, assume it's because the standby asked for a
2509 * too old WAL segment that has already been removed or recycled.
2510 */
2511 if (errno == ENOENT)
2512 {
2513 char xlogfname[MAXFNAMELEN];
2514 int save_errno = errno;
2515
2516 XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2517 errno = save_errno;
2518 ereport(ERROR,
2519 (errcode_for_file_access(),
2520 errmsg("requested WAL segment %s has already been removed",
2521 xlogfname)));
2522 }
2523 else
2524 ereport(ERROR,
2525 (errcode_for_file_access(),
2526 errmsg("could not open file \"%s\": %m",
2527 path)));
2528 }
2529
2530 /*
2531 * Send out the WAL in its normal physical/stored form.
2532 *
2533 * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2534 * but not yet sent to the client, and buffer it in the libpq output
2535 * buffer.
2536 *
2537 * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2538 * otherwise WalSndCaughtUp is set to false.
2539 */
2540 static void
XLogSendPhysical(void)2541 XLogSendPhysical(void)
2542 {
2543 XLogRecPtr SendRqstPtr;
2544 XLogRecPtr startptr;
2545 XLogRecPtr endptr;
2546 Size nbytes;
2547 XLogSegNo segno;
2548 WALReadError errinfo;
2549
2550 /* If requested switch the WAL sender to the stopping state. */
2551 if (got_STOPPING)
2552 WalSndSetState(WALSNDSTATE_STOPPING);
2553
2554 if (streamingDoneSending)
2555 {
2556 WalSndCaughtUp = true;
2557 return;
2558 }
2559
2560 /* Figure out how far we can safely send the WAL. */
2561 if (sendTimeLineIsHistoric)
2562 {
2563 /*
2564 * Streaming an old timeline that's in this server's history, but is
2565 * not the one we're currently inserting or replaying. It can be
2566 * streamed up to the point where we switched off that timeline.
2567 */
2568 SendRqstPtr = sendTimeLineValidUpto;
2569 }
2570 else if (am_cascading_walsender)
2571 {
2572 /*
2573 * Streaming the latest timeline on a standby.
2574 *
2575 * Attempt to send all WAL that has already been replayed, so that we
2576 * know it's valid. If we're receiving WAL through streaming
2577 * replication, it's also OK to send any WAL that has been received
2578 * but not replayed.
2579 *
2580 * The timeline we're recovering from can change, or we can be
2581 * promoted. In either case, the current timeline becomes historic. We
2582 * need to detect that so that we don't try to stream past the point
2583 * where we switched to another timeline. We check for promotion or
2584 * timeline switch after calculating FlushPtr, to avoid a race
2585 * condition: if the timeline becomes historic just after we checked
2586 * that it was still current, it's still be OK to stream it up to the
2587 * FlushPtr that was calculated before it became historic.
2588 */
2589 bool becameHistoric = false;
2590
2591 SendRqstPtr = GetStandbyFlushRecPtr();
2592
2593 if (!RecoveryInProgress())
2594 {
2595 /*
2596 * We have been promoted. RecoveryInProgress() updated
2597 * ThisTimeLineID to the new current timeline.
2598 */
2599 am_cascading_walsender = false;
2600 becameHistoric = true;
2601 }
2602 else
2603 {
2604 /*
2605 * Still a cascading standby. But is the timeline we're sending
2606 * still the one recovery is recovering from? ThisTimeLineID was
2607 * updated by the GetStandbyFlushRecPtr() call above.
2608 */
2609 if (sendTimeLine != ThisTimeLineID)
2610 becameHistoric = true;
2611 }
2612
2613 if (becameHistoric)
2614 {
2615 /*
2616 * The timeline we were sending has become historic. Read the
2617 * timeline history file of the new timeline to see where exactly
2618 * we forked off from the timeline we were sending.
2619 */
2620 List *history;
2621
2622 history = readTimeLineHistory(ThisTimeLineID);
2623 sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
2624
2625 Assert(sendTimeLine < sendTimeLineNextTLI);
2626 list_free_deep(history);
2627
2628 sendTimeLineIsHistoric = true;
2629
2630 SendRqstPtr = sendTimeLineValidUpto;
2631 }
2632 }
2633 else
2634 {
2635 /*
2636 * Streaming the current timeline on a primary.
2637 *
2638 * Attempt to send all data that's already been written out and
2639 * fsync'd to disk. We cannot go further than what's been written out
2640 * given the current implementation of WALRead(). And in any case
2641 * it's unsafe to send WAL that is not securely down to disk on the
2642 * primary: if the primary subsequently crashes and restarts, standbys
2643 * must not have applied any WAL that got lost on the primary.
2644 */
2645 SendRqstPtr = GetFlushRecPtr();
2646 }
2647
2648 /*
2649 * Record the current system time as an approximation of the time at which
2650 * this WAL location was written for the purposes of lag tracking.
2651 *
2652 * In theory we could make XLogFlush() record a time in shmem whenever WAL
2653 * is flushed and we could get that time as well as the LSN when we call
2654 * GetFlushRecPtr() above (and likewise for the cascading standby
2655 * equivalent), but rather than putting any new code into the hot WAL path
2656 * it seems good enough to capture the time here. We should reach this
2657 * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2658 * may take some time, we read the WAL flush pointer and take the time
2659 * very close to together here so that we'll get a later position if it is
2660 * still moving.
2661 *
2662 * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
2663 * this gives us a cheap approximation for the WAL flush time for this
2664 * LSN.
2665 *
2666 * Note that the LSN is not necessarily the LSN for the data contained in
2667 * the present message; it's the end of the WAL, which might be further
2668 * ahead. All the lag tracking machinery cares about is finding out when
2669 * that arbitrary LSN is eventually reported as written, flushed and
2670 * applied, so that it can measure the elapsed time.
2671 */
2672 LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2673
2674 /*
2675 * If this is a historic timeline and we've reached the point where we
2676 * forked to the next timeline, stop streaming.
2677 *
2678 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2679 * startup process will normally replay all WAL that has been received
2680 * from the primary, before promoting, but if the WAL streaming is
2681 * terminated at a WAL page boundary, the valid portion of the timeline
2682 * might end in the middle of a WAL record. We might've already sent the
2683 * first half of that partial WAL record to the cascading standby, so that
2684 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2685 * replay the partial WAL record either, so it can still follow our
2686 * timeline switch.
2687 */
2688 if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
2689 {
2690 /* close the current file. */
2691 if (xlogreader->seg.ws_file >= 0)
2692 wal_segment_close(xlogreader);
2693
2694 /* Send CopyDone */
2695 pq_putmessage_noblock('c', NULL, 0);
2696 streamingDoneSending = true;
2697
2698 WalSndCaughtUp = true;
2699
2700 elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2701 LSN_FORMAT_ARGS(sendTimeLineValidUpto),
2702 LSN_FORMAT_ARGS(sentPtr));
2703 return;
2704 }
2705
2706 /* Do we have any work to do? */
2707 Assert(sentPtr <= SendRqstPtr);
2708 if (SendRqstPtr <= sentPtr)
2709 {
2710 WalSndCaughtUp = true;
2711 return;
2712 }
2713
2714 /*
2715 * Figure out how much to send in one message. If there's no more than
2716 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2717 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2718 *
2719 * The rounding is not only for performance reasons. Walreceiver relies on
2720 * the fact that we never split a WAL record across two messages. Since a
2721 * long WAL record is split at page boundary into continuation records,
2722 * page boundary is always a safe cut-off point. We also assume that
2723 * SendRqstPtr never points to the middle of a WAL record.
2724 */
2725 startptr = sentPtr;
2726 endptr = startptr;
2727 endptr += MAX_SEND_SIZE;
2728
2729 /* if we went beyond SendRqstPtr, back off */
2730 if (SendRqstPtr <= endptr)
2731 {
2732 endptr = SendRqstPtr;
2733 if (sendTimeLineIsHistoric)
2734 WalSndCaughtUp = false;
2735 else
2736 WalSndCaughtUp = true;
2737 }
2738 else
2739 {
2740 /* round down to page boundary. */
2741 endptr -= (endptr % XLOG_BLCKSZ);
2742 WalSndCaughtUp = false;
2743 }
2744
2745 nbytes = endptr - startptr;
2746 Assert(nbytes <= MAX_SEND_SIZE);
2747
2748 /*
2749 * OK to read and send the slice.
2750 */
2751 resetStringInfo(&output_message);
2752 pq_sendbyte(&output_message, 'w');
2753
2754 pq_sendint64(&output_message, startptr); /* dataStart */
2755 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2756 pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2757
2758 /*
2759 * Read the log directly into the output buffer to avoid extra memcpy
2760 * calls.
2761 */
2762 enlargeStringInfo(&output_message, nbytes);
2763
2764 retry:
2765 if (!WALRead(xlogreader,
2766 &output_message.data[output_message.len],
2767 startptr,
2768 nbytes,
2769 xlogreader->seg.ws_tli, /* Pass the current TLI because
2770 * only WalSndSegmentOpen controls
2771 * whether new TLI is needed. */
2772 &errinfo))
2773 WALReadRaiseError(&errinfo);
2774
2775 /* See logical_read_xlog_page(). */
2776 XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
2777 CheckXLogRemoved(segno, xlogreader->seg.ws_tli);
2778
2779 /*
2780 * During recovery, the currently-open WAL file might be replaced with the
2781 * file of the same name retrieved from archive. So we always need to
2782 * check what we read was valid after reading into the buffer. If it's
2783 * invalid, we try to open and read the file again.
2784 */
2785 if (am_cascading_walsender)
2786 {
2787 WalSnd *walsnd = MyWalSnd;
2788 bool reload;
2789
2790 SpinLockAcquire(&walsnd->mutex);
2791 reload = walsnd->needreload;
2792 walsnd->needreload = false;
2793 SpinLockRelease(&walsnd->mutex);
2794
2795 if (reload && xlogreader->seg.ws_file >= 0)
2796 {
2797 wal_segment_close(xlogreader);
2798
2799 goto retry;
2800 }
2801 }
2802
2803 output_message.len += nbytes;
2804 output_message.data[output_message.len] = '\0';
2805
2806 /*
2807 * Fill the send timestamp last, so that it is taken as late as possible.
2808 */
2809 resetStringInfo(&tmpbuf);
2810 pq_sendint64(&tmpbuf, GetCurrentTimestamp());
2811 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2812 tmpbuf.data, sizeof(int64));
2813
2814 pq_putmessage_noblock('d', output_message.data, output_message.len);
2815
2816 sentPtr = endptr;
2817
2818 /* Update shared memory status */
2819 {
2820 WalSnd *walsnd = MyWalSnd;
2821
2822 SpinLockAcquire(&walsnd->mutex);
2823 walsnd->sentPtr = sentPtr;
2824 SpinLockRelease(&walsnd->mutex);
2825 }
2826
2827 /* Report progress of XLOG streaming in PS display */
2828 if (update_process_title)
2829 {
2830 char activitymsg[50];
2831
2832 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2833 LSN_FORMAT_ARGS(sentPtr));
2834 set_ps_display(activitymsg);
2835 }
2836 }
2837
2838 /*
2839 * Stream out logically decoded data.
2840 */
2841 static void
XLogSendLogical(void)2842 XLogSendLogical(void)
2843 {
2844 XLogRecord *record;
2845 char *errm;
2846
2847 /*
2848 * We'll use the current flush point to determine whether we've caught up.
2849 * This variable is static in order to cache it across calls. Caching is
2850 * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
2851 * spinlock.
2852 */
2853 static XLogRecPtr flushPtr = InvalidXLogRecPtr;
2854
2855 /*
2856 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2857 * true in WalSndWaitForWal, if we're actually waiting. We also set to
2858 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2859 * didn't wait - i.e. when we're shutting down.
2860 */
2861 WalSndCaughtUp = false;
2862
2863 record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
2864
2865 /* xlog record was invalid */
2866 if (errm != NULL)
2867 elog(ERROR, "%s", errm);
2868
2869 if (record != NULL)
2870 {
2871 /*
2872 * Note the lack of any call to LagTrackerWrite() which is handled by
2873 * WalSndUpdateProgress which is called by output plugin through
2874 * logical decoding write api.
2875 */
2876 LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2877
2878 sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2879 }
2880
2881 /*
2882 * If first time through in this session, initialize flushPtr. Otherwise,
2883 * we only need to update flushPtr if EndRecPtr is past it.
2884 */
2885 if (flushPtr == InvalidXLogRecPtr)
2886 flushPtr = GetFlushRecPtr();
2887 else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2888 flushPtr = GetFlushRecPtr();
2889
2890 /* If EndRecPtr is still past our flushPtr, it means we caught up. */
2891 if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2892 WalSndCaughtUp = true;
2893
2894 /*
2895 * If we're caught up and have been requested to stop, have WalSndLoop()
2896 * terminate the connection in an orderly manner, after writing out all
2897 * the pending data.
2898 */
2899 if (WalSndCaughtUp && got_STOPPING)
2900 got_SIGUSR2 = true;
2901
2902 /* Update shared memory status */
2903 {
2904 WalSnd *walsnd = MyWalSnd;
2905
2906 SpinLockAcquire(&walsnd->mutex);
2907 walsnd->sentPtr = sentPtr;
2908 SpinLockRelease(&walsnd->mutex);
2909 }
2910 }
2911
2912 /*
2913 * Shutdown if the sender is caught up.
2914 *
2915 * NB: This should only be called when the shutdown signal has been received
2916 * from postmaster.
2917 *
2918 * Note that if we determine that there's still more data to send, this
2919 * function will return control to the caller.
2920 */
2921 static void
WalSndDone(WalSndSendDataCallback send_data)2922 WalSndDone(WalSndSendDataCallback send_data)
2923 {
2924 XLogRecPtr replicatedPtr;
2925
2926 /* ... let's just be real sure we're caught up ... */
2927 send_data();
2928
2929 /*
2930 * To figure out whether all WAL has successfully been replicated, check
2931 * flush location if valid, write otherwise. Tools like pg_receivewal will
2932 * usually (unless in synchronous mode) return an invalid flush location.
2933 */
2934 replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2935 MyWalSnd->write : MyWalSnd->flush;
2936
2937 if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2938 !pq_is_send_pending())
2939 {
2940 QueryCompletion qc;
2941
2942 /* Inform the standby that XLOG streaming is done */
2943 SetQueryCompletion(&qc, CMDTAG_COPY, 0);
2944 EndCommand(&qc, DestRemote, false);
2945 pq_flush();
2946
2947 proc_exit(0);
2948 }
2949 if (!waiting_for_ping_response)
2950 WalSndKeepalive(true);
2951 }
2952
2953 /*
2954 * Returns the latest point in WAL that has been safely flushed to disk, and
2955 * can be sent to the standby. This should only be called when in recovery,
2956 * ie. we're streaming to a cascaded standby.
2957 *
2958 * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2959 * replayed WAL record.
2960 */
2961 static XLogRecPtr
GetStandbyFlushRecPtr(void)2962 GetStandbyFlushRecPtr(void)
2963 {
2964 XLogRecPtr replayPtr;
2965 TimeLineID replayTLI;
2966 XLogRecPtr receivePtr;
2967 TimeLineID receiveTLI;
2968 XLogRecPtr result;
2969
2970 /*
2971 * We can safely send what's already been replayed. Also, if walreceiver
2972 * is streaming WAL from the same timeline, we can send anything that it
2973 * has streamed, but hasn't been replayed yet.
2974 */
2975
2976 receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
2977 replayPtr = GetXLogReplayRecPtr(&replayTLI);
2978
2979 ThisTimeLineID = replayTLI;
2980
2981 result = replayPtr;
2982 if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2983 result = receivePtr;
2984
2985 return result;
2986 }
2987
2988 /*
2989 * Request walsenders to reload the currently-open WAL file
2990 */
2991 void
WalSndRqstFileReload(void)2992 WalSndRqstFileReload(void)
2993 {
2994 int i;
2995
2996 for (i = 0; i < max_wal_senders; i++)
2997 {
2998 WalSnd *walsnd = &WalSndCtl->walsnds[i];
2999
3000 SpinLockAcquire(&walsnd->mutex);
3001 if (walsnd->pid == 0)
3002 {
3003 SpinLockRelease(&walsnd->mutex);
3004 continue;
3005 }
3006 walsnd->needreload = true;
3007 SpinLockRelease(&walsnd->mutex);
3008 }
3009 }
3010
3011 /*
3012 * Handle PROCSIG_WALSND_INIT_STOPPING signal.
3013 */
3014 void
HandleWalSndInitStopping(void)3015 HandleWalSndInitStopping(void)
3016 {
3017 Assert(am_walsender);
3018
3019 /*
3020 * If replication has not yet started, die like with SIGTERM. If
3021 * replication is active, only set a flag and wake up the main loop. It
3022 * will send any outstanding WAL, wait for it to be replicated to the
3023 * standby, and then exit gracefully.
3024 */
3025 if (!replication_active)
3026 kill(MyProcPid, SIGTERM);
3027 else
3028 got_STOPPING = true;
3029 }
3030
3031 /*
3032 * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
3033 * sender should already have been switched to WALSNDSTATE_STOPPING at
3034 * this point.
3035 */
3036 static void
WalSndLastCycleHandler(SIGNAL_ARGS)3037 WalSndLastCycleHandler(SIGNAL_ARGS)
3038 {
3039 int save_errno = errno;
3040
3041 got_SIGUSR2 = true;
3042 SetLatch(MyLatch);
3043
3044 errno = save_errno;
3045 }
3046
3047 /* Set up signal handlers */
3048 void
WalSndSignals(void)3049 WalSndSignals(void)
3050 {
3051 /* Set up signal handlers */
3052 pqsignal(SIGHUP, SignalHandlerForConfigReload);
3053 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3054 pqsignal(SIGTERM, die); /* request shutdown */
3055 /* SIGQUIT handler was already set up by InitPostmasterChild */
3056 InitializeTimeouts(); /* establishes SIGALRM handler */
3057 pqsignal(SIGPIPE, SIG_IGN);
3058 pqsignal(SIGUSR1, procsignal_sigusr1_handler);
3059 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3060 * shutdown */
3061
3062 /* Reset some signals that are accepted by postmaster but not here */
3063 pqsignal(SIGCHLD, SIG_DFL);
3064 }
3065
3066 /* Report shared-memory space needed by WalSndShmemInit */
3067 Size
WalSndShmemSize(void)3068 WalSndShmemSize(void)
3069 {
3070 Size size = 0;
3071
3072 size = offsetof(WalSndCtlData, walsnds);
3073 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3074
3075 return size;
3076 }
3077
3078 /* Allocate and initialize walsender-related shared memory */
3079 void
WalSndShmemInit(void)3080 WalSndShmemInit(void)
3081 {
3082 bool found;
3083 int i;
3084
3085 WalSndCtl = (WalSndCtlData *)
3086 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3087
3088 if (!found)
3089 {
3090 /* First time through, so initialize */
3091 MemSet(WalSndCtl, 0, WalSndShmemSize());
3092
3093 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3094 SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
3095
3096 for (i = 0; i < max_wal_senders; i++)
3097 {
3098 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3099
3100 SpinLockInit(&walsnd->mutex);
3101 }
3102 }
3103 }
3104
3105 /*
3106 * Wake up all walsenders
3107 *
3108 * This will be called inside critical sections, so throwing an error is not
3109 * advisable.
3110 */
3111 void
WalSndWakeup(void)3112 WalSndWakeup(void)
3113 {
3114 int i;
3115
3116 for (i = 0; i < max_wal_senders; i++)
3117 {
3118 Latch *latch;
3119 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3120
3121 /*
3122 * Get latch pointer with spinlock held, for the unlikely case that
3123 * pointer reads aren't atomic (as they're 8 bytes).
3124 */
3125 SpinLockAcquire(&walsnd->mutex);
3126 latch = walsnd->latch;
3127 SpinLockRelease(&walsnd->mutex);
3128
3129 if (latch != NULL)
3130 SetLatch(latch);
3131 }
3132 }
3133
3134 /*
3135 * Wait for readiness on the FeBe socket, or a timeout. The mask should be
3136 * composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags. Exit
3137 * on postmaster death.
3138 */
3139 static void
WalSndWait(uint32 socket_events,long timeout,uint32 wait_event)3140 WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
3141 {
3142 WaitEvent event;
3143
3144 ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3145 if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3146 (event.events & WL_POSTMASTER_DEATH))
3147 proc_exit(1);
3148 }
3149
3150 /*
3151 * Signal all walsenders to move to stopping state.
3152 *
3153 * This will trigger walsenders to move to a state where no further WAL can be
3154 * generated. See this file's header for details.
3155 */
3156 void
WalSndInitStopping(void)3157 WalSndInitStopping(void)
3158 {
3159 int i;
3160
3161 for (i = 0; i < max_wal_senders; i++)
3162 {
3163 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3164 pid_t pid;
3165
3166 SpinLockAcquire(&walsnd->mutex);
3167 pid = walsnd->pid;
3168 SpinLockRelease(&walsnd->mutex);
3169
3170 if (pid == 0)
3171 continue;
3172
3173 SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId);
3174 }
3175 }
3176
3177 /*
3178 * Wait that all the WAL senders have quit or reached the stopping state. This
3179 * is used by the checkpointer to control when the shutdown checkpoint can
3180 * safely be performed.
3181 */
3182 void
WalSndWaitStopping(void)3183 WalSndWaitStopping(void)
3184 {
3185 for (;;)
3186 {
3187 int i;
3188 bool all_stopped = true;
3189
3190 for (i = 0; i < max_wal_senders; i++)
3191 {
3192 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3193
3194 SpinLockAcquire(&walsnd->mutex);
3195
3196 if (walsnd->pid == 0)
3197 {
3198 SpinLockRelease(&walsnd->mutex);
3199 continue;
3200 }
3201
3202 if (walsnd->state != WALSNDSTATE_STOPPING)
3203 {
3204 all_stopped = false;
3205 SpinLockRelease(&walsnd->mutex);
3206 break;
3207 }
3208 SpinLockRelease(&walsnd->mutex);
3209 }
3210
3211 /* safe to leave if confirmation is done for all WAL senders */
3212 if (all_stopped)
3213 return;
3214
3215 pg_usleep(10000L); /* wait for 10 msec */
3216 }
3217 }
3218
3219 /* Set state for current walsender (only called in walsender) */
3220 void
WalSndSetState(WalSndState state)3221 WalSndSetState(WalSndState state)
3222 {
3223 WalSnd *walsnd = MyWalSnd;
3224
3225 Assert(am_walsender);
3226
3227 if (walsnd->state == state)
3228 return;
3229
3230 SpinLockAcquire(&walsnd->mutex);
3231 walsnd->state = state;
3232 SpinLockRelease(&walsnd->mutex);
3233 }
3234
3235 /*
3236 * Return a string constant representing the state. This is used
3237 * in system views, and should *not* be translated.
3238 */
3239 static const char *
WalSndGetStateString(WalSndState state)3240 WalSndGetStateString(WalSndState state)
3241 {
3242 switch (state)
3243 {
3244 case WALSNDSTATE_STARTUP:
3245 return "startup";
3246 case WALSNDSTATE_BACKUP:
3247 return "backup";
3248 case WALSNDSTATE_CATCHUP:
3249 return "catchup";
3250 case WALSNDSTATE_STREAMING:
3251 return "streaming";
3252 case WALSNDSTATE_STOPPING:
3253 return "stopping";
3254 }
3255 return "UNKNOWN";
3256 }
3257
3258 static Interval *
offset_to_interval(TimeOffset offset)3259 offset_to_interval(TimeOffset offset)
3260 {
3261 Interval *result = palloc(sizeof(Interval));
3262
3263 result->month = 0;
3264 result->day = 0;
3265 result->time = offset;
3266
3267 return result;
3268 }
3269
3270 /*
3271 * Returns activity of walsenders, including pids and xlog locations sent to
3272 * standby servers.
3273 */
3274 Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)3275 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
3276 {
3277 #define PG_STAT_GET_WAL_SENDERS_COLS 12
3278 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3279 TupleDesc tupdesc;
3280 Tuplestorestate *tupstore;
3281 MemoryContext per_query_ctx;
3282 MemoryContext oldcontext;
3283 SyncRepStandbyData *sync_standbys;
3284 int num_standbys;
3285 int i;
3286
3287 /* check to see if caller supports us returning a tuplestore */
3288 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
3289 ereport(ERROR,
3290 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3291 errmsg("set-valued function called in context that cannot accept a set")));
3292 if (!(rsinfo->allowedModes & SFRM_Materialize))
3293 ereport(ERROR,
3294 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3295 errmsg("materialize mode required, but it is not allowed in this context")));
3296
3297 /* Build a tuple descriptor for our result type */
3298 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
3299 elog(ERROR, "return type must be a row type");
3300
3301 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
3302 oldcontext = MemoryContextSwitchTo(per_query_ctx);
3303
3304 tupstore = tuplestore_begin_heap(true, false, work_mem);
3305 rsinfo->returnMode = SFRM_Materialize;
3306 rsinfo->setResult = tupstore;
3307 rsinfo->setDesc = tupdesc;
3308
3309 MemoryContextSwitchTo(oldcontext);
3310
3311 /*
3312 * Get the currently active synchronous standbys. This could be out of
3313 * date before we're done, but we'll use the data anyway.
3314 */
3315 num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3316
3317 for (i = 0; i < max_wal_senders; i++)
3318 {
3319 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3320 XLogRecPtr sentPtr;
3321 XLogRecPtr write;
3322 XLogRecPtr flush;
3323 XLogRecPtr apply;
3324 TimeOffset writeLag;
3325 TimeOffset flushLag;
3326 TimeOffset applyLag;
3327 int priority;
3328 int pid;
3329 WalSndState state;
3330 TimestampTz replyTime;
3331 bool is_sync_standby;
3332 Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
3333 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
3334 int j;
3335
3336 /* Collect data from shared memory */
3337 SpinLockAcquire(&walsnd->mutex);
3338 if (walsnd->pid == 0)
3339 {
3340 SpinLockRelease(&walsnd->mutex);
3341 continue;
3342 }
3343 pid = walsnd->pid;
3344 sentPtr = walsnd->sentPtr;
3345 state = walsnd->state;
3346 write = walsnd->write;
3347 flush = walsnd->flush;
3348 apply = walsnd->apply;
3349 writeLag = walsnd->writeLag;
3350 flushLag = walsnd->flushLag;
3351 applyLag = walsnd->applyLag;
3352 priority = walsnd->sync_standby_priority;
3353 replyTime = walsnd->replyTime;
3354 SpinLockRelease(&walsnd->mutex);
3355
3356 /*
3357 * Detect whether walsender is/was considered synchronous. We can
3358 * provide some protection against stale data by checking the PID
3359 * along with walsnd_index.
3360 */
3361 is_sync_standby = false;
3362 for (j = 0; j < num_standbys; j++)
3363 {
3364 if (sync_standbys[j].walsnd_index == i &&
3365 sync_standbys[j].pid == pid)
3366 {
3367 is_sync_standby = true;
3368 break;
3369 }
3370 }
3371
3372 memset(nulls, 0, sizeof(nulls));
3373 values[0] = Int32GetDatum(pid);
3374
3375 if (!is_member_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3376 {
3377 /*
3378 * Only superusers and members of pg_read_all_stats can see
3379 * details. Other users only get the pid value to know it's a
3380 * walsender, but no details.
3381 */
3382 MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3383 }
3384 else
3385 {
3386 values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3387
3388 if (XLogRecPtrIsInvalid(sentPtr))
3389 nulls[2] = true;
3390 values[2] = LSNGetDatum(sentPtr);
3391
3392 if (XLogRecPtrIsInvalid(write))
3393 nulls[3] = true;
3394 values[3] = LSNGetDatum(write);
3395
3396 if (XLogRecPtrIsInvalid(flush))
3397 nulls[4] = true;
3398 values[4] = LSNGetDatum(flush);
3399
3400 if (XLogRecPtrIsInvalid(apply))
3401 nulls[5] = true;
3402 values[5] = LSNGetDatum(apply);
3403
3404 /*
3405 * Treat a standby such as a pg_basebackup background process
3406 * which always returns an invalid flush location, as an
3407 * asynchronous standby.
3408 */
3409 priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3410
3411 if (writeLag < 0)
3412 nulls[6] = true;
3413 else
3414 values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3415
3416 if (flushLag < 0)
3417 nulls[7] = true;
3418 else
3419 values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3420
3421 if (applyLag < 0)
3422 nulls[8] = true;
3423 else
3424 values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3425
3426 values[9] = Int32GetDatum(priority);
3427
3428 /*
3429 * More easily understood version of standby state. This is purely
3430 * informational.
3431 *
3432 * In quorum-based sync replication, the role of each standby
3433 * listed in synchronous_standby_names can be changing very
3434 * frequently. Any standbys considered as "sync" at one moment can
3435 * be switched to "potential" ones at the next moment. So, it's
3436 * basically useless to report "sync" or "potential" as their sync
3437 * states. We report just "quorum" for them.
3438 */
3439 if (priority == 0)
3440 values[10] = CStringGetTextDatum("async");
3441 else if (is_sync_standby)
3442 values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
3443 CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3444 else
3445 values[10] = CStringGetTextDatum("potential");
3446
3447 if (replyTime == 0)
3448 nulls[11] = true;
3449 else
3450 values[11] = TimestampTzGetDatum(replyTime);
3451 }
3452
3453 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
3454 }
3455
3456 /* clean up and return the tuplestore */
3457 tuplestore_donestoring(tupstore);
3458
3459 return (Datum) 0;
3460 }
3461
3462 /*
3463 * Send a keepalive message to standby.
3464 *
3465 * If requestReply is set, the message requests the other party to send
3466 * a message back to us, for heartbeat purposes. We also set a flag to
3467 * let nearby code that we're waiting for that response, to avoid
3468 * repeated requests.
3469 */
3470 static void
WalSndKeepalive(bool requestReply)3471 WalSndKeepalive(bool requestReply)
3472 {
3473 elog(DEBUG2, "sending replication keepalive");
3474
3475 /* construct the message... */
3476 resetStringInfo(&output_message);
3477 pq_sendbyte(&output_message, 'k');
3478 pq_sendint64(&output_message, sentPtr);
3479 pq_sendint64(&output_message, GetCurrentTimestamp());
3480 pq_sendbyte(&output_message, requestReply ? 1 : 0);
3481
3482 /* ... and send it wrapped in CopyData */
3483 pq_putmessage_noblock('d', output_message.data, output_message.len);
3484
3485 /* Set local flag */
3486 if (requestReply)
3487 waiting_for_ping_response = true;
3488 }
3489
3490 /*
3491 * Send keepalive message if too much time has elapsed.
3492 */
3493 static void
WalSndKeepaliveIfNecessary(void)3494 WalSndKeepaliveIfNecessary(void)
3495 {
3496 TimestampTz ping_time;
3497
3498 /*
3499 * Don't send keepalive messages if timeouts are globally disabled or
3500 * we're doing something not partaking in timeouts.
3501 */
3502 if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3503 return;
3504
3505 if (waiting_for_ping_response)
3506 return;
3507
3508 /*
3509 * If half of wal_sender_timeout has lapsed without receiving any reply
3510 * from the standby, send a keep-alive message to the standby requesting
3511 * an immediate reply.
3512 */
3513 ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
3514 wal_sender_timeout / 2);
3515 if (last_processing >= ping_time)
3516 {
3517 WalSndKeepalive(true);
3518
3519 /* Try to flush pending output to the client */
3520 if (pq_flush_if_writable() != 0)
3521 WalSndShutdown();
3522 }
3523 }
3524
3525 /*
3526 * Record the end of the WAL and the time it was flushed locally, so that
3527 * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3528 * eventually reported to have been written, flushed and applied by the
3529 * standby in a reply message.
3530 */
3531 static void
LagTrackerWrite(XLogRecPtr lsn,TimestampTz local_flush_time)3532 LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
3533 {
3534 bool buffer_full;
3535 int new_write_head;
3536 int i;
3537
3538 if (!am_walsender)
3539 return;
3540
3541 /*
3542 * If the lsn hasn't advanced since last time, then do nothing. This way
3543 * we only record a new sample when new WAL has been written.
3544 */
3545 if (lag_tracker->last_lsn == lsn)
3546 return;
3547 lag_tracker->last_lsn = lsn;
3548
3549 /*
3550 * If advancing the write head of the circular buffer would crash into any
3551 * of the read heads, then the buffer is full. In other words, the
3552 * slowest reader (presumably apply) is the one that controls the release
3553 * of space.
3554 */
3555 new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3556 buffer_full = false;
3557 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3558 {
3559 if (new_write_head == lag_tracker->read_heads[i])
3560 buffer_full = true;
3561 }
3562
3563 /*
3564 * If the buffer is full, for now we just rewind by one slot and overwrite
3565 * the last sample, as a simple (if somewhat uneven) way to lower the
3566 * sampling rate. There may be better adaptive compaction algorithms.
3567 */
3568 if (buffer_full)
3569 {
3570 new_write_head = lag_tracker->write_head;
3571 if (lag_tracker->write_head > 0)
3572 lag_tracker->write_head--;
3573 else
3574 lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3575 }
3576
3577 /* Store a sample at the current write head position. */
3578 lag_tracker->buffer[lag_tracker->write_head].lsn = lsn;
3579 lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3580 lag_tracker->write_head = new_write_head;
3581 }
3582
3583 /*
3584 * Find out how much time has elapsed between the moment WAL location 'lsn'
3585 * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3586 * We have a separate read head for each of the reported LSN locations we
3587 * receive in replies from standby; 'head' controls which read head is
3588 * used. Whenever a read head crosses an LSN which was written into the
3589 * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3590 * find out the time this LSN (or an earlier one) was flushed locally, and
3591 * therefore compute the lag.
3592 *
3593 * Return -1 if no new sample data is available, and otherwise the elapsed
3594 * time in microseconds.
3595 */
3596 static TimeOffset
LagTrackerRead(int head,XLogRecPtr lsn,TimestampTz now)3597 LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
3598 {
3599 TimestampTz time = 0;
3600
3601 /* Read all unread samples up to this LSN or end of buffer. */
3602 while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3603 lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3604 {
3605 time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3606 lag_tracker->last_read[head] =
3607 lag_tracker->buffer[lag_tracker->read_heads[head]];
3608 lag_tracker->read_heads[head] =
3609 (lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3610 }
3611
3612 /*
3613 * If the lag tracker is empty, that means the standby has processed
3614 * everything we've ever sent so we should now clear 'last_read'. If we
3615 * didn't do that, we'd risk using a stale and irrelevant sample for
3616 * interpolation at the beginning of the next burst of WAL after a period
3617 * of idleness.
3618 */
3619 if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3620 lag_tracker->last_read[head].time = 0;
3621
3622 if (time > now)
3623 {
3624 /* If the clock somehow went backwards, treat as not found. */
3625 return -1;
3626 }
3627 else if (time == 0)
3628 {
3629 /*
3630 * We didn't cross a time. If there is a future sample that we
3631 * haven't reached yet, and we've already reached at least one sample,
3632 * let's interpolate the local flushed time. This is mainly useful
3633 * for reporting a completely stuck apply position as having
3634 * increasing lag, since otherwise we'd have to wait for it to
3635 * eventually start moving again and cross one of our samples before
3636 * we can show the lag increasing.
3637 */
3638 if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3639 {
3640 /* There are no future samples, so we can't interpolate. */
3641 return -1;
3642 }
3643 else if (lag_tracker->last_read[head].time != 0)
3644 {
3645 /* We can interpolate between last_read and the next sample. */
3646 double fraction;
3647 WalTimeSample prev = lag_tracker->last_read[head];
3648 WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]];
3649
3650 if (lsn < prev.lsn)
3651 {
3652 /*
3653 * Reported LSNs shouldn't normally go backwards, but it's
3654 * possible when there is a timeline change. Treat as not
3655 * found.
3656 */
3657 return -1;
3658 }
3659
3660 Assert(prev.lsn < next.lsn);
3661
3662 if (prev.time > next.time)
3663 {
3664 /* If the clock somehow went backwards, treat as not found. */
3665 return -1;
3666 }
3667
3668 /* See how far we are between the previous and next samples. */
3669 fraction =
3670 (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3671
3672 /* Scale the local flush time proportionally. */
3673 time = (TimestampTz)
3674 ((double) prev.time + (next.time - prev.time) * fraction);
3675 }
3676 else
3677 {
3678 /*
3679 * We have only a future sample, implying that we were entirely
3680 * caught up but and now there is a new burst of WAL and the
3681 * standby hasn't processed the first sample yet. Until the
3682 * standby reaches the future sample the best we can do is report
3683 * the hypothetical lag if that sample were to be replayed now.
3684 */
3685 time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3686 }
3687 }
3688
3689 /* Return the elapsed time since local flush time in microseconds. */
3690 Assert(time != 0);
3691 return now - time;
3692 }
3693