1 /*-------------------------------------------------------------------------
2 *
3 * walreceiverfuncs.c
4 *
5 * This file contains functions used by the startup process to communicate
6 * with the walreceiver process. Functions implementing walreceiver itself
7 * are in walreceiver.c.
8 *
9 * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group
10 *
11 *
12 * IDENTIFICATION
13 * src/backend/replication/walreceiverfuncs.c
14 *
15 *-------------------------------------------------------------------------
16 */
17 #include "postgres.h"
18
19 #include <sys/stat.h>
20 #include <sys/time.h>
21 #include <time.h>
22 #include <unistd.h>
23 #include <signal.h>
24
25 #include "access/xlog_internal.h"
26 #include "postmaster/startup.h"
27 #include "replication/walreceiver.h"
28 #include "storage/pmsignal.h"
29 #include "storage/shmem.h"
30 #include "utils/timestamp.h"
31
32 WalRcvData *WalRcv = NULL;
33
34 /*
35 * How long to wait for walreceiver to start up after requesting
36 * postmaster to launch it. In seconds.
37 */
38 #define WALRCV_STARTUP_TIMEOUT 10
39
40 /* Report shared memory space needed by WalRcvShmemInit */
41 Size
WalRcvShmemSize(void)42 WalRcvShmemSize(void)
43 {
44 Size size = 0;
45
46 size = add_size(size, sizeof(WalRcvData));
47
48 return size;
49 }
50
51 /* Allocate and initialize walreceiver-related shared memory */
52 void
WalRcvShmemInit(void)53 WalRcvShmemInit(void)
54 {
55 bool found;
56
57 WalRcv = (WalRcvData *)
58 ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
59
60 if (!found)
61 {
62 /* First time through, so initialize */
63 MemSet(WalRcv, 0, WalRcvShmemSize());
64 WalRcv->walRcvState = WALRCV_STOPPED;
65 SpinLockInit(&WalRcv->mutex);
66 pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
67 WalRcv->latch = NULL;
68 }
69 }
70
71 /* Is walreceiver running (or starting up)? */
72 bool
WalRcvRunning(void)73 WalRcvRunning(void)
74 {
75 WalRcvData *walrcv = WalRcv;
76 WalRcvState state;
77 pg_time_t startTime;
78
79 SpinLockAcquire(&walrcv->mutex);
80
81 state = walrcv->walRcvState;
82 startTime = walrcv->startTime;
83
84 SpinLockRelease(&walrcv->mutex);
85
86 /*
87 * If it has taken too long for walreceiver to start up, give up. Setting
88 * the state to STOPPED ensures that if walreceiver later does start up
89 * after all, it will see that it's not supposed to be running and die
90 * without doing anything.
91 */
92 if (state == WALRCV_STARTING)
93 {
94 pg_time_t now = (pg_time_t) time(NULL);
95
96 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
97 {
98 SpinLockAcquire(&walrcv->mutex);
99
100 if (walrcv->walRcvState == WALRCV_STARTING)
101 state = walrcv->walRcvState = WALRCV_STOPPED;
102
103 SpinLockRelease(&walrcv->mutex);
104 }
105 }
106
107 if (state != WALRCV_STOPPED)
108 return true;
109 else
110 return false;
111 }
112
113 /*
114 * Is walreceiver running and streaming (or at least attempting to connect,
115 * or starting up)?
116 */
117 bool
WalRcvStreaming(void)118 WalRcvStreaming(void)
119 {
120 WalRcvData *walrcv = WalRcv;
121 WalRcvState state;
122 pg_time_t startTime;
123
124 SpinLockAcquire(&walrcv->mutex);
125
126 state = walrcv->walRcvState;
127 startTime = walrcv->startTime;
128
129 SpinLockRelease(&walrcv->mutex);
130
131 /*
132 * If it has taken too long for walreceiver to start up, give up. Setting
133 * the state to STOPPED ensures that if walreceiver later does start up
134 * after all, it will see that it's not supposed to be running and die
135 * without doing anything.
136 */
137 if (state == WALRCV_STARTING)
138 {
139 pg_time_t now = (pg_time_t) time(NULL);
140
141 if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
142 {
143 SpinLockAcquire(&walrcv->mutex);
144
145 if (walrcv->walRcvState == WALRCV_STARTING)
146 state = walrcv->walRcvState = WALRCV_STOPPED;
147
148 SpinLockRelease(&walrcv->mutex);
149 }
150 }
151
152 if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
153 state == WALRCV_RESTARTING)
154 return true;
155 else
156 return false;
157 }
158
159 /*
160 * Stop walreceiver (if running) and wait for it to die.
161 * Executed by the Startup process.
162 */
163 void
ShutdownWalRcv(void)164 ShutdownWalRcv(void)
165 {
166 WalRcvData *walrcv = WalRcv;
167 pid_t walrcvpid = 0;
168
169 /*
170 * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
171 * mode once it's finished, and will also request postmaster to not
172 * restart itself.
173 */
174 SpinLockAcquire(&walrcv->mutex);
175 switch (walrcv->walRcvState)
176 {
177 case WALRCV_STOPPED:
178 break;
179 case WALRCV_STARTING:
180 walrcv->walRcvState = WALRCV_STOPPED;
181 break;
182
183 case WALRCV_STREAMING:
184 case WALRCV_WAITING:
185 case WALRCV_RESTARTING:
186 walrcv->walRcvState = WALRCV_STOPPING;
187 /* fall through */
188 case WALRCV_STOPPING:
189 walrcvpid = walrcv->pid;
190 break;
191 }
192 SpinLockRelease(&walrcv->mutex);
193
194 /*
195 * Signal walreceiver process if it was still running.
196 */
197 if (walrcvpid != 0)
198 kill(walrcvpid, SIGTERM);
199
200 /*
201 * Wait for walreceiver to acknowledge its death by setting state to
202 * WALRCV_STOPPED.
203 */
204 while (WalRcvRunning())
205 {
206 /*
207 * This possibly-long loop needs to handle interrupts of startup
208 * process.
209 */
210 HandleStartupProcInterrupts();
211
212 pg_usleep(100000); /* 100ms */
213 }
214 }
215
216 /*
217 * Request postmaster to start walreceiver.
218 *
219 * "recptr" indicates the position where streaming should begin. "conninfo"
220 * is a libpq connection string to use. "slotname" is, optionally, the name
221 * of a replication slot to acquire. "create_temp_slot" indicates to create
222 * a temporary slot when no "slotname" is given.
223 *
224 * WAL receivers do not directly load GUC parameters used for the connection
225 * to the primary, and rely on the values passed down by the caller of this
226 * routine instead. Hence, the addition of any new parameters should happen
227 * through this code path.
228 */
229 void
RequestXLogStreaming(TimeLineID tli,XLogRecPtr recptr,const char * conninfo,const char * slotname,bool create_temp_slot)230 RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
231 const char *slotname, bool create_temp_slot)
232 {
233 WalRcvData *walrcv = WalRcv;
234 bool launch = false;
235 pg_time_t now = (pg_time_t) time(NULL);
236 Latch *latch;
237
238 /*
239 * We always start at the beginning of the segment. That prevents a broken
240 * segment (i.e., with no records in the first half of a segment) from
241 * being created by XLOG streaming, which might cause trouble later on if
242 * the segment is e.g archived.
243 */
244 if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
245 recptr -= XLogSegmentOffset(recptr, wal_segment_size);
246
247 SpinLockAcquire(&walrcv->mutex);
248
249 /* It better be stopped if we try to restart it */
250 Assert(walrcv->walRcvState == WALRCV_STOPPED ||
251 walrcv->walRcvState == WALRCV_WAITING);
252
253 if (conninfo != NULL)
254 strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
255 else
256 walrcv->conninfo[0] = '\0';
257
258 /*
259 * Use configured replication slot if present, and ignore the value of
260 * create_temp_slot as the slot name should be persistent. Otherwise, use
261 * create_temp_slot to determine whether this WAL receiver should create a
262 * temporary slot by itself and use it, or not.
263 */
264 if (slotname != NULL && slotname[0] != '\0')
265 {
266 strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
267 walrcv->is_temp_slot = false;
268 }
269 else
270 {
271 walrcv->slotname[0] = '\0';
272 walrcv->is_temp_slot = create_temp_slot;
273 }
274
275 if (walrcv->walRcvState == WALRCV_STOPPED)
276 {
277 launch = true;
278 walrcv->walRcvState = WALRCV_STARTING;
279 }
280 else
281 walrcv->walRcvState = WALRCV_RESTARTING;
282 walrcv->startTime = now;
283
284 /*
285 * If this is the first startup of walreceiver (on this timeline),
286 * initialize flushedUpto and latestChunkStart to the starting point.
287 */
288 if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
289 {
290 walrcv->flushedUpto = recptr;
291 walrcv->receivedTLI = tli;
292 walrcv->latestChunkStart = recptr;
293 }
294 walrcv->receiveStart = recptr;
295 walrcv->receiveStartTLI = tli;
296
297 latch = walrcv->latch;
298
299 SpinLockRelease(&walrcv->mutex);
300
301 if (launch)
302 SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
303 else if (latch)
304 SetLatch(latch);
305 }
306
307 /*
308 * Returns the last+1 byte position that walreceiver has flushed.
309 *
310 * Optionally, returns the previous chunk start, that is the first byte
311 * written in the most recent walreceiver flush cycle. Callers not
312 * interested in that value may pass NULL for latestChunkStart. Same for
313 * receiveTLI.
314 */
315 XLogRecPtr
GetWalRcvFlushRecPtr(XLogRecPtr * latestChunkStart,TimeLineID * receiveTLI)316 GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
317 {
318 WalRcvData *walrcv = WalRcv;
319 XLogRecPtr recptr;
320
321 SpinLockAcquire(&walrcv->mutex);
322 recptr = walrcv->flushedUpto;
323 if (latestChunkStart)
324 *latestChunkStart = walrcv->latestChunkStart;
325 if (receiveTLI)
326 *receiveTLI = walrcv->receivedTLI;
327 SpinLockRelease(&walrcv->mutex);
328
329 return recptr;
330 }
331
332 /*
333 * Returns the last+1 byte position that walreceiver has written.
334 * This returns a recently written value without taking a lock.
335 */
336 XLogRecPtr
GetWalRcvWriteRecPtr(void)337 GetWalRcvWriteRecPtr(void)
338 {
339 WalRcvData *walrcv = WalRcv;
340
341 return pg_atomic_read_u64(&walrcv->writtenUpto);
342 }
343
344 /*
345 * Returns the replication apply delay in ms or -1
346 * if the apply delay info is not available
347 */
348 int
GetReplicationApplyDelay(void)349 GetReplicationApplyDelay(void)
350 {
351 WalRcvData *walrcv = WalRcv;
352 XLogRecPtr receivePtr;
353 XLogRecPtr replayPtr;
354 TimestampTz chunkReplayStartTime;
355
356 SpinLockAcquire(&walrcv->mutex);
357 receivePtr = walrcv->flushedUpto;
358 SpinLockRelease(&walrcv->mutex);
359
360 replayPtr = GetXLogReplayRecPtr(NULL);
361
362 if (receivePtr == replayPtr)
363 return 0;
364
365 chunkReplayStartTime = GetCurrentChunkReplayStartTime();
366
367 if (chunkReplayStartTime == 0)
368 return -1;
369
370 return TimestampDifferenceMilliseconds(chunkReplayStartTime,
371 GetCurrentTimestamp());
372 }
373
374 /*
375 * Returns the network latency in ms, note that this includes any
376 * difference in clock settings between the servers, as well as timezone.
377 */
378 int
GetReplicationTransferLatency(void)379 GetReplicationTransferLatency(void)
380 {
381 WalRcvData *walrcv = WalRcv;
382 TimestampTz lastMsgSendTime;
383 TimestampTz lastMsgReceiptTime;
384
385 SpinLockAcquire(&walrcv->mutex);
386 lastMsgSendTime = walrcv->lastMsgSendTime;
387 lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
388 SpinLockRelease(&walrcv->mutex);
389
390 return TimestampDifferenceMilliseconds(lastMsgSendTime,
391 lastMsgReceiptTime);
392 }
393