1 /*-------------------------------------------------------------------------
2  *
3  * walreceiverfuncs.c
4  *
5  * This file contains functions used by the startup process to communicate
6  * with the walreceiver process. Functions implementing walreceiver itself
7  * are in walreceiver.c.
8  *
9  * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group
10  *
11  *
12  * IDENTIFICATION
13  *	  src/backend/replication/walreceiverfuncs.c
14  *
15  *-------------------------------------------------------------------------
16  */
17 #include "postgres.h"
18 
19 #include <sys/stat.h>
20 #include <sys/time.h>
21 #include <time.h>
22 #include <unistd.h>
23 #include <signal.h>
24 
25 #include "access/xlog_internal.h"
26 #include "postmaster/startup.h"
27 #include "replication/walreceiver.h"
28 #include "storage/pmsignal.h"
29 #include "storage/shmem.h"
30 #include "utils/timestamp.h"
31 
32 WalRcvData *WalRcv = NULL;
33 
34 /*
35  * How long to wait for walreceiver to start up after requesting
36  * postmaster to launch it. In seconds.
37  */
38 #define WALRCV_STARTUP_TIMEOUT 10
39 
40 /* Report shared memory space needed by WalRcvShmemInit */
41 Size
WalRcvShmemSize(void)42 WalRcvShmemSize(void)
43 {
44 	Size		size = 0;
45 
46 	size = add_size(size, sizeof(WalRcvData));
47 
48 	return size;
49 }
50 
51 /* Allocate and initialize walreceiver-related shared memory */
52 void
WalRcvShmemInit(void)53 WalRcvShmemInit(void)
54 {
55 	bool		found;
56 
57 	WalRcv = (WalRcvData *)
58 		ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
59 
60 	if (!found)
61 	{
62 		/* First time through, so initialize */
63 		MemSet(WalRcv, 0, WalRcvShmemSize());
64 		WalRcv->walRcvState = WALRCV_STOPPED;
65 		SpinLockInit(&WalRcv->mutex);
66 		pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
67 		WalRcv->latch = NULL;
68 	}
69 }
70 
71 /* Is walreceiver running (or starting up)? */
72 bool
WalRcvRunning(void)73 WalRcvRunning(void)
74 {
75 	WalRcvData *walrcv = WalRcv;
76 	WalRcvState state;
77 	pg_time_t	startTime;
78 
79 	SpinLockAcquire(&walrcv->mutex);
80 
81 	state = walrcv->walRcvState;
82 	startTime = walrcv->startTime;
83 
84 	SpinLockRelease(&walrcv->mutex);
85 
86 	/*
87 	 * If it has taken too long for walreceiver to start up, give up. Setting
88 	 * the state to STOPPED ensures that if walreceiver later does start up
89 	 * after all, it will see that it's not supposed to be running and die
90 	 * without doing anything.
91 	 */
92 	if (state == WALRCV_STARTING)
93 	{
94 		pg_time_t	now = (pg_time_t) time(NULL);
95 
96 		if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
97 		{
98 			SpinLockAcquire(&walrcv->mutex);
99 
100 			if (walrcv->walRcvState == WALRCV_STARTING)
101 				state = walrcv->walRcvState = WALRCV_STOPPED;
102 
103 			SpinLockRelease(&walrcv->mutex);
104 		}
105 	}
106 
107 	if (state != WALRCV_STOPPED)
108 		return true;
109 	else
110 		return false;
111 }
112 
113 /*
114  * Is walreceiver running and streaming (or at least attempting to connect,
115  * or starting up)?
116  */
117 bool
WalRcvStreaming(void)118 WalRcvStreaming(void)
119 {
120 	WalRcvData *walrcv = WalRcv;
121 	WalRcvState state;
122 	pg_time_t	startTime;
123 
124 	SpinLockAcquire(&walrcv->mutex);
125 
126 	state = walrcv->walRcvState;
127 	startTime = walrcv->startTime;
128 
129 	SpinLockRelease(&walrcv->mutex);
130 
131 	/*
132 	 * If it has taken too long for walreceiver to start up, give up. Setting
133 	 * the state to STOPPED ensures that if walreceiver later does start up
134 	 * after all, it will see that it's not supposed to be running and die
135 	 * without doing anything.
136 	 */
137 	if (state == WALRCV_STARTING)
138 	{
139 		pg_time_t	now = (pg_time_t) time(NULL);
140 
141 		if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
142 		{
143 			SpinLockAcquire(&walrcv->mutex);
144 
145 			if (walrcv->walRcvState == WALRCV_STARTING)
146 				state = walrcv->walRcvState = WALRCV_STOPPED;
147 
148 			SpinLockRelease(&walrcv->mutex);
149 		}
150 	}
151 
152 	if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
153 		state == WALRCV_RESTARTING)
154 		return true;
155 	else
156 		return false;
157 }
158 
159 /*
160  * Stop walreceiver (if running) and wait for it to die.
161  * Executed by the Startup process.
162  */
163 void
ShutdownWalRcv(void)164 ShutdownWalRcv(void)
165 {
166 	WalRcvData *walrcv = WalRcv;
167 	pid_t		walrcvpid = 0;
168 
169 	/*
170 	 * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
171 	 * mode once it's finished, and will also request postmaster to not
172 	 * restart itself.
173 	 */
174 	SpinLockAcquire(&walrcv->mutex);
175 	switch (walrcv->walRcvState)
176 	{
177 		case WALRCV_STOPPED:
178 			break;
179 		case WALRCV_STARTING:
180 			walrcv->walRcvState = WALRCV_STOPPED;
181 			break;
182 
183 		case WALRCV_STREAMING:
184 		case WALRCV_WAITING:
185 		case WALRCV_RESTARTING:
186 			walrcv->walRcvState = WALRCV_STOPPING;
187 			/* fall through */
188 		case WALRCV_STOPPING:
189 			walrcvpid = walrcv->pid;
190 			break;
191 	}
192 	SpinLockRelease(&walrcv->mutex);
193 
194 	/*
195 	 * Signal walreceiver process if it was still running.
196 	 */
197 	if (walrcvpid != 0)
198 		kill(walrcvpid, SIGTERM);
199 
200 	/*
201 	 * Wait for walreceiver to acknowledge its death by setting state to
202 	 * WALRCV_STOPPED.
203 	 */
204 	while (WalRcvRunning())
205 	{
206 		/*
207 		 * This possibly-long loop needs to handle interrupts of startup
208 		 * process.
209 		 */
210 		HandleStartupProcInterrupts();
211 
212 		pg_usleep(100000);		/* 100ms */
213 	}
214 }
215 
216 /*
217  * Request postmaster to start walreceiver.
218  *
219  * "recptr" indicates the position where streaming should begin.  "conninfo"
220  * is a libpq connection string to use.  "slotname" is, optionally, the name
221  * of a replication slot to acquire.  "create_temp_slot" indicates to create
222  * a temporary slot when no "slotname" is given.
223  *
224  * WAL receivers do not directly load GUC parameters used for the connection
225  * to the primary, and rely on the values passed down by the caller of this
226  * routine instead.  Hence, the addition of any new parameters should happen
227  * through this code path.
228  */
229 void
RequestXLogStreaming(TimeLineID tli,XLogRecPtr recptr,const char * conninfo,const char * slotname,bool create_temp_slot)230 RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
231 					 const char *slotname, bool create_temp_slot)
232 {
233 	WalRcvData *walrcv = WalRcv;
234 	bool		launch = false;
235 	pg_time_t	now = (pg_time_t) time(NULL);
236 	Latch	   *latch;
237 
238 	/*
239 	 * We always start at the beginning of the segment. That prevents a broken
240 	 * segment (i.e., with no records in the first half of a segment) from
241 	 * being created by XLOG streaming, which might cause trouble later on if
242 	 * the segment is e.g archived.
243 	 */
244 	if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
245 		recptr -= XLogSegmentOffset(recptr, wal_segment_size);
246 
247 	SpinLockAcquire(&walrcv->mutex);
248 
249 	/* It better be stopped if we try to restart it */
250 	Assert(walrcv->walRcvState == WALRCV_STOPPED ||
251 		   walrcv->walRcvState == WALRCV_WAITING);
252 
253 	if (conninfo != NULL)
254 		strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
255 	else
256 		walrcv->conninfo[0] = '\0';
257 
258 	/*
259 	 * Use configured replication slot if present, and ignore the value of
260 	 * create_temp_slot as the slot name should be persistent.  Otherwise, use
261 	 * create_temp_slot to determine whether this WAL receiver should create a
262 	 * temporary slot by itself and use it, or not.
263 	 */
264 	if (slotname != NULL && slotname[0] != '\0')
265 	{
266 		strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
267 		walrcv->is_temp_slot = false;
268 	}
269 	else
270 	{
271 		walrcv->slotname[0] = '\0';
272 		walrcv->is_temp_slot = create_temp_slot;
273 	}
274 
275 	if (walrcv->walRcvState == WALRCV_STOPPED)
276 	{
277 		launch = true;
278 		walrcv->walRcvState = WALRCV_STARTING;
279 	}
280 	else
281 		walrcv->walRcvState = WALRCV_RESTARTING;
282 	walrcv->startTime = now;
283 
284 	/*
285 	 * If this is the first startup of walreceiver (on this timeline),
286 	 * initialize flushedUpto and latestChunkStart to the starting point.
287 	 */
288 	if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
289 	{
290 		walrcv->flushedUpto = recptr;
291 		walrcv->receivedTLI = tli;
292 		walrcv->latestChunkStart = recptr;
293 	}
294 	walrcv->receiveStart = recptr;
295 	walrcv->receiveStartTLI = tli;
296 
297 	latch = walrcv->latch;
298 
299 	SpinLockRelease(&walrcv->mutex);
300 
301 	if (launch)
302 		SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
303 	else if (latch)
304 		SetLatch(latch);
305 }
306 
307 /*
308  * Returns the last+1 byte position that walreceiver has flushed.
309  *
310  * Optionally, returns the previous chunk start, that is the first byte
311  * written in the most recent walreceiver flush cycle.  Callers not
312  * interested in that value may pass NULL for latestChunkStart. Same for
313  * receiveTLI.
314  */
315 XLogRecPtr
GetWalRcvFlushRecPtr(XLogRecPtr * latestChunkStart,TimeLineID * receiveTLI)316 GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
317 {
318 	WalRcvData *walrcv = WalRcv;
319 	XLogRecPtr	recptr;
320 
321 	SpinLockAcquire(&walrcv->mutex);
322 	recptr = walrcv->flushedUpto;
323 	if (latestChunkStart)
324 		*latestChunkStart = walrcv->latestChunkStart;
325 	if (receiveTLI)
326 		*receiveTLI = walrcv->receivedTLI;
327 	SpinLockRelease(&walrcv->mutex);
328 
329 	return recptr;
330 }
331 
332 /*
333  * Returns the last+1 byte position that walreceiver has written.
334  * This returns a recently written value without taking a lock.
335  */
336 XLogRecPtr
GetWalRcvWriteRecPtr(void)337 GetWalRcvWriteRecPtr(void)
338 {
339 	WalRcvData *walrcv = WalRcv;
340 
341 	return pg_atomic_read_u64(&walrcv->writtenUpto);
342 }
343 
344 /*
345  * Returns the replication apply delay in ms or -1
346  * if the apply delay info is not available
347  */
348 int
GetReplicationApplyDelay(void)349 GetReplicationApplyDelay(void)
350 {
351 	WalRcvData *walrcv = WalRcv;
352 	XLogRecPtr	receivePtr;
353 	XLogRecPtr	replayPtr;
354 	TimestampTz chunkReplayStartTime;
355 
356 	SpinLockAcquire(&walrcv->mutex);
357 	receivePtr = walrcv->flushedUpto;
358 	SpinLockRelease(&walrcv->mutex);
359 
360 	replayPtr = GetXLogReplayRecPtr(NULL);
361 
362 	if (receivePtr == replayPtr)
363 		return 0;
364 
365 	chunkReplayStartTime = GetCurrentChunkReplayStartTime();
366 
367 	if (chunkReplayStartTime == 0)
368 		return -1;
369 
370 	return TimestampDifferenceMilliseconds(chunkReplayStartTime,
371 										   GetCurrentTimestamp());
372 }
373 
374 /*
375  * Returns the network latency in ms, note that this includes any
376  * difference in clock settings between the servers, as well as timezone.
377  */
378 int
GetReplicationTransferLatency(void)379 GetReplicationTransferLatency(void)
380 {
381 	WalRcvData *walrcv = WalRcv;
382 	TimestampTz lastMsgSendTime;
383 	TimestampTz lastMsgReceiptTime;
384 
385 	SpinLockAcquire(&walrcv->mutex);
386 	lastMsgSendTime = walrcv->lastMsgSendTime;
387 	lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
388 	SpinLockRelease(&walrcv->mutex);
389 
390 	return TimestampDifferenceMilliseconds(lastMsgSendTime,
391 										   lastMsgReceiptTime);
392 }
393