1 /*-------------------------------------------------------------------------
2  *
3  * walreceiverfuncs.c
4  *
5  * This file contains functions used by the startup process to communicate
6  * with the walreceiver process. Functions implementing walreceiver itself
7  * are in walreceiver.c.
8  *
9  * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
10  *
11  *
12  * IDENTIFICATION
13  *	  src/backend/replication/walreceiverfuncs.c
14  *
15  *-------------------------------------------------------------------------
16  */
17 #include "postgres.h"
18 
19 #include <sys/stat.h>
20 #include <sys/time.h>
21 #include <time.h>
22 #include <unistd.h>
23 #include <signal.h>
24 
25 #include "access/xlog_internal.h"
26 #include "postmaster/startup.h"
27 #include "replication/walreceiver.h"
28 #include "storage/pmsignal.h"
29 #include "storage/shmem.h"
30 #include "utils/timestamp.h"
31 
32 WalRcvData *WalRcv = NULL;
33 
34 /*
35  * How long to wait for walreceiver to start up after requesting
36  * postmaster to launch it. In seconds.
37  */
38 #define WALRCV_STARTUP_TIMEOUT 10
39 
40 /* Report shared memory space needed by WalRcvShmemInit */
41 Size
WalRcvShmemSize(void)42 WalRcvShmemSize(void)
43 {
44 	Size		size = 0;
45 
46 	size = add_size(size, sizeof(WalRcvData));
47 
48 	return size;
49 }
50 
51 /* Allocate and initialize walreceiver-related shared memory */
52 void
WalRcvShmemInit(void)53 WalRcvShmemInit(void)
54 {
55 	bool		found;
56 
57 	WalRcv = (WalRcvData *)
58 		ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
59 
60 	if (!found)
61 	{
62 		/* First time through, so initialize */
63 		MemSet(WalRcv, 0, WalRcvShmemSize());
64 		WalRcv->walRcvState = WALRCV_STOPPED;
65 		SpinLockInit(&WalRcv->mutex);
66 		WalRcv->latch = NULL;
67 	}
68 }
69 
70 /* Is walreceiver running (or starting up)? */
71 bool
WalRcvRunning(void)72 WalRcvRunning(void)
73 {
74 	WalRcvData *walrcv = WalRcv;
75 	WalRcvState state;
76 	pg_time_t	startTime;
77 
78 	SpinLockAcquire(&walrcv->mutex);
79 
80 	state = walrcv->walRcvState;
81 	startTime = walrcv->startTime;
82 
83 	SpinLockRelease(&walrcv->mutex);
84 
85 	/*
86 	 * If it has taken too long for walreceiver to start up, give up. Setting
87 	 * the state to STOPPED ensures that if walreceiver later does start up
88 	 * after all, it will see that it's not supposed to be running and die
89 	 * without doing anything.
90 	 */
91 	if (state == WALRCV_STARTING)
92 	{
93 		pg_time_t	now = (pg_time_t) time(NULL);
94 
95 		if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
96 		{
97 			SpinLockAcquire(&walrcv->mutex);
98 
99 			if (walrcv->walRcvState == WALRCV_STARTING)
100 				state = walrcv->walRcvState = WALRCV_STOPPED;
101 
102 			SpinLockRelease(&walrcv->mutex);
103 		}
104 	}
105 
106 	if (state != WALRCV_STOPPED)
107 		return true;
108 	else
109 		return false;
110 }
111 
112 /*
113  * Is walreceiver running and streaming (or at least attempting to connect,
114  * or starting up)?
115  */
116 bool
WalRcvStreaming(void)117 WalRcvStreaming(void)
118 {
119 	WalRcvData *walrcv = WalRcv;
120 	WalRcvState state;
121 	pg_time_t	startTime;
122 
123 	SpinLockAcquire(&walrcv->mutex);
124 
125 	state = walrcv->walRcvState;
126 	startTime = walrcv->startTime;
127 
128 	SpinLockRelease(&walrcv->mutex);
129 
130 	/*
131 	 * If it has taken too long for walreceiver to start up, give up. Setting
132 	 * the state to STOPPED ensures that if walreceiver later does start up
133 	 * after all, it will see that it's not supposed to be running and die
134 	 * without doing anything.
135 	 */
136 	if (state == WALRCV_STARTING)
137 	{
138 		pg_time_t	now = (pg_time_t) time(NULL);
139 
140 		if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
141 		{
142 			SpinLockAcquire(&walrcv->mutex);
143 
144 			if (walrcv->walRcvState == WALRCV_STARTING)
145 				state = walrcv->walRcvState = WALRCV_STOPPED;
146 
147 			SpinLockRelease(&walrcv->mutex);
148 		}
149 	}
150 
151 	if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
152 		state == WALRCV_RESTARTING)
153 		return true;
154 	else
155 		return false;
156 }
157 
158 /*
159  * Stop walreceiver (if running) and wait for it to die.
160  * Executed by the Startup process.
161  */
162 void
ShutdownWalRcv(void)163 ShutdownWalRcv(void)
164 {
165 	WalRcvData *walrcv = WalRcv;
166 	pid_t		walrcvpid = 0;
167 
168 	/*
169 	 * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
170 	 * mode once it's finished, and will also request postmaster to not
171 	 * restart itself.
172 	 */
173 	SpinLockAcquire(&walrcv->mutex);
174 	switch (walrcv->walRcvState)
175 	{
176 		case WALRCV_STOPPED:
177 			break;
178 		case WALRCV_STARTING:
179 			walrcv->walRcvState = WALRCV_STOPPED;
180 			break;
181 
182 		case WALRCV_STREAMING:
183 		case WALRCV_WAITING:
184 		case WALRCV_RESTARTING:
185 			walrcv->walRcvState = WALRCV_STOPPING;
186 			/* fall through */
187 		case WALRCV_STOPPING:
188 			walrcvpid = walrcv->pid;
189 			break;
190 	}
191 	SpinLockRelease(&walrcv->mutex);
192 
193 	/*
194 	 * Signal walreceiver process if it was still running.
195 	 */
196 	if (walrcvpid != 0)
197 		kill(walrcvpid, SIGTERM);
198 
199 	/*
200 	 * Wait for walreceiver to acknowledge its death by setting state to
201 	 * WALRCV_STOPPED.
202 	 */
203 	while (WalRcvRunning())
204 	{
205 		/*
206 		 * This possibly-long loop needs to handle interrupts of startup
207 		 * process.
208 		 */
209 		HandleStartupProcInterrupts();
210 
211 		pg_usleep(100000);		/* 100ms */
212 	}
213 }
214 
215 /*
216  * Request postmaster to start walreceiver.
217  *
218  * recptr indicates the position where streaming should begin, conninfo
219  * is a libpq connection string to use, and slotname is, optionally, the name
220  * of a replication slot to acquire.
221  */
222 void
RequestXLogStreaming(TimeLineID tli,XLogRecPtr recptr,const char * conninfo,const char * slotname)223 RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
224 					 const char *slotname)
225 {
226 	WalRcvData *walrcv = WalRcv;
227 	bool		launch = false;
228 	pg_time_t	now = (pg_time_t) time(NULL);
229 	Latch	   *latch;
230 
231 	/*
232 	 * We always start at the beginning of the segment. That prevents a broken
233 	 * segment (i.e., with no records in the first half of a segment) from
234 	 * being created by XLOG streaming, which might cause trouble later on if
235 	 * the segment is e.g archived.
236 	 */
237 	if (recptr % XLogSegSize != 0)
238 		recptr -= recptr % XLogSegSize;
239 
240 	SpinLockAcquire(&walrcv->mutex);
241 
242 	/* It better be stopped if we try to restart it */
243 	Assert(walrcv->walRcvState == WALRCV_STOPPED ||
244 		   walrcv->walRcvState == WALRCV_WAITING);
245 
246 	if (conninfo != NULL)
247 		strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
248 	else
249 		walrcv->conninfo[0] = '\0';
250 
251 	if (slotname != NULL)
252 		strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
253 	else
254 		walrcv->slotname[0] = '\0';
255 
256 	if (walrcv->walRcvState == WALRCV_STOPPED)
257 	{
258 		launch = true;
259 		walrcv->walRcvState = WALRCV_STARTING;
260 	}
261 	else
262 		walrcv->walRcvState = WALRCV_RESTARTING;
263 	walrcv->startTime = now;
264 
265 	/*
266 	 * If this is the first startup of walreceiver (on this timeline),
267 	 * initialize receivedUpto and latestChunkStart to the starting point.
268 	 */
269 	if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
270 	{
271 		walrcv->receivedUpto = recptr;
272 		walrcv->receivedTLI = tli;
273 		walrcv->latestChunkStart = recptr;
274 	}
275 	walrcv->receiveStart = recptr;
276 	walrcv->receiveStartTLI = tli;
277 
278 	latch = walrcv->latch;
279 
280 	SpinLockRelease(&walrcv->mutex);
281 
282 	if (launch)
283 		SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
284 	else if (latch)
285 		SetLatch(latch);
286 }
287 
288 /*
289  * Returns the last+1 byte position that walreceiver has written.
290  *
291  * Optionally, returns the previous chunk start, that is the first byte
292  * written in the most recent walreceiver flush cycle.  Callers not
293  * interested in that value may pass NULL for latestChunkStart. Same for
294  * receiveTLI.
295  */
296 XLogRecPtr
GetWalRcvWriteRecPtr(XLogRecPtr * latestChunkStart,TimeLineID * receiveTLI)297 GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
298 {
299 	WalRcvData *walrcv = WalRcv;
300 	XLogRecPtr	recptr;
301 
302 	SpinLockAcquire(&walrcv->mutex);
303 	recptr = walrcv->receivedUpto;
304 	if (latestChunkStart)
305 		*latestChunkStart = walrcv->latestChunkStart;
306 	if (receiveTLI)
307 		*receiveTLI = walrcv->receivedTLI;
308 	SpinLockRelease(&walrcv->mutex);
309 
310 	return recptr;
311 }
312 
313 /*
314  * Returns the replication apply delay in ms or -1
315  * if the apply delay info is not available
316  */
317 int
GetReplicationApplyDelay(void)318 GetReplicationApplyDelay(void)
319 {
320 	WalRcvData *walrcv = WalRcv;
321 	XLogRecPtr	receivePtr;
322 	XLogRecPtr	replayPtr;
323 	TimestampTz chunkReplayStartTime;
324 
325 	SpinLockAcquire(&walrcv->mutex);
326 	receivePtr = walrcv->receivedUpto;
327 	SpinLockRelease(&walrcv->mutex);
328 
329 	replayPtr = GetXLogReplayRecPtr(NULL);
330 
331 	if (receivePtr == replayPtr)
332 		return 0;
333 
334 	chunkReplayStartTime = GetCurrentChunkReplayStartTime();
335 
336 	if (chunkReplayStartTime == 0)
337 		return -1;
338 
339 	return TimestampDifferenceMilliseconds(chunkReplayStartTime,
340 										   GetCurrentTimestamp());
341 }
342 
343 /*
344  * Returns the network latency in ms, note that this includes any
345  * difference in clock settings between the servers, as well as timezone.
346  */
347 int
GetReplicationTransferLatency(void)348 GetReplicationTransferLatency(void)
349 {
350 	WalRcvData *walrcv = WalRcv;
351 	TimestampTz lastMsgSendTime;
352 	TimestampTz lastMsgReceiptTime;
353 
354 	SpinLockAcquire(&walrcv->mutex);
355 	lastMsgSendTime = walrcv->lastMsgSendTime;
356 	lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
357 	SpinLockRelease(&walrcv->mutex);
358 
359 	return TimestampDifferenceMilliseconds(lastMsgSendTime,
360 										   lastMsgReceiptTime);
361 }
362