1 /*-------------------------------------------------------------------------
2 *
3 * walsender.c
4 *
5 * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6 * care of sending XLOG from the primary server to a single recipient.
7 * (Note that there can be more than one walsender process concurrently.)
8 * It is started by the postmaster when the walreceiver of a standby server
9 * connects to the primary server and requests XLOG streaming replication.
10 *
11 * A walsender is similar to a regular backend, ie. there is a one-to-one
12 * relationship between a connection and a walsender process, but instead
13 * of processing SQL queries, it understands a small set of special
14 * replication-mode commands. The START_REPLICATION command begins streaming
15 * WAL to the client. While streaming, the walsender keeps reading XLOG
16 * records from the disk and sends them to the standby server over the
17 * COPY protocol, until the either side ends the replication by exiting COPY
18 * mode (or until the connection is closed).
19 *
20 * Normal termination is by SIGTERM, which instructs the walsender to
21 * close the connection and exit(0) at next convenient moment. Emergency
22 * termination is by SIGQUIT; like any backend, the walsender will simply
23 * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24 * are treated as not a crash but approximately normal termination;
25 * the walsender will exit quickly without sending any more XLOG records.
26 *
27 * If the server is shut down, checkpointer sends us
28 * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29 * the backend is idle or runs an SQL query this causes the backend to
30 * shutdown, if logical replication is in progress all existing WAL records
31 * are processed followed by a shutdown. Otherwise this causes the walsender
32 * to switch to the "stopping" state. In this state, the walsender will reject
33 * any further replication commands. The checkpointer begins the shutdown
34 * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35 * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36 * walsender to send any outstanding WAL, including the shutdown checkpoint
37 * record, wait for it to be replicated to the standby, and then exit.
38 *
39 *
40 * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
41 *
42 * IDENTIFICATION
43 * src/backend/replication/walsender.c
44 *
45 *-------------------------------------------------------------------------
46 */
47 #include "postgres.h"
48
49 #include <signal.h>
50 #include <unistd.h>
51
52 #include "access/timeline.h"
53 #include "access/transam.h"
54 #include "access/xact.h"
55 #include "access/xlog_internal.h"
56
57 #include "catalog/pg_type.h"
58 #include "commands/dbcommands.h"
59 #include "funcapi.h"
60 #include "libpq/libpq.h"
61 #include "libpq/pqformat.h"
62 #include "miscadmin.h"
63 #include "nodes/replnodes.h"
64 #include "pgstat.h"
65 #include "replication/basebackup.h"
66 #include "replication/decode.h"
67 #include "replication/logical.h"
68 #include "replication/logicalfuncs.h"
69 #include "replication/slot.h"
70 #include "replication/snapbuild.h"
71 #include "replication/syncrep.h"
72 #include "replication/walreceiver.h"
73 #include "replication/walsender.h"
74 #include "replication/walsender_private.h"
75 #include "storage/fd.h"
76 #include "storage/ipc.h"
77 #include "storage/pmsignal.h"
78 #include "storage/proc.h"
79 #include "storage/procarray.h"
80 #include "tcop/tcopprot.h"
81 #include "utils/builtins.h"
82 #include "utils/guc.h"
83 #include "utils/memutils.h"
84 #include "utils/pg_lsn.h"
85 #include "utils/ps_status.h"
86 #include "utils/resowner.h"
87 #include "utils/timeout.h"
88 #include "utils/timestamp.h"
89
90 /*
91 * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
92 *
93 * We don't have a good idea of what a good value would be; there's some
94 * overhead per message in both walsender and walreceiver, but on the other
95 * hand sending large batches makes walsender less responsive to signals
96 * because signals are checked only between messages. 128kB (with
97 * default 8k blocks) seems like a reasonable guess for now.
98 */
99 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
100
101 /* Array of WalSnds in shared memory */
102 WalSndCtlData *WalSndCtl = NULL;
103
104 /* My slot in the shared memory array */
105 WalSnd *MyWalSnd = NULL;
106
107 /* Global state */
108 bool am_walsender = false; /* Am I a walsender process? */
109 bool am_cascading_walsender = false; /* Am I cascading WAL to
110 * another standby? */
111 bool am_db_walsender = false; /* Connected to a database? */
112
113 /* User-settable parameters for walsender */
114 int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
115 int wal_sender_timeout = 60 * 1000; /* maximum time to send one
116 * WAL data message */
117 bool log_replication_commands = false;
118
119 /*
120 * State for WalSndWakeupRequest
121 */
122 bool wake_wal_senders = false;
123
124 /*
125 * These variables are used similarly to openLogFile/SegNo/Off,
126 * but for walsender to read the XLOG.
127 */
128 static int sendFile = -1;
129 static XLogSegNo sendSegNo = 0;
130 static uint32 sendOff = 0;
131
132 /* Timeline ID of the currently open file */
133 static TimeLineID curFileTimeLine = 0;
134
135 /*
136 * These variables keep track of the state of the timeline we're currently
137 * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
138 * the timeline is not the latest timeline on this server, and the server's
139 * history forked off from that timeline at sendTimeLineValidUpto.
140 */
141 static TimeLineID sendTimeLine = 0;
142 static TimeLineID sendTimeLineNextTLI = 0;
143 static bool sendTimeLineIsHistoric = false;
144 static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
145
146 /*
147 * How far have we sent WAL already? This is also advertised in
148 * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
149 */
150 static XLogRecPtr sentPtr = InvalidXLogRecPtr;
151
152 /* Buffers for constructing outgoing messages and processing reply messages. */
153 static StringInfoData output_message;
154 static StringInfoData reply_message;
155 static StringInfoData tmpbuf;
156
157 /* Timestamp of last ProcessRepliesIfAny(). */
158 static TimestampTz last_processing = 0;
159
160 /*
161 * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
162 * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
163 */
164 static TimestampTz last_reply_timestamp = 0;
165
166 /* Have we sent a heartbeat message asking for reply, since last reply? */
167 static bool waiting_for_ping_response = false;
168
169 /*
170 * While streaming WAL in Copy mode, streamingDoneSending is set to true
171 * after we have sent CopyDone. We should not send any more CopyData messages
172 * after that. streamingDoneReceiving is set to true when we receive CopyDone
173 * from the other end. When both become true, it's time to exit Copy mode.
174 */
175 static bool streamingDoneSending;
176 static bool streamingDoneReceiving;
177
178 /* Are we there yet? */
179 static bool WalSndCaughtUp = false;
180
181 /* Flags set by signal handlers for later service in main loop */
182 static volatile sig_atomic_t got_SIGUSR2 = false;
183 static volatile sig_atomic_t got_STOPPING = false;
184
185 /*
186 * This is set while we are streaming. When not set
187 * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
188 * the main loop is responsible for checking got_STOPPING and terminating when
189 * it's set (after streaming any remaining WAL).
190 */
191 static volatile sig_atomic_t replication_active = false;
192
193 static LogicalDecodingContext *logical_decoding_ctx = NULL;
194 static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
195
196 /* Signal handlers */
197 static void WalSndLastCycleHandler(SIGNAL_ARGS);
198
199 /* Prototypes for private functions */
200 typedef void (*WalSndSendDataCallback) (void);
201 static void WalSndLoop(WalSndSendDataCallback send_data);
202 static void InitWalSenderSlot(void);
203 static void WalSndKill(int code, Datum arg);
204 static void WalSndShutdown(void) pg_attribute_noreturn();
205 static void XLogSendPhysical(void);
206 static void XLogSendLogical(void);
207 static void WalSndDone(WalSndSendDataCallback send_data);
208 static XLogRecPtr GetStandbyFlushRecPtr(void);
209 static void IdentifySystem(void);
210 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
211 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
212 static void StartReplication(StartReplicationCmd *cmd);
213 static void StartLogicalReplication(StartReplicationCmd *cmd);
214 static void ProcessStandbyMessage(void);
215 static void ProcessStandbyReplyMessage(void);
216 static void ProcessStandbyHSFeedbackMessage(void);
217 static void ProcessRepliesIfAny(void);
218 static void WalSndKeepalive(bool requestReply);
219 static void WalSndKeepaliveIfNecessary(void);
220 static void WalSndCheckTimeOut(void);
221 static long WalSndComputeSleeptime(TimestampTz now);
222 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
223 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
224 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
225
226 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
227
228
229 /* Initialize walsender process before entering the main command loop */
230 void
InitWalSender(void)231 InitWalSender(void)
232 {
233 am_cascading_walsender = RecoveryInProgress();
234
235 /* Create a per-walsender data structure in shared memory */
236 InitWalSenderSlot();
237
238 /* Set up resource owner */
239 CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
240
241 /*
242 * Let postmaster know that we're a WAL sender. Once we've declared us as
243 * a WAL sender process, postmaster will let us outlive the bgwriter and
244 * kill us last in the shutdown sequence, so we get a chance to stream all
245 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
246 * there's no going back, and we mustn't write any WAL records after this.
247 */
248 MarkPostmasterChildWalSender();
249 SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
250 }
251
252 /*
253 * Clean up after an error.
254 *
255 * WAL sender processes don't use transactions like regular backends do.
256 * This function does any cleanup requited after an error in a WAL sender
257 * process, similar to what transaction abort does in a regular backend.
258 */
259 void
WalSndErrorCleanup(void)260 WalSndErrorCleanup(void)
261 {
262 LWLockReleaseAll();
263 pgstat_report_wait_end();
264
265 if (sendFile >= 0)
266 {
267 close(sendFile);
268 sendFile = -1;
269 }
270
271 if (MyReplicationSlot != NULL)
272 ReplicationSlotRelease();
273
274 replication_active = false;
275
276 if (got_STOPPING || got_SIGUSR2)
277 proc_exit(0);
278
279 /* Revert back to startup state */
280 WalSndSetState(WALSNDSTATE_STARTUP);
281 }
282
283 /*
284 * Handle a client's connection abort in an orderly manner.
285 */
286 static void
WalSndShutdown(void)287 WalSndShutdown(void)
288 {
289 /*
290 * Reset whereToSendOutput to prevent ereport from attempting to send any
291 * more messages to the standby.
292 */
293 if (whereToSendOutput == DestRemote)
294 whereToSendOutput = DestNone;
295
296 proc_exit(0);
297 abort(); /* keep the compiler quiet */
298 }
299
300 /*
301 * Handle the IDENTIFY_SYSTEM command.
302 */
303 static void
IdentifySystem(void)304 IdentifySystem(void)
305 {
306 StringInfoData buf;
307 char sysid[32];
308 char tli[11];
309 char xpos[MAXFNAMELEN];
310 XLogRecPtr logptr;
311 char *dbname = NULL;
312 Size len;
313
314 /*
315 * Reply with a result set with one row, four columns. First col is system
316 * ID, second is timeline ID, third is current xlog location and the
317 * fourth contains the database name if we are connected to one.
318 */
319
320 snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
321 GetSystemIdentifier());
322
323 am_cascading_walsender = RecoveryInProgress();
324 if (am_cascading_walsender)
325 {
326 /* this also updates ThisTimeLineID */
327 logptr = GetStandbyFlushRecPtr();
328 }
329 else
330 logptr = GetFlushRecPtr();
331
332 snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
333
334 snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
335
336 if (MyDatabaseId != InvalidOid)
337 {
338 MemoryContext cur = CurrentMemoryContext;
339
340 /* syscache access needs a transaction env. */
341 StartTransactionCommand();
342 /* make dbname live outside TX context */
343 MemoryContextSwitchTo(cur);
344 dbname = get_database_name(MyDatabaseId);
345 CommitTransactionCommand();
346 /* CommitTransactionCommand switches to TopMemoryContext */
347 MemoryContextSwitchTo(cur);
348 }
349
350 /* Send a RowDescription message */
351 pq_beginmessage(&buf, 'T');
352 pq_sendint(&buf, 4, 2); /* 4 fields */
353
354 /* first field */
355 pq_sendstring(&buf, "systemid"); /* col name */
356 pq_sendint(&buf, 0, 4); /* table oid */
357 pq_sendint(&buf, 0, 2); /* attnum */
358 pq_sendint(&buf, TEXTOID, 4); /* type oid */
359 pq_sendint(&buf, -1, 2); /* typlen */
360 pq_sendint(&buf, 0, 4); /* typmod */
361 pq_sendint(&buf, 0, 2); /* format code */
362
363 /* second field */
364 pq_sendstring(&buf, "timeline"); /* col name */
365 pq_sendint(&buf, 0, 4); /* table oid */
366 pq_sendint(&buf, 0, 2); /* attnum */
367 pq_sendint(&buf, INT4OID, 4); /* type oid */
368 pq_sendint(&buf, 4, 2); /* typlen */
369 pq_sendint(&buf, 0, 4); /* typmod */
370 pq_sendint(&buf, 0, 2); /* format code */
371
372 /* third field */
373 pq_sendstring(&buf, "xlogpos"); /* col name */
374 pq_sendint(&buf, 0, 4); /* table oid */
375 pq_sendint(&buf, 0, 2); /* attnum */
376 pq_sendint(&buf, TEXTOID, 4); /* type oid */
377 pq_sendint(&buf, -1, 2); /* typlen */
378 pq_sendint(&buf, 0, 4); /* typmod */
379 pq_sendint(&buf, 0, 2); /* format code */
380
381 /* fourth field */
382 pq_sendstring(&buf, "dbname"); /* col name */
383 pq_sendint(&buf, 0, 4); /* table oid */
384 pq_sendint(&buf, 0, 2); /* attnum */
385 pq_sendint(&buf, TEXTOID, 4); /* type oid */
386 pq_sendint(&buf, -1, 2); /* typlen */
387 pq_sendint(&buf, 0, 4); /* typmod */
388 pq_sendint(&buf, 0, 2); /* format code */
389 pq_endmessage(&buf);
390
391 /* Send a DataRow message */
392 pq_beginmessage(&buf, 'D');
393 pq_sendint(&buf, 4, 2); /* # of columns */
394
395 /* column 1: system identifier */
396 len = strlen(sysid);
397 pq_sendint(&buf, len, 4);
398 pq_sendbytes(&buf, (char *) &sysid, len);
399
400 /* column 2: timeline */
401 len = strlen(tli);
402 pq_sendint(&buf, len, 4);
403 pq_sendbytes(&buf, (char *) tli, len);
404
405 /* column 3: xlog position */
406 len = strlen(xpos);
407 pq_sendint(&buf, len, 4);
408 pq_sendbytes(&buf, (char *) xpos, len);
409
410 /* column 4: database name, or NULL if none */
411 if (dbname)
412 {
413 len = strlen(dbname);
414 pq_sendint(&buf, len, 4);
415 pq_sendbytes(&buf, (char *) dbname, len);
416 }
417 else
418 {
419 pq_sendint(&buf, -1, 4);
420 }
421
422 pq_endmessage(&buf);
423 }
424
425
426 /*
427 * Handle TIMELINE_HISTORY command.
428 */
429 static void
SendTimeLineHistory(TimeLineHistoryCmd * cmd)430 SendTimeLineHistory(TimeLineHistoryCmd *cmd)
431 {
432 StringInfoData buf;
433 char histfname[MAXFNAMELEN];
434 char path[MAXPGPATH];
435 int fd;
436 off_t histfilelen;
437 off_t bytesleft;
438 Size len;
439
440 /*
441 * Reply with a result set with one row, and two columns. The first col is
442 * the name of the history file, 2nd is the contents.
443 */
444
445 TLHistoryFileName(histfname, cmd->timeline);
446 TLHistoryFilePath(path, cmd->timeline);
447
448 /* Send a RowDescription message */
449 pq_beginmessage(&buf, 'T');
450 pq_sendint(&buf, 2, 2); /* 2 fields */
451
452 /* first field */
453 pq_sendstring(&buf, "filename"); /* col name */
454 pq_sendint(&buf, 0, 4); /* table oid */
455 pq_sendint(&buf, 0, 2); /* attnum */
456 pq_sendint(&buf, TEXTOID, 4); /* type oid */
457 pq_sendint(&buf, -1, 2); /* typlen */
458 pq_sendint(&buf, 0, 4); /* typmod */
459 pq_sendint(&buf, 0, 2); /* format code */
460
461 /* second field */
462 pq_sendstring(&buf, "content"); /* col name */
463 pq_sendint(&buf, 0, 4); /* table oid */
464 pq_sendint(&buf, 0, 2); /* attnum */
465 /*
466 * While this is labeled as BYTEAOID, it is the same output format
467 * as TEXTOID above.
468 */
469 pq_sendint(&buf, BYTEAOID, 4); /* type oid */
470 pq_sendint(&buf, -1, 2); /* typlen */
471 pq_sendint(&buf, 0, 4); /* typmod */
472 pq_sendint(&buf, 0, 2); /* format code */
473 pq_endmessage(&buf);
474
475 /* Send a DataRow message */
476 pq_beginmessage(&buf, 'D');
477 pq_sendint(&buf, 2, 2); /* # of columns */
478 len = strlen(histfname);
479 pq_sendint(&buf, len, 4); /* col1 len */
480 pq_sendbytes(&buf, histfname, len);
481
482 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
483 if (fd < 0)
484 ereport(ERROR,
485 (errcode_for_file_access(),
486 errmsg("could not open file \"%s\": %m", path)));
487
488 /* Determine file length and send it to client */
489 histfilelen = lseek(fd, 0, SEEK_END);
490 if (histfilelen < 0)
491 ereport(ERROR,
492 (errcode_for_file_access(),
493 errmsg("could not seek to end of file \"%s\": %m", path)));
494 if (lseek(fd, 0, SEEK_SET) != 0)
495 ereport(ERROR,
496 (errcode_for_file_access(),
497 errmsg("could not seek to beginning of file \"%s\": %m", path)));
498
499 pq_sendint(&buf, histfilelen, 4); /* col2 len */
500
501 bytesleft = histfilelen;
502 while (bytesleft > 0)
503 {
504 PGAlignedBlock rbuf;
505 int nread;
506
507 nread = read(fd, rbuf.data, sizeof(rbuf));
508 if (nread <= 0)
509 ereport(ERROR,
510 (errcode_for_file_access(),
511 errmsg("could not read file \"%s\": %m",
512 path)));
513 pq_sendbytes(&buf, rbuf.data, nread);
514 bytesleft -= nread;
515 }
516 CloseTransientFile(fd);
517
518 pq_endmessage(&buf);
519 }
520
521 /*
522 * Handle START_REPLICATION command.
523 *
524 * At the moment, this never returns, but an ereport(ERROR) will take us back
525 * to the main loop.
526 */
527 static void
StartReplication(StartReplicationCmd * cmd)528 StartReplication(StartReplicationCmd *cmd)
529 {
530 StringInfoData buf;
531 XLogRecPtr FlushPtr;
532
533 /*
534 * We assume here that we're logging enough information in the WAL for
535 * log-shipping, since this is checked in PostmasterMain().
536 *
537 * NOTE: wal_level can only change at shutdown, so in most cases it is
538 * difficult for there to be WAL data that we can still see that was
539 * written at wal_level='minimal'.
540 */
541
542 if (cmd->slotname)
543 {
544 ReplicationSlotAcquire(cmd->slotname);
545 if (SlotIsLogical(MyReplicationSlot))
546 ereport(ERROR,
547 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
548 (errmsg("cannot use a logical replication slot for physical replication"))));
549 }
550
551 /*
552 * Select the timeline. If it was given explicitly by the client, use
553 * that. Otherwise use the timeline of the last replayed record, which is
554 * kept in ThisTimeLineID.
555 */
556 if (am_cascading_walsender)
557 {
558 /* this also updates ThisTimeLineID */
559 FlushPtr = GetStandbyFlushRecPtr();
560 }
561 else
562 FlushPtr = GetFlushRecPtr();
563
564 if (cmd->timeline != 0)
565 {
566 XLogRecPtr switchpoint;
567
568 sendTimeLine = cmd->timeline;
569 if (sendTimeLine == ThisTimeLineID)
570 {
571 sendTimeLineIsHistoric = false;
572 sendTimeLineValidUpto = InvalidXLogRecPtr;
573 }
574 else
575 {
576 List *timeLineHistory;
577
578 sendTimeLineIsHistoric = true;
579
580 /*
581 * Check that the timeline the client requested for exists, and
582 * the requested start location is on that timeline.
583 */
584 timeLineHistory = readTimeLineHistory(ThisTimeLineID);
585 switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
586 &sendTimeLineNextTLI);
587 list_free_deep(timeLineHistory);
588
589 /*
590 * Found the requested timeline in the history. Check that
591 * requested startpoint is on that timeline in our history.
592 *
593 * This is quite loose on purpose. We only check that we didn't
594 * fork off the requested timeline before the switchpoint. We
595 * don't check that we switched *to* it before the requested
596 * starting point. This is because the client can legitimately
597 * request to start replication from the beginning of the WAL
598 * segment that contains switchpoint, but on the new timeline, so
599 * that it doesn't end up with a partial segment. If you ask for a
600 * too old starting point, you'll get an error later when we fail
601 * to find the requested WAL segment in pg_xlog.
602 *
603 * XXX: we could be more strict here and only allow a startpoint
604 * that's older than the switchpoint, if it's still in the same
605 * WAL segment.
606 */
607 if (!XLogRecPtrIsInvalid(switchpoint) &&
608 switchpoint < cmd->startpoint)
609 {
610 ereport(ERROR,
611 (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
612 (uint32) (cmd->startpoint >> 32),
613 (uint32) (cmd->startpoint),
614 cmd->timeline),
615 errdetail("This server's history forked from timeline %u at %X/%X.",
616 cmd->timeline,
617 (uint32) (switchpoint >> 32),
618 (uint32) (switchpoint))));
619 }
620 sendTimeLineValidUpto = switchpoint;
621 }
622 }
623 else
624 {
625 sendTimeLine = ThisTimeLineID;
626 sendTimeLineValidUpto = InvalidXLogRecPtr;
627 sendTimeLineIsHistoric = false;
628 }
629
630 streamingDoneSending = streamingDoneReceiving = false;
631
632 /* If there is nothing to stream, don't even enter COPY mode */
633 if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
634 {
635 /*
636 * When we first start replication the standby will be behind the
637 * primary. For some applications, for example, synchronous
638 * replication, it is important to have a clear state for this initial
639 * catchup mode, so we can trigger actions when we change streaming
640 * state later. We may stay in this state for a long time, which is
641 * exactly why we want to be able to monitor whether or not we are
642 * still here.
643 */
644 WalSndSetState(WALSNDSTATE_CATCHUP);
645
646 /* Send a CopyBothResponse message, and start streaming */
647 pq_beginmessage(&buf, 'W');
648 pq_sendbyte(&buf, 0);
649 pq_sendint(&buf, 0, 2);
650 pq_endmessage(&buf);
651 pq_flush();
652
653 /*
654 * Don't allow a request to stream from a future point in WAL that
655 * hasn't been flushed to disk in this server yet.
656 */
657 if (FlushPtr < cmd->startpoint)
658 {
659 ereport(ERROR,
660 (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
661 (uint32) (cmd->startpoint >> 32),
662 (uint32) (cmd->startpoint),
663 (uint32) (FlushPtr >> 32),
664 (uint32) (FlushPtr))));
665 }
666
667 /* Start streaming from the requested point */
668 sentPtr = cmd->startpoint;
669
670 /* Initialize shared memory status, too */
671 {
672 WalSnd *walsnd = MyWalSnd;
673
674 SpinLockAcquire(&walsnd->mutex);
675 walsnd->sentPtr = sentPtr;
676 SpinLockRelease(&walsnd->mutex);
677 }
678
679 SyncRepInitConfig();
680
681 /* Main loop of walsender */
682 replication_active = true;
683
684 WalSndLoop(XLogSendPhysical);
685
686 replication_active = false;
687 if (got_STOPPING)
688 proc_exit(0);
689 WalSndSetState(WALSNDSTATE_STARTUP);
690
691 Assert(streamingDoneSending && streamingDoneReceiving);
692 }
693
694 if (cmd->slotname)
695 ReplicationSlotRelease();
696
697 /*
698 * Copy is finished now. Send a single-row result set indicating the next
699 * timeline.
700 */
701 if (sendTimeLineIsHistoric)
702 {
703 char tli_str[11];
704 char startpos_str[8 + 1 + 8 + 1];
705 Size len;
706
707 snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI);
708 snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
709 (uint32) (sendTimeLineValidUpto >> 32),
710 (uint32) sendTimeLineValidUpto);
711
712 pq_beginmessage(&buf, 'T'); /* RowDescription */
713 pq_sendint(&buf, 2, 2); /* 2 fields */
714
715 /* Field header */
716 pq_sendstring(&buf, "next_tli");
717 pq_sendint(&buf, 0, 4); /* table oid */
718 pq_sendint(&buf, 0, 2); /* attnum */
719
720 /*
721 * int8 may seem like a surprising data type for this, but in theory
722 * int4 would not be wide enough for this, as TimeLineID is unsigned.
723 */
724 pq_sendint(&buf, INT8OID, 4); /* type oid */
725 pq_sendint(&buf, -1, 2);
726 pq_sendint(&buf, 0, 4);
727 pq_sendint(&buf, 0, 2);
728
729 pq_sendstring(&buf, "next_tli_startpos");
730 pq_sendint(&buf, 0, 4); /* table oid */
731 pq_sendint(&buf, 0, 2); /* attnum */
732 pq_sendint(&buf, TEXTOID, 4); /* type oid */
733 pq_sendint(&buf, -1, 2);
734 pq_sendint(&buf, 0, 4);
735 pq_sendint(&buf, 0, 2);
736 pq_endmessage(&buf);
737
738 /* Data row */
739 pq_beginmessage(&buf, 'D');
740 pq_sendint(&buf, 2, 2); /* number of columns */
741
742 len = strlen(tli_str);
743 pq_sendint(&buf, len, 4); /* length */
744 pq_sendbytes(&buf, tli_str, len);
745
746 len = strlen(startpos_str);
747 pq_sendint(&buf, len, 4); /* length */
748 pq_sendbytes(&buf, startpos_str, len);
749
750 pq_endmessage(&buf);
751 }
752
753 /* Send CommandComplete message */
754 pq_puttextmessage('C', "START_STREAMING");
755 }
756
757 /*
758 * read_page callback for logical decoding contexts, as a walsender process.
759 *
760 * Inside the walsender we can do better than logical_read_local_xlog_page,
761 * which has to do a plain sleep/busy loop, because the walsender's latch gets
762 * set every time WAL is flushed.
763 */
764 static int
logical_read_xlog_page(XLogReaderState * state,XLogRecPtr targetPagePtr,int reqLen,XLogRecPtr targetRecPtr,char * cur_page,TimeLineID * pageTLI)765 logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
766 XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
767 {
768 XLogRecPtr flushptr;
769 int count;
770
771 /* make sure we have enough WAL available */
772 flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
773
774 /* fail if not (implies we are going to shut down) */
775 if (flushptr < targetPagePtr + reqLen)
776 return -1;
777
778 if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
779 count = XLOG_BLCKSZ; /* more than one block available */
780 else
781 count = flushptr - targetPagePtr; /* part of the page available */
782
783 /* now actually read the data, we know it's there */
784 XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
785
786 return count;
787 }
788
789 /*
790 * Create a new replication slot.
791 */
792 static void
CreateReplicationSlot(CreateReplicationSlotCmd * cmd)793 CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
794 {
795 const char *snapshot_name = NULL;
796 char xpos[MAXFNAMELEN];
797 StringInfoData buf;
798 Size len;
799
800 Assert(!MyReplicationSlot);
801
802 /* setup state for XLogReadPage */
803 sendTimeLineIsHistoric = false;
804 sendTimeLine = ThisTimeLineID;
805
806 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
807 {
808 ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT);
809 }
810 else
811 {
812 CheckLogicalDecodingRequirements();
813
814 /*
815 * Initially create the slot as ephemeral - that allows us to nicely
816 * handle errors during initialization because it'll get dropped if
817 * this transaction fails. We'll make it persistent at the end.
818 */
819 ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL);
820 }
821
822 if (cmd->kind == REPLICATION_KIND_LOGICAL)
823 {
824 LogicalDecodingContext *ctx;
825
826 ctx = CreateInitDecodingContext(cmd->plugin, NIL,
827 true, /* build snapshot */
828 logical_read_xlog_page,
829 WalSndPrepareWrite, WalSndWriteData);
830
831 /*
832 * Signal that we don't need the timeout mechanism. We're just
833 * creating the replication slot and don't yet accept feedback
834 * messages or send keepalives. As we possibly need to wait for
835 * further WAL the walsender would otherwise possibly be killed too
836 * soon.
837 */
838 last_reply_timestamp = 0;
839
840 /* build initial snapshot, might take a while */
841 DecodingContextFindStartpoint(ctx);
842
843 /*
844 * Export a plain (not of the snapbuild.c type) snapshot to the user
845 * that can be imported into another session.
846 */
847 snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
848
849 /* don't need the decoding context anymore */
850 FreeDecodingContext(ctx);
851
852 ReplicationSlotPersist();
853 }
854 else if (cmd->kind == REPLICATION_KIND_PHYSICAL && cmd->reserve_wal)
855 {
856 ReplicationSlotReserveWal();
857
858 /* Write this slot to disk */
859 ReplicationSlotMarkDirty();
860 ReplicationSlotSave();
861 }
862
863 snprintf(xpos, sizeof(xpos), "%X/%X",
864 (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
865 (uint32) MyReplicationSlot->data.confirmed_flush);
866
867 pq_beginmessage(&buf, 'T');
868 pq_sendint(&buf, 4, 2); /* 4 fields */
869
870 /* first field: slot name */
871 pq_sendstring(&buf, "slot_name"); /* col name */
872 pq_sendint(&buf, 0, 4); /* table oid */
873 pq_sendint(&buf, 0, 2); /* attnum */
874 pq_sendint(&buf, TEXTOID, 4); /* type oid */
875 pq_sendint(&buf, -1, 2); /* typlen */
876 pq_sendint(&buf, 0, 4); /* typmod */
877 pq_sendint(&buf, 0, 2); /* format code */
878
879 /* second field: LSN at which we became consistent */
880 pq_sendstring(&buf, "consistent_point"); /* col name */
881 pq_sendint(&buf, 0, 4); /* table oid */
882 pq_sendint(&buf, 0, 2); /* attnum */
883 pq_sendint(&buf, TEXTOID, 4); /* type oid */
884 pq_sendint(&buf, -1, 2); /* typlen */
885 pq_sendint(&buf, 0, 4); /* typmod */
886 pq_sendint(&buf, 0, 2); /* format code */
887
888 /* third field: exported snapshot's name */
889 pq_sendstring(&buf, "snapshot_name"); /* col name */
890 pq_sendint(&buf, 0, 4); /* table oid */
891 pq_sendint(&buf, 0, 2); /* attnum */
892 pq_sendint(&buf, TEXTOID, 4); /* type oid */
893 pq_sendint(&buf, -1, 2); /* typlen */
894 pq_sendint(&buf, 0, 4); /* typmod */
895 pq_sendint(&buf, 0, 2); /* format code */
896
897 /* fourth field: output plugin */
898 pq_sendstring(&buf, "output_plugin"); /* col name */
899 pq_sendint(&buf, 0, 4); /* table oid */
900 pq_sendint(&buf, 0, 2); /* attnum */
901 pq_sendint(&buf, TEXTOID, 4); /* type oid */
902 pq_sendint(&buf, -1, 2); /* typlen */
903 pq_sendint(&buf, 0, 4); /* typmod */
904 pq_sendint(&buf, 0, 2); /* format code */
905
906 pq_endmessage(&buf);
907
908 /* Send a DataRow message */
909 pq_beginmessage(&buf, 'D');
910 pq_sendint(&buf, 4, 2); /* # of columns */
911
912 /* slot_name */
913 len = strlen(NameStr(MyReplicationSlot->data.name));
914 pq_sendint(&buf, len, 4); /* col1 len */
915 pq_sendbytes(&buf, NameStr(MyReplicationSlot->data.name), len);
916
917 /* consistent wal location */
918 len = strlen(xpos);
919 pq_sendint(&buf, len, 4);
920 pq_sendbytes(&buf, xpos, len);
921
922 /* snapshot name, or NULL if none */
923 if (snapshot_name != NULL)
924 {
925 len = strlen(snapshot_name);
926 pq_sendint(&buf, len, 4);
927 pq_sendbytes(&buf, snapshot_name, len);
928 }
929 else
930 pq_sendint(&buf, -1, 4);
931
932 /* plugin, or NULL if none */
933 if (cmd->plugin != NULL)
934 {
935 len = strlen(cmd->plugin);
936 pq_sendint(&buf, len, 4);
937 pq_sendbytes(&buf, cmd->plugin, len);
938 }
939 else
940 pq_sendint(&buf, -1, 4);
941
942 pq_endmessage(&buf);
943
944 /*
945 * release active status again, START_REPLICATION will reacquire it
946 */
947 ReplicationSlotRelease();
948 }
949
950 /*
951 * Get rid of a replication slot that is no longer wanted.
952 */
953 static void
DropReplicationSlot(DropReplicationSlotCmd * cmd)954 DropReplicationSlot(DropReplicationSlotCmd *cmd)
955 {
956 ReplicationSlotDrop(cmd->slotname);
957 EndCommand("DROP_REPLICATION_SLOT", DestRemote);
958 }
959
960 /*
961 * Load previously initiated logical slot and prepare for sending data (via
962 * WalSndLoop).
963 */
964 static void
StartLogicalReplication(StartReplicationCmd * cmd)965 StartLogicalReplication(StartReplicationCmd *cmd)
966 {
967 StringInfoData buf;
968
969 /* make sure that our requirements are still fulfilled */
970 CheckLogicalDecodingRequirements();
971
972 Assert(!MyReplicationSlot);
973
974 ReplicationSlotAcquire(cmd->slotname);
975
976 /*
977 * Force a disconnect, so that the decoding code doesn't need to care
978 * about an eventual switch from running in recovery, to running in a
979 * normal environment. Client code is expected to handle reconnects.
980 */
981 if (am_cascading_walsender && !RecoveryInProgress())
982 {
983 ereport(LOG,
984 (errmsg("terminating walsender process after promotion")));
985 got_STOPPING = true;
986 }
987
988 WalSndSetState(WALSNDSTATE_CATCHUP);
989
990 /* Send a CopyBothResponse message, and start streaming */
991 pq_beginmessage(&buf, 'W');
992 pq_sendbyte(&buf, 0);
993 pq_sendint(&buf, 0, 2);
994 pq_endmessage(&buf);
995 pq_flush();
996
997 /* setup state for XLogReadPage */
998 sendTimeLineIsHistoric = false;
999 sendTimeLine = ThisTimeLineID;
1000
1001 /*
1002 * Initialize position to the last ack'ed one, then the xlog records begin
1003 * to be shipped from that position.
1004 */
1005 logical_decoding_ctx = CreateDecodingContext(
1006 cmd->startpoint, cmd->options,
1007 logical_read_xlog_page,
1008 WalSndPrepareWrite, WalSndWriteData);
1009
1010 /* Start reading WAL from the oldest required WAL. */
1011 logical_startptr = MyReplicationSlot->data.restart_lsn;
1012
1013 /*
1014 * Report the location after which we'll send out further commits as the
1015 * current sentPtr.
1016 */
1017 sentPtr = MyReplicationSlot->data.confirmed_flush;
1018
1019 /* Also update the sent position status in shared memory */
1020 {
1021 WalSnd *walsnd = MyWalSnd;
1022
1023 SpinLockAcquire(&walsnd->mutex);
1024 walsnd->sentPtr = MyReplicationSlot->data.restart_lsn;
1025 SpinLockRelease(&walsnd->mutex);
1026 }
1027
1028 replication_active = true;
1029
1030 SyncRepInitConfig();
1031
1032 /* Main loop of walsender */
1033 WalSndLoop(XLogSendLogical);
1034
1035 FreeDecodingContext(logical_decoding_ctx);
1036 ReplicationSlotRelease();
1037
1038 replication_active = false;
1039 if (got_STOPPING)
1040 proc_exit(0);
1041 WalSndSetState(WALSNDSTATE_STARTUP);
1042
1043 /* Get out of COPY mode (CommandComplete). */
1044 EndCommand("COPY 0", DestRemote);
1045 }
1046
1047 /*
1048 * LogicalDecodingContext 'prepare_write' callback.
1049 *
1050 * Prepare a write into a StringInfo.
1051 *
1052 * Don't do anything lasting in here, it's quite possible that nothing will done
1053 * with the data.
1054 */
1055 static void
WalSndPrepareWrite(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid,bool last_write)1056 WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
1057 {
1058 /* can't have sync rep confused by sending the same LSN several times */
1059 if (!last_write)
1060 lsn = InvalidXLogRecPtr;
1061
1062 resetStringInfo(ctx->out);
1063
1064 pq_sendbyte(ctx->out, 'w');
1065 pq_sendint64(ctx->out, lsn); /* dataStart */
1066 pq_sendint64(ctx->out, lsn); /* walEnd */
1067
1068 /*
1069 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1070 * reserve space here.
1071 */
1072 pq_sendint64(ctx->out, 0); /* sendtime */
1073 }
1074
1075 /*
1076 * LogicalDecodingContext 'write' callback.
1077 *
1078 * Actually write out data previously prepared by WalSndPrepareWrite out to
1079 * the network. Take as long as needed, but process replies from the other
1080 * side and check timeouts during that.
1081 */
1082 static void
WalSndWriteData(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid,bool last_write)1083 WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
1084 bool last_write)
1085 {
1086 TimestampTz now;
1087 int64 now_int;
1088
1089 /*
1090 * Fill the send timestamp last, so that it is taken as late as possible.
1091 * This is somewhat ugly, but the protocol's set as it's already used for
1092 * several releases by streaming physical replication.
1093 */
1094 resetStringInfo(&tmpbuf);
1095 now_int = GetCurrentIntegerTimestamp();
1096 now = IntegerTimestampToTimestampTz(now_int);
1097 pq_sendint64(&tmpbuf, now_int);
1098 memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1099 tmpbuf.data, sizeof(int64));
1100
1101 /* output previously gathered data in a CopyData packet */
1102 pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1103
1104 CHECK_FOR_INTERRUPTS();
1105
1106 /* Try to flush pending output to the client */
1107 if (pq_flush_if_writable() != 0)
1108 WalSndShutdown();
1109
1110 /* Try taking fast path unless we get too close to walsender timeout. */
1111 if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
1112 wal_sender_timeout / 2) &&
1113 !pq_is_send_pending())
1114 {
1115 return;
1116 }
1117
1118 /* If we have pending write here, go to slow path */
1119 for (;;)
1120 {
1121 int wakeEvents;
1122 long sleeptime;
1123
1124 /* Check for input from the client */
1125 ProcessRepliesIfAny();
1126
1127 /* die if timeout was reached */
1128 WalSndCheckTimeOut();
1129
1130 /* Send keepalive if the time has come */
1131 WalSndKeepaliveIfNecessary();
1132
1133 if (!pq_is_send_pending())
1134 break;
1135
1136 sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1137
1138 wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1139 WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
1140
1141 /* Sleep until something happens or we time out */
1142 WaitLatchOrSocket(MyLatch, wakeEvents,
1143 MyProcPort->sock, sleeptime);
1144
1145 /*
1146 * Emergency bailout if postmaster has died. This is to avoid the
1147 * necessity for manual cleanup of all postmaster children.
1148 */
1149 if (!PostmasterIsAlive())
1150 exit(1);
1151
1152 /* Clear any already-pending wakeups */
1153 ResetLatch(MyLatch);
1154
1155 CHECK_FOR_INTERRUPTS();
1156
1157 /* Process any requests or signals received recently */
1158 if (ConfigReloadPending)
1159 {
1160 ConfigReloadPending = false;
1161 ProcessConfigFile(PGC_SIGHUP);
1162 SyncRepInitConfig();
1163 }
1164
1165 /* Try to flush pending output to the client */
1166 if (pq_flush_if_writable() != 0)
1167 WalSndShutdown();
1168 }
1169
1170 /* reactivate latch so WalSndLoop knows to continue */
1171 SetLatch(MyLatch);
1172 }
1173
1174 /*
1175 * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1176 *
1177 * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1178 * if we detect a shutdown request (either from postmaster or client)
1179 * we will return early, so caller must always check.
1180 */
1181 static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)1182 WalSndWaitForWal(XLogRecPtr loc)
1183 {
1184 int wakeEvents;
1185 static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1186
1187 /*
1188 * Fast path to avoid acquiring the spinlock in case we already know we
1189 * have enough WAL available. This is particularly interesting if we're
1190 * far behind.
1191 */
1192 if (RecentFlushPtr != InvalidXLogRecPtr &&
1193 loc <= RecentFlushPtr)
1194 return RecentFlushPtr;
1195
1196 /* Get a more recent flush pointer. */
1197 if (!RecoveryInProgress())
1198 RecentFlushPtr = GetFlushRecPtr();
1199 else
1200 RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1201
1202 for (;;)
1203 {
1204 long sleeptime;
1205
1206 /*
1207 * Emergency bailout if postmaster has died. This is to avoid the
1208 * necessity for manual cleanup of all postmaster children.
1209 */
1210 if (!PostmasterIsAlive())
1211 exit(1);
1212
1213 /* Clear any already-pending wakeups */
1214 ResetLatch(MyLatch);
1215
1216 CHECK_FOR_INTERRUPTS();
1217
1218 /* Process any requests or signals received recently */
1219 if (ConfigReloadPending)
1220 {
1221 ConfigReloadPending = false;
1222 ProcessConfigFile(PGC_SIGHUP);
1223 SyncRepInitConfig();
1224 }
1225
1226 /* Check for input from the client */
1227 ProcessRepliesIfAny();
1228
1229 /*
1230 * If we're shutting down, trigger pending WAL to be written out,
1231 * otherwise we'd possibly end up waiting for WAL that never gets
1232 * written, because walwriter has shut down already.
1233 */
1234 if (got_STOPPING)
1235 XLogBackgroundFlush();
1236
1237 /* Update our idea of the currently flushed position. */
1238 if (!RecoveryInProgress())
1239 RecentFlushPtr = GetFlushRecPtr();
1240 else
1241 RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1242
1243 /*
1244 * If postmaster asked us to stop, don't wait anymore.
1245 *
1246 * It's important to do this check after the recomputation of
1247 * RecentFlushPtr, so we can send all remaining data before shutting
1248 * down.
1249 */
1250 if (got_STOPPING)
1251 break;
1252
1253 /*
1254 * We only send regular messages to the client for full decoded
1255 * transactions, but a synchronous replication and walsender shutdown
1256 * possibly are waiting for a later location. So, before sleeping, we
1257 * send a ping containing the flush location. If the receiver is
1258 * otherwise idle, this keepalive will trigger a reply. Processing the
1259 * reply will update these MyWalSnd locations.
1260 */
1261 if (MyWalSnd->flush < sentPtr &&
1262 MyWalSnd->write < sentPtr &&
1263 !waiting_for_ping_response)
1264 WalSndKeepalive(false);
1265
1266 /* check whether we're done */
1267 if (loc <= RecentFlushPtr)
1268 break;
1269
1270 /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1271 WalSndCaughtUp = true;
1272
1273 /*
1274 * Try to flush any pending output to the client.
1275 */
1276 if (pq_flush_if_writable() != 0)
1277 WalSndShutdown();
1278
1279 /*
1280 * If we have received CopyDone from the client, sent CopyDone
1281 * ourselves, and the output buffer is empty, it's time to exit
1282 * streaming, so fail the current WAL fetch request.
1283 */
1284 if (streamingDoneReceiving && streamingDoneSending &&
1285 !pq_is_send_pending())
1286 break;
1287
1288 /* die if timeout was reached */
1289 WalSndCheckTimeOut();
1290
1291 /* Send keepalive if the time has come */
1292 WalSndKeepaliveIfNecessary();
1293
1294 /*
1295 * Sleep until something happens or we time out. Also wait for the
1296 * socket becoming writable, if there's still pending output.
1297 * Otherwise we might sit on sendable output data while waiting for
1298 * new WAL to be generated. (But if we have nothing to send, we don't
1299 * want to wake on socket-writable.)
1300 */
1301 sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1302
1303 wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1304 WL_SOCKET_READABLE | WL_TIMEOUT;
1305
1306 if (pq_is_send_pending())
1307 wakeEvents |= WL_SOCKET_WRITEABLE;
1308
1309 WaitLatchOrSocket(MyLatch, wakeEvents,
1310 MyProcPort->sock, sleeptime);
1311 }
1312
1313 /* reactivate latch so WalSndLoop knows to continue */
1314 SetLatch(MyLatch);
1315 return RecentFlushPtr;
1316 }
1317
1318 /*
1319 * Execute an incoming replication command.
1320 */
1321 void
exec_replication_command(const char * cmd_string)1322 exec_replication_command(const char *cmd_string)
1323 {
1324 int parse_rc;
1325 Node *cmd_node;
1326 MemoryContext cmd_context;
1327 MemoryContext old_context;
1328
1329 /*
1330 * If WAL sender has been told that shutdown is getting close, switch its
1331 * status accordingly to handle the next replication commands correctly.
1332 */
1333 if (got_STOPPING)
1334 WalSndSetState(WALSNDSTATE_STOPPING);
1335
1336 /*
1337 * Throw error if in stopping mode. We need prevent commands that could
1338 * generate WAL while the shutdown checkpoint is being written. To be
1339 * safe, we just prohibit all new commands.
1340 */
1341 if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1342 ereport(ERROR,
1343 (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1344
1345 /*
1346 * Log replication command if log_replication_commands is enabled. Even
1347 * when it's disabled, log the command with DEBUG1 level for backward
1348 * compatibility.
1349 */
1350 ereport(log_replication_commands ? LOG : DEBUG1,
1351 (errmsg("received replication command: %s", cmd_string)));
1352
1353 /*
1354 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1355 * command arrives. Clean up the old stuff if there's anything.
1356 */
1357 SnapBuildClearExportedSnapshot();
1358
1359 CHECK_FOR_INTERRUPTS();
1360
1361 cmd_context = AllocSetContextCreate(CurrentMemoryContext,
1362 "Replication command context",
1363 ALLOCSET_DEFAULT_SIZES);
1364 old_context = MemoryContextSwitchTo(cmd_context);
1365
1366 replication_scanner_init(cmd_string);
1367 parse_rc = replication_yyparse();
1368 if (parse_rc != 0)
1369 ereport(ERROR,
1370 (errcode(ERRCODE_SYNTAX_ERROR),
1371 (errmsg_internal("replication command parser returned %d",
1372 parse_rc))));
1373
1374 cmd_node = replication_parse_result;
1375
1376 /*
1377 * Allocate buffers that will be used for each outgoing and incoming
1378 * message. We do this just once per command to reduce palloc overhead.
1379 */
1380 initStringInfo(&output_message);
1381 initStringInfo(&reply_message);
1382 initStringInfo(&tmpbuf);
1383
1384 switch (cmd_node->type)
1385 {
1386 case T_IdentifySystemCmd:
1387 IdentifySystem();
1388 break;
1389
1390 case T_BaseBackupCmd:
1391 SendBaseBackup((BaseBackupCmd *) cmd_node);
1392 break;
1393
1394 case T_CreateReplicationSlotCmd:
1395 CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
1396 break;
1397
1398 case T_DropReplicationSlotCmd:
1399 DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
1400 break;
1401
1402 case T_StartReplicationCmd:
1403 {
1404 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1405
1406 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1407 StartReplication(cmd);
1408 else
1409 StartLogicalReplication(cmd);
1410 break;
1411 }
1412
1413 case T_TimeLineHistoryCmd:
1414 SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
1415 break;
1416
1417 default:
1418 elog(ERROR, "unrecognized replication command node tag: %u",
1419 cmd_node->type);
1420 }
1421
1422 /* done */
1423 MemoryContextSwitchTo(old_context);
1424 MemoryContextDelete(cmd_context);
1425
1426 /* Send CommandComplete message */
1427 EndCommand("SELECT", DestRemote);
1428 }
1429
1430 /*
1431 * Process any incoming messages while streaming. Also checks if the remote
1432 * end has closed the connection.
1433 */
1434 static void
ProcessRepliesIfAny(void)1435 ProcessRepliesIfAny(void)
1436 {
1437 unsigned char firstchar;
1438 int r;
1439 bool received = false;
1440
1441 last_processing = GetCurrentTimestamp();
1442
1443 /*
1444 * If we already received a CopyDone from the frontend, any subsequent
1445 * message is the beginning of a new command, and should be processed in
1446 * the main processing loop.
1447 */
1448 while (!streamingDoneReceiving)
1449 {
1450 pq_startmsgread();
1451 r = pq_getbyte_if_available(&firstchar);
1452 if (r < 0)
1453 {
1454 /* unexpected error or EOF */
1455 ereport(COMMERROR,
1456 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1457 errmsg("unexpected EOF on standby connection")));
1458 proc_exit(0);
1459 }
1460 if (r == 0)
1461 {
1462 /* no data available without blocking */
1463 pq_endmsgread();
1464 break;
1465 }
1466
1467 /* Read the message contents */
1468 resetStringInfo(&reply_message);
1469 if (pq_getmessage(&reply_message, 0))
1470 {
1471 ereport(COMMERROR,
1472 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1473 errmsg("unexpected EOF on standby connection")));
1474 proc_exit(0);
1475 }
1476
1477 /* Handle the very limited subset of commands expected in this phase */
1478 switch (firstchar)
1479 {
1480 /*
1481 * 'd' means a standby reply wrapped in a CopyData packet.
1482 */
1483 case 'd':
1484 ProcessStandbyMessage();
1485 received = true;
1486 break;
1487
1488 /*
1489 * CopyDone means the standby requested to finish streaming.
1490 * Reply with CopyDone, if we had not sent that already.
1491 */
1492 case 'c':
1493 if (!streamingDoneSending)
1494 {
1495 pq_putmessage_noblock('c', NULL, 0);
1496 streamingDoneSending = true;
1497 }
1498
1499 streamingDoneReceiving = true;
1500 received = true;
1501 break;
1502
1503 /*
1504 * 'X' means that the standby is closing down the socket.
1505 */
1506 case 'X':
1507 proc_exit(0);
1508
1509 default:
1510 ereport(FATAL,
1511 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1512 errmsg("invalid standby message type \"%c\"",
1513 firstchar)));
1514 }
1515 }
1516
1517 /*
1518 * Save the last reply timestamp if we've received at least one reply.
1519 */
1520 if (received)
1521 {
1522 last_reply_timestamp = last_processing;
1523 waiting_for_ping_response = false;
1524 }
1525 }
1526
1527 /*
1528 * Process a status update message received from standby.
1529 */
1530 static void
ProcessStandbyMessage(void)1531 ProcessStandbyMessage(void)
1532 {
1533 char msgtype;
1534
1535 /*
1536 * Check message type from the first byte.
1537 */
1538 msgtype = pq_getmsgbyte(&reply_message);
1539
1540 switch (msgtype)
1541 {
1542 case 'r':
1543 ProcessStandbyReplyMessage();
1544 break;
1545
1546 case 'h':
1547 ProcessStandbyHSFeedbackMessage();
1548 break;
1549
1550 default:
1551 ereport(COMMERROR,
1552 (errcode(ERRCODE_PROTOCOL_VIOLATION),
1553 errmsg("unexpected message type \"%c\"", msgtype)));
1554 proc_exit(0);
1555 }
1556 }
1557
1558 /*
1559 * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1560 */
1561 static void
PhysicalConfirmReceivedLocation(XLogRecPtr lsn)1562 PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
1563 {
1564 bool changed = false;
1565 ReplicationSlot *slot = MyReplicationSlot;
1566
1567 Assert(lsn != InvalidXLogRecPtr);
1568 SpinLockAcquire(&slot->mutex);
1569 if (slot->data.restart_lsn != lsn)
1570 {
1571 changed = true;
1572 slot->data.restart_lsn = lsn;
1573 }
1574 SpinLockRelease(&slot->mutex);
1575
1576 if (changed)
1577 {
1578 ReplicationSlotMarkDirty();
1579 ReplicationSlotsComputeRequiredLSN();
1580 }
1581
1582 /*
1583 * One could argue that the slot should be saved to disk now, but that'd
1584 * be energy wasted - the worst lost information can do here is give us
1585 * wrong information in a statistics view - we'll just potentially be more
1586 * conservative in removing files.
1587 */
1588 }
1589
1590 /*
1591 * Regular reply from standby advising of WAL positions on standby server.
1592 */
1593 static void
ProcessStandbyReplyMessage(void)1594 ProcessStandbyReplyMessage(void)
1595 {
1596 XLogRecPtr writePtr,
1597 flushPtr,
1598 applyPtr;
1599 bool replyRequested;
1600
1601 /* the caller already consumed the msgtype byte */
1602 writePtr = pq_getmsgint64(&reply_message);
1603 flushPtr = pq_getmsgint64(&reply_message);
1604 applyPtr = pq_getmsgint64(&reply_message);
1605 (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1606 replyRequested = pq_getmsgbyte(&reply_message);
1607
1608 elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1609 (uint32) (writePtr >> 32), (uint32) writePtr,
1610 (uint32) (flushPtr >> 32), (uint32) flushPtr,
1611 (uint32) (applyPtr >> 32), (uint32) applyPtr,
1612 replyRequested ? " (reply requested)" : "");
1613
1614 /* Send a reply if the standby requested one. */
1615 if (replyRequested)
1616 WalSndKeepalive(false);
1617
1618 /*
1619 * Update shared state for this WalSender process based on reply data from
1620 * standby.
1621 */
1622 {
1623 WalSnd *walsnd = MyWalSnd;
1624
1625 SpinLockAcquire(&walsnd->mutex);
1626 walsnd->write = writePtr;
1627 walsnd->flush = flushPtr;
1628 walsnd->apply = applyPtr;
1629 SpinLockRelease(&walsnd->mutex);
1630 }
1631
1632 if (!am_cascading_walsender)
1633 SyncRepReleaseWaiters();
1634
1635 /*
1636 * Advance our local xmin horizon when the client confirmed a flush.
1637 */
1638 if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1639 {
1640 if (SlotIsLogical(MyReplicationSlot))
1641 LogicalConfirmReceivedLocation(flushPtr);
1642 else
1643 PhysicalConfirmReceivedLocation(flushPtr);
1644 }
1645 }
1646
1647 /* compute new replication slot xmin horizon if needed */
1648 static void
PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)1649 PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
1650 {
1651 bool changed = false;
1652 ReplicationSlot *slot = MyReplicationSlot;
1653
1654 SpinLockAcquire(&slot->mutex);
1655 MyPgXact->xmin = InvalidTransactionId;
1656
1657 /*
1658 * For physical replication we don't need the interlock provided by xmin
1659 * and effective_xmin since the consequences of a missed increase are
1660 * limited to query cancellations, so set both at once.
1661 */
1662 if (!TransactionIdIsNormal(slot->data.xmin) ||
1663 !TransactionIdIsNormal(feedbackXmin) ||
1664 TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1665 {
1666 changed = true;
1667 slot->data.xmin = feedbackXmin;
1668 slot->effective_xmin = feedbackXmin;
1669 }
1670 SpinLockRelease(&slot->mutex);
1671
1672 if (changed)
1673 {
1674 ReplicationSlotMarkDirty();
1675 ReplicationSlotsComputeRequiredXmin(false);
1676 }
1677 }
1678
1679 /*
1680 * Hot Standby feedback
1681 */
1682 static void
ProcessStandbyHSFeedbackMessage(void)1683 ProcessStandbyHSFeedbackMessage(void)
1684 {
1685 TransactionId nextXid;
1686 uint32 nextEpoch;
1687 TransactionId feedbackXmin;
1688 uint32 feedbackEpoch;
1689
1690 /*
1691 * Decipher the reply message. The caller already consumed the msgtype
1692 * byte.
1693 */
1694 (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
1695 feedbackXmin = pq_getmsgint(&reply_message, 4);
1696 feedbackEpoch = pq_getmsgint(&reply_message, 4);
1697
1698 elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
1699 feedbackXmin,
1700 feedbackEpoch);
1701
1702 /* Unset WalSender's xmin if the feedback message value is invalid */
1703 if (!TransactionIdIsNormal(feedbackXmin))
1704 {
1705 MyPgXact->xmin = InvalidTransactionId;
1706 if (MyReplicationSlot != NULL)
1707 PhysicalReplicationSlotNewXmin(feedbackXmin);
1708 return;
1709 }
1710
1711 /*
1712 * Check that the provided xmin/epoch are sane, that is, not in the future
1713 * and not so far back as to be already wrapped around. Ignore if not.
1714 *
1715 * Epoch of nextXid should be same as standby, or if the counter has
1716 * wrapped, then one greater than standby.
1717 */
1718 GetNextXidAndEpoch(&nextXid, &nextEpoch);
1719
1720 if (feedbackXmin <= nextXid)
1721 {
1722 if (feedbackEpoch != nextEpoch)
1723 return;
1724 }
1725 else
1726 {
1727 if (feedbackEpoch + 1 != nextEpoch)
1728 return;
1729 }
1730
1731 if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
1732 return; /* epoch OK, but it's wrapped around */
1733
1734 /*
1735 * Set the WalSender's xmin equal to the standby's requested xmin, so that
1736 * the xmin will be taken into account by GetOldestXmin. This will hold
1737 * back the removal of dead rows and thereby prevent the generation of
1738 * cleanup conflicts on the standby server.
1739 *
1740 * There is a small window for a race condition here: although we just
1741 * checked that feedbackXmin precedes nextXid, the nextXid could have
1742 * gotten advanced between our fetching it and applying the xmin below,
1743 * perhaps far enough to make feedbackXmin wrap around. In that case the
1744 * xmin we set here would be "in the future" and have no effect. No point
1745 * in worrying about this since it's too late to save the desired data
1746 * anyway. Assuming that the standby sends us an increasing sequence of
1747 * xmins, this could only happen during the first reply cycle, else our
1748 * own xmin would prevent nextXid from advancing so far.
1749 *
1750 * We don't bother taking the ProcArrayLock here. Setting the xmin field
1751 * is assumed atomic, and there's no real need to prevent a concurrent
1752 * GetOldestXmin. (If we're moving our xmin forward, this is obviously
1753 * safe, and if we're moving it backwards, well, the data is at risk
1754 * already since a VACUUM could have just finished calling GetOldestXmin.)
1755 *
1756 * If we're using a replication slot we reserve the xmin via that,
1757 * otherwise via the walsender's PGXACT entry.
1758 *
1759 * XXX: It might make sense to generalize the ephemeral slot concept and
1760 * always use the slot mechanism to handle the feedback xmin.
1761 */
1762 if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
1763 PhysicalReplicationSlotNewXmin(feedbackXmin);
1764 else
1765 MyPgXact->xmin = feedbackXmin;
1766 }
1767
1768 /*
1769 * Compute how long send/receive loops should sleep.
1770 *
1771 * If wal_sender_timeout is enabled we want to wake up in time to send
1772 * keepalives and to abort the connection if wal_sender_timeout has been
1773 * reached.
1774 */
1775 static long
WalSndComputeSleeptime(TimestampTz now)1776 WalSndComputeSleeptime(TimestampTz now)
1777 {
1778 long sleeptime = 10000; /* 10 s */
1779
1780 if (wal_sender_timeout > 0 && last_reply_timestamp > 0)
1781 {
1782 TimestampTz wakeup_time;
1783
1784 /*
1785 * At the latest stop sleeping once wal_sender_timeout has been
1786 * reached.
1787 */
1788 wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
1789 wal_sender_timeout);
1790
1791 /*
1792 * If no ping has been sent yet, wakeup when it's time to do so.
1793 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
1794 * the timeout passed without a response.
1795 */
1796 if (!waiting_for_ping_response)
1797 wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
1798 wal_sender_timeout / 2);
1799
1800 /* Compute relative time until wakeup. */
1801 sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
1802 }
1803
1804 return sleeptime;
1805 }
1806
1807 /*
1808 * Check whether there have been responses by the client within
1809 * wal_sender_timeout and shutdown if not. Using last_processing as the
1810 * reference point avoids counting server-side stalls against the client.
1811 * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
1812 * postdate last_processing by more than wal_sender_timeout. If that happens,
1813 * the client must reply almost immediately to avoid a timeout. This rarely
1814 * affects the default configuration, under which clients spontaneously send a
1815 * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
1816 * could eliminate that problem by recognizing timeout expiration at
1817 * wal_sender_timeout/2 after the keepalive.
1818 */
1819 static void
WalSndCheckTimeOut(void)1820 WalSndCheckTimeOut(void)
1821 {
1822 TimestampTz timeout;
1823
1824 /* don't bail out if we're doing something that doesn't require timeouts */
1825 if (last_reply_timestamp <= 0)
1826 return;
1827
1828 timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
1829 wal_sender_timeout);
1830
1831 if (wal_sender_timeout > 0 && last_processing >= timeout)
1832 {
1833 /*
1834 * Since typically expiration of replication timeout means
1835 * communication problem, we don't send the error message to the
1836 * standby.
1837 */
1838 ereport(COMMERROR,
1839 (errmsg("terminating walsender process due to replication timeout")));
1840
1841 WalSndShutdown();
1842 }
1843 }
1844
1845 /* Main loop of walsender process that streams the WAL over Copy messages. */
1846 static void
WalSndLoop(WalSndSendDataCallback send_data)1847 WalSndLoop(WalSndSendDataCallback send_data)
1848 {
1849 /*
1850 * Initialize the last reply timestamp. That enables timeout processing
1851 * from hereon.
1852 */
1853 last_reply_timestamp = GetCurrentTimestamp();
1854 waiting_for_ping_response = false;
1855
1856 /*
1857 * Loop until we reach the end of this timeline or the client requests to
1858 * stop streaming.
1859 */
1860 for (;;)
1861 {
1862 /*
1863 * Emergency bailout if postmaster has died. This is to avoid the
1864 * necessity for manual cleanup of all postmaster children.
1865 */
1866 if (!PostmasterIsAlive())
1867 exit(1);
1868
1869 /* Clear any already-pending wakeups */
1870 ResetLatch(MyLatch);
1871
1872 CHECK_FOR_INTERRUPTS();
1873
1874 /* Process any requests or signals received recently */
1875 if (ConfigReloadPending)
1876 {
1877 ConfigReloadPending = false;
1878 ProcessConfigFile(PGC_SIGHUP);
1879 SyncRepInitConfig();
1880 }
1881
1882 /* Check for input from the client */
1883 ProcessRepliesIfAny();
1884
1885 /*
1886 * If we have received CopyDone from the client, sent CopyDone
1887 * ourselves, and the output buffer is empty, it's time to exit
1888 * streaming.
1889 */
1890 if (streamingDoneReceiving && streamingDoneSending &&
1891 !pq_is_send_pending())
1892 break;
1893
1894 /*
1895 * If we don't have any pending data in the output buffer, try to send
1896 * some more. If there is some, we don't bother to call send_data
1897 * again until we've flushed it ... but we'd better assume we are not
1898 * caught up.
1899 */
1900 if (!pq_is_send_pending())
1901 send_data();
1902 else
1903 WalSndCaughtUp = false;
1904
1905 /* Try to flush pending output to the client */
1906 if (pq_flush_if_writable() != 0)
1907 WalSndShutdown();
1908
1909 /* If nothing remains to be sent right now ... */
1910 if (WalSndCaughtUp && !pq_is_send_pending())
1911 {
1912 /*
1913 * If we're in catchup state, move to streaming. This is an
1914 * important state change for users to know about, since before
1915 * this point data loss might occur if the primary dies and we
1916 * need to failover to the standby. The state change is also
1917 * important for synchronous replication, since commits that
1918 * started to wait at that point might wait for some time.
1919 */
1920 if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
1921 {
1922 ereport(DEBUG1,
1923 (errmsg("\"%s\" has now caught up with upstream server",
1924 application_name)));
1925 WalSndSetState(WALSNDSTATE_STREAMING);
1926 }
1927
1928 /*
1929 * When SIGUSR2 arrives, we send any outstanding logs up to the
1930 * shutdown checkpoint record (i.e., the latest record), wait for
1931 * them to be replicated to the standby, and exit. This may be a
1932 * normal termination at shutdown, or a promotion, the walsender
1933 * is not sure which.
1934 */
1935 if (got_SIGUSR2)
1936 WalSndDone(send_data);
1937 }
1938
1939 /* Check for replication timeout. */
1940 WalSndCheckTimeOut();
1941
1942 /* Send keepalive if the time has come */
1943 WalSndKeepaliveIfNecessary();
1944
1945 /*
1946 * We don't block if not caught up, unless there is unsent data
1947 * pending in which case we'd better block until the socket is
1948 * write-ready. This test is only needed for the case where the
1949 * send_data callback handled a subset of the available data but then
1950 * pq_flush_if_writable flushed it all --- we should immediately try
1951 * to send more.
1952 */
1953 if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
1954 {
1955 long sleeptime;
1956 int wakeEvents;
1957
1958 wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT;
1959
1960 if (!streamingDoneReceiving)
1961 wakeEvents |= WL_SOCKET_READABLE;
1962
1963 /*
1964 * Use fresh timestamp, not last_processed, to reduce the chance
1965 * of reaching wal_sender_timeout before sending a keepalive.
1966 */
1967 sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1968
1969 if (pq_is_send_pending())
1970 wakeEvents |= WL_SOCKET_WRITEABLE;
1971
1972 /* Sleep until something happens or we time out */
1973 WaitLatchOrSocket(MyLatch, wakeEvents,
1974 MyProcPort->sock, sleeptime);
1975 }
1976 }
1977 return;
1978 }
1979
1980 /* Initialize a per-walsender data structure for this walsender process */
1981 static void
InitWalSenderSlot(void)1982 InitWalSenderSlot(void)
1983 {
1984 int i;
1985
1986 /*
1987 * WalSndCtl should be set up already (we inherit this by fork() or
1988 * EXEC_BACKEND mechanism from the postmaster).
1989 */
1990 Assert(WalSndCtl != NULL);
1991 Assert(MyWalSnd == NULL);
1992
1993 /*
1994 * Find a free walsender slot and reserve it. If this fails, we must be
1995 * out of WalSnd structures.
1996 */
1997 for (i = 0; i < max_wal_senders; i++)
1998 {
1999 WalSnd *walsnd = &WalSndCtl->walsnds[i];
2000
2001 SpinLockAcquire(&walsnd->mutex);
2002
2003 if (walsnd->pid != 0)
2004 {
2005 SpinLockRelease(&walsnd->mutex);
2006 continue;
2007 }
2008 else
2009 {
2010 /*
2011 * Found a free slot. Reserve it for us.
2012 */
2013 walsnd->pid = MyProcPid;
2014 walsnd->sentPtr = InvalidXLogRecPtr;
2015 walsnd->write = InvalidXLogRecPtr;
2016 walsnd->flush = InvalidXLogRecPtr;
2017 walsnd->apply = InvalidXLogRecPtr;
2018 walsnd->state = WALSNDSTATE_STARTUP;
2019 walsnd->latch = &MyProc->procLatch;
2020 SpinLockRelease(&walsnd->mutex);
2021 /* don't need the lock anymore */
2022 MyWalSnd = (WalSnd *) walsnd;
2023
2024 break;
2025 }
2026 }
2027 if (MyWalSnd == NULL)
2028 ereport(FATAL,
2029 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2030 errmsg("number of requested standby connections "
2031 "exceeds max_wal_senders (currently %d)",
2032 max_wal_senders)));
2033
2034 /* Arrange to clean up at walsender exit */
2035 on_shmem_exit(WalSndKill, 0);
2036 }
2037
2038 /* Destroy the per-walsender data structure for this walsender process */
2039 static void
WalSndKill(int code,Datum arg)2040 WalSndKill(int code, Datum arg)
2041 {
2042 WalSnd *walsnd = MyWalSnd;
2043
2044 Assert(walsnd != NULL);
2045
2046 MyWalSnd = NULL;
2047
2048 SpinLockAcquire(&walsnd->mutex);
2049 /* clear latch while holding the spinlock, so it can safely be read */
2050 walsnd->latch = NULL;
2051 /* Mark WalSnd struct as no longer being in use. */
2052 walsnd->pid = 0;
2053 SpinLockRelease(&walsnd->mutex);
2054 }
2055
2056 /*
2057 * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
2058 *
2059 * XXX probably this should be improved to suck data directly from the
2060 * WAL buffers when possible.
2061 *
2062 * Will open, and keep open, one WAL segment stored in the global file
2063 * descriptor sendFile. This means if XLogRead is used once, there will
2064 * always be one descriptor left open until the process ends, but never
2065 * more than one.
2066 */
2067 static void
XLogRead(char * buf,XLogRecPtr startptr,Size count)2068 XLogRead(char *buf, XLogRecPtr startptr, Size count)
2069 {
2070 char *p;
2071 XLogRecPtr recptr;
2072 Size nbytes;
2073 XLogSegNo segno;
2074
2075 retry:
2076 p = buf;
2077 recptr = startptr;
2078 nbytes = count;
2079
2080 while (nbytes > 0)
2081 {
2082 uint32 startoff;
2083 int segbytes;
2084 int readbytes;
2085
2086 startoff = recptr % XLogSegSize;
2087
2088 if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
2089 {
2090 char path[MAXPGPATH];
2091
2092 /* Switch to another logfile segment */
2093 if (sendFile >= 0)
2094 close(sendFile);
2095
2096 XLByteToSeg(recptr, sendSegNo);
2097
2098 /*-------
2099 * When reading from a historic timeline, and there is a timeline
2100 * switch within this segment, read from the WAL segment belonging
2101 * to the new timeline.
2102 *
2103 * For example, imagine that this server is currently on timeline
2104 * 5, and we're streaming timeline 4. The switch from timeline 4
2105 * to 5 happened at 0/13002088. In pg_xlog, we have these files:
2106 *
2107 * ...
2108 * 000000040000000000000012
2109 * 000000040000000000000013
2110 * 000000050000000000000013
2111 * 000000050000000000000014
2112 * ...
2113 *
2114 * In this situation, when requested to send the WAL from
2115 * segment 0x13, on timeline 4, we read the WAL from file
2116 * 000000050000000000000013. Archive recovery prefers files from
2117 * newer timelines, so if the segment was restored from the
2118 * archive on this server, the file belonging to the old timeline,
2119 * 000000040000000000000013, might not exist. Their contents are
2120 * equal up to the switchpoint, because at a timeline switch, the
2121 * used portion of the old segment is copied to the new file.
2122 *-------
2123 */
2124 curFileTimeLine = sendTimeLine;
2125 if (sendTimeLineIsHistoric)
2126 {
2127 XLogSegNo endSegNo;
2128
2129 XLByteToSeg(sendTimeLineValidUpto, endSegNo);
2130 if (sendSegNo == endSegNo)
2131 curFileTimeLine = sendTimeLineNextTLI;
2132 }
2133
2134 XLogFilePath(path, curFileTimeLine, sendSegNo);
2135
2136 sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2137 if (sendFile < 0)
2138 {
2139 /*
2140 * If the file is not found, assume it's because the standby
2141 * asked for a too old WAL segment that has already been
2142 * removed or recycled.
2143 */
2144 if (errno == ENOENT)
2145 ereport(ERROR,
2146 (errcode_for_file_access(),
2147 errmsg("requested WAL segment %s has already been removed",
2148 XLogFileNameP(curFileTimeLine, sendSegNo))));
2149 else
2150 ereport(ERROR,
2151 (errcode_for_file_access(),
2152 errmsg("could not open file \"%s\": %m",
2153 path)));
2154 }
2155 sendOff = 0;
2156 }
2157
2158 /* Need to seek in the file? */
2159 if (sendOff != startoff)
2160 {
2161 if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2162 ereport(ERROR,
2163 (errcode_for_file_access(),
2164 errmsg("could not seek in log segment %s to offset %u: %m",
2165 XLogFileNameP(curFileTimeLine, sendSegNo),
2166 startoff)));
2167 sendOff = startoff;
2168 }
2169
2170 /* How many bytes are within this segment? */
2171 if (nbytes > (XLogSegSize - startoff))
2172 segbytes = XLogSegSize - startoff;
2173 else
2174 segbytes = nbytes;
2175
2176 readbytes = read(sendFile, p, segbytes);
2177 if (readbytes <= 0)
2178 {
2179 ereport(ERROR,
2180 (errcode_for_file_access(),
2181 errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2182 XLogFileNameP(curFileTimeLine, sendSegNo),
2183 sendOff, (unsigned long) segbytes)));
2184 }
2185
2186 /* Update state for read */
2187 recptr += readbytes;
2188
2189 sendOff += readbytes;
2190 nbytes -= readbytes;
2191 p += readbytes;
2192 }
2193
2194 /*
2195 * After reading into the buffer, check that what we read was valid. We do
2196 * this after reading, because even though the segment was present when we
2197 * opened it, it might get recycled or removed while we read it. The
2198 * read() succeeds in that case, but the data we tried to read might
2199 * already have been overwritten with new WAL records.
2200 */
2201 XLByteToSeg(startptr, segno);
2202 CheckXLogRemoved(segno, ThisTimeLineID);
2203
2204 /*
2205 * During recovery, the currently-open WAL file might be replaced with the
2206 * file of the same name retrieved from archive. So we always need to
2207 * check what we read was valid after reading into the buffer. If it's
2208 * invalid, we try to open and read the file again.
2209 */
2210 if (am_cascading_walsender)
2211 {
2212 WalSnd *walsnd = MyWalSnd;
2213 bool reload;
2214
2215 SpinLockAcquire(&walsnd->mutex);
2216 reload = walsnd->needreload;
2217 walsnd->needreload = false;
2218 SpinLockRelease(&walsnd->mutex);
2219
2220 if (reload && sendFile >= 0)
2221 {
2222 close(sendFile);
2223 sendFile = -1;
2224
2225 goto retry;
2226 }
2227 }
2228 }
2229
2230 /*
2231 * Send out the WAL in its normal physical/stored form.
2232 *
2233 * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2234 * but not yet sent to the client, and buffer it in the libpq output
2235 * buffer.
2236 *
2237 * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2238 * otherwise WalSndCaughtUp is set to false.
2239 */
2240 static void
XLogSendPhysical(void)2241 XLogSendPhysical(void)
2242 {
2243 XLogRecPtr SendRqstPtr;
2244 XLogRecPtr startptr;
2245 XLogRecPtr endptr;
2246 Size nbytes;
2247
2248 /* If requested switch the WAL sender to the stopping state. */
2249 if (got_STOPPING)
2250 WalSndSetState(WALSNDSTATE_STOPPING);
2251
2252 if (streamingDoneSending)
2253 {
2254 WalSndCaughtUp = true;
2255 return;
2256 }
2257
2258 /* Figure out how far we can safely send the WAL. */
2259 if (sendTimeLineIsHistoric)
2260 {
2261 /*
2262 * Streaming an old timeline that's in this server's history, but is
2263 * not the one we're currently inserting or replaying. It can be
2264 * streamed up to the point where we switched off that timeline.
2265 */
2266 SendRqstPtr = sendTimeLineValidUpto;
2267 }
2268 else if (am_cascading_walsender)
2269 {
2270 /*
2271 * Streaming the latest timeline on a standby.
2272 *
2273 * Attempt to send all WAL that has already been replayed, so that we
2274 * know it's valid. If we're receiving WAL through streaming
2275 * replication, it's also OK to send any WAL that has been received
2276 * but not replayed.
2277 *
2278 * The timeline we're recovering from can change, or we can be
2279 * promoted. In either case, the current timeline becomes historic. We
2280 * need to detect that so that we don't try to stream past the point
2281 * where we switched to another timeline. We check for promotion or
2282 * timeline switch after calculating FlushPtr, to avoid a race
2283 * condition: if the timeline becomes historic just after we checked
2284 * that it was still current, it's still be OK to stream it up to the
2285 * FlushPtr that was calculated before it became historic.
2286 */
2287 bool becameHistoric = false;
2288
2289 SendRqstPtr = GetStandbyFlushRecPtr();
2290
2291 if (!RecoveryInProgress())
2292 {
2293 /*
2294 * We have been promoted. RecoveryInProgress() updated
2295 * ThisTimeLineID to the new current timeline.
2296 */
2297 am_cascading_walsender = false;
2298 becameHistoric = true;
2299 }
2300 else
2301 {
2302 /*
2303 * Still a cascading standby. But is the timeline we're sending
2304 * still the one recovery is recovering from? ThisTimeLineID was
2305 * updated by the GetStandbyFlushRecPtr() call above.
2306 */
2307 if (sendTimeLine != ThisTimeLineID)
2308 becameHistoric = true;
2309 }
2310
2311 if (becameHistoric)
2312 {
2313 /*
2314 * The timeline we were sending has become historic. Read the
2315 * timeline history file of the new timeline to see where exactly
2316 * we forked off from the timeline we were sending.
2317 */
2318 List *history;
2319
2320 history = readTimeLineHistory(ThisTimeLineID);
2321 sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
2322
2323 Assert(sendTimeLine < sendTimeLineNextTLI);
2324 list_free_deep(history);
2325
2326 sendTimeLineIsHistoric = true;
2327
2328 SendRqstPtr = sendTimeLineValidUpto;
2329 }
2330 }
2331 else
2332 {
2333 /*
2334 * Streaming the current timeline on a master.
2335 *
2336 * Attempt to send all data that's already been written out and
2337 * fsync'd to disk. We cannot go further than what's been written out
2338 * given the current implementation of XLogRead(). And in any case
2339 * it's unsafe to send WAL that is not securely down to disk on the
2340 * master: if the master subsequently crashes and restarts, slaves
2341 * must not have applied any WAL that gets lost on the master.
2342 */
2343 SendRqstPtr = GetFlushRecPtr();
2344 }
2345
2346 /*
2347 * If this is a historic timeline and we've reached the point where we
2348 * forked to the next timeline, stop streaming.
2349 *
2350 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2351 * startup process will normally replay all WAL that has been received
2352 * from the master, before promoting, but if the WAL streaming is
2353 * terminated at a WAL page boundary, the valid portion of the timeline
2354 * might end in the middle of a WAL record. We might've already sent the
2355 * first half of that partial WAL record to the cascading standby, so that
2356 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2357 * replay the partial WAL record either, so it can still follow our
2358 * timeline switch.
2359 */
2360 if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
2361 {
2362 /* close the current file. */
2363 if (sendFile >= 0)
2364 close(sendFile);
2365 sendFile = -1;
2366
2367 /* Send CopyDone */
2368 pq_putmessage_noblock('c', NULL, 0);
2369 streamingDoneSending = true;
2370
2371 WalSndCaughtUp = true;
2372
2373 elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2374 (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
2375 (uint32) (sentPtr >> 32), (uint32) sentPtr);
2376 return;
2377 }
2378
2379 /* Do we have any work to do? */
2380 Assert(sentPtr <= SendRqstPtr);
2381 if (SendRqstPtr <= sentPtr)
2382 {
2383 WalSndCaughtUp = true;
2384 return;
2385 }
2386
2387 /*
2388 * Figure out how much to send in one message. If there's no more than
2389 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2390 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2391 *
2392 * The rounding is not only for performance reasons. Walreceiver relies on
2393 * the fact that we never split a WAL record across two messages. Since a
2394 * long WAL record is split at page boundary into continuation records,
2395 * page boundary is always a safe cut-off point. We also assume that
2396 * SendRqstPtr never points to the middle of a WAL record.
2397 */
2398 startptr = sentPtr;
2399 endptr = startptr;
2400 endptr += MAX_SEND_SIZE;
2401
2402 /* if we went beyond SendRqstPtr, back off */
2403 if (SendRqstPtr <= endptr)
2404 {
2405 endptr = SendRqstPtr;
2406 if (sendTimeLineIsHistoric)
2407 WalSndCaughtUp = false;
2408 else
2409 WalSndCaughtUp = true;
2410 }
2411 else
2412 {
2413 /* round down to page boundary. */
2414 endptr -= (endptr % XLOG_BLCKSZ);
2415 WalSndCaughtUp = false;
2416 }
2417
2418 nbytes = endptr - startptr;
2419 Assert(nbytes <= MAX_SEND_SIZE);
2420
2421 /*
2422 * OK to read and send the slice.
2423 */
2424 resetStringInfo(&output_message);
2425 pq_sendbyte(&output_message, 'w');
2426
2427 pq_sendint64(&output_message, startptr); /* dataStart */
2428 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2429 pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2430
2431 /*
2432 * Read the log directly into the output buffer to avoid extra memcpy
2433 * calls.
2434 */
2435 enlargeStringInfo(&output_message, nbytes);
2436 XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2437 output_message.len += nbytes;
2438 output_message.data[output_message.len] = '\0';
2439
2440 /*
2441 * Fill the send timestamp last, so that it is taken as late as possible.
2442 */
2443 resetStringInfo(&tmpbuf);
2444 pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
2445 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2446 tmpbuf.data, sizeof(int64));
2447
2448 pq_putmessage_noblock('d', output_message.data, output_message.len);
2449
2450 sentPtr = endptr;
2451
2452 /* Update shared memory status */
2453 {
2454 WalSnd *walsnd = MyWalSnd;
2455
2456 SpinLockAcquire(&walsnd->mutex);
2457 walsnd->sentPtr = sentPtr;
2458 SpinLockRelease(&walsnd->mutex);
2459 }
2460
2461 /* Report progress of XLOG streaming in PS display */
2462 if (update_process_title)
2463 {
2464 char activitymsg[50];
2465
2466 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2467 (uint32) (sentPtr >> 32), (uint32) sentPtr);
2468 set_ps_display(activitymsg, false);
2469 }
2470
2471 return;
2472 }
2473
2474 /*
2475 * Stream out logically decoded data.
2476 */
2477 static void
XLogSendLogical(void)2478 XLogSendLogical(void)
2479 {
2480 XLogRecord *record;
2481 char *errm;
2482 XLogRecPtr flushPtr;
2483
2484 /*
2485 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2486 * true in WalSndWaitForWal, if we're actually waiting. We also set to
2487 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2488 * didn't wait - i.e. when we're shutting down.
2489 */
2490 WalSndCaughtUp = false;
2491
2492 record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
2493 logical_startptr = InvalidXLogRecPtr;
2494
2495 /* xlog record was invalid */
2496 if (errm != NULL)
2497 elog(ERROR, "%s", errm);
2498
2499 /*
2500 * We'll use the current flush point to determine whether we've caught up.
2501 */
2502 flushPtr = GetFlushRecPtr();
2503
2504 if (record != NULL)
2505 {
2506 LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2507
2508 sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2509 }
2510
2511 /* Set flag if we're caught up. */
2512 if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2513 WalSndCaughtUp = true;
2514
2515 /*
2516 * If we're caught up and have been requested to stop, have WalSndLoop()
2517 * terminate the connection in an orderly manner, after writing out all
2518 * the pending data.
2519 */
2520 if (WalSndCaughtUp && got_STOPPING)
2521 got_SIGUSR2 = true;
2522
2523 /* Update shared memory status */
2524 {
2525 WalSnd *walsnd = MyWalSnd;
2526
2527 SpinLockAcquire(&walsnd->mutex);
2528 walsnd->sentPtr = sentPtr;
2529 SpinLockRelease(&walsnd->mutex);
2530 }
2531 }
2532
2533 /*
2534 * Shutdown if the sender is caught up.
2535 *
2536 * NB: This should only be called when the shutdown signal has been received
2537 * from postmaster.
2538 *
2539 * Note that if we determine that there's still more data to send, this
2540 * function will return control to the caller.
2541 */
2542 static void
WalSndDone(WalSndSendDataCallback send_data)2543 WalSndDone(WalSndSendDataCallback send_data)
2544 {
2545 XLogRecPtr replicatedPtr;
2546
2547 /* ... let's just be real sure we're caught up ... */
2548 send_data();
2549
2550 /*
2551 * To figure out whether all WAL has successfully been replicated, check
2552 * flush location if valid, write otherwise. Tools like pg_receivexlog
2553 * will usually (unless in synchronous mode) return an invalid flush
2554 * location.
2555 */
2556 replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2557 MyWalSnd->write : MyWalSnd->flush;
2558
2559 if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2560 !pq_is_send_pending())
2561 {
2562 /* Inform the standby that XLOG streaming is done */
2563 EndCommand("COPY 0", DestRemote);
2564 pq_flush();
2565
2566 proc_exit(0);
2567 }
2568 if (!waiting_for_ping_response)
2569 WalSndKeepalive(true);
2570 }
2571
2572 /*
2573 * Returns the latest point in WAL that has been safely flushed to disk, and
2574 * can be sent to the standby. This should only be called when in recovery,
2575 * ie. we're streaming to a cascaded standby.
2576 *
2577 * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2578 * replayed WAL record.
2579 */
2580 static XLogRecPtr
GetStandbyFlushRecPtr(void)2581 GetStandbyFlushRecPtr(void)
2582 {
2583 XLogRecPtr replayPtr;
2584 TimeLineID replayTLI;
2585 XLogRecPtr receivePtr;
2586 TimeLineID receiveTLI;
2587 XLogRecPtr result;
2588
2589 /*
2590 * We can safely send what's already been replayed. Also, if walreceiver
2591 * is streaming WAL from the same timeline, we can send anything that it
2592 * has streamed, but hasn't been replayed yet.
2593 */
2594
2595 receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2596 replayPtr = GetXLogReplayRecPtr(&replayTLI);
2597
2598 ThisTimeLineID = replayTLI;
2599
2600 result = replayPtr;
2601 if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2602 result = receivePtr;
2603
2604 return result;
2605 }
2606
2607 /*
2608 * Request walsenders to reload the currently-open WAL file
2609 */
2610 void
WalSndRqstFileReload(void)2611 WalSndRqstFileReload(void)
2612 {
2613 int i;
2614
2615 for (i = 0; i < max_wal_senders; i++)
2616 {
2617 WalSnd *walsnd = &WalSndCtl->walsnds[i];
2618
2619 if (walsnd->pid == 0)
2620 continue;
2621
2622 SpinLockAcquire(&walsnd->mutex);
2623 walsnd->needreload = true;
2624 SpinLockRelease(&walsnd->mutex);
2625 }
2626 }
2627
2628 /*
2629 * Handle PROCSIG_WALSND_INIT_STOPPING signal.
2630 */
2631 void
HandleWalSndInitStopping(void)2632 HandleWalSndInitStopping(void)
2633 {
2634 Assert(am_walsender);
2635
2636 /*
2637 * If replication has not yet started, die like with SIGTERM. If
2638 * replication is active, only set a flag and wake up the main loop. It
2639 * will send any outstanding WAL, wait for it to be replicated to the
2640 * standby, and then exit gracefully.
2641 */
2642 if (!replication_active)
2643 kill(MyProcPid, SIGTERM);
2644 else
2645 got_STOPPING = true;
2646 }
2647
2648 /*
2649 * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
2650 * sender should already have been switched to WALSNDSTATE_STOPPING at
2651 * this point.
2652 */
2653 static void
WalSndLastCycleHandler(SIGNAL_ARGS)2654 WalSndLastCycleHandler(SIGNAL_ARGS)
2655 {
2656 int save_errno = errno;
2657
2658 got_SIGUSR2 = true;
2659 SetLatch(MyLatch);
2660
2661 errno = save_errno;
2662 }
2663
2664 /* Set up signal handlers */
2665 void
WalSndSignals(void)2666 WalSndSignals(void)
2667 {
2668 /* Set up signal handlers */
2669 pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config
2670 * file */
2671 pqsignal(SIGINT, SIG_IGN); /* not used */
2672 pqsignal(SIGTERM, die); /* request shutdown */
2673 pqsignal(SIGQUIT, quickdie); /* hard crash time */
2674 InitializeTimeouts(); /* establishes SIGALRM handler */
2675 pqsignal(SIGPIPE, SIG_IGN);
2676 pqsignal(SIGUSR1, procsignal_sigusr1_handler);
2677 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
2678 * shutdown */
2679
2680 /* Reset some signals that are accepted by postmaster but not here */
2681 pqsignal(SIGCHLD, SIG_DFL);
2682 pqsignal(SIGTTIN, SIG_DFL);
2683 pqsignal(SIGTTOU, SIG_DFL);
2684 pqsignal(SIGCONT, SIG_DFL);
2685 pqsignal(SIGWINCH, SIG_DFL);
2686 }
2687
2688 /* Report shared-memory space needed by WalSndShmemInit */
2689 Size
WalSndShmemSize(void)2690 WalSndShmemSize(void)
2691 {
2692 Size size = 0;
2693
2694 size = offsetof(WalSndCtlData, walsnds);
2695 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2696
2697 return size;
2698 }
2699
2700 /* Allocate and initialize walsender-related shared memory */
2701 void
WalSndShmemInit(void)2702 WalSndShmemInit(void)
2703 {
2704 bool found;
2705 int i;
2706
2707 WalSndCtl = (WalSndCtlData *)
2708 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2709
2710 if (!found)
2711 {
2712 /* First time through, so initialize */
2713 MemSet(WalSndCtl, 0, WalSndShmemSize());
2714
2715 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
2716 SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
2717
2718 for (i = 0; i < max_wal_senders; i++)
2719 {
2720 WalSnd *walsnd = &WalSndCtl->walsnds[i];
2721
2722 SpinLockInit(&walsnd->mutex);
2723 }
2724 }
2725 }
2726
2727 /*
2728 * Wake up all walsenders
2729 *
2730 * This will be called inside critical sections, so throwing an error is not
2731 * adviseable.
2732 */
2733 void
WalSndWakeup(void)2734 WalSndWakeup(void)
2735 {
2736 int i;
2737
2738 for (i = 0; i < max_wal_senders; i++)
2739 {
2740 Latch *latch;
2741 WalSnd *walsnd = &WalSndCtl->walsnds[i];
2742
2743 /*
2744 * Get latch pointer with spinlock held, for the unlikely case that
2745 * pointer reads aren't atomic (as they're 8 bytes).
2746 */
2747 SpinLockAcquire(&walsnd->mutex);
2748 latch = walsnd->latch;
2749 SpinLockRelease(&walsnd->mutex);
2750
2751 if (latch != NULL)
2752 SetLatch(latch);
2753 }
2754 }
2755
2756 /*
2757 * Signal all walsenders to move to stopping state.
2758 *
2759 * This will trigger walsenders to move to a state where no further WAL can be
2760 * generated. See this file's header for details.
2761 */
2762 void
WalSndInitStopping(void)2763 WalSndInitStopping(void)
2764 {
2765 int i;
2766
2767 for (i = 0; i < max_wal_senders; i++)
2768 {
2769 WalSnd *walsnd = &WalSndCtl->walsnds[i];
2770 pid_t pid;
2771
2772 SpinLockAcquire(&walsnd->mutex);
2773 pid = walsnd->pid;
2774 SpinLockRelease(&walsnd->mutex);
2775
2776 if (pid == 0)
2777 continue;
2778
2779 SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId);
2780 }
2781 }
2782
2783 /*
2784 * Wait that all the WAL senders have quit or reached the stopping state. This
2785 * is used by the checkpointer to control when the shutdown checkpoint can
2786 * safely be performed.
2787 */
2788 void
WalSndWaitStopping(void)2789 WalSndWaitStopping(void)
2790 {
2791 for (;;)
2792 {
2793 int i;
2794 bool all_stopped = true;
2795
2796 for (i = 0; i < max_wal_senders; i++)
2797 {
2798 WalSndState state;
2799 WalSnd *walsnd = &WalSndCtl->walsnds[i];
2800
2801 SpinLockAcquire(&walsnd->mutex);
2802
2803 if (walsnd->pid == 0)
2804 {
2805 SpinLockRelease(&walsnd->mutex);
2806 continue;
2807 }
2808
2809 state = walsnd->state;
2810 SpinLockRelease(&walsnd->mutex);
2811
2812 if (state != WALSNDSTATE_STOPPING)
2813 {
2814 all_stopped = false;
2815 break;
2816 }
2817 }
2818
2819 /* safe to leave if confirmation is done for all WAL senders */
2820 if (all_stopped)
2821 return;
2822
2823 pg_usleep(10000L); /* wait for 10 msec */
2824 }
2825 }
2826
2827 /* Set state for current walsender (only called in walsender) */
2828 void
WalSndSetState(WalSndState state)2829 WalSndSetState(WalSndState state)
2830 {
2831 WalSnd *walsnd = MyWalSnd;
2832
2833 Assert(am_walsender);
2834
2835 if (walsnd->state == state)
2836 return;
2837
2838 SpinLockAcquire(&walsnd->mutex);
2839 walsnd->state = state;
2840 SpinLockRelease(&walsnd->mutex);
2841 }
2842
2843 /*
2844 * Return a string constant representing the state. This is used
2845 * in system views, and should *not* be translated.
2846 */
2847 static const char *
WalSndGetStateString(WalSndState state)2848 WalSndGetStateString(WalSndState state)
2849 {
2850 switch (state)
2851 {
2852 case WALSNDSTATE_STARTUP:
2853 return "startup";
2854 case WALSNDSTATE_BACKUP:
2855 return "backup";
2856 case WALSNDSTATE_CATCHUP:
2857 return "catchup";
2858 case WALSNDSTATE_STREAMING:
2859 return "streaming";
2860 case WALSNDSTATE_STOPPING:
2861 return "stopping";
2862 }
2863 return "UNKNOWN";
2864 }
2865
2866
2867 /*
2868 * Returns activity of walsenders, including pids and xlog locations sent to
2869 * standby servers.
2870 */
2871 Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)2872 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
2873 {
2874 #define PG_STAT_GET_WAL_SENDERS_COLS 8
2875 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
2876 TupleDesc tupdesc;
2877 Tuplestorestate *tupstore;
2878 MemoryContext per_query_ctx;
2879 MemoryContext oldcontext;
2880 List *sync_standbys;
2881 int i;
2882
2883 /* check to see if caller supports us returning a tuplestore */
2884 if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
2885 ereport(ERROR,
2886 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2887 errmsg("set-valued function called in context that cannot accept a set")));
2888 if (!(rsinfo->allowedModes & SFRM_Materialize))
2889 ereport(ERROR,
2890 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2891 errmsg("materialize mode required, but it is not " \
2892 "allowed in this context")));
2893
2894 /* Build a tuple descriptor for our result type */
2895 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
2896 elog(ERROR, "return type must be a row type");
2897
2898 per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
2899 oldcontext = MemoryContextSwitchTo(per_query_ctx);
2900
2901 tupstore = tuplestore_begin_heap(true, false, work_mem);
2902 rsinfo->returnMode = SFRM_Materialize;
2903 rsinfo->setResult = tupstore;
2904 rsinfo->setDesc = tupdesc;
2905
2906 MemoryContextSwitchTo(oldcontext);
2907
2908 /*
2909 * Get the currently active synchronous standbys.
2910 */
2911 LWLockAcquire(SyncRepLock, LW_SHARED);
2912 sync_standbys = SyncRepGetSyncStandbys(NULL);
2913 LWLockRelease(SyncRepLock);
2914
2915 for (i = 0; i < max_wal_senders; i++)
2916 {
2917 WalSnd *walsnd = &WalSndCtl->walsnds[i];
2918 XLogRecPtr sentPtr;
2919 XLogRecPtr write;
2920 XLogRecPtr flush;
2921 XLogRecPtr apply;
2922 int priority;
2923 WalSndState state;
2924 Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
2925 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
2926
2927 if (walsnd->pid == 0)
2928 continue;
2929
2930 SpinLockAcquire(&walsnd->mutex);
2931 sentPtr = walsnd->sentPtr;
2932 state = walsnd->state;
2933 write = walsnd->write;
2934 flush = walsnd->flush;
2935 apply = walsnd->apply;
2936 priority = walsnd->sync_standby_priority;
2937 SpinLockRelease(&walsnd->mutex);
2938
2939 memset(nulls, 0, sizeof(nulls));
2940 values[0] = Int32GetDatum(walsnd->pid);
2941
2942 if (!superuser())
2943 {
2944 /*
2945 * Only superusers can see details. Other users only get the pid
2946 * value to know it's a walsender, but no details.
2947 */
2948 MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
2949 }
2950 else
2951 {
2952 values[1] = CStringGetTextDatum(WalSndGetStateString(state));
2953
2954 if (XLogRecPtrIsInvalid(sentPtr))
2955 nulls[2] = true;
2956 values[2] = LSNGetDatum(sentPtr);
2957
2958 if (XLogRecPtrIsInvalid(write))
2959 nulls[3] = true;
2960 values[3] = LSNGetDatum(write);
2961
2962 if (XLogRecPtrIsInvalid(flush))
2963 nulls[4] = true;
2964 values[4] = LSNGetDatum(flush);
2965
2966 if (XLogRecPtrIsInvalid(apply))
2967 nulls[5] = true;
2968 values[5] = LSNGetDatum(apply);
2969
2970 /*
2971 * Treat a standby such as a pg_basebackup background process
2972 * which always returns an invalid flush location, as an
2973 * asynchronous standby.
2974 */
2975 priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
2976
2977 values[6] = Int32GetDatum(priority);
2978
2979 /*
2980 * More easily understood version of standby state. This is purely
2981 * informational, not different from priority.
2982 */
2983 if (priority == 0)
2984 values[7] = CStringGetTextDatum("async");
2985 else if (list_member_int(sync_standbys, i))
2986 values[7] = CStringGetTextDatum("sync");
2987 else
2988 values[7] = CStringGetTextDatum("potential");
2989 }
2990
2991 tuplestore_putvalues(tupstore, tupdesc, values, nulls);
2992 }
2993
2994 /* clean up and return the tuplestore */
2995 tuplestore_donestoring(tupstore);
2996
2997 return (Datum) 0;
2998 }
2999
3000 /*
3001 * Send a keepalive message to standby.
3002 *
3003 * If requestReply is set, the message requests the other party to send
3004 * a message back to us, for heartbeat purposes. We also set a flag to
3005 * let nearby code that we're waiting for that response, to avoid
3006 * repeated requests.
3007 */
3008 static void
WalSndKeepalive(bool requestReply)3009 WalSndKeepalive(bool requestReply)
3010 {
3011 elog(DEBUG2, "sending replication keepalive");
3012
3013 /* construct the message... */
3014 resetStringInfo(&output_message);
3015 pq_sendbyte(&output_message, 'k');
3016 pq_sendint64(&output_message, sentPtr);
3017 pq_sendint64(&output_message, GetCurrentIntegerTimestamp());
3018 pq_sendbyte(&output_message, requestReply ? 1 : 0);
3019
3020 /* ... and send it wrapped in CopyData */
3021 pq_putmessage_noblock('d', output_message.data, output_message.len);
3022
3023 /* Set local flag */
3024 if (requestReply)
3025 waiting_for_ping_response = true;
3026 }
3027
3028 /*
3029 * Send keepalive message if too much time has elapsed.
3030 */
3031 static void
WalSndKeepaliveIfNecessary(void)3032 WalSndKeepaliveIfNecessary(void)
3033 {
3034 TimestampTz ping_time;
3035
3036 /*
3037 * Don't send keepalive messages if timeouts are globally disabled or
3038 * we're doing something not partaking in timeouts.
3039 */
3040 if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3041 return;
3042
3043 if (waiting_for_ping_response)
3044 return;
3045
3046 /*
3047 * If half of wal_sender_timeout has lapsed without receiving any reply
3048 * from the standby, send a keep-alive message to the standby requesting
3049 * an immediate reply.
3050 */
3051 ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
3052 wal_sender_timeout / 2);
3053 if (last_processing >= ping_time)
3054 {
3055 WalSndKeepalive(true);
3056
3057 /* Try to flush pending output to the client */
3058 if (pq_flush_if_writable() != 0)
3059 WalSndShutdown();
3060 }
3061 }
3062