1 /*-------------------------------------------------------------------------
2 *
3 * walreceiverfuncs.c
4 *
5 * This file contains functions used by the startup process to communicate
6 * with the walreceiver process. Functions implementing walreceiver itself
7 * are in walreceiver.c.
8 *
9 * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
10 *
11 *
12 * IDENTIFICATION
13 * src/backend/replication/walreceiverfuncs.c
14 *
15 *-------------------------------------------------------------------------
16 */
17 #include "postgres.h"
18
19 #include <sys/types.h>
20 #include <sys/stat.h>
21 #include <sys/time.h>
22 #include <time.h>
23 #include <unistd.h>
24 #include <signal.h>
25
26 #include "access/xlog_internal.h"
27 #include "postmaster/startup.h"
28 #include "replication/walreceiver.h"
29 #include "storage/pmsignal.h"
30 #include "storage/shmem.h"
31 #include "utils/timestamp.h"
32
33 WalRcvData *WalRcv = NULL;
34
35 /*
36 * How long to wait for walreceiver to start up after requesting
37 * postmaster to launch it. In seconds.
38 */
39 #define WALRCV_STARTUP_TIMEOUT 10
40
41 /* Report shared memory space needed by WalRcvShmemInit */
42 Size
WalRcvShmemSize(void)43 WalRcvShmemSize(void)
44 {
45 Size size = 0;
46
47 size = add_size(size, sizeof(WalRcvData));
48
49 return size;
50 }
51
52 /* Allocate and initialize walreceiver-related shared memory */
53 void
WalRcvShmemInit(void)54 WalRcvShmemInit(void)
55 {
56 bool found;
57
58 WalRcv = (WalRcvData *)
59 ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
60
61 if (!found)
62 {
63 /* First time through, so initialize */
64 MemSet(WalRcv, 0, WalRcvShmemSize());
65 WalRcv->walRcvState = WALRCV_STOPPED;
66 SpinLockInit(&WalRcv->mutex);
67 InitSharedLatch(&WalRcv->latch);
68 }
69 }
70
71 /* Is walreceiver running (or starting up)? */
72 bool
WalRcvRunning(void)73 WalRcvRunning(void)
74 {
75 WalRcvData *walrcv = WalRcv;
76 WalRcvState state;
77 pg_time_t startTime;
78
79 SpinLockAcquire(&walrcv->mutex);
80
81 state = walrcv->walRcvState;
82 startTime = walrcv->startTime;
83
84 SpinLockRelease(&walrcv->mutex);
85
86 /*
87 * If it has taken too long for walreceiver to start up, give up. Setting
88 * the state to STOPPED ensures that if walreceiver later does start up
89 * after all, it will see that it's not supposed to be running and die
90 * without doing anything.
91 */
92 if (state == WALRCV_STARTING)
93 {
94 pg_time_t now = (pg_time_t) time(NULL);
95
96 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
97 {
98 SpinLockAcquire(&walrcv->mutex);
99
100 if (walrcv->walRcvState == WALRCV_STARTING)
101 state = walrcv->walRcvState = WALRCV_STOPPED;
102
103 SpinLockRelease(&walrcv->mutex);
104 }
105 }
106
107 if (state != WALRCV_STOPPED)
108 return true;
109 else
110 return false;
111 }
112
113 /*
114 * Is walreceiver running and streaming (or at least attempting to connect,
115 * or starting up)?
116 */
117 bool
WalRcvStreaming(void)118 WalRcvStreaming(void)
119 {
120 WalRcvData *walrcv = WalRcv;
121 WalRcvState state;
122 pg_time_t startTime;
123
124 SpinLockAcquire(&walrcv->mutex);
125
126 state = walrcv->walRcvState;
127 startTime = walrcv->startTime;
128
129 SpinLockRelease(&walrcv->mutex);
130
131 /*
132 * If it has taken too long for walreceiver to start up, give up. Setting
133 * the state to STOPPED ensures that if walreceiver later does start up
134 * after all, it will see that it's not supposed to be running and die
135 * without doing anything.
136 */
137 if (state == WALRCV_STARTING)
138 {
139 pg_time_t now = (pg_time_t) time(NULL);
140
141 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
142 {
143 SpinLockAcquire(&walrcv->mutex);
144
145 if (walrcv->walRcvState == WALRCV_STARTING)
146 state = walrcv->walRcvState = WALRCV_STOPPED;
147
148 SpinLockRelease(&walrcv->mutex);
149 }
150 }
151
152 if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
153 state == WALRCV_RESTARTING)
154 return true;
155 else
156 return false;
157 }
158
159 /*
160 * Stop walreceiver (if running) and wait for it to die.
161 * Executed by the Startup process.
162 */
163 void
ShutdownWalRcv(void)164 ShutdownWalRcv(void)
165 {
166 WalRcvData *walrcv = WalRcv;
167 pid_t walrcvpid = 0;
168
169 /*
170 * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
171 * mode once it's finished, and will also request postmaster to not
172 * restart itself.
173 */
174 SpinLockAcquire(&walrcv->mutex);
175 switch (walrcv->walRcvState)
176 {
177 case WALRCV_STOPPED:
178 break;
179 case WALRCV_STARTING:
180 walrcv->walRcvState = WALRCV_STOPPED;
181 break;
182
183 case WALRCV_STREAMING:
184 case WALRCV_WAITING:
185 case WALRCV_RESTARTING:
186 walrcv->walRcvState = WALRCV_STOPPING;
187 /* fall through */
188 case WALRCV_STOPPING:
189 walrcvpid = walrcv->pid;
190 break;
191 }
192 SpinLockRelease(&walrcv->mutex);
193
194 /*
195 * Signal walreceiver process if it was still running.
196 */
197 if (walrcvpid != 0)
198 kill(walrcvpid, SIGTERM);
199
200 /*
201 * Wait for walreceiver to acknowledge its death by setting state to
202 * WALRCV_STOPPED.
203 */
204 while (WalRcvRunning())
205 {
206 /*
207 * This possibly-long loop needs to handle interrupts of startup
208 * process.
209 */
210 HandleStartupProcInterrupts();
211
212 pg_usleep(100000); /* 100ms */
213 }
214 }
215
216 /*
217 * Request postmaster to start walreceiver.
218 *
219 * recptr indicates the position where streaming should begin, conninfo
220 * is a libpq connection string to use, and slotname is, optionally, the name
221 * of a replication slot to acquire.
222 */
223 void
RequestXLogStreaming(TimeLineID tli,XLogRecPtr recptr,const char * conninfo,const char * slotname)224 RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
225 const char *slotname)
226 {
227 WalRcvData *walrcv = WalRcv;
228 bool launch = false;
229 pg_time_t now = (pg_time_t) time(NULL);
230
231 /*
232 * We always start at the beginning of the segment. That prevents a broken
233 * segment (i.e., with no records in the first half of a segment) from
234 * being created by XLOG streaming, which might cause trouble later on if
235 * the segment is e.g archived.
236 */
237 if (recptr % XLogSegSize != 0)
238 recptr -= recptr % XLogSegSize;
239
240 SpinLockAcquire(&walrcv->mutex);
241
242 /* It better be stopped if we try to restart it */
243 Assert(walrcv->walRcvState == WALRCV_STOPPED ||
244 walrcv->walRcvState == WALRCV_WAITING);
245
246 if (conninfo != NULL)
247 strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
248 else
249 walrcv->conninfo[0] = '\0';
250
251 if (slotname != NULL)
252 strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
253 else
254 walrcv->slotname[0] = '\0';
255
256 if (walrcv->walRcvState == WALRCV_STOPPED)
257 {
258 launch = true;
259 walrcv->walRcvState = WALRCV_STARTING;
260 }
261 else
262 walrcv->walRcvState = WALRCV_RESTARTING;
263 walrcv->startTime = now;
264
265 /*
266 * If this is the first startup of walreceiver (on this timeline),
267 * initialize receivedUpto and latestChunkStart to the starting point.
268 */
269 if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
270 {
271 walrcv->receivedUpto = recptr;
272 walrcv->receivedTLI = tli;
273 walrcv->latestChunkStart = recptr;
274 }
275 walrcv->receiveStart = recptr;
276 walrcv->receiveStartTLI = tli;
277
278 SpinLockRelease(&walrcv->mutex);
279
280 if (launch)
281 SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
282 else
283 SetLatch(&walrcv->latch);
284 }
285
286 /*
287 * Returns the last+1 byte position that walreceiver has written.
288 *
289 * Optionally, returns the previous chunk start, that is the first byte
290 * written in the most recent walreceiver flush cycle. Callers not
291 * interested in that value may pass NULL for latestChunkStart. Same for
292 * receiveTLI.
293 */
294 XLogRecPtr
GetWalRcvWriteRecPtr(XLogRecPtr * latestChunkStart,TimeLineID * receiveTLI)295 GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
296 {
297 WalRcvData *walrcv = WalRcv;
298 XLogRecPtr recptr;
299
300 SpinLockAcquire(&walrcv->mutex);
301 recptr = walrcv->receivedUpto;
302 if (latestChunkStart)
303 *latestChunkStart = walrcv->latestChunkStart;
304 if (receiveTLI)
305 *receiveTLI = walrcv->receivedTLI;
306 SpinLockRelease(&walrcv->mutex);
307
308 return recptr;
309 }
310
311 /*
312 * Returns the replication apply delay in ms or -1
313 * if the apply delay info is not available
314 */
315 int
GetReplicationApplyDelay(void)316 GetReplicationApplyDelay(void)
317 {
318 WalRcvData *walrcv = WalRcv;
319 XLogRecPtr receivePtr;
320 XLogRecPtr replayPtr;
321 TimestampTz chunkReplayStartTime;
322
323 SpinLockAcquire(&walrcv->mutex);
324 receivePtr = walrcv->receivedUpto;
325 SpinLockRelease(&walrcv->mutex);
326
327 replayPtr = GetXLogReplayRecPtr(NULL);
328
329 if (receivePtr == replayPtr)
330 return 0;
331
332 chunkReplayStartTime = GetCurrentChunkReplayStartTime();
333
334 if (chunkReplayStartTime == 0)
335 return -1;
336
337 return TimestampDifferenceMilliseconds(chunkReplayStartTime,
338 GetCurrentTimestamp());
339 }
340
341 /*
342 * Returns the network latency in ms, note that this includes any
343 * difference in clock settings between the servers, as well as timezone.
344 */
345 int
GetReplicationTransferLatency(void)346 GetReplicationTransferLatency(void)
347 {
348 WalRcvData *walrcv = WalRcv;
349 TimestampTz lastMsgSendTime;
350 TimestampTz lastMsgReceiptTime;
351
352 SpinLockAcquire(&walrcv->mutex);
353 lastMsgSendTime = walrcv->lastMsgSendTime;
354 lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
355 SpinLockRelease(&walrcv->mutex);
356
357 return TimestampDifferenceMilliseconds(lastMsgSendTime,
358 lastMsgReceiptTime);
359 }
360