1 /*-------------------------------------------------------------------------
2 *
3 * walreceiverfuncs.c
4 *
5 * This file contains functions used by the startup process to communicate
6 * with the walreceiver process. Functions implementing walreceiver itself
7 * are in walreceiver.c.
8 *
9 * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
10 *
11 *
12 * IDENTIFICATION
13 * src/backend/replication/walreceiverfuncs.c
14 *
15 *-------------------------------------------------------------------------
16 */
17 #include "postgres.h"
18
19 #include <sys/stat.h>
20 #include <sys/time.h>
21 #include <time.h>
22 #include <unistd.h>
23 #include <signal.h>
24
25 #include "access/xlog_internal.h"
26 #include "postmaster/startup.h"
27 #include "replication/walreceiver.h"
28 #include "storage/pmsignal.h"
29 #include "storage/shmem.h"
30 #include "utils/timestamp.h"
31
32 WalRcvData *WalRcv = NULL;
33
34 /*
35 * How long to wait for walreceiver to start up after requesting
36 * postmaster to launch it. In seconds.
37 */
38 #define WALRCV_STARTUP_TIMEOUT 10
39
40 /* Report shared memory space needed by WalRcvShmemInit */
41 Size
WalRcvShmemSize(void)42 WalRcvShmemSize(void)
43 {
44 Size size = 0;
45
46 size = add_size(size, sizeof(WalRcvData));
47
48 return size;
49 }
50
51 /* Allocate and initialize walreceiver-related shared memory */
52 void
WalRcvShmemInit(void)53 WalRcvShmemInit(void)
54 {
55 bool found;
56
57 WalRcv = (WalRcvData *)
58 ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
59
60 if (!found)
61 {
62 /* First time through, so initialize */
63 MemSet(WalRcv, 0, WalRcvShmemSize());
64 WalRcv->walRcvState = WALRCV_STOPPED;
65 SpinLockInit(&WalRcv->mutex);
66 WalRcv->latch = NULL;
67 }
68 }
69
70 /* Is walreceiver running (or starting up)? */
71 bool
WalRcvRunning(void)72 WalRcvRunning(void)
73 {
74 WalRcvData *walrcv = WalRcv;
75 WalRcvState state;
76 pg_time_t startTime;
77
78 SpinLockAcquire(&walrcv->mutex);
79
80 state = walrcv->walRcvState;
81 startTime = walrcv->startTime;
82
83 SpinLockRelease(&walrcv->mutex);
84
85 /*
86 * If it has taken too long for walreceiver to start up, give up. Setting
87 * the state to STOPPED ensures that if walreceiver later does start up
88 * after all, it will see that it's not supposed to be running and die
89 * without doing anything.
90 */
91 if (state == WALRCV_STARTING)
92 {
93 pg_time_t now = (pg_time_t) time(NULL);
94
95 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
96 {
97 SpinLockAcquire(&walrcv->mutex);
98
99 if (walrcv->walRcvState == WALRCV_STARTING)
100 state = walrcv->walRcvState = WALRCV_STOPPED;
101
102 SpinLockRelease(&walrcv->mutex);
103 }
104 }
105
106 if (state != WALRCV_STOPPED)
107 return true;
108 else
109 return false;
110 }
111
112 /*
113 * Is walreceiver running and streaming (or at least attempting to connect,
114 * or starting up)?
115 */
116 bool
WalRcvStreaming(void)117 WalRcvStreaming(void)
118 {
119 WalRcvData *walrcv = WalRcv;
120 WalRcvState state;
121 pg_time_t startTime;
122
123 SpinLockAcquire(&walrcv->mutex);
124
125 state = walrcv->walRcvState;
126 startTime = walrcv->startTime;
127
128 SpinLockRelease(&walrcv->mutex);
129
130 /*
131 * If it has taken too long for walreceiver to start up, give up. Setting
132 * the state to STOPPED ensures that if walreceiver later does start up
133 * after all, it will see that it's not supposed to be running and die
134 * without doing anything.
135 */
136 if (state == WALRCV_STARTING)
137 {
138 pg_time_t now = (pg_time_t) time(NULL);
139
140 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
141 {
142 SpinLockAcquire(&walrcv->mutex);
143
144 if (walrcv->walRcvState == WALRCV_STARTING)
145 state = walrcv->walRcvState = WALRCV_STOPPED;
146
147 SpinLockRelease(&walrcv->mutex);
148 }
149 }
150
151 if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
152 state == WALRCV_RESTARTING)
153 return true;
154 else
155 return false;
156 }
157
158 /*
159 * Stop walreceiver (if running) and wait for it to die.
160 * Executed by the Startup process.
161 */
162 void
ShutdownWalRcv(void)163 ShutdownWalRcv(void)
164 {
165 WalRcvData *walrcv = WalRcv;
166 pid_t walrcvpid = 0;
167
168 /*
169 * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
170 * mode once it's finished, and will also request postmaster to not
171 * restart itself.
172 */
173 SpinLockAcquire(&walrcv->mutex);
174 switch (walrcv->walRcvState)
175 {
176 case WALRCV_STOPPED:
177 break;
178 case WALRCV_STARTING:
179 walrcv->walRcvState = WALRCV_STOPPED;
180 break;
181
182 case WALRCV_STREAMING:
183 case WALRCV_WAITING:
184 case WALRCV_RESTARTING:
185 walrcv->walRcvState = WALRCV_STOPPING;
186 /* fall through */
187 case WALRCV_STOPPING:
188 walrcvpid = walrcv->pid;
189 break;
190 }
191 SpinLockRelease(&walrcv->mutex);
192
193 /*
194 * Signal walreceiver process if it was still running.
195 */
196 if (walrcvpid != 0)
197 kill(walrcvpid, SIGTERM);
198
199 /*
200 * Wait for walreceiver to acknowledge its death by setting state to
201 * WALRCV_STOPPED.
202 */
203 while (WalRcvRunning())
204 {
205 /*
206 * This possibly-long loop needs to handle interrupts of startup
207 * process.
208 */
209 HandleStartupProcInterrupts();
210
211 pg_usleep(100000); /* 100ms */
212 }
213 }
214
215 /*
216 * Request postmaster to start walreceiver.
217 *
218 * recptr indicates the position where streaming should begin, conninfo
219 * is a libpq connection string to use, and slotname is, optionally, the name
220 * of a replication slot to acquire.
221 */
222 void
RequestXLogStreaming(TimeLineID tli,XLogRecPtr recptr,const char * conninfo,const char * slotname)223 RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
224 const char *slotname)
225 {
226 WalRcvData *walrcv = WalRcv;
227 bool launch = false;
228 pg_time_t now = (pg_time_t) time(NULL);
229 Latch *latch;
230
231 /*
232 * We always start at the beginning of the segment. That prevents a broken
233 * segment (i.e., with no records in the first half of a segment) from
234 * being created by XLOG streaming, which might cause trouble later on if
235 * the segment is e.g archived.
236 */
237 if (recptr % XLogSegSize != 0)
238 recptr -= recptr % XLogSegSize;
239
240 SpinLockAcquire(&walrcv->mutex);
241
242 /* It better be stopped if we try to restart it */
243 Assert(walrcv->walRcvState == WALRCV_STOPPED ||
244 walrcv->walRcvState == WALRCV_WAITING);
245
246 if (conninfo != NULL)
247 strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
248 else
249 walrcv->conninfo[0] = '\0';
250
251 if (slotname != NULL)
252 strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
253 else
254 walrcv->slotname[0] = '\0';
255
256 if (walrcv->walRcvState == WALRCV_STOPPED)
257 {
258 launch = true;
259 walrcv->walRcvState = WALRCV_STARTING;
260 }
261 else
262 walrcv->walRcvState = WALRCV_RESTARTING;
263 walrcv->startTime = now;
264
265 /*
266 * If this is the first startup of walreceiver (on this timeline),
267 * initialize receivedUpto and latestChunkStart to the starting point.
268 */
269 if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
270 {
271 walrcv->receivedUpto = recptr;
272 walrcv->receivedTLI = tli;
273 walrcv->latestChunkStart = recptr;
274 }
275 walrcv->receiveStart = recptr;
276 walrcv->receiveStartTLI = tli;
277
278 latch = walrcv->latch;
279
280 SpinLockRelease(&walrcv->mutex);
281
282 if (launch)
283 SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
284 else if (latch)
285 SetLatch(latch);
286 }
287
288 /*
289 * Returns the last+1 byte position that walreceiver has written.
290 *
291 * Optionally, returns the previous chunk start, that is the first byte
292 * written in the most recent walreceiver flush cycle. Callers not
293 * interested in that value may pass NULL for latestChunkStart. Same for
294 * receiveTLI.
295 */
296 XLogRecPtr
GetWalRcvWriteRecPtr(XLogRecPtr * latestChunkStart,TimeLineID * receiveTLI)297 GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
298 {
299 WalRcvData *walrcv = WalRcv;
300 XLogRecPtr recptr;
301
302 SpinLockAcquire(&walrcv->mutex);
303 recptr = walrcv->receivedUpto;
304 if (latestChunkStart)
305 *latestChunkStart = walrcv->latestChunkStart;
306 if (receiveTLI)
307 *receiveTLI = walrcv->receivedTLI;
308 SpinLockRelease(&walrcv->mutex);
309
310 return recptr;
311 }
312
313 /*
314 * Returns the replication apply delay in ms or -1
315 * if the apply delay info is not available
316 */
317 int
GetReplicationApplyDelay(void)318 GetReplicationApplyDelay(void)
319 {
320 WalRcvData *walrcv = WalRcv;
321 XLogRecPtr receivePtr;
322 XLogRecPtr replayPtr;
323 TimestampTz chunkReplayStartTime;
324
325 SpinLockAcquire(&walrcv->mutex);
326 receivePtr = walrcv->receivedUpto;
327 SpinLockRelease(&walrcv->mutex);
328
329 replayPtr = GetXLogReplayRecPtr(NULL);
330
331 if (receivePtr == replayPtr)
332 return 0;
333
334 chunkReplayStartTime = GetCurrentChunkReplayStartTime();
335
336 if (chunkReplayStartTime == 0)
337 return -1;
338
339 return TimestampDifferenceMilliseconds(chunkReplayStartTime,
340 GetCurrentTimestamp());
341 }
342
343 /*
344 * Returns the network latency in ms, note that this includes any
345 * difference in clock settings between the servers, as well as timezone.
346 */
347 int
GetReplicationTransferLatency(void)348 GetReplicationTransferLatency(void)
349 {
350 WalRcvData *walrcv = WalRcv;
351 TimestampTz lastMsgSendTime;
352 TimestampTz lastMsgReceiptTime;
353
354 SpinLockAcquire(&walrcv->mutex);
355 lastMsgSendTime = walrcv->lastMsgSendTime;
356 lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
357 SpinLockRelease(&walrcv->mutex);
358
359 return TimestampDifferenceMilliseconds(lastMsgSendTime,
360 lastMsgReceiptTime);
361 }
362