1 /*-------------------------------------------------------------------------
2  *
3  * walsender.c
4  *
5  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6  * care of sending XLOG from the primary server to a single recipient.
7  * (Note that there can be more than one walsender process concurrently.)
8  * It is started by the postmaster when the walreceiver of a standby server
9  * connects to the primary server and requests XLOG streaming replication.
10  *
11  * A walsender is similar to a regular backend, ie. there is a one-to-one
12  * relationship between a connection and a walsender process, but instead
13  * of processing SQL queries, it understands a small set of special
14  * replication-mode commands. The START_REPLICATION command begins streaming
15  * WAL to the client. While streaming, the walsender keeps reading XLOG
16  * records from the disk and sends them to the standby server over the
17  * COPY protocol, until the either side ends the replication by exiting COPY
18  * mode (or until the connection is closed).
19  *
20  * Normal termination is by SIGTERM, which instructs the walsender to
21  * close the connection and exit(0) at next convenient moment. Emergency
22  * termination is by SIGQUIT; like any backend, the walsender will simply
23  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24  * are treated as not a crash but approximately normal termination;
25  * the walsender will exit quickly without sending any more XLOG records.
26  *
27  * If the server is shut down, checkpointer sends us
28  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited.  If
29  * the backend is idle or runs an SQL query this causes the backend to
30  * shutdown, if logical replication is in progress all existing WAL records
31  * are processed followed by a shutdown.  Otherwise this causes the walsender
32  * to switch to the "stopping" state. In this state, the walsender will reject
33  * any further replication commands. The checkpointer begins the shutdown
34  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36  * walsender to send any outstanding WAL, including the shutdown checkpoint
37  * record, wait for it to be replicated to the standby, and then exit.
38  *
39  *
40  * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
41  *
42  * IDENTIFICATION
43  *	  src/backend/replication/walsender.c
44  *
45  *-------------------------------------------------------------------------
46  */
47 #include "postgres.h"
48 
49 #include <signal.h>
50 #include <unistd.h>
51 
52 #include "access/timeline.h"
53 #include "access/transam.h"
54 #include "access/xact.h"
55 #include "access/xlog_internal.h"
56 
57 #include "catalog/pg_type.h"
58 #include "commands/dbcommands.h"
59 #include "funcapi.h"
60 #include "libpq/libpq.h"
61 #include "libpq/pqformat.h"
62 #include "miscadmin.h"
63 #include "nodes/replnodes.h"
64 #include "pgstat.h"
65 #include "replication/basebackup.h"
66 #include "replication/decode.h"
67 #include "replication/logical.h"
68 #include "replication/logicalfuncs.h"
69 #include "replication/slot.h"
70 #include "replication/snapbuild.h"
71 #include "replication/syncrep.h"
72 #include "replication/walreceiver.h"
73 #include "replication/walsender.h"
74 #include "replication/walsender_private.h"
75 #include "storage/fd.h"
76 #include "storage/ipc.h"
77 #include "storage/pmsignal.h"
78 #include "storage/proc.h"
79 #include "storage/procarray.h"
80 #include "tcop/tcopprot.h"
81 #include "utils/builtins.h"
82 #include "utils/guc.h"
83 #include "utils/memutils.h"
84 #include "utils/pg_lsn.h"
85 #include "utils/ps_status.h"
86 #include "utils/resowner.h"
87 #include "utils/timeout.h"
88 #include "utils/timestamp.h"
89 
90 /*
91  * Maximum data payload in a WAL data message.  Must be >= XLOG_BLCKSZ.
92  *
93  * We don't have a good idea of what a good value would be; there's some
94  * overhead per message in both walsender and walreceiver, but on the other
95  * hand sending large batches makes walsender less responsive to signals
96  * because signals are checked only between messages.  128kB (with
97  * default 8k blocks) seems like a reasonable guess for now.
98  */
99 #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
100 
101 /* Array of WalSnds in shared memory */
102 WalSndCtlData *WalSndCtl = NULL;
103 
104 /* My slot in the shared memory array */
105 WalSnd	   *MyWalSnd = NULL;
106 
107 /* Global state */
108 bool		am_walsender = false;		/* Am I a walsender process? */
109 bool		am_cascading_walsender = false;		/* Am I cascading WAL to
110 												 * another standby? */
111 bool		am_db_walsender = false;	/* Connected to a database? */
112 
113 /* User-settable parameters for walsender */
114 int			max_wal_senders = 0;	/* the maximum number of concurrent walsenders */
115 int			wal_sender_timeout = 60 * 1000;		/* maximum time to send one
116 												 * WAL data message */
117 bool		log_replication_commands = false;
118 
119 /*
120  * State for WalSndWakeupRequest
121  */
122 bool		wake_wal_senders = false;
123 
124 /*
125  * These variables are used similarly to openLogFile/SegNo/Off,
126  * but for walsender to read the XLOG.
127  */
128 static int	sendFile = -1;
129 static XLogSegNo sendSegNo = 0;
130 static uint32 sendOff = 0;
131 
132 /* Timeline ID of the currently open file */
133 static TimeLineID curFileTimeLine = 0;
134 
135 /*
136  * These variables keep track of the state of the timeline we're currently
137  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
138  * the timeline is not the latest timeline on this server, and the server's
139  * history forked off from that timeline at sendTimeLineValidUpto.
140  */
141 static TimeLineID sendTimeLine = 0;
142 static TimeLineID sendTimeLineNextTLI = 0;
143 static bool sendTimeLineIsHistoric = false;
144 static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
145 
146 /*
147  * How far have we sent WAL already? This is also advertised in
148  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
149  */
150 static XLogRecPtr sentPtr = InvalidXLogRecPtr;
151 
152 /* Buffers for constructing outgoing messages and processing reply messages. */
153 static StringInfoData output_message;
154 static StringInfoData reply_message;
155 static StringInfoData tmpbuf;
156 
157 /* Timestamp of last ProcessRepliesIfAny(). */
158 static TimestampTz last_processing = 0;
159 
160 /*
161  * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
162  * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
163  */
164 static TimestampTz last_reply_timestamp = 0;
165 
166 /* Have we sent a heartbeat message asking for reply, since last reply? */
167 static bool waiting_for_ping_response = false;
168 
169 /*
170  * While streaming WAL in Copy mode, streamingDoneSending is set to true
171  * after we have sent CopyDone. We should not send any more CopyData messages
172  * after that. streamingDoneReceiving is set to true when we receive CopyDone
173  * from the other end. When both become true, it's time to exit Copy mode.
174  */
175 static bool streamingDoneSending;
176 static bool streamingDoneReceiving;
177 
178 /* Are we there yet? */
179 static bool WalSndCaughtUp = false;
180 
181 /* Flags set by signal handlers for later service in main loop */
182 static volatile sig_atomic_t got_SIGUSR2 = false;
183 static volatile sig_atomic_t got_STOPPING = false;
184 
185 /*
186  * This is set while we are streaming. When not set
187  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
188  * the main loop is responsible for checking got_STOPPING and terminating when
189  * it's set (after streaming any remaining WAL).
190  */
191 static volatile sig_atomic_t replication_active = false;
192 
193 static LogicalDecodingContext *logical_decoding_ctx = NULL;
194 static XLogRecPtr logical_startptr = InvalidXLogRecPtr;
195 
196 /* Signal handlers */
197 static void WalSndLastCycleHandler(SIGNAL_ARGS);
198 
199 /* Prototypes for private functions */
200 typedef void (*WalSndSendDataCallback) (void);
201 static void WalSndLoop(WalSndSendDataCallback send_data);
202 static void InitWalSenderSlot(void);
203 static void WalSndKill(int code, Datum arg);
204 static void WalSndShutdown(void) pg_attribute_noreturn();
205 static void XLogSendPhysical(void);
206 static void XLogSendLogical(void);
207 static void WalSndDone(WalSndSendDataCallback send_data);
208 static XLogRecPtr GetStandbyFlushRecPtr(void);
209 static void IdentifySystem(void);
210 static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
211 static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
212 static void StartReplication(StartReplicationCmd *cmd);
213 static void StartLogicalReplication(StartReplicationCmd *cmd);
214 static void ProcessStandbyMessage(void);
215 static void ProcessStandbyReplyMessage(void);
216 static void ProcessStandbyHSFeedbackMessage(void);
217 static void ProcessRepliesIfAny(void);
218 static void WalSndKeepalive(bool requestReply);
219 static void WalSndKeepaliveIfNecessary(void);
220 static void WalSndCheckTimeOut(void);
221 static long WalSndComputeSleeptime(TimestampTz now);
222 static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
223 static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
224 static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
225 
226 static void XLogRead(char *buf, XLogRecPtr startptr, Size count);
227 
228 
229 /* Initialize walsender process before entering the main command loop */
230 void
InitWalSender(void)231 InitWalSender(void)
232 {
233 	am_cascading_walsender = RecoveryInProgress();
234 
235 	/* Create a per-walsender data structure in shared memory */
236 	InitWalSenderSlot();
237 
238 	/* Set up resource owner */
239 	CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
240 
241 	/*
242 	 * Let postmaster know that we're a WAL sender. Once we've declared us as
243 	 * a WAL sender process, postmaster will let us outlive the bgwriter and
244 	 * kill us last in the shutdown sequence, so we get a chance to stream all
245 	 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
246 	 * there's no going back, and we mustn't write any WAL records after this.
247 	 */
248 	MarkPostmasterChildWalSender();
249 	SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
250 }
251 
252 /*
253  * Clean up after an error.
254  *
255  * WAL sender processes don't use transactions like regular backends do.
256  * This function does any cleanup requited after an error in a WAL sender
257  * process, similar to what transaction abort does in a regular backend.
258  */
259 void
WalSndErrorCleanup(void)260 WalSndErrorCleanup(void)
261 {
262 	LWLockReleaseAll();
263 	pgstat_report_wait_end();
264 
265 	if (sendFile >= 0)
266 	{
267 		close(sendFile);
268 		sendFile = -1;
269 	}
270 
271 	if (MyReplicationSlot != NULL)
272 		ReplicationSlotRelease();
273 
274 	replication_active = false;
275 
276 	if (got_STOPPING || got_SIGUSR2)
277 		proc_exit(0);
278 
279 	/* Revert back to startup state */
280 	WalSndSetState(WALSNDSTATE_STARTUP);
281 }
282 
283 /*
284  * Handle a client's connection abort in an orderly manner.
285  */
286 static void
WalSndShutdown(void)287 WalSndShutdown(void)
288 {
289 	/*
290 	 * Reset whereToSendOutput to prevent ereport from attempting to send any
291 	 * more messages to the standby.
292 	 */
293 	if (whereToSendOutput == DestRemote)
294 		whereToSendOutput = DestNone;
295 
296 	proc_exit(0);
297 	abort();					/* keep the compiler quiet */
298 }
299 
300 /*
301  * Handle the IDENTIFY_SYSTEM command.
302  */
303 static void
IdentifySystem(void)304 IdentifySystem(void)
305 {
306 	StringInfoData buf;
307 	char		sysid[32];
308 	char		tli[11];
309 	char		xpos[MAXFNAMELEN];
310 	XLogRecPtr	logptr;
311 	char	   *dbname = NULL;
312 	Size		len;
313 
314 	/*
315 	 * Reply with a result set with one row, four columns. First col is system
316 	 * ID, second is timeline ID, third is current xlog location and the
317 	 * fourth contains the database name if we are connected to one.
318 	 */
319 
320 	snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
321 			 GetSystemIdentifier());
322 
323 	am_cascading_walsender = RecoveryInProgress();
324 	if (am_cascading_walsender)
325 	{
326 		/* this also updates ThisTimeLineID */
327 		logptr = GetStandbyFlushRecPtr();
328 	}
329 	else
330 		logptr = GetFlushRecPtr();
331 
332 	snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
333 
334 	snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
335 
336 	if (MyDatabaseId != InvalidOid)
337 	{
338 		MemoryContext cur = CurrentMemoryContext;
339 
340 		/* syscache access needs a transaction env. */
341 		StartTransactionCommand();
342 		/* make dbname live outside TX context */
343 		MemoryContextSwitchTo(cur);
344 		dbname = get_database_name(MyDatabaseId);
345 		CommitTransactionCommand();
346 		/* CommitTransactionCommand switches to TopMemoryContext */
347 		MemoryContextSwitchTo(cur);
348 	}
349 
350 	/* Send a RowDescription message */
351 	pq_beginmessage(&buf, 'T');
352 	pq_sendint(&buf, 4, 2);		/* 4 fields */
353 
354 	/* first field */
355 	pq_sendstring(&buf, "systemid");	/* col name */
356 	pq_sendint(&buf, 0, 4);		/* table oid */
357 	pq_sendint(&buf, 0, 2);		/* attnum */
358 	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
359 	pq_sendint(&buf, -1, 2);	/* typlen */
360 	pq_sendint(&buf, 0, 4);		/* typmod */
361 	pq_sendint(&buf, 0, 2);		/* format code */
362 
363 	/* second field */
364 	pq_sendstring(&buf, "timeline");	/* col name */
365 	pq_sendint(&buf, 0, 4);		/* table oid */
366 	pq_sendint(&buf, 0, 2);		/* attnum */
367 	pq_sendint(&buf, INT4OID, 4);		/* type oid */
368 	pq_sendint(&buf, 4, 2);		/* typlen */
369 	pq_sendint(&buf, 0, 4);		/* typmod */
370 	pq_sendint(&buf, 0, 2);		/* format code */
371 
372 	/* third field */
373 	pq_sendstring(&buf, "xlogpos");		/* col name */
374 	pq_sendint(&buf, 0, 4);		/* table oid */
375 	pq_sendint(&buf, 0, 2);		/* attnum */
376 	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
377 	pq_sendint(&buf, -1, 2);	/* typlen */
378 	pq_sendint(&buf, 0, 4);		/* typmod */
379 	pq_sendint(&buf, 0, 2);		/* format code */
380 
381 	/* fourth field */
382 	pq_sendstring(&buf, "dbname");		/* col name */
383 	pq_sendint(&buf, 0, 4);		/* table oid */
384 	pq_sendint(&buf, 0, 2);		/* attnum */
385 	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
386 	pq_sendint(&buf, -1, 2);	/* typlen */
387 	pq_sendint(&buf, 0, 4);		/* typmod */
388 	pq_sendint(&buf, 0, 2);		/* format code */
389 	pq_endmessage(&buf);
390 
391 	/* Send a DataRow message */
392 	pq_beginmessage(&buf, 'D');
393 	pq_sendint(&buf, 4, 2);		/* # of columns */
394 
395 	/* column 1: system identifier */
396 	len = strlen(sysid);
397 	pq_sendint(&buf, len, 4);
398 	pq_sendbytes(&buf, (char *) &sysid, len);
399 
400 	/* column 2: timeline */
401 	len = strlen(tli);
402 	pq_sendint(&buf, len, 4);
403 	pq_sendbytes(&buf, (char *) tli, len);
404 
405 	/* column 3: xlog position */
406 	len = strlen(xpos);
407 	pq_sendint(&buf, len, 4);
408 	pq_sendbytes(&buf, (char *) xpos, len);
409 
410 	/* column 4: database name, or NULL if none */
411 	if (dbname)
412 	{
413 		len = strlen(dbname);
414 		pq_sendint(&buf, len, 4);
415 		pq_sendbytes(&buf, (char *) dbname, len);
416 	}
417 	else
418 	{
419 		pq_sendint(&buf, -1, 4);
420 	}
421 
422 	pq_endmessage(&buf);
423 }
424 
425 
426 /*
427  * Handle TIMELINE_HISTORY command.
428  */
429 static void
SendTimeLineHistory(TimeLineHistoryCmd * cmd)430 SendTimeLineHistory(TimeLineHistoryCmd *cmd)
431 {
432 	StringInfoData buf;
433 	char		histfname[MAXFNAMELEN];
434 	char		path[MAXPGPATH];
435 	int			fd;
436 	off_t		histfilelen;
437 	off_t		bytesleft;
438 	Size		len;
439 
440 	/*
441 	 * Reply with a result set with one row, and two columns. The first col is
442 	 * the name of the history file, 2nd is the contents.
443 	 */
444 
445 	TLHistoryFileName(histfname, cmd->timeline);
446 	TLHistoryFilePath(path, cmd->timeline);
447 
448 	/* Send a RowDescription message */
449 	pq_beginmessage(&buf, 'T');
450 	pq_sendint(&buf, 2, 2);		/* 2 fields */
451 
452 	/* first field */
453 	pq_sendstring(&buf, "filename");	/* col name */
454 	pq_sendint(&buf, 0, 4);		/* table oid */
455 	pq_sendint(&buf, 0, 2);		/* attnum */
456 	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
457 	pq_sendint(&buf, -1, 2);	/* typlen */
458 	pq_sendint(&buf, 0, 4);		/* typmod */
459 	pq_sendint(&buf, 0, 2);		/* format code */
460 
461 	/* second field */
462 	pq_sendstring(&buf, "content");		/* col name */
463 	pq_sendint(&buf, 0, 4);		/* table oid */
464 	pq_sendint(&buf, 0, 2);		/* attnum */
465 	/*
466 	 * While this is labeled as BYTEAOID, it is the same output format
467 	 * as TEXTOID above.
468 	 */
469 	pq_sendint(&buf, BYTEAOID, 4);		/* type oid */
470 	pq_sendint(&buf, -1, 2);	/* typlen */
471 	pq_sendint(&buf, 0, 4);		/* typmod */
472 	pq_sendint(&buf, 0, 2);		/* format code */
473 	pq_endmessage(&buf);
474 
475 	/* Send a DataRow message */
476 	pq_beginmessage(&buf, 'D');
477 	pq_sendint(&buf, 2, 2);		/* # of columns */
478 	len = strlen(histfname);
479 	pq_sendint(&buf, len, 4);	/* col1 len */
480 	pq_sendbytes(&buf, histfname, len);
481 
482 	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
483 	if (fd < 0)
484 		ereport(ERROR,
485 				(errcode_for_file_access(),
486 				 errmsg("could not open file \"%s\": %m", path)));
487 
488 	/* Determine file length and send it to client */
489 	histfilelen = lseek(fd, 0, SEEK_END);
490 	if (histfilelen < 0)
491 		ereport(ERROR,
492 				(errcode_for_file_access(),
493 				 errmsg("could not seek to end of file \"%s\": %m", path)));
494 	if (lseek(fd, 0, SEEK_SET) != 0)
495 		ereport(ERROR,
496 				(errcode_for_file_access(),
497 			errmsg("could not seek to beginning of file \"%s\": %m", path)));
498 
499 	pq_sendint(&buf, histfilelen, 4);	/* col2 len */
500 
501 	bytesleft = histfilelen;
502 	while (bytesleft > 0)
503 	{
504 		PGAlignedBlock rbuf;
505 		int			nread;
506 
507 		nread = read(fd, rbuf.data, sizeof(rbuf));
508 		if (nread <= 0)
509 			ereport(ERROR,
510 					(errcode_for_file_access(),
511 					 errmsg("could not read file \"%s\": %m",
512 							path)));
513 		pq_sendbytes(&buf, rbuf.data, nread);
514 		bytesleft -= nread;
515 	}
516 	CloseTransientFile(fd);
517 
518 	pq_endmessage(&buf);
519 }
520 
521 /*
522  * Handle START_REPLICATION command.
523  *
524  * At the moment, this never returns, but an ereport(ERROR) will take us back
525  * to the main loop.
526  */
527 static void
StartReplication(StartReplicationCmd * cmd)528 StartReplication(StartReplicationCmd *cmd)
529 {
530 	StringInfoData buf;
531 	XLogRecPtr	FlushPtr;
532 
533 	/*
534 	 * We assume here that we're logging enough information in the WAL for
535 	 * log-shipping, since this is checked in PostmasterMain().
536 	 *
537 	 * NOTE: wal_level can only change at shutdown, so in most cases it is
538 	 * difficult for there to be WAL data that we can still see that was
539 	 * written at wal_level='minimal'.
540 	 */
541 
542 	if (cmd->slotname)
543 	{
544 		ReplicationSlotAcquire(cmd->slotname);
545 		if (SlotIsLogical(MyReplicationSlot))
546 			ereport(ERROR,
547 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
548 					 (errmsg("cannot use a logical replication slot for physical replication"))));
549 	}
550 
551 	/*
552 	 * Select the timeline. If it was given explicitly by the client, use
553 	 * that. Otherwise use the timeline of the last replayed record, which is
554 	 * kept in ThisTimeLineID.
555 	 */
556 	if (am_cascading_walsender)
557 	{
558 		/* this also updates ThisTimeLineID */
559 		FlushPtr = GetStandbyFlushRecPtr();
560 	}
561 	else
562 		FlushPtr = GetFlushRecPtr();
563 
564 	if (cmd->timeline != 0)
565 	{
566 		XLogRecPtr	switchpoint;
567 
568 		sendTimeLine = cmd->timeline;
569 		if (sendTimeLine == ThisTimeLineID)
570 		{
571 			sendTimeLineIsHistoric = false;
572 			sendTimeLineValidUpto = InvalidXLogRecPtr;
573 		}
574 		else
575 		{
576 			List	   *timeLineHistory;
577 
578 			sendTimeLineIsHistoric = true;
579 
580 			/*
581 			 * Check that the timeline the client requested for exists, and
582 			 * the requested start location is on that timeline.
583 			 */
584 			timeLineHistory = readTimeLineHistory(ThisTimeLineID);
585 			switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
586 										 &sendTimeLineNextTLI);
587 			list_free_deep(timeLineHistory);
588 
589 			/*
590 			 * Found the requested timeline in the history. Check that
591 			 * requested startpoint is on that timeline in our history.
592 			 *
593 			 * This is quite loose on purpose. We only check that we didn't
594 			 * fork off the requested timeline before the switchpoint. We
595 			 * don't check that we switched *to* it before the requested
596 			 * starting point. This is because the client can legitimately
597 			 * request to start replication from the beginning of the WAL
598 			 * segment that contains switchpoint, but on the new timeline, so
599 			 * that it doesn't end up with a partial segment. If you ask for a
600 			 * too old starting point, you'll get an error later when we fail
601 			 * to find the requested WAL segment in pg_xlog.
602 			 *
603 			 * XXX: we could be more strict here and only allow a startpoint
604 			 * that's older than the switchpoint, if it's still in the same
605 			 * WAL segment.
606 			 */
607 			if (!XLogRecPtrIsInvalid(switchpoint) &&
608 				switchpoint < cmd->startpoint)
609 			{
610 				ereport(ERROR,
611 						(errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
612 								(uint32) (cmd->startpoint >> 32),
613 								(uint32) (cmd->startpoint),
614 								cmd->timeline),
615 						 errdetail("This server's history forked from timeline %u at %X/%X.",
616 								   cmd->timeline,
617 								   (uint32) (switchpoint >> 32),
618 								   (uint32) (switchpoint))));
619 			}
620 			sendTimeLineValidUpto = switchpoint;
621 		}
622 	}
623 	else
624 	{
625 		sendTimeLine = ThisTimeLineID;
626 		sendTimeLineValidUpto = InvalidXLogRecPtr;
627 		sendTimeLineIsHistoric = false;
628 	}
629 
630 	streamingDoneSending = streamingDoneReceiving = false;
631 
632 	/* If there is nothing to stream, don't even enter COPY mode */
633 	if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
634 	{
635 		/*
636 		 * When we first start replication the standby will be behind the
637 		 * primary. For some applications, for example, synchronous
638 		 * replication, it is important to have a clear state for this initial
639 		 * catchup mode, so we can trigger actions when we change streaming
640 		 * state later. We may stay in this state for a long time, which is
641 		 * exactly why we want to be able to monitor whether or not we are
642 		 * still here.
643 		 */
644 		WalSndSetState(WALSNDSTATE_CATCHUP);
645 
646 		/* Send a CopyBothResponse message, and start streaming */
647 		pq_beginmessage(&buf, 'W');
648 		pq_sendbyte(&buf, 0);
649 		pq_sendint(&buf, 0, 2);
650 		pq_endmessage(&buf);
651 		pq_flush();
652 
653 		/*
654 		 * Don't allow a request to stream from a future point in WAL that
655 		 * hasn't been flushed to disk in this server yet.
656 		 */
657 		if (FlushPtr < cmd->startpoint)
658 		{
659 			ereport(ERROR,
660 					(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
661 							(uint32) (cmd->startpoint >> 32),
662 							(uint32) (cmd->startpoint),
663 							(uint32) (FlushPtr >> 32),
664 							(uint32) (FlushPtr))));
665 		}
666 
667 		/* Start streaming from the requested point */
668 		sentPtr = cmd->startpoint;
669 
670 		/* Initialize shared memory status, too */
671 		{
672 			WalSnd	   *walsnd = MyWalSnd;
673 
674 			SpinLockAcquire(&walsnd->mutex);
675 			walsnd->sentPtr = sentPtr;
676 			SpinLockRelease(&walsnd->mutex);
677 		}
678 
679 		SyncRepInitConfig();
680 
681 		/* Main loop of walsender */
682 		replication_active = true;
683 
684 		WalSndLoop(XLogSendPhysical);
685 
686 		replication_active = false;
687 		if (got_STOPPING)
688 			proc_exit(0);
689 		WalSndSetState(WALSNDSTATE_STARTUP);
690 
691 		Assert(streamingDoneSending && streamingDoneReceiving);
692 	}
693 
694 	if (cmd->slotname)
695 		ReplicationSlotRelease();
696 
697 	/*
698 	 * Copy is finished now. Send a single-row result set indicating the next
699 	 * timeline.
700 	 */
701 	if (sendTimeLineIsHistoric)
702 	{
703 		char		tli_str[11];
704 		char		startpos_str[8 + 1 + 8 + 1];
705 		Size		len;
706 
707 		snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI);
708 		snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
709 				 (uint32) (sendTimeLineValidUpto >> 32),
710 				 (uint32) sendTimeLineValidUpto);
711 
712 		pq_beginmessage(&buf, 'T');		/* RowDescription */
713 		pq_sendint(&buf, 2, 2); /* 2 fields */
714 
715 		/* Field header */
716 		pq_sendstring(&buf, "next_tli");
717 		pq_sendint(&buf, 0, 4); /* table oid */
718 		pq_sendint(&buf, 0, 2); /* attnum */
719 
720 		/*
721 		 * int8 may seem like a surprising data type for this, but in theory
722 		 * int4 would not be wide enough for this, as TimeLineID is unsigned.
723 		 */
724 		pq_sendint(&buf, INT8OID, 4);	/* type oid */
725 		pq_sendint(&buf, -1, 2);
726 		pq_sendint(&buf, 0, 4);
727 		pq_sendint(&buf, 0, 2);
728 
729 		pq_sendstring(&buf, "next_tli_startpos");
730 		pq_sendint(&buf, 0, 4); /* table oid */
731 		pq_sendint(&buf, 0, 2); /* attnum */
732 		pq_sendint(&buf, TEXTOID, 4);	/* type oid */
733 		pq_sendint(&buf, -1, 2);
734 		pq_sendint(&buf, 0, 4);
735 		pq_sendint(&buf, 0, 2);
736 		pq_endmessage(&buf);
737 
738 		/* Data row */
739 		pq_beginmessage(&buf, 'D');
740 		pq_sendint(&buf, 2, 2); /* number of columns */
741 
742 		len = strlen(tli_str);
743 		pq_sendint(&buf, len, 4);		/* length */
744 		pq_sendbytes(&buf, tli_str, len);
745 
746 		len = strlen(startpos_str);
747 		pq_sendint(&buf, len, 4);		/* length */
748 		pq_sendbytes(&buf, startpos_str, len);
749 
750 		pq_endmessage(&buf);
751 	}
752 
753 	/* Send CommandComplete message */
754 	pq_puttextmessage('C', "START_STREAMING");
755 }
756 
757 /*
758  * read_page callback for logical decoding contexts, as a walsender process.
759  *
760  * Inside the walsender we can do better than logical_read_local_xlog_page,
761  * which has to do a plain sleep/busy loop, because the walsender's latch gets
762  * set every time WAL is flushed.
763  */
764 static int
logical_read_xlog_page(XLogReaderState * state,XLogRecPtr targetPagePtr,int reqLen,XLogRecPtr targetRecPtr,char * cur_page,TimeLineID * pageTLI)765 logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
766 				XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
767 {
768 	XLogRecPtr	flushptr;
769 	int			count;
770 
771 	/* make sure we have enough WAL available */
772 	flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
773 
774 	/* fail if not (implies we are going to shut down) */
775 	if (flushptr < targetPagePtr + reqLen)
776 		return -1;
777 
778 	if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
779 		count = XLOG_BLCKSZ;	/* more than one block available */
780 	else
781 		count = flushptr - targetPagePtr;	/* part of the page available */
782 
783 	/* now actually read the data, we know it's there */
784 	XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
785 
786 	return count;
787 }
788 
789 /*
790  * Create a new replication slot.
791  */
792 static void
CreateReplicationSlot(CreateReplicationSlotCmd * cmd)793 CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
794 {
795 	const char *snapshot_name = NULL;
796 	char		xpos[MAXFNAMELEN];
797 	StringInfoData buf;
798 	Size		len;
799 
800 	Assert(!MyReplicationSlot);
801 
802 	/* setup state for XLogReadPage */
803 	sendTimeLineIsHistoric = false;
804 	sendTimeLine = ThisTimeLineID;
805 
806 	if (cmd->kind == REPLICATION_KIND_PHYSICAL)
807 	{
808 		ReplicationSlotCreate(cmd->slotname, false, RS_PERSISTENT);
809 	}
810 	else
811 	{
812 		CheckLogicalDecodingRequirements();
813 
814 		/*
815 		 * Initially create the slot as ephemeral - that allows us to nicely
816 		 * handle errors during initialization because it'll get dropped if
817 		 * this transaction fails. We'll make it persistent at the end.
818 		 */
819 		ReplicationSlotCreate(cmd->slotname, true, RS_EPHEMERAL);
820 	}
821 
822 	if (cmd->kind == REPLICATION_KIND_LOGICAL)
823 	{
824 		LogicalDecodingContext *ctx;
825 
826 		ctx = CreateInitDecodingContext(cmd->plugin, NIL,
827 										true, /* build snapshot */
828 										logical_read_xlog_page,
829 										WalSndPrepareWrite, WalSndWriteData);
830 
831 		/*
832 		 * Signal that we don't need the timeout mechanism. We're just
833 		 * creating the replication slot and don't yet accept feedback
834 		 * messages or send keepalives. As we possibly need to wait for
835 		 * further WAL the walsender would otherwise possibly be killed too
836 		 * soon.
837 		 */
838 		last_reply_timestamp = 0;
839 
840 		/* build initial snapshot, might take a while */
841 		DecodingContextFindStartpoint(ctx);
842 
843 		/*
844 		 * Export a plain (not of the snapbuild.c type) snapshot to the user
845 		 * that can be imported into another session.
846 		 */
847 		snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
848 
849 		/* don't need the decoding context anymore */
850 		FreeDecodingContext(ctx);
851 
852 		ReplicationSlotPersist();
853 	}
854 	else if (cmd->kind == REPLICATION_KIND_PHYSICAL && cmd->reserve_wal)
855 	{
856 		ReplicationSlotReserveWal();
857 
858 		/* Write this slot to disk */
859 		ReplicationSlotMarkDirty();
860 		ReplicationSlotSave();
861 	}
862 
863 	snprintf(xpos, sizeof(xpos), "%X/%X",
864 			 (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
865 			 (uint32) MyReplicationSlot->data.confirmed_flush);
866 
867 	pq_beginmessage(&buf, 'T');
868 	pq_sendint(&buf, 4, 2);		/* 4 fields */
869 
870 	/* first field: slot name */
871 	pq_sendstring(&buf, "slot_name");	/* col name */
872 	pq_sendint(&buf, 0, 4);		/* table oid */
873 	pq_sendint(&buf, 0, 2);		/* attnum */
874 	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
875 	pq_sendint(&buf, -1, 2);	/* typlen */
876 	pq_sendint(&buf, 0, 4);		/* typmod */
877 	pq_sendint(&buf, 0, 2);		/* format code */
878 
879 	/* second field: LSN at which we became consistent */
880 	pq_sendstring(&buf, "consistent_point");	/* col name */
881 	pq_sendint(&buf, 0, 4);		/* table oid */
882 	pq_sendint(&buf, 0, 2);		/* attnum */
883 	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
884 	pq_sendint(&buf, -1, 2);	/* typlen */
885 	pq_sendint(&buf, 0, 4);		/* typmod */
886 	pq_sendint(&buf, 0, 2);		/* format code */
887 
888 	/* third field: exported snapshot's name */
889 	pq_sendstring(&buf, "snapshot_name");		/* col name */
890 	pq_sendint(&buf, 0, 4);		/* table oid */
891 	pq_sendint(&buf, 0, 2);		/* attnum */
892 	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
893 	pq_sendint(&buf, -1, 2);	/* typlen */
894 	pq_sendint(&buf, 0, 4);		/* typmod */
895 	pq_sendint(&buf, 0, 2);		/* format code */
896 
897 	/* fourth field: output plugin */
898 	pq_sendstring(&buf, "output_plugin");		/* col name */
899 	pq_sendint(&buf, 0, 4);		/* table oid */
900 	pq_sendint(&buf, 0, 2);		/* attnum */
901 	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
902 	pq_sendint(&buf, -1, 2);	/* typlen */
903 	pq_sendint(&buf, 0, 4);		/* typmod */
904 	pq_sendint(&buf, 0, 2);		/* format code */
905 
906 	pq_endmessage(&buf);
907 
908 	/* Send a DataRow message */
909 	pq_beginmessage(&buf, 'D');
910 	pq_sendint(&buf, 4, 2);		/* # of columns */
911 
912 	/* slot_name */
913 	len = strlen(NameStr(MyReplicationSlot->data.name));
914 	pq_sendint(&buf, len, 4);	/* col1 len */
915 	pq_sendbytes(&buf, NameStr(MyReplicationSlot->data.name), len);
916 
917 	/* consistent wal location */
918 	len = strlen(xpos);
919 	pq_sendint(&buf, len, 4);
920 	pq_sendbytes(&buf, xpos, len);
921 
922 	/* snapshot name, or NULL if none */
923 	if (snapshot_name != NULL)
924 	{
925 		len = strlen(snapshot_name);
926 		pq_sendint(&buf, len, 4);
927 		pq_sendbytes(&buf, snapshot_name, len);
928 	}
929 	else
930 		pq_sendint(&buf, -1, 4);
931 
932 	/* plugin, or NULL if none */
933 	if (cmd->plugin != NULL)
934 	{
935 		len = strlen(cmd->plugin);
936 		pq_sendint(&buf, len, 4);
937 		pq_sendbytes(&buf, cmd->plugin, len);
938 	}
939 	else
940 		pq_sendint(&buf, -1, 4);
941 
942 	pq_endmessage(&buf);
943 
944 	/*
945 	 * release active status again, START_REPLICATION will reacquire it
946 	 */
947 	ReplicationSlotRelease();
948 }
949 
950 /*
951  * Get rid of a replication slot that is no longer wanted.
952  */
953 static void
DropReplicationSlot(DropReplicationSlotCmd * cmd)954 DropReplicationSlot(DropReplicationSlotCmd *cmd)
955 {
956 	ReplicationSlotDrop(cmd->slotname);
957 	EndCommand("DROP_REPLICATION_SLOT", DestRemote);
958 }
959 
960 /*
961  * Load previously initiated logical slot and prepare for sending data (via
962  * WalSndLoop).
963  */
964 static void
StartLogicalReplication(StartReplicationCmd * cmd)965 StartLogicalReplication(StartReplicationCmd *cmd)
966 {
967 	StringInfoData buf;
968 
969 	/* make sure that our requirements are still fulfilled */
970 	CheckLogicalDecodingRequirements();
971 
972 	Assert(!MyReplicationSlot);
973 
974 	ReplicationSlotAcquire(cmd->slotname);
975 
976 	/*
977 	 * Force a disconnect, so that the decoding code doesn't need to care
978 	 * about an eventual switch from running in recovery, to running in a
979 	 * normal environment. Client code is expected to handle reconnects.
980 	 */
981 	if (am_cascading_walsender && !RecoveryInProgress())
982 	{
983 		ereport(LOG,
984 				(errmsg("terminating walsender process after promotion")));
985 		got_STOPPING = true;
986 	}
987 
988 	WalSndSetState(WALSNDSTATE_CATCHUP);
989 
990 	/* Send a CopyBothResponse message, and start streaming */
991 	pq_beginmessage(&buf, 'W');
992 	pq_sendbyte(&buf, 0);
993 	pq_sendint(&buf, 0, 2);
994 	pq_endmessage(&buf);
995 	pq_flush();
996 
997 	/* setup state for XLogReadPage */
998 	sendTimeLineIsHistoric = false;
999 	sendTimeLine = ThisTimeLineID;
1000 
1001 	/*
1002 	 * Initialize position to the last ack'ed one, then the xlog records begin
1003 	 * to be shipped from that position.
1004 	 */
1005 	logical_decoding_ctx = CreateDecodingContext(
1006 											   cmd->startpoint, cmd->options,
1007 												 logical_read_xlog_page,
1008 										WalSndPrepareWrite, WalSndWriteData);
1009 
1010 	/* Start reading WAL from the oldest required WAL. */
1011 	logical_startptr = MyReplicationSlot->data.restart_lsn;
1012 
1013 	/*
1014 	 * Report the location after which we'll send out further commits as the
1015 	 * current sentPtr.
1016 	 */
1017 	sentPtr = MyReplicationSlot->data.confirmed_flush;
1018 
1019 	/* Also update the sent position status in shared memory */
1020 	{
1021 		WalSnd	   *walsnd = MyWalSnd;
1022 
1023 		SpinLockAcquire(&walsnd->mutex);
1024 		walsnd->sentPtr = MyReplicationSlot->data.restart_lsn;
1025 		SpinLockRelease(&walsnd->mutex);
1026 	}
1027 
1028 	replication_active = true;
1029 
1030 	SyncRepInitConfig();
1031 
1032 	/* Main loop of walsender */
1033 	WalSndLoop(XLogSendLogical);
1034 
1035 	FreeDecodingContext(logical_decoding_ctx);
1036 	ReplicationSlotRelease();
1037 
1038 	replication_active = false;
1039 	if (got_STOPPING)
1040 		proc_exit(0);
1041 	WalSndSetState(WALSNDSTATE_STARTUP);
1042 
1043 	/* Get out of COPY mode (CommandComplete). */
1044 	EndCommand("COPY 0", DestRemote);
1045 }
1046 
1047 /*
1048  * LogicalDecodingContext 'prepare_write' callback.
1049  *
1050  * Prepare a write into a StringInfo.
1051  *
1052  * Don't do anything lasting in here, it's quite possible that nothing will done
1053  * with the data.
1054  */
1055 static void
WalSndPrepareWrite(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid,bool last_write)1056 WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
1057 {
1058 	/* can't have sync rep confused by sending the same LSN several times */
1059 	if (!last_write)
1060 		lsn = InvalidXLogRecPtr;
1061 
1062 	resetStringInfo(ctx->out);
1063 
1064 	pq_sendbyte(ctx->out, 'w');
1065 	pq_sendint64(ctx->out, lsn);	/* dataStart */
1066 	pq_sendint64(ctx->out, lsn);	/* walEnd */
1067 
1068 	/*
1069 	 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1070 	 * reserve space here.
1071 	 */
1072 	pq_sendint64(ctx->out, 0);	/* sendtime */
1073 }
1074 
1075 /*
1076  * LogicalDecodingContext 'write' callback.
1077  *
1078  * Actually write out data previously prepared by WalSndPrepareWrite out to
1079  * the network. Take as long as needed, but process replies from the other
1080  * side and check timeouts during that.
1081  */
1082 static void
WalSndWriteData(LogicalDecodingContext * ctx,XLogRecPtr lsn,TransactionId xid,bool last_write)1083 WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
1084 				bool last_write)
1085 {
1086 	TimestampTz	now;
1087 	int64 now_int;
1088 
1089 	/*
1090 	 * Fill the send timestamp last, so that it is taken as late as possible.
1091 	 * This is somewhat ugly, but the protocol's set as it's already used for
1092 	 * several releases by streaming physical replication.
1093 	 */
1094 	resetStringInfo(&tmpbuf);
1095 	now_int = GetCurrentIntegerTimestamp();
1096 	now = IntegerTimestampToTimestampTz(now_int);
1097 	pq_sendint64(&tmpbuf, now_int);
1098 	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1099 		   tmpbuf.data, sizeof(int64));
1100 
1101 	/* output previously gathered data in a CopyData packet */
1102 	pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1103 
1104 	CHECK_FOR_INTERRUPTS();
1105 
1106 	/* Try to flush pending output to the client */
1107 	if (pq_flush_if_writable() != 0)
1108 		WalSndShutdown();
1109 
1110 	/* Try taking fast path unless we get too close to walsender timeout. */
1111 	if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
1112 										  wal_sender_timeout / 2) &&
1113 		!pq_is_send_pending())
1114 	{
1115 		return;
1116 	}
1117 
1118 	/* If we have pending write here, go to slow path */
1119 	for (;;)
1120 	{
1121 		int			wakeEvents;
1122 		long		sleeptime;
1123 
1124 		/* Check for input from the client */
1125 		ProcessRepliesIfAny();
1126 
1127 		/* die if timeout was reached */
1128 		WalSndCheckTimeOut();
1129 
1130 		/* Send keepalive if the time has come */
1131 		WalSndKeepaliveIfNecessary();
1132 
1133 		if (!pq_is_send_pending())
1134 			break;
1135 
1136 		sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1137 
1138 		wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1139 			WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
1140 
1141 		/* Sleep until something happens or we time out */
1142 		WaitLatchOrSocket(MyLatch, wakeEvents,
1143 						  MyProcPort->sock, sleeptime);
1144 
1145 		/*
1146 		 * Emergency bailout if postmaster has died.  This is to avoid the
1147 		 * necessity for manual cleanup of all postmaster children.
1148 		 */
1149 		if (!PostmasterIsAlive())
1150 			exit(1);
1151 
1152 		/* Clear any already-pending wakeups */
1153 		ResetLatch(MyLatch);
1154 
1155 		CHECK_FOR_INTERRUPTS();
1156 
1157 		/* Process any requests or signals received recently */
1158 		if (ConfigReloadPending)
1159 		{
1160 			ConfigReloadPending = false;
1161 			ProcessConfigFile(PGC_SIGHUP);
1162 			SyncRepInitConfig();
1163 		}
1164 
1165 		/* Try to flush pending output to the client */
1166 		if (pq_flush_if_writable() != 0)
1167 			WalSndShutdown();
1168 	}
1169 
1170 	/* reactivate latch so WalSndLoop knows to continue */
1171 	SetLatch(MyLatch);
1172 }
1173 
1174 /*
1175  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1176  *
1177  * Returns end LSN of flushed WAL.  Normally this will be >= loc, but
1178  * if we detect a shutdown request (either from postmaster or client)
1179  * we will return early, so caller must always check.
1180  */
1181 static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)1182 WalSndWaitForWal(XLogRecPtr loc)
1183 {
1184 	int			wakeEvents;
1185 	static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1186 
1187 	/*
1188 	 * Fast path to avoid acquiring the spinlock in case we already know we
1189 	 * have enough WAL available. This is particularly interesting if we're
1190 	 * far behind.
1191 	 */
1192 	if (RecentFlushPtr != InvalidXLogRecPtr &&
1193 		loc <= RecentFlushPtr)
1194 		return RecentFlushPtr;
1195 
1196 	/* Get a more recent flush pointer. */
1197 	if (!RecoveryInProgress())
1198 		RecentFlushPtr = GetFlushRecPtr();
1199 	else
1200 		RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1201 
1202 	for (;;)
1203 	{
1204 		long		sleeptime;
1205 
1206 		/*
1207 		 * Emergency bailout if postmaster has died.  This is to avoid the
1208 		 * necessity for manual cleanup of all postmaster children.
1209 		 */
1210 		if (!PostmasterIsAlive())
1211 			exit(1);
1212 
1213 		/* Clear any already-pending wakeups */
1214 		ResetLatch(MyLatch);
1215 
1216 		CHECK_FOR_INTERRUPTS();
1217 
1218 		/* Process any requests or signals received recently */
1219 		if (ConfigReloadPending)
1220 		{
1221 			ConfigReloadPending = false;
1222 			ProcessConfigFile(PGC_SIGHUP);
1223 			SyncRepInitConfig();
1224 		}
1225 
1226 		/* Check for input from the client */
1227 		ProcessRepliesIfAny();
1228 
1229 		/*
1230 		 * If we're shutting down, trigger pending WAL to be written out,
1231 		 * otherwise we'd possibly end up waiting for WAL that never gets
1232 		 * written, because walwriter has shut down already.
1233 		 */
1234 		if (got_STOPPING)
1235 			XLogBackgroundFlush();
1236 
1237 		/* Update our idea of the currently flushed position. */
1238 		if (!RecoveryInProgress())
1239 			RecentFlushPtr = GetFlushRecPtr();
1240 		else
1241 			RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1242 
1243 		/*
1244 		 * If postmaster asked us to stop, don't wait anymore.
1245 		 *
1246 		 * It's important to do this check after the recomputation of
1247 		 * RecentFlushPtr, so we can send all remaining data before shutting
1248 		 * down.
1249 		 */
1250 		if (got_STOPPING)
1251 			break;
1252 
1253 		/*
1254 		 * We only send regular messages to the client for full decoded
1255 		 * transactions, but a synchronous replication and walsender shutdown
1256 		 * possibly are waiting for a later location. So, before sleeping, we
1257 		 * send a ping containing the flush location. If the receiver is
1258 		 * otherwise idle, this keepalive will trigger a reply. Processing the
1259 		 * reply will update these MyWalSnd locations.
1260 		 */
1261 		if (MyWalSnd->flush < sentPtr &&
1262 			MyWalSnd->write < sentPtr &&
1263 			!waiting_for_ping_response)
1264 			WalSndKeepalive(false);
1265 
1266 		/* check whether we're done */
1267 		if (loc <= RecentFlushPtr)
1268 			break;
1269 
1270 		/* Waiting for new WAL. Since we need to wait, we're now caught up. */
1271 		WalSndCaughtUp = true;
1272 
1273 		/*
1274 		 * Try to flush any pending output to the client.
1275 		 */
1276 		if (pq_flush_if_writable() != 0)
1277 			WalSndShutdown();
1278 
1279 		/*
1280 		 * If we have received CopyDone from the client, sent CopyDone
1281 		 * ourselves, and the output buffer is empty, it's time to exit
1282 		 * streaming, so fail the current WAL fetch request.
1283 		 */
1284 		if (streamingDoneReceiving && streamingDoneSending &&
1285 			!pq_is_send_pending())
1286 			break;
1287 
1288 		/* die if timeout was reached */
1289 		WalSndCheckTimeOut();
1290 
1291 		/* Send keepalive if the time has come */
1292 		WalSndKeepaliveIfNecessary();
1293 
1294 		/*
1295 		 * Sleep until something happens or we time out.  Also wait for the
1296 		 * socket becoming writable, if there's still pending output.
1297 		 * Otherwise we might sit on sendable output data while waiting for
1298 		 * new WAL to be generated.  (But if we have nothing to send, we don't
1299 		 * want to wake on socket-writable.)
1300 		 */
1301 		sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1302 
1303 		wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
1304 			WL_SOCKET_READABLE | WL_TIMEOUT;
1305 
1306 		if (pq_is_send_pending())
1307 			wakeEvents |= WL_SOCKET_WRITEABLE;
1308 
1309 		WaitLatchOrSocket(MyLatch, wakeEvents,
1310 						  MyProcPort->sock, sleeptime);
1311 	}
1312 
1313 	/* reactivate latch so WalSndLoop knows to continue */
1314 	SetLatch(MyLatch);
1315 	return RecentFlushPtr;
1316 }
1317 
1318 /*
1319  * Execute an incoming replication command.
1320  */
1321 void
exec_replication_command(const char * cmd_string)1322 exec_replication_command(const char *cmd_string)
1323 {
1324 	int			parse_rc;
1325 	Node	   *cmd_node;
1326 	MemoryContext cmd_context;
1327 	MemoryContext old_context;
1328 
1329 	/*
1330 	 * If WAL sender has been told that shutdown is getting close, switch its
1331 	 * status accordingly to handle the next replication commands correctly.
1332 	 */
1333 	if (got_STOPPING)
1334 		WalSndSetState(WALSNDSTATE_STOPPING);
1335 
1336 	/*
1337 	 * Throw error if in stopping mode.  We need prevent commands that could
1338 	 * generate WAL while the shutdown checkpoint is being written.  To be
1339 	 * safe, we just prohibit all new commands.
1340 	 */
1341 	if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1342 		ereport(ERROR,
1343 				(errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1344 
1345 	/*
1346 	 * Log replication command if log_replication_commands is enabled. Even
1347 	 * when it's disabled, log the command with DEBUG1 level for backward
1348 	 * compatibility.
1349 	 */
1350 	ereport(log_replication_commands ? LOG : DEBUG1,
1351 			(errmsg("received replication command: %s", cmd_string)));
1352 
1353 	/*
1354 	 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1355 	 * command arrives. Clean up the old stuff if there's anything.
1356 	 */
1357 	SnapBuildClearExportedSnapshot();
1358 
1359 	CHECK_FOR_INTERRUPTS();
1360 
1361 	cmd_context = AllocSetContextCreate(CurrentMemoryContext,
1362 										"Replication command context",
1363 										ALLOCSET_DEFAULT_SIZES);
1364 	old_context = MemoryContextSwitchTo(cmd_context);
1365 
1366 	replication_scanner_init(cmd_string);
1367 	parse_rc = replication_yyparse();
1368 	if (parse_rc != 0)
1369 		ereport(ERROR,
1370 				(errcode(ERRCODE_SYNTAX_ERROR),
1371 				 (errmsg_internal("replication command parser returned %d",
1372 								  parse_rc))));
1373 
1374 	cmd_node = replication_parse_result;
1375 
1376 	/*
1377 	 * Allocate buffers that will be used for each outgoing and incoming
1378 	 * message.  We do this just once per command to reduce palloc overhead.
1379 	 */
1380 	initStringInfo(&output_message);
1381 	initStringInfo(&reply_message);
1382 	initStringInfo(&tmpbuf);
1383 
1384 	switch (cmd_node->type)
1385 	{
1386 		case T_IdentifySystemCmd:
1387 			IdentifySystem();
1388 			break;
1389 
1390 		case T_BaseBackupCmd:
1391 			SendBaseBackup((BaseBackupCmd *) cmd_node);
1392 			break;
1393 
1394 		case T_CreateReplicationSlotCmd:
1395 			CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
1396 			break;
1397 
1398 		case T_DropReplicationSlotCmd:
1399 			DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
1400 			break;
1401 
1402 		case T_StartReplicationCmd:
1403 			{
1404 				StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1405 
1406 				if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1407 					StartReplication(cmd);
1408 				else
1409 					StartLogicalReplication(cmd);
1410 				break;
1411 			}
1412 
1413 		case T_TimeLineHistoryCmd:
1414 			SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
1415 			break;
1416 
1417 		default:
1418 			elog(ERROR, "unrecognized replication command node tag: %u",
1419 				 cmd_node->type);
1420 	}
1421 
1422 	/* done */
1423 	MemoryContextSwitchTo(old_context);
1424 	MemoryContextDelete(cmd_context);
1425 
1426 	/* Send CommandComplete message */
1427 	EndCommand("SELECT", DestRemote);
1428 }
1429 
1430 /*
1431  * Process any incoming messages while streaming. Also checks if the remote
1432  * end has closed the connection.
1433  */
1434 static void
ProcessRepliesIfAny(void)1435 ProcessRepliesIfAny(void)
1436 {
1437 	unsigned char firstchar;
1438 	int			r;
1439 	bool		received = false;
1440 
1441 	last_processing = GetCurrentTimestamp();
1442 
1443 	/*
1444 	 * If we already received a CopyDone from the frontend, any subsequent
1445 	 * message is the beginning of a new command, and should be processed in
1446 	 * the main processing loop.
1447 	 */
1448 	while (!streamingDoneReceiving)
1449 	{
1450 		pq_startmsgread();
1451 		r = pq_getbyte_if_available(&firstchar);
1452 		if (r < 0)
1453 		{
1454 			/* unexpected error or EOF */
1455 			ereport(COMMERROR,
1456 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
1457 					 errmsg("unexpected EOF on standby connection")));
1458 			proc_exit(0);
1459 		}
1460 		if (r == 0)
1461 		{
1462 			/* no data available without blocking */
1463 			pq_endmsgread();
1464 			break;
1465 		}
1466 
1467 		/* Read the message contents */
1468 		resetStringInfo(&reply_message);
1469 		if (pq_getmessage(&reply_message, 0))
1470 		{
1471 			ereport(COMMERROR,
1472 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
1473 					 errmsg("unexpected EOF on standby connection")));
1474 			proc_exit(0);
1475 		}
1476 
1477 		/* Handle the very limited subset of commands expected in this phase */
1478 		switch (firstchar)
1479 		{
1480 				/*
1481 				 * 'd' means a standby reply wrapped in a CopyData packet.
1482 				 */
1483 			case 'd':
1484 				ProcessStandbyMessage();
1485 				received = true;
1486 				break;
1487 
1488 				/*
1489 				 * CopyDone means the standby requested to finish streaming.
1490 				 * Reply with CopyDone, if we had not sent that already.
1491 				 */
1492 			case 'c':
1493 				if (!streamingDoneSending)
1494 				{
1495 					pq_putmessage_noblock('c', NULL, 0);
1496 					streamingDoneSending = true;
1497 				}
1498 
1499 				streamingDoneReceiving = true;
1500 				received = true;
1501 				break;
1502 
1503 				/*
1504 				 * 'X' means that the standby is closing down the socket.
1505 				 */
1506 			case 'X':
1507 				proc_exit(0);
1508 
1509 			default:
1510 				ereport(FATAL,
1511 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
1512 						 errmsg("invalid standby message type \"%c\"",
1513 								firstchar)));
1514 		}
1515 	}
1516 
1517 	/*
1518 	 * Save the last reply timestamp if we've received at least one reply.
1519 	 */
1520 	if (received)
1521 	{
1522 		last_reply_timestamp = last_processing;
1523 		waiting_for_ping_response = false;
1524 	}
1525 }
1526 
1527 /*
1528  * Process a status update message received from standby.
1529  */
1530 static void
ProcessStandbyMessage(void)1531 ProcessStandbyMessage(void)
1532 {
1533 	char		msgtype;
1534 
1535 	/*
1536 	 * Check message type from the first byte.
1537 	 */
1538 	msgtype = pq_getmsgbyte(&reply_message);
1539 
1540 	switch (msgtype)
1541 	{
1542 		case 'r':
1543 			ProcessStandbyReplyMessage();
1544 			break;
1545 
1546 		case 'h':
1547 			ProcessStandbyHSFeedbackMessage();
1548 			break;
1549 
1550 		default:
1551 			ereport(COMMERROR,
1552 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
1553 					 errmsg("unexpected message type \"%c\"", msgtype)));
1554 			proc_exit(0);
1555 	}
1556 }
1557 
1558 /*
1559  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
1560  */
1561 static void
PhysicalConfirmReceivedLocation(XLogRecPtr lsn)1562 PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
1563 {
1564 	bool		changed = false;
1565 	ReplicationSlot *slot = MyReplicationSlot;
1566 
1567 	Assert(lsn != InvalidXLogRecPtr);
1568 	SpinLockAcquire(&slot->mutex);
1569 	if (slot->data.restart_lsn != lsn)
1570 	{
1571 		changed = true;
1572 		slot->data.restart_lsn = lsn;
1573 	}
1574 	SpinLockRelease(&slot->mutex);
1575 
1576 	if (changed)
1577 	{
1578 		ReplicationSlotMarkDirty();
1579 		ReplicationSlotsComputeRequiredLSN();
1580 	}
1581 
1582 	/*
1583 	 * One could argue that the slot should be saved to disk now, but that'd
1584 	 * be energy wasted - the worst lost information can do here is give us
1585 	 * wrong information in a statistics view - we'll just potentially be more
1586 	 * conservative in removing files.
1587 	 */
1588 }
1589 
1590 /*
1591  * Regular reply from standby advising of WAL positions on standby server.
1592  */
1593 static void
ProcessStandbyReplyMessage(void)1594 ProcessStandbyReplyMessage(void)
1595 {
1596 	XLogRecPtr	writePtr,
1597 				flushPtr,
1598 				applyPtr;
1599 	bool		replyRequested;
1600 
1601 	/* the caller already consumed the msgtype byte */
1602 	writePtr = pq_getmsgint64(&reply_message);
1603 	flushPtr = pq_getmsgint64(&reply_message);
1604 	applyPtr = pq_getmsgint64(&reply_message);
1605 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
1606 	replyRequested = pq_getmsgbyte(&reply_message);
1607 
1608 	elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
1609 		 (uint32) (writePtr >> 32), (uint32) writePtr,
1610 		 (uint32) (flushPtr >> 32), (uint32) flushPtr,
1611 		 (uint32) (applyPtr >> 32), (uint32) applyPtr,
1612 		 replyRequested ? " (reply requested)" : "");
1613 
1614 	/* Send a reply if the standby requested one. */
1615 	if (replyRequested)
1616 		WalSndKeepalive(false);
1617 
1618 	/*
1619 	 * Update shared state for this WalSender process based on reply data from
1620 	 * standby.
1621 	 */
1622 	{
1623 		WalSnd	   *walsnd = MyWalSnd;
1624 
1625 		SpinLockAcquire(&walsnd->mutex);
1626 		walsnd->write = writePtr;
1627 		walsnd->flush = flushPtr;
1628 		walsnd->apply = applyPtr;
1629 		SpinLockRelease(&walsnd->mutex);
1630 	}
1631 
1632 	if (!am_cascading_walsender)
1633 		SyncRepReleaseWaiters();
1634 
1635 	/*
1636 	 * Advance our local xmin horizon when the client confirmed a flush.
1637 	 */
1638 	if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
1639 	{
1640 		if (SlotIsLogical(MyReplicationSlot))
1641 			LogicalConfirmReceivedLocation(flushPtr);
1642 		else
1643 			PhysicalConfirmReceivedLocation(flushPtr);
1644 	}
1645 }
1646 
1647 /* compute new replication slot xmin horizon if needed */
1648 static void
PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)1649 PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
1650 {
1651 	bool		changed = false;
1652 	ReplicationSlot *slot = MyReplicationSlot;
1653 
1654 	SpinLockAcquire(&slot->mutex);
1655 	MyPgXact->xmin = InvalidTransactionId;
1656 
1657 	/*
1658 	 * For physical replication we don't need the interlock provided by xmin
1659 	 * and effective_xmin since the consequences of a missed increase are
1660 	 * limited to query cancellations, so set both at once.
1661 	 */
1662 	if (!TransactionIdIsNormal(slot->data.xmin) ||
1663 		!TransactionIdIsNormal(feedbackXmin) ||
1664 		TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
1665 	{
1666 		changed = true;
1667 		slot->data.xmin = feedbackXmin;
1668 		slot->effective_xmin = feedbackXmin;
1669 	}
1670 	SpinLockRelease(&slot->mutex);
1671 
1672 	if (changed)
1673 	{
1674 		ReplicationSlotMarkDirty();
1675 		ReplicationSlotsComputeRequiredXmin(false);
1676 	}
1677 }
1678 
1679 /*
1680  * Hot Standby feedback
1681  */
1682 static void
ProcessStandbyHSFeedbackMessage(void)1683 ProcessStandbyHSFeedbackMessage(void)
1684 {
1685 	TransactionId nextXid;
1686 	uint32		nextEpoch;
1687 	TransactionId feedbackXmin;
1688 	uint32		feedbackEpoch;
1689 
1690 	/*
1691 	 * Decipher the reply message. The caller already consumed the msgtype
1692 	 * byte.
1693 	 */
1694 	(void) pq_getmsgint64(&reply_message);		/* sendTime; not used ATM */
1695 	feedbackXmin = pq_getmsgint(&reply_message, 4);
1696 	feedbackEpoch = pq_getmsgint(&reply_message, 4);
1697 
1698 	elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
1699 		 feedbackXmin,
1700 		 feedbackEpoch);
1701 
1702 	/* Unset WalSender's xmin if the feedback message value is invalid */
1703 	if (!TransactionIdIsNormal(feedbackXmin))
1704 	{
1705 		MyPgXact->xmin = InvalidTransactionId;
1706 		if (MyReplicationSlot != NULL)
1707 			PhysicalReplicationSlotNewXmin(feedbackXmin);
1708 		return;
1709 	}
1710 
1711 	/*
1712 	 * Check that the provided xmin/epoch are sane, that is, not in the future
1713 	 * and not so far back as to be already wrapped around.  Ignore if not.
1714 	 *
1715 	 * Epoch of nextXid should be same as standby, or if the counter has
1716 	 * wrapped, then one greater than standby.
1717 	 */
1718 	GetNextXidAndEpoch(&nextXid, &nextEpoch);
1719 
1720 	if (feedbackXmin <= nextXid)
1721 	{
1722 		if (feedbackEpoch != nextEpoch)
1723 			return;
1724 	}
1725 	else
1726 	{
1727 		if (feedbackEpoch + 1 != nextEpoch)
1728 			return;
1729 	}
1730 
1731 	if (!TransactionIdPrecedesOrEquals(feedbackXmin, nextXid))
1732 		return;					/* epoch OK, but it's wrapped around */
1733 
1734 	/*
1735 	 * Set the WalSender's xmin equal to the standby's requested xmin, so that
1736 	 * the xmin will be taken into account by GetOldestXmin.  This will hold
1737 	 * back the removal of dead rows and thereby prevent the generation of
1738 	 * cleanup conflicts on the standby server.
1739 	 *
1740 	 * There is a small window for a race condition here: although we just
1741 	 * checked that feedbackXmin precedes nextXid, the nextXid could have
1742 	 * gotten advanced between our fetching it and applying the xmin below,
1743 	 * perhaps far enough to make feedbackXmin wrap around.  In that case the
1744 	 * xmin we set here would be "in the future" and have no effect.  No point
1745 	 * in worrying about this since it's too late to save the desired data
1746 	 * anyway.  Assuming that the standby sends us an increasing sequence of
1747 	 * xmins, this could only happen during the first reply cycle, else our
1748 	 * own xmin would prevent nextXid from advancing so far.
1749 	 *
1750 	 * We don't bother taking the ProcArrayLock here.  Setting the xmin field
1751 	 * is assumed atomic, and there's no real need to prevent a concurrent
1752 	 * GetOldestXmin.  (If we're moving our xmin forward, this is obviously
1753 	 * safe, and if we're moving it backwards, well, the data is at risk
1754 	 * already since a VACUUM could have just finished calling GetOldestXmin.)
1755 	 *
1756 	 * If we're using a replication slot we reserve the xmin via that,
1757 	 * otherwise via the walsender's PGXACT entry.
1758 	 *
1759 	 * XXX: It might make sense to generalize the ephemeral slot concept and
1760 	 * always use the slot mechanism to handle the feedback xmin.
1761 	 */
1762 	if (MyReplicationSlot != NULL)		/* XXX: persistency configurable? */
1763 		PhysicalReplicationSlotNewXmin(feedbackXmin);
1764 	else
1765 		MyPgXact->xmin = feedbackXmin;
1766 }
1767 
1768 /*
1769  * Compute how long send/receive loops should sleep.
1770  *
1771  * If wal_sender_timeout is enabled we want to wake up in time to send
1772  * keepalives and to abort the connection if wal_sender_timeout has been
1773  * reached.
1774  */
1775 static long
WalSndComputeSleeptime(TimestampTz now)1776 WalSndComputeSleeptime(TimestampTz now)
1777 {
1778 	long		sleeptime = 10000;		/* 10 s */
1779 
1780 	if (wal_sender_timeout > 0 && last_reply_timestamp > 0)
1781 	{
1782 		TimestampTz wakeup_time;
1783 
1784 		/*
1785 		 * At the latest stop sleeping once wal_sender_timeout has been
1786 		 * reached.
1787 		 */
1788 		wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
1789 												  wal_sender_timeout);
1790 
1791 		/*
1792 		 * If no ping has been sent yet, wakeup when it's time to do so.
1793 		 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
1794 		 * the timeout passed without a response.
1795 		 */
1796 		if (!waiting_for_ping_response)
1797 			wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
1798 													  wal_sender_timeout / 2);
1799 
1800 		/* Compute relative time until wakeup. */
1801 		sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
1802 	}
1803 
1804 	return sleeptime;
1805 }
1806 
1807 /*
1808  * Check whether there have been responses by the client within
1809  * wal_sender_timeout and shutdown if not.  Using last_processing as the
1810  * reference point avoids counting server-side stalls against the client.
1811  * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
1812  * postdate last_processing by more than wal_sender_timeout.  If that happens,
1813  * the client must reply almost immediately to avoid a timeout.  This rarely
1814  * affects the default configuration, under which clients spontaneously send a
1815  * message every standby_message_timeout = wal_sender_timeout/6 = 10s.  We
1816  * could eliminate that problem by recognizing timeout expiration at
1817  * wal_sender_timeout/2 after the keepalive.
1818  */
1819 static void
WalSndCheckTimeOut(void)1820 WalSndCheckTimeOut(void)
1821 {
1822 	TimestampTz timeout;
1823 
1824 	/* don't bail out if we're doing something that doesn't require timeouts */
1825 	if (last_reply_timestamp <= 0)
1826 		return;
1827 
1828 	timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
1829 										  wal_sender_timeout);
1830 
1831 	if (wal_sender_timeout > 0 && last_processing >= timeout)
1832 	{
1833 		/*
1834 		 * Since typically expiration of replication timeout means
1835 		 * communication problem, we don't send the error message to the
1836 		 * standby.
1837 		 */
1838 		ereport(COMMERROR,
1839 		(errmsg("terminating walsender process due to replication timeout")));
1840 
1841 		WalSndShutdown();
1842 	}
1843 }
1844 
1845 /* Main loop of walsender process that streams the WAL over Copy messages. */
1846 static void
WalSndLoop(WalSndSendDataCallback send_data)1847 WalSndLoop(WalSndSendDataCallback send_data)
1848 {
1849 	/*
1850 	 * Initialize the last reply timestamp. That enables timeout processing
1851 	 * from hereon.
1852 	 */
1853 	last_reply_timestamp = GetCurrentTimestamp();
1854 	waiting_for_ping_response = false;
1855 
1856 	/*
1857 	 * Loop until we reach the end of this timeline or the client requests to
1858 	 * stop streaming.
1859 	 */
1860 	for (;;)
1861 	{
1862 		/*
1863 		 * Emergency bailout if postmaster has died.  This is to avoid the
1864 		 * necessity for manual cleanup of all postmaster children.
1865 		 */
1866 		if (!PostmasterIsAlive())
1867 			exit(1);
1868 
1869 		/* Clear any already-pending wakeups */
1870 		ResetLatch(MyLatch);
1871 
1872 		CHECK_FOR_INTERRUPTS();
1873 
1874 		/* Process any requests or signals received recently */
1875 		if (ConfigReloadPending)
1876 		{
1877 			ConfigReloadPending = false;
1878 			ProcessConfigFile(PGC_SIGHUP);
1879 			SyncRepInitConfig();
1880 		}
1881 
1882 		/* Check for input from the client */
1883 		ProcessRepliesIfAny();
1884 
1885 		/*
1886 		 * If we have received CopyDone from the client, sent CopyDone
1887 		 * ourselves, and the output buffer is empty, it's time to exit
1888 		 * streaming.
1889 		 */
1890 		if (streamingDoneReceiving && streamingDoneSending &&
1891 			!pq_is_send_pending())
1892 			break;
1893 
1894 		/*
1895 		 * If we don't have any pending data in the output buffer, try to send
1896 		 * some more.  If there is some, we don't bother to call send_data
1897 		 * again until we've flushed it ... but we'd better assume we are not
1898 		 * caught up.
1899 		 */
1900 		if (!pq_is_send_pending())
1901 			send_data();
1902 		else
1903 			WalSndCaughtUp = false;
1904 
1905 		/* Try to flush pending output to the client */
1906 		if (pq_flush_if_writable() != 0)
1907 			WalSndShutdown();
1908 
1909 		/* If nothing remains to be sent right now ... */
1910 		if (WalSndCaughtUp && !pq_is_send_pending())
1911 		{
1912 			/*
1913 			 * If we're in catchup state, move to streaming.  This is an
1914 			 * important state change for users to know about, since before
1915 			 * this point data loss might occur if the primary dies and we
1916 			 * need to failover to the standby. The state change is also
1917 			 * important for synchronous replication, since commits that
1918 			 * started to wait at that point might wait for some time.
1919 			 */
1920 			if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
1921 			{
1922 				ereport(DEBUG1,
1923 						(errmsg("\"%s\" has now caught up with upstream server",
1924 								application_name)));
1925 				WalSndSetState(WALSNDSTATE_STREAMING);
1926 			}
1927 
1928 			/*
1929 			 * When SIGUSR2 arrives, we send any outstanding logs up to the
1930 			 * shutdown checkpoint record (i.e., the latest record), wait for
1931 			 * them to be replicated to the standby, and exit. This may be a
1932 			 * normal termination at shutdown, or a promotion, the walsender
1933 			 * is not sure which.
1934 			 */
1935 			if (got_SIGUSR2)
1936 				WalSndDone(send_data);
1937 		}
1938 
1939 		/* Check for replication timeout. */
1940 		WalSndCheckTimeOut();
1941 
1942 		/* Send keepalive if the time has come */
1943 		WalSndKeepaliveIfNecessary();
1944 
1945 		/*
1946 		 * We don't block if not caught up, unless there is unsent data
1947 		 * pending in which case we'd better block until the socket is
1948 		 * write-ready.  This test is only needed for the case where the
1949 		 * send_data callback handled a subset of the available data but then
1950 		 * pq_flush_if_writable flushed it all --- we should immediately try
1951 		 * to send more.
1952 		 */
1953 		if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
1954 		{
1955 			long		sleeptime;
1956 			int			wakeEvents;
1957 
1958 			wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT;
1959 
1960 			if (!streamingDoneReceiving)
1961 				wakeEvents |= WL_SOCKET_READABLE;
1962 
1963 			/*
1964 			 * Use fresh timestamp, not last_processed, to reduce the chance
1965 			 * of reaching wal_sender_timeout before sending a keepalive.
1966 			 */
1967 			sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1968 
1969 			if (pq_is_send_pending())
1970 				wakeEvents |= WL_SOCKET_WRITEABLE;
1971 
1972 			/* Sleep until something happens or we time out */
1973 			WaitLatchOrSocket(MyLatch, wakeEvents,
1974 							  MyProcPort->sock, sleeptime);
1975 		}
1976 	}
1977 	return;
1978 }
1979 
1980 /* Initialize a per-walsender data structure for this walsender process */
1981 static void
InitWalSenderSlot(void)1982 InitWalSenderSlot(void)
1983 {
1984 	int			i;
1985 
1986 	/*
1987 	 * WalSndCtl should be set up already (we inherit this by fork() or
1988 	 * EXEC_BACKEND mechanism from the postmaster).
1989 	 */
1990 	Assert(WalSndCtl != NULL);
1991 	Assert(MyWalSnd == NULL);
1992 
1993 	/*
1994 	 * Find a free walsender slot and reserve it. If this fails, we must be
1995 	 * out of WalSnd structures.
1996 	 */
1997 	for (i = 0; i < max_wal_senders; i++)
1998 	{
1999 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
2000 
2001 		SpinLockAcquire(&walsnd->mutex);
2002 
2003 		if (walsnd->pid != 0)
2004 		{
2005 			SpinLockRelease(&walsnd->mutex);
2006 			continue;
2007 		}
2008 		else
2009 		{
2010 			/*
2011 			 * Found a free slot. Reserve it for us.
2012 			 */
2013 			walsnd->pid = MyProcPid;
2014 			walsnd->sentPtr = InvalidXLogRecPtr;
2015 			walsnd->write = InvalidXLogRecPtr;
2016 			walsnd->flush = InvalidXLogRecPtr;
2017 			walsnd->apply = InvalidXLogRecPtr;
2018 			walsnd->state = WALSNDSTATE_STARTUP;
2019 			walsnd->latch = &MyProc->procLatch;
2020 			SpinLockRelease(&walsnd->mutex);
2021 			/* don't need the lock anymore */
2022 			MyWalSnd = (WalSnd *) walsnd;
2023 
2024 			break;
2025 		}
2026 	}
2027 	if (MyWalSnd == NULL)
2028 		ereport(FATAL,
2029 				(errcode(ERRCODE_TOO_MANY_CONNECTIONS),
2030 				 errmsg("number of requested standby connections "
2031 						"exceeds max_wal_senders (currently %d)",
2032 						max_wal_senders)));
2033 
2034 	/* Arrange to clean up at walsender exit */
2035 	on_shmem_exit(WalSndKill, 0);
2036 }
2037 
2038 /* Destroy the per-walsender data structure for this walsender process */
2039 static void
WalSndKill(int code,Datum arg)2040 WalSndKill(int code, Datum arg)
2041 {
2042 	WalSnd	   *walsnd = MyWalSnd;
2043 
2044 	Assert(walsnd != NULL);
2045 
2046 	MyWalSnd = NULL;
2047 
2048 	SpinLockAcquire(&walsnd->mutex);
2049 	/* clear latch while holding the spinlock, so it can safely be read */
2050 	walsnd->latch = NULL;
2051 	/* Mark WalSnd struct as no longer being in use. */
2052 	walsnd->pid = 0;
2053 	SpinLockRelease(&walsnd->mutex);
2054 }
2055 
2056 /*
2057  * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
2058  *
2059  * XXX probably this should be improved to suck data directly from the
2060  * WAL buffers when possible.
2061  *
2062  * Will open, and keep open, one WAL segment stored in the global file
2063  * descriptor sendFile. This means if XLogRead is used once, there will
2064  * always be one descriptor left open until the process ends, but never
2065  * more than one.
2066  */
2067 static void
XLogRead(char * buf,XLogRecPtr startptr,Size count)2068 XLogRead(char *buf, XLogRecPtr startptr, Size count)
2069 {
2070 	char	   *p;
2071 	XLogRecPtr	recptr;
2072 	Size		nbytes;
2073 	XLogSegNo	segno;
2074 
2075 retry:
2076 	p = buf;
2077 	recptr = startptr;
2078 	nbytes = count;
2079 
2080 	while (nbytes > 0)
2081 	{
2082 		uint32		startoff;
2083 		int			segbytes;
2084 		int			readbytes;
2085 
2086 		startoff = recptr % XLogSegSize;
2087 
2088 		if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
2089 		{
2090 			char		path[MAXPGPATH];
2091 
2092 			/* Switch to another logfile segment */
2093 			if (sendFile >= 0)
2094 				close(sendFile);
2095 
2096 			XLByteToSeg(recptr, sendSegNo);
2097 
2098 			/*-------
2099 			 * When reading from a historic timeline, and there is a timeline
2100 			 * switch within this segment, read from the WAL segment belonging
2101 			 * to the new timeline.
2102 			 *
2103 			 * For example, imagine that this server is currently on timeline
2104 			 * 5, and we're streaming timeline 4. The switch from timeline 4
2105 			 * to 5 happened at 0/13002088. In pg_xlog, we have these files:
2106 			 *
2107 			 * ...
2108 			 * 000000040000000000000012
2109 			 * 000000040000000000000013
2110 			 * 000000050000000000000013
2111 			 * 000000050000000000000014
2112 			 * ...
2113 			 *
2114 			 * In this situation, when requested to send the WAL from
2115 			 * segment 0x13, on timeline 4, we read the WAL from file
2116 			 * 000000050000000000000013. Archive recovery prefers files from
2117 			 * newer timelines, so if the segment was restored from the
2118 			 * archive on this server, the file belonging to the old timeline,
2119 			 * 000000040000000000000013, might not exist. Their contents are
2120 			 * equal up to the switchpoint, because at a timeline switch, the
2121 			 * used portion of the old segment is copied to the new file.
2122 			 *-------
2123 			 */
2124 			curFileTimeLine = sendTimeLine;
2125 			if (sendTimeLineIsHistoric)
2126 			{
2127 				XLogSegNo	endSegNo;
2128 
2129 				XLByteToSeg(sendTimeLineValidUpto, endSegNo);
2130 				if (sendSegNo == endSegNo)
2131 					curFileTimeLine = sendTimeLineNextTLI;
2132 			}
2133 
2134 			XLogFilePath(path, curFileTimeLine, sendSegNo);
2135 
2136 			sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2137 			if (sendFile < 0)
2138 			{
2139 				/*
2140 				 * If the file is not found, assume it's because the standby
2141 				 * asked for a too old WAL segment that has already been
2142 				 * removed or recycled.
2143 				 */
2144 				if (errno == ENOENT)
2145 					ereport(ERROR,
2146 							(errcode_for_file_access(),
2147 							 errmsg("requested WAL segment %s has already been removed",
2148 								XLogFileNameP(curFileTimeLine, sendSegNo))));
2149 				else
2150 					ereport(ERROR,
2151 							(errcode_for_file_access(),
2152 							 errmsg("could not open file \"%s\": %m",
2153 									path)));
2154 			}
2155 			sendOff = 0;
2156 		}
2157 
2158 		/* Need to seek in the file? */
2159 		if (sendOff != startoff)
2160 		{
2161 			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
2162 				ereport(ERROR,
2163 						(errcode_for_file_access(),
2164 				  errmsg("could not seek in log segment %s to offset %u: %m",
2165 						 XLogFileNameP(curFileTimeLine, sendSegNo),
2166 						 startoff)));
2167 			sendOff = startoff;
2168 		}
2169 
2170 		/* How many bytes are within this segment? */
2171 		if (nbytes > (XLogSegSize - startoff))
2172 			segbytes = XLogSegSize - startoff;
2173 		else
2174 			segbytes = nbytes;
2175 
2176 		readbytes = read(sendFile, p, segbytes);
2177 		if (readbytes <= 0)
2178 		{
2179 			ereport(ERROR,
2180 					(errcode_for_file_access(),
2181 					 errmsg("could not read from log segment %s, offset %u, length %lu: %m",
2182 							XLogFileNameP(curFileTimeLine, sendSegNo),
2183 							sendOff, (unsigned long) segbytes)));
2184 		}
2185 
2186 		/* Update state for read */
2187 		recptr += readbytes;
2188 
2189 		sendOff += readbytes;
2190 		nbytes -= readbytes;
2191 		p += readbytes;
2192 	}
2193 
2194 	/*
2195 	 * After reading into the buffer, check that what we read was valid. We do
2196 	 * this after reading, because even though the segment was present when we
2197 	 * opened it, it might get recycled or removed while we read it. The
2198 	 * read() succeeds in that case, but the data we tried to read might
2199 	 * already have been overwritten with new WAL records.
2200 	 */
2201 	XLByteToSeg(startptr, segno);
2202 	CheckXLogRemoved(segno, ThisTimeLineID);
2203 
2204 	/*
2205 	 * During recovery, the currently-open WAL file might be replaced with the
2206 	 * file of the same name retrieved from archive. So we always need to
2207 	 * check what we read was valid after reading into the buffer. If it's
2208 	 * invalid, we try to open and read the file again.
2209 	 */
2210 	if (am_cascading_walsender)
2211 	{
2212 		WalSnd	   *walsnd = MyWalSnd;
2213 		bool		reload;
2214 
2215 		SpinLockAcquire(&walsnd->mutex);
2216 		reload = walsnd->needreload;
2217 		walsnd->needreload = false;
2218 		SpinLockRelease(&walsnd->mutex);
2219 
2220 		if (reload && sendFile >= 0)
2221 		{
2222 			close(sendFile);
2223 			sendFile = -1;
2224 
2225 			goto retry;
2226 		}
2227 	}
2228 }
2229 
2230 /*
2231  * Send out the WAL in its normal physical/stored form.
2232  *
2233  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2234  * but not yet sent to the client, and buffer it in the libpq output
2235  * buffer.
2236  *
2237  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2238  * otherwise WalSndCaughtUp is set to false.
2239  */
2240 static void
XLogSendPhysical(void)2241 XLogSendPhysical(void)
2242 {
2243 	XLogRecPtr	SendRqstPtr;
2244 	XLogRecPtr	startptr;
2245 	XLogRecPtr	endptr;
2246 	Size		nbytes;
2247 
2248 	/* If requested switch the WAL sender to the stopping state. */
2249 	if (got_STOPPING)
2250 		WalSndSetState(WALSNDSTATE_STOPPING);
2251 
2252 	if (streamingDoneSending)
2253 	{
2254 		WalSndCaughtUp = true;
2255 		return;
2256 	}
2257 
2258 	/* Figure out how far we can safely send the WAL. */
2259 	if (sendTimeLineIsHistoric)
2260 	{
2261 		/*
2262 		 * Streaming an old timeline that's in this server's history, but is
2263 		 * not the one we're currently inserting or replaying. It can be
2264 		 * streamed up to the point where we switched off that timeline.
2265 		 */
2266 		SendRqstPtr = sendTimeLineValidUpto;
2267 	}
2268 	else if (am_cascading_walsender)
2269 	{
2270 		/*
2271 		 * Streaming the latest timeline on a standby.
2272 		 *
2273 		 * Attempt to send all WAL that has already been replayed, so that we
2274 		 * know it's valid. If we're receiving WAL through streaming
2275 		 * replication, it's also OK to send any WAL that has been received
2276 		 * but not replayed.
2277 		 *
2278 		 * The timeline we're recovering from can change, or we can be
2279 		 * promoted. In either case, the current timeline becomes historic. We
2280 		 * need to detect that so that we don't try to stream past the point
2281 		 * where we switched to another timeline. We check for promotion or
2282 		 * timeline switch after calculating FlushPtr, to avoid a race
2283 		 * condition: if the timeline becomes historic just after we checked
2284 		 * that it was still current, it's still be OK to stream it up to the
2285 		 * FlushPtr that was calculated before it became historic.
2286 		 */
2287 		bool		becameHistoric = false;
2288 
2289 		SendRqstPtr = GetStandbyFlushRecPtr();
2290 
2291 		if (!RecoveryInProgress())
2292 		{
2293 			/*
2294 			 * We have been promoted. RecoveryInProgress() updated
2295 			 * ThisTimeLineID to the new current timeline.
2296 			 */
2297 			am_cascading_walsender = false;
2298 			becameHistoric = true;
2299 		}
2300 		else
2301 		{
2302 			/*
2303 			 * Still a cascading standby. But is the timeline we're sending
2304 			 * still the one recovery is recovering from? ThisTimeLineID was
2305 			 * updated by the GetStandbyFlushRecPtr() call above.
2306 			 */
2307 			if (sendTimeLine != ThisTimeLineID)
2308 				becameHistoric = true;
2309 		}
2310 
2311 		if (becameHistoric)
2312 		{
2313 			/*
2314 			 * The timeline we were sending has become historic. Read the
2315 			 * timeline history file of the new timeline to see where exactly
2316 			 * we forked off from the timeline we were sending.
2317 			 */
2318 			List	   *history;
2319 
2320 			history = readTimeLineHistory(ThisTimeLineID);
2321 			sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
2322 
2323 			Assert(sendTimeLine < sendTimeLineNextTLI);
2324 			list_free_deep(history);
2325 
2326 			sendTimeLineIsHistoric = true;
2327 
2328 			SendRqstPtr = sendTimeLineValidUpto;
2329 		}
2330 	}
2331 	else
2332 	{
2333 		/*
2334 		 * Streaming the current timeline on a master.
2335 		 *
2336 		 * Attempt to send all data that's already been written out and
2337 		 * fsync'd to disk.  We cannot go further than what's been written out
2338 		 * given the current implementation of XLogRead().  And in any case
2339 		 * it's unsafe to send WAL that is not securely down to disk on the
2340 		 * master: if the master subsequently crashes and restarts, slaves
2341 		 * must not have applied any WAL that gets lost on the master.
2342 		 */
2343 		SendRqstPtr = GetFlushRecPtr();
2344 	}
2345 
2346 	/*
2347 	 * If this is a historic timeline and we've reached the point where we
2348 	 * forked to the next timeline, stop streaming.
2349 	 *
2350 	 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2351 	 * startup process will normally replay all WAL that has been received
2352 	 * from the master, before promoting, but if the WAL streaming is
2353 	 * terminated at a WAL page boundary, the valid portion of the timeline
2354 	 * might end in the middle of a WAL record. We might've already sent the
2355 	 * first half of that partial WAL record to the cascading standby, so that
2356 	 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2357 	 * replay the partial WAL record either, so it can still follow our
2358 	 * timeline switch.
2359 	 */
2360 	if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
2361 	{
2362 		/* close the current file. */
2363 		if (sendFile >= 0)
2364 			close(sendFile);
2365 		sendFile = -1;
2366 
2367 		/* Send CopyDone */
2368 		pq_putmessage_noblock('c', NULL, 0);
2369 		streamingDoneSending = true;
2370 
2371 		WalSndCaughtUp = true;
2372 
2373 		elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2374 			 (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
2375 			 (uint32) (sentPtr >> 32), (uint32) sentPtr);
2376 		return;
2377 	}
2378 
2379 	/* Do we have any work to do? */
2380 	Assert(sentPtr <= SendRqstPtr);
2381 	if (SendRqstPtr <= sentPtr)
2382 	{
2383 		WalSndCaughtUp = true;
2384 		return;
2385 	}
2386 
2387 	/*
2388 	 * Figure out how much to send in one message. If there's no more than
2389 	 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2390 	 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2391 	 *
2392 	 * The rounding is not only for performance reasons. Walreceiver relies on
2393 	 * the fact that we never split a WAL record across two messages. Since a
2394 	 * long WAL record is split at page boundary into continuation records,
2395 	 * page boundary is always a safe cut-off point. We also assume that
2396 	 * SendRqstPtr never points to the middle of a WAL record.
2397 	 */
2398 	startptr = sentPtr;
2399 	endptr = startptr;
2400 	endptr += MAX_SEND_SIZE;
2401 
2402 	/* if we went beyond SendRqstPtr, back off */
2403 	if (SendRqstPtr <= endptr)
2404 	{
2405 		endptr = SendRqstPtr;
2406 		if (sendTimeLineIsHistoric)
2407 			WalSndCaughtUp = false;
2408 		else
2409 			WalSndCaughtUp = true;
2410 	}
2411 	else
2412 	{
2413 		/* round down to page boundary. */
2414 		endptr -= (endptr % XLOG_BLCKSZ);
2415 		WalSndCaughtUp = false;
2416 	}
2417 
2418 	nbytes = endptr - startptr;
2419 	Assert(nbytes <= MAX_SEND_SIZE);
2420 
2421 	/*
2422 	 * OK to read and send the slice.
2423 	 */
2424 	resetStringInfo(&output_message);
2425 	pq_sendbyte(&output_message, 'w');
2426 
2427 	pq_sendint64(&output_message, startptr);	/* dataStart */
2428 	pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2429 	pq_sendint64(&output_message, 0);	/* sendtime, filled in last */
2430 
2431 	/*
2432 	 * Read the log directly into the output buffer to avoid extra memcpy
2433 	 * calls.
2434 	 */
2435 	enlargeStringInfo(&output_message, nbytes);
2436 	XLogRead(&output_message.data[output_message.len], startptr, nbytes);
2437 	output_message.len += nbytes;
2438 	output_message.data[output_message.len] = '\0';
2439 
2440 	/*
2441 	 * Fill the send timestamp last, so that it is taken as late as possible.
2442 	 */
2443 	resetStringInfo(&tmpbuf);
2444 	pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
2445 	memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
2446 		   tmpbuf.data, sizeof(int64));
2447 
2448 	pq_putmessage_noblock('d', output_message.data, output_message.len);
2449 
2450 	sentPtr = endptr;
2451 
2452 	/* Update shared memory status */
2453 	{
2454 		WalSnd	   *walsnd = MyWalSnd;
2455 
2456 		SpinLockAcquire(&walsnd->mutex);
2457 		walsnd->sentPtr = sentPtr;
2458 		SpinLockRelease(&walsnd->mutex);
2459 	}
2460 
2461 	/* Report progress of XLOG streaming in PS display */
2462 	if (update_process_title)
2463 	{
2464 		char		activitymsg[50];
2465 
2466 		snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
2467 				 (uint32) (sentPtr >> 32), (uint32) sentPtr);
2468 		set_ps_display(activitymsg, false);
2469 	}
2470 
2471 	return;
2472 }
2473 
2474 /*
2475  * Stream out logically decoded data.
2476  */
2477 static void
XLogSendLogical(void)2478 XLogSendLogical(void)
2479 {
2480 	XLogRecord *record;
2481 	char	   *errm;
2482 	XLogRecPtr	flushPtr;
2483 
2484 	/*
2485 	 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
2486 	 * true in WalSndWaitForWal, if we're actually waiting. We also set to
2487 	 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
2488 	 * didn't wait - i.e. when we're shutting down.
2489 	 */
2490 	WalSndCaughtUp = false;
2491 
2492 	record = XLogReadRecord(logical_decoding_ctx->reader, logical_startptr, &errm);
2493 	logical_startptr = InvalidXLogRecPtr;
2494 
2495 	/* xlog record was invalid */
2496 	if (errm != NULL)
2497 		elog(ERROR, "%s", errm);
2498 
2499 	/*
2500 	 * We'll use the current flush point to determine whether we've caught up.
2501 	 */
2502 	flushPtr = GetFlushRecPtr();
2503 
2504 	if (record != NULL)
2505 	{
2506 		LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
2507 
2508 		sentPtr = logical_decoding_ctx->reader->EndRecPtr;
2509 	}
2510 
2511 	/* Set flag if we're caught up. */
2512 	if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
2513 		WalSndCaughtUp = true;
2514 
2515 	/*
2516 	 * If we're caught up and have been requested to stop, have WalSndLoop()
2517 	 * terminate the connection in an orderly manner, after writing out all
2518 	 * the pending data.
2519 	 */
2520 	if (WalSndCaughtUp && got_STOPPING)
2521 		got_SIGUSR2 = true;
2522 
2523 	/* Update shared memory status */
2524 	{
2525 		WalSnd	   *walsnd = MyWalSnd;
2526 
2527 		SpinLockAcquire(&walsnd->mutex);
2528 		walsnd->sentPtr = sentPtr;
2529 		SpinLockRelease(&walsnd->mutex);
2530 	}
2531 }
2532 
2533 /*
2534  * Shutdown if the sender is caught up.
2535  *
2536  * NB: This should only be called when the shutdown signal has been received
2537  * from postmaster.
2538  *
2539  * Note that if we determine that there's still more data to send, this
2540  * function will return control to the caller.
2541  */
2542 static void
WalSndDone(WalSndSendDataCallback send_data)2543 WalSndDone(WalSndSendDataCallback send_data)
2544 {
2545 	XLogRecPtr	replicatedPtr;
2546 
2547 	/* ... let's just be real sure we're caught up ... */
2548 	send_data();
2549 
2550 	/*
2551 	 * To figure out whether all WAL has successfully been replicated, check
2552 	 * flush location if valid, write otherwise. Tools like pg_receivexlog
2553 	 * will usually (unless in synchronous mode) return an invalid flush
2554 	 * location.
2555 	 */
2556 	replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
2557 		MyWalSnd->write : MyWalSnd->flush;
2558 
2559 	if (WalSndCaughtUp && sentPtr == replicatedPtr &&
2560 		!pq_is_send_pending())
2561 	{
2562 		/* Inform the standby that XLOG streaming is done */
2563 		EndCommand("COPY 0", DestRemote);
2564 		pq_flush();
2565 
2566 		proc_exit(0);
2567 	}
2568 	if (!waiting_for_ping_response)
2569 		WalSndKeepalive(true);
2570 }
2571 
2572 /*
2573  * Returns the latest point in WAL that has been safely flushed to disk, and
2574  * can be sent to the standby. This should only be called when in recovery,
2575  * ie. we're streaming to a cascaded standby.
2576  *
2577  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
2578  * replayed WAL record.
2579  */
2580 static XLogRecPtr
GetStandbyFlushRecPtr(void)2581 GetStandbyFlushRecPtr(void)
2582 {
2583 	XLogRecPtr	replayPtr;
2584 	TimeLineID	replayTLI;
2585 	XLogRecPtr	receivePtr;
2586 	TimeLineID	receiveTLI;
2587 	XLogRecPtr	result;
2588 
2589 	/*
2590 	 * We can safely send what's already been replayed. Also, if walreceiver
2591 	 * is streaming WAL from the same timeline, we can send anything that it
2592 	 * has streamed, but hasn't been replayed yet.
2593 	 */
2594 
2595 	receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
2596 	replayPtr = GetXLogReplayRecPtr(&replayTLI);
2597 
2598 	ThisTimeLineID = replayTLI;
2599 
2600 	result = replayPtr;
2601 	if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
2602 		result = receivePtr;
2603 
2604 	return result;
2605 }
2606 
2607 /*
2608  * Request walsenders to reload the currently-open WAL file
2609  */
2610 void
WalSndRqstFileReload(void)2611 WalSndRqstFileReload(void)
2612 {
2613 	int			i;
2614 
2615 	for (i = 0; i < max_wal_senders; i++)
2616 	{
2617 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
2618 
2619 		if (walsnd->pid == 0)
2620 			continue;
2621 
2622 		SpinLockAcquire(&walsnd->mutex);
2623 		walsnd->needreload = true;
2624 		SpinLockRelease(&walsnd->mutex);
2625 	}
2626 }
2627 
2628 /*
2629  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
2630  */
2631 void
HandleWalSndInitStopping(void)2632 HandleWalSndInitStopping(void)
2633 {
2634 	Assert(am_walsender);
2635 
2636 	/*
2637 	 * If replication has not yet started, die like with SIGTERM. If
2638 	 * replication is active, only set a flag and wake up the main loop. It
2639 	 * will send any outstanding WAL, wait for it to be replicated to the
2640 	 * standby, and then exit gracefully.
2641 	 */
2642 	if (!replication_active)
2643 		kill(MyProcPid, SIGTERM);
2644 	else
2645 		got_STOPPING = true;
2646 }
2647 
2648 /*
2649  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
2650  * sender should already have been switched to WALSNDSTATE_STOPPING at
2651  * this point.
2652  */
2653 static void
WalSndLastCycleHandler(SIGNAL_ARGS)2654 WalSndLastCycleHandler(SIGNAL_ARGS)
2655 {
2656 	int			save_errno = errno;
2657 
2658 	got_SIGUSR2 = true;
2659 	SetLatch(MyLatch);
2660 
2661 	errno = save_errno;
2662 }
2663 
2664 /* Set up signal handlers */
2665 void
WalSndSignals(void)2666 WalSndSignals(void)
2667 {
2668 	/* Set up signal handlers */
2669 	pqsignal(SIGHUP, PostgresSigHupHandler);	/* set flag to read config
2670 												 * file */
2671 	pqsignal(SIGINT, SIG_IGN);	/* not used */
2672 	pqsignal(SIGTERM, die);		/* request shutdown */
2673 	pqsignal(SIGQUIT, quickdie);	/* hard crash time */
2674 	InitializeTimeouts();		/* establishes SIGALRM handler */
2675 	pqsignal(SIGPIPE, SIG_IGN);
2676 	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
2677 	pqsignal(SIGUSR2, WalSndLastCycleHandler);	/* request a last cycle and
2678 												 * shutdown */
2679 
2680 	/* Reset some signals that are accepted by postmaster but not here */
2681 	pqsignal(SIGCHLD, SIG_DFL);
2682 	pqsignal(SIGTTIN, SIG_DFL);
2683 	pqsignal(SIGTTOU, SIG_DFL);
2684 	pqsignal(SIGCONT, SIG_DFL);
2685 	pqsignal(SIGWINCH, SIG_DFL);
2686 }
2687 
2688 /* Report shared-memory space needed by WalSndShmemInit */
2689 Size
WalSndShmemSize(void)2690 WalSndShmemSize(void)
2691 {
2692 	Size		size = 0;
2693 
2694 	size = offsetof(WalSndCtlData, walsnds);
2695 	size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
2696 
2697 	return size;
2698 }
2699 
2700 /* Allocate and initialize walsender-related shared memory */
2701 void
WalSndShmemInit(void)2702 WalSndShmemInit(void)
2703 {
2704 	bool		found;
2705 	int			i;
2706 
2707 	WalSndCtl = (WalSndCtlData *)
2708 		ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
2709 
2710 	if (!found)
2711 	{
2712 		/* First time through, so initialize */
2713 		MemSet(WalSndCtl, 0, WalSndShmemSize());
2714 
2715 		for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
2716 			SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
2717 
2718 		for (i = 0; i < max_wal_senders; i++)
2719 		{
2720 			WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
2721 
2722 			SpinLockInit(&walsnd->mutex);
2723 		}
2724 	}
2725 }
2726 
2727 /*
2728  * Wake up all walsenders
2729  *
2730  * This will be called inside critical sections, so throwing an error is not
2731  * adviseable.
2732  */
2733 void
WalSndWakeup(void)2734 WalSndWakeup(void)
2735 {
2736 	int			i;
2737 
2738 	for (i = 0; i < max_wal_senders; i++)
2739 	{
2740 		Latch	   *latch;
2741 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
2742 
2743 		/*
2744 		 * Get latch pointer with spinlock held, for the unlikely case that
2745 		 * pointer reads aren't atomic (as they're 8 bytes).
2746 		 */
2747 		SpinLockAcquire(&walsnd->mutex);
2748 		latch = walsnd->latch;
2749 		SpinLockRelease(&walsnd->mutex);
2750 
2751 		if (latch != NULL)
2752 			SetLatch(latch);
2753 	}
2754 }
2755 
2756 /*
2757  * Signal all walsenders to move to stopping state.
2758  *
2759  * This will trigger walsenders to move to a state where no further WAL can be
2760  * generated. See this file's header for details.
2761  */
2762 void
WalSndInitStopping(void)2763 WalSndInitStopping(void)
2764 {
2765 	int			i;
2766 
2767 	for (i = 0; i < max_wal_senders; i++)
2768 	{
2769 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
2770 		pid_t		pid;
2771 
2772 		SpinLockAcquire(&walsnd->mutex);
2773 		pid = walsnd->pid;
2774 		SpinLockRelease(&walsnd->mutex);
2775 
2776 		if (pid == 0)
2777 			continue;
2778 
2779 		SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId);
2780 	}
2781 }
2782 
2783 /*
2784  * Wait that all the WAL senders have quit or reached the stopping state. This
2785  * is used by the checkpointer to control when the shutdown checkpoint can
2786  * safely be performed.
2787  */
2788 void
WalSndWaitStopping(void)2789 WalSndWaitStopping(void)
2790 {
2791 	for (;;)
2792 	{
2793 		int			i;
2794 		bool		all_stopped = true;
2795 
2796 		for (i = 0; i < max_wal_senders; i++)
2797 		{
2798 			WalSndState state;
2799 			WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
2800 
2801 			SpinLockAcquire(&walsnd->mutex);
2802 
2803 			if (walsnd->pid == 0)
2804 			{
2805 				SpinLockRelease(&walsnd->mutex);
2806 				continue;
2807 			}
2808 
2809 			state = walsnd->state;
2810 			SpinLockRelease(&walsnd->mutex);
2811 
2812 			if (state != WALSNDSTATE_STOPPING)
2813 			{
2814 				all_stopped = false;
2815 				break;
2816 			}
2817 		}
2818 
2819 		/* safe to leave if confirmation is done for all WAL senders */
2820 		if (all_stopped)
2821 			return;
2822 
2823 		pg_usleep(10000L);		/* wait for 10 msec */
2824 	}
2825 }
2826 
2827 /* Set state for current walsender (only called in walsender) */
2828 void
WalSndSetState(WalSndState state)2829 WalSndSetState(WalSndState state)
2830 {
2831 	WalSnd	   *walsnd = MyWalSnd;
2832 
2833 	Assert(am_walsender);
2834 
2835 	if (walsnd->state == state)
2836 		return;
2837 
2838 	SpinLockAcquire(&walsnd->mutex);
2839 	walsnd->state = state;
2840 	SpinLockRelease(&walsnd->mutex);
2841 }
2842 
2843 /*
2844  * Return a string constant representing the state. This is used
2845  * in system views, and should *not* be translated.
2846  */
2847 static const char *
WalSndGetStateString(WalSndState state)2848 WalSndGetStateString(WalSndState state)
2849 {
2850 	switch (state)
2851 	{
2852 		case WALSNDSTATE_STARTUP:
2853 			return "startup";
2854 		case WALSNDSTATE_BACKUP:
2855 			return "backup";
2856 		case WALSNDSTATE_CATCHUP:
2857 			return "catchup";
2858 		case WALSNDSTATE_STREAMING:
2859 			return "streaming";
2860 		case WALSNDSTATE_STOPPING:
2861 			return "stopping";
2862 	}
2863 	return "UNKNOWN";
2864 }
2865 
2866 
2867 /*
2868  * Returns activity of walsenders, including pids and xlog locations sent to
2869  * standby servers.
2870  */
2871 Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)2872 pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
2873 {
2874 #define PG_STAT_GET_WAL_SENDERS_COLS	8
2875 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
2876 	TupleDesc	tupdesc;
2877 	Tuplestorestate *tupstore;
2878 	MemoryContext per_query_ctx;
2879 	MemoryContext oldcontext;
2880 	List	   *sync_standbys;
2881 	int			i;
2882 
2883 	/* check to see if caller supports us returning a tuplestore */
2884 	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
2885 		ereport(ERROR,
2886 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2887 				 errmsg("set-valued function called in context that cannot accept a set")));
2888 	if (!(rsinfo->allowedModes & SFRM_Materialize))
2889 		ereport(ERROR,
2890 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2891 				 errmsg("materialize mode required, but it is not " \
2892 						"allowed in this context")));
2893 
2894 	/* Build a tuple descriptor for our result type */
2895 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
2896 		elog(ERROR, "return type must be a row type");
2897 
2898 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
2899 	oldcontext = MemoryContextSwitchTo(per_query_ctx);
2900 
2901 	tupstore = tuplestore_begin_heap(true, false, work_mem);
2902 	rsinfo->returnMode = SFRM_Materialize;
2903 	rsinfo->setResult = tupstore;
2904 	rsinfo->setDesc = tupdesc;
2905 
2906 	MemoryContextSwitchTo(oldcontext);
2907 
2908 	/*
2909 	 * Get the currently active synchronous standbys.
2910 	 */
2911 	LWLockAcquire(SyncRepLock, LW_SHARED);
2912 	sync_standbys = SyncRepGetSyncStandbys(NULL);
2913 	LWLockRelease(SyncRepLock);
2914 
2915 	for (i = 0; i < max_wal_senders; i++)
2916 	{
2917 		WalSnd	   *walsnd = &WalSndCtl->walsnds[i];
2918 		XLogRecPtr	sentPtr;
2919 		XLogRecPtr	write;
2920 		XLogRecPtr	flush;
2921 		XLogRecPtr	apply;
2922 		int			priority;
2923 		WalSndState state;
2924 		Datum		values[PG_STAT_GET_WAL_SENDERS_COLS];
2925 		bool		nulls[PG_STAT_GET_WAL_SENDERS_COLS];
2926 
2927 		if (walsnd->pid == 0)
2928 			continue;
2929 
2930 		SpinLockAcquire(&walsnd->mutex);
2931 		sentPtr = walsnd->sentPtr;
2932 		state = walsnd->state;
2933 		write = walsnd->write;
2934 		flush = walsnd->flush;
2935 		apply = walsnd->apply;
2936 		priority = walsnd->sync_standby_priority;
2937 		SpinLockRelease(&walsnd->mutex);
2938 
2939 		memset(nulls, 0, sizeof(nulls));
2940 		values[0] = Int32GetDatum(walsnd->pid);
2941 
2942 		if (!superuser())
2943 		{
2944 			/*
2945 			 * Only superusers can see details. Other users only get the pid
2946 			 * value to know it's a walsender, but no details.
2947 			 */
2948 			MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
2949 		}
2950 		else
2951 		{
2952 			values[1] = CStringGetTextDatum(WalSndGetStateString(state));
2953 
2954 			if (XLogRecPtrIsInvalid(sentPtr))
2955 				nulls[2] = true;
2956 			values[2] = LSNGetDatum(sentPtr);
2957 
2958 			if (XLogRecPtrIsInvalid(write))
2959 				nulls[3] = true;
2960 			values[3] = LSNGetDatum(write);
2961 
2962 			if (XLogRecPtrIsInvalid(flush))
2963 				nulls[4] = true;
2964 			values[4] = LSNGetDatum(flush);
2965 
2966 			if (XLogRecPtrIsInvalid(apply))
2967 				nulls[5] = true;
2968 			values[5] = LSNGetDatum(apply);
2969 
2970 			/*
2971 			 * Treat a standby such as a pg_basebackup background process
2972 			 * which always returns an invalid flush location, as an
2973 			 * asynchronous standby.
2974 			 */
2975 			priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
2976 
2977 			values[6] = Int32GetDatum(priority);
2978 
2979 			/*
2980 			 * More easily understood version of standby state. This is purely
2981 			 * informational, not different from priority.
2982 			 */
2983 			if (priority == 0)
2984 				values[7] = CStringGetTextDatum("async");
2985 			else if (list_member_int(sync_standbys, i))
2986 				values[7] = CStringGetTextDatum("sync");
2987 			else
2988 				values[7] = CStringGetTextDatum("potential");
2989 		}
2990 
2991 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
2992 	}
2993 
2994 	/* clean up and return the tuplestore */
2995 	tuplestore_donestoring(tupstore);
2996 
2997 	return (Datum) 0;
2998 }
2999 
3000 /*
3001  * Send a keepalive message to standby.
3002  *
3003  * If requestReply is set, the message requests the other party to send
3004  * a message back to us, for heartbeat purposes.  We also set a flag to
3005  * let nearby code that we're waiting for that response, to avoid
3006  * repeated requests.
3007  */
3008 static void
WalSndKeepalive(bool requestReply)3009 WalSndKeepalive(bool requestReply)
3010 {
3011 	elog(DEBUG2, "sending replication keepalive");
3012 
3013 	/* construct the message... */
3014 	resetStringInfo(&output_message);
3015 	pq_sendbyte(&output_message, 'k');
3016 	pq_sendint64(&output_message, sentPtr);
3017 	pq_sendint64(&output_message, GetCurrentIntegerTimestamp());
3018 	pq_sendbyte(&output_message, requestReply ? 1 : 0);
3019 
3020 	/* ... and send it wrapped in CopyData */
3021 	pq_putmessage_noblock('d', output_message.data, output_message.len);
3022 
3023 	/* Set local flag */
3024 	if (requestReply)
3025 		waiting_for_ping_response = true;
3026 }
3027 
3028 /*
3029  * Send keepalive message if too much time has elapsed.
3030  */
3031 static void
WalSndKeepaliveIfNecessary(void)3032 WalSndKeepaliveIfNecessary(void)
3033 {
3034 	TimestampTz ping_time;
3035 
3036 	/*
3037 	 * Don't send keepalive messages if timeouts are globally disabled or
3038 	 * we're doing something not partaking in timeouts.
3039 	 */
3040 	if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3041 		return;
3042 
3043 	if (waiting_for_ping_response)
3044 		return;
3045 
3046 	/*
3047 	 * If half of wal_sender_timeout has lapsed without receiving any reply
3048 	 * from the standby, send a keep-alive message to the standby requesting
3049 	 * an immediate reply.
3050 	 */
3051 	ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
3052 											wal_sender_timeout / 2);
3053 	if (last_processing >= ping_time)
3054 	{
3055 		WalSndKeepalive(true);
3056 
3057 		/* Try to flush pending output to the client */
3058 		if (pq_flush_if_writable() != 0)
3059 			WalSndShutdown();
3060 	}
3061 }
3062