1 /*-------------------------------------------------------------------------
2 *
3 * syncrep.c
4 *
5 * Synchronous replication is new as of PostgreSQL 9.1.
6 *
7 * If requested, transaction commits wait until their commit LSN are
8 * acknowledged by the synchronous standbys.
9 *
10 * This module contains the code for waiting and release of backends.
11 * All code in this module executes on the primary. The core streaming
12 * replication transport remains within WALreceiver/WALsender modules.
13 *
14 * The essence of this design is that it isolates all logic about
15 * waiting/releasing onto the primary. The primary defines which standbys
16 * it wishes to wait for. The standbys are completely unaware of the
17 * durability requirements of transactions on the primary, reducing the
18 * complexity of the code and streamlining both standby operations and
19 * network bandwidth because there is no requirement to ship
20 * per-transaction state information.
21 *
22 * Replication is either synchronous or not synchronous (async). If it is
23 * async, we just fastpath out of here. If it is sync, then we wait for
24 * the write, flush or apply location on the standby before releasing
25 * the waiting backend. Further complexity in that interaction is
26 * expected in later releases.
27 *
28 * The best performing way to manage the waiting backends is to have a
29 * single ordered queue of waiting backends, so that we can avoid
30 * searching the through all waiters each time we receive a reply.
31 *
32 * In 9.5 or before only a single standby could be considered as
33 * synchronous. In 9.6 we support multiple synchronous standbys.
34 * The number of synchronous standbys that transactions must wait for
35 * replies from is specified in synchronous_standby_names.
36 * This parameter also specifies a list of standby names,
37 * which determines the priority of each standby for being chosen as
38 * a synchronous standby. The standbys whose names appear earlier
39 * in the list are given higher priority and will be considered as
40 * synchronous. Other standby servers appearing later in this list
41 * represent potential synchronous standbys. If any of the current
42 * synchronous standbys disconnects for whatever reason, it will be
43 * replaced immediately with the next-highest-priority standby.
44 *
45 * Before the standbys chosen from synchronous_standby_names can
46 * become the synchronous standbys they must have caught up with
47 * the primary; that may take some time. Once caught up,
48 * the current higher priority standbys which are considered as
49 * synchronous at that moment will release waiters from the queue.
50 *
51 * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group
52 *
53 * IDENTIFICATION
54 * src/backend/replication/syncrep.c
55 *
56 *-------------------------------------------------------------------------
57 */
58 #include "postgres.h"
59
60 #include <unistd.h>
61
62 #include "access/xact.h"
63 #include "miscadmin.h"
64 #include "replication/syncrep.h"
65 #include "replication/walsender.h"
66 #include "replication/walsender_private.h"
67 #include "storage/pmsignal.h"
68 #include "storage/proc.h"
69 #include "tcop/tcopprot.h"
70 #include "utils/builtins.h"
71 #include "utils/ps_status.h"
72
73 /* User-settable parameters for sync rep */
74 char *SyncRepStandbyNames;
75
76 #define SyncStandbysDefined() \
77 (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
78
79 static bool announce_next_takeover = true;
80
81 static SyncRepConfigData *SyncRepConfig = NULL;
82 static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
83
84 static void SyncRepQueueInsert(int mode);
85 static void SyncRepCancelWait(void);
86 static int SyncRepWakeQueue(bool all, int mode);
87
88 static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
89 XLogRecPtr *flushPtr,
90 XLogRecPtr *applyPtr,
91 bool *am_sync);
92 static int SyncRepGetStandbyPriority(void);
93
94 #ifdef USE_ASSERT_CHECKING
95 static bool SyncRepQueueIsOrderedByLSN(int mode);
96 #endif
97
98 /*
99 * ===========================================================
100 * Synchronous Replication functions for normal user backends
101 * ===========================================================
102 */
103
104 /*
105 * Wait for synchronous replication, if requested by user.
106 *
107 * Initially backends start in state SYNC_REP_NOT_WAITING and then
108 * change that state to SYNC_REP_WAITING before adding ourselves
109 * to the wait queue. During SyncRepWakeQueue() a WALSender changes
110 * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
111 * This backend then resets its state to SYNC_REP_NOT_WAITING.
112 *
113 * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
114 * represents a commit record. If it doesn't, then we wait only for the WAL
115 * to be flushed if synchronous_commit is set to the higher level of
116 * remote_apply, because only commit records provide apply feedback.
117 */
118 void
SyncRepWaitForLSN(XLogRecPtr lsn,bool commit)119 SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
120 {
121 char *new_status = NULL;
122 const char *old_status;
123 int mode;
124
125 /* Cap the level for anything other than commit to remote flush only. */
126 if (commit)
127 mode = SyncRepWaitMode;
128 else
129 mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
130
131 /*
132 * Fast exit if user has not requested sync replication, or there are no
133 * sync replication standby names defined. Note that those standbys don't
134 * need to be connected.
135 */
136 if (!SyncRepRequested() || !SyncStandbysDefined())
137 return;
138
139 Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
140 Assert(WalSndCtl != NULL);
141
142 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
143 Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
144
145 /*
146 * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
147 * set. See SyncRepUpdateSyncStandbysDefined.
148 *
149 * Also check that the standby hasn't already replied. Unlikely race
150 * condition but we'll be fetching that cache line anyway so it's likely
151 * to be a low cost check.
152 */
153 if (!WalSndCtl->sync_standbys_defined ||
154 lsn <= WalSndCtl->lsn[mode])
155 {
156 LWLockRelease(SyncRepLock);
157 return;
158 }
159
160 /*
161 * Set our waitLSN so WALSender will know when to wake us, and add
162 * ourselves to the queue.
163 */
164 MyProc->waitLSN = lsn;
165 MyProc->syncRepState = SYNC_REP_WAITING;
166 SyncRepQueueInsert(mode);
167 Assert(SyncRepQueueIsOrderedByLSN(mode));
168 LWLockRelease(SyncRepLock);
169
170 /* Alter ps display to show waiting for sync rep. */
171 if (update_process_title)
172 {
173 int len;
174
175 old_status = get_ps_display(&len);
176 new_status = (char *) palloc(len + 32 + 1);
177 memcpy(new_status, old_status, len);
178 sprintf(new_status + len, " waiting for %X/%X",
179 (uint32) (lsn >> 32), (uint32) lsn);
180 set_ps_display(new_status, false);
181 new_status[len] = '\0'; /* truncate off " waiting ..." */
182 }
183
184 /*
185 * Wait for specified LSN to be confirmed.
186 *
187 * Each proc has its own wait latch, so we perform a normal latch
188 * check/wait loop here.
189 */
190 for (;;)
191 {
192 /* Must reset the latch before testing state. */
193 ResetLatch(MyLatch);
194
195 /*
196 * Acquiring the lock is not needed, the latch ensures proper
197 * barriers. If it looks like we're done, we must really be done,
198 * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
199 * it will never update it again, so we can't be seeing a stale value
200 * in that case.
201 */
202 if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
203 break;
204
205 /*
206 * If a wait for synchronous replication is pending, we can neither
207 * acknowledge the commit nor raise ERROR or FATAL. The latter would
208 * lead the client to believe that the transaction aborted, which is
209 * not true: it's already committed locally. The former is no good
210 * either: the client has requested synchronous replication, and is
211 * entitled to assume that an acknowledged commit is also replicated,
212 * which might not be true. So in this case we issue a WARNING (which
213 * some clients may be able to interpret) and shut off further output.
214 * We do NOT reset ProcDiePending, so that the process will die after
215 * the commit is cleaned up.
216 */
217 if (ProcDiePending)
218 {
219 ereport(WARNING,
220 (errcode(ERRCODE_ADMIN_SHUTDOWN),
221 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
222 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
223 whereToSendOutput = DestNone;
224 SyncRepCancelWait();
225 break;
226 }
227
228 /*
229 * It's unclear what to do if a query cancel interrupt arrives. We
230 * can't actually abort at this point, but ignoring the interrupt
231 * altogether is not helpful, so we just terminate the wait with a
232 * suitable warning.
233 */
234 if (QueryCancelPending)
235 {
236 QueryCancelPending = false;
237 ereport(WARNING,
238 (errmsg("canceling wait for synchronous replication due to user request"),
239 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
240 SyncRepCancelWait();
241 break;
242 }
243
244 /*
245 * If the postmaster dies, we'll probably never get an
246 * acknowledgement, because all the wal sender processes will exit. So
247 * just bail out.
248 */
249 if (!PostmasterIsAlive())
250 {
251 ProcDiePending = true;
252 whereToSendOutput = DestNone;
253 SyncRepCancelWait();
254 break;
255 }
256
257 /*
258 * Wait on latch. Any condition that should wake us up will set the
259 * latch, so no need for timeout.
260 */
261 WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1);
262 }
263
264 /*
265 * WalSender has checked our LSN and has removed us from queue. Clean up
266 * state and leave. It's OK to reset these shared memory fields without
267 * holding SyncRepLock, because any walsenders will ignore us anyway when
268 * we're not on the queue. We need a read barrier to make sure we see
269 * the changes to the queue link (this might be unnecessary without
270 * assertions, but better safe than sorry).
271 */
272 pg_read_barrier();
273 Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
274 MyProc->syncRepState = SYNC_REP_NOT_WAITING;
275 MyProc->waitLSN = 0;
276
277 if (new_status)
278 {
279 /* Reset ps display */
280 set_ps_display(new_status, false);
281 pfree(new_status);
282 }
283 }
284
285 /*
286 * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
287 *
288 * Usually we will go at tail of queue, though it's possible that we arrive
289 * here out of order, so start at tail and work back to insertion point.
290 */
291 static void
SyncRepQueueInsert(int mode)292 SyncRepQueueInsert(int mode)
293 {
294 PGPROC *proc;
295
296 Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
297 proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
298 &(WalSndCtl->SyncRepQueue[mode]),
299 offsetof(PGPROC, syncRepLinks));
300
301 while (proc)
302 {
303 /*
304 * Stop at the queue element that we should after to ensure the queue
305 * is ordered by LSN.
306 */
307 if (proc->waitLSN < MyProc->waitLSN)
308 break;
309
310 proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
311 &(proc->syncRepLinks),
312 offsetof(PGPROC, syncRepLinks));
313 }
314
315 if (proc)
316 SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
317 else
318 SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
319 }
320
321 /*
322 * Acquire SyncRepLock and cancel any wait currently in progress.
323 */
324 static void
SyncRepCancelWait(void)325 SyncRepCancelWait(void)
326 {
327 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
328 if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
329 SHMQueueDelete(&(MyProc->syncRepLinks));
330 MyProc->syncRepState = SYNC_REP_NOT_WAITING;
331 LWLockRelease(SyncRepLock);
332 }
333
334 void
SyncRepCleanupAtProcExit(void)335 SyncRepCleanupAtProcExit(void)
336 {
337 /*
338 * First check if we are removed from the queue without the lock to not
339 * slow down backend exit.
340 */
341 if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
342 {
343 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
344
345 /* maybe we have just been removed, so recheck */
346 if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
347 SHMQueueDelete(&(MyProc->syncRepLinks));
348
349 LWLockRelease(SyncRepLock);
350 }
351 }
352
353 /*
354 * ===========================================================
355 * Synchronous Replication functions for wal sender processes
356 * ===========================================================
357 */
358
359 /*
360 * Take any action required to initialise sync rep state from config
361 * data. Called at WALSender startup and after each SIGHUP.
362 */
363 void
SyncRepInitConfig(void)364 SyncRepInitConfig(void)
365 {
366 int priority;
367
368 /*
369 * Determine if we are a potential sync standby and remember the result
370 * for handling replies from standby.
371 */
372 priority = SyncRepGetStandbyPriority();
373 if (MyWalSnd->sync_standby_priority != priority)
374 {
375 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
376 MyWalSnd->sync_standby_priority = priority;
377 LWLockRelease(SyncRepLock);
378 ereport(DEBUG1,
379 (errmsg("standby \"%s\" now has synchronous standby priority %u",
380 application_name, priority)));
381 }
382 }
383
384 /*
385 * Update the LSNs on each queue based upon our latest state. This
386 * implements a simple policy of first-valid-sync-standby-releases-waiter.
387 *
388 * Other policies are possible, which would change what we do here and
389 * perhaps also which information we store as well.
390 */
391 void
SyncRepReleaseWaiters(void)392 SyncRepReleaseWaiters(void)
393 {
394 volatile WalSndCtlData *walsndctl = WalSndCtl;
395 XLogRecPtr writePtr;
396 XLogRecPtr flushPtr;
397 XLogRecPtr applyPtr;
398 bool got_oldest;
399 bool am_sync;
400 int numwrite = 0;
401 int numflush = 0;
402 int numapply = 0;
403
404 /*
405 * If this WALSender is serving a standby that is not on the list of
406 * potential sync standbys then we have nothing to do. If we are still
407 * starting up, still running base backup or the current flush position is
408 * still invalid, then leave quickly also. Streaming or stopping WAL
409 * senders are allowed to release waiters.
410 */
411 if (MyWalSnd->sync_standby_priority == 0 ||
412 (MyWalSnd->state != WALSNDSTATE_STREAMING &&
413 MyWalSnd->state != WALSNDSTATE_STOPPING) ||
414 XLogRecPtrIsInvalid(MyWalSnd->flush))
415 {
416 announce_next_takeover = true;
417 return;
418 }
419
420 /*
421 * We're a potential sync standby. Release waiters if there are enough
422 * sync standbys and we are considered as sync.
423 */
424 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
425
426 /*
427 * Check whether we are a sync standby or not, and calculate the oldest
428 * positions among all sync standbys.
429 */
430 got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr,
431 &applyPtr, &am_sync);
432
433 /*
434 * If we are managing a sync standby, though we weren't prior to this,
435 * then announce we are now a sync standby.
436 */
437 if (announce_next_takeover && am_sync)
438 {
439 announce_next_takeover = false;
440 ereport(LOG,
441 (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
442 application_name, MyWalSnd->sync_standby_priority)));
443 }
444
445 /*
446 * If the number of sync standbys is less than requested or we aren't
447 * managing a sync standby then just leave.
448 */
449 if (!got_oldest || !am_sync)
450 {
451 LWLockRelease(SyncRepLock);
452 announce_next_takeover = !am_sync;
453 return;
454 }
455
456 /*
457 * Set the lsn first so that when we wake backends they will release up to
458 * this location.
459 */
460 if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
461 {
462 walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
463 numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
464 }
465 if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
466 {
467 walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
468 numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
469 }
470 if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
471 {
472 walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
473 numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
474 }
475
476 LWLockRelease(SyncRepLock);
477
478 elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
479 numwrite, (uint32) (writePtr >> 32), (uint32) writePtr,
480 numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr,
481 numapply, (uint32) (applyPtr >> 32), (uint32) applyPtr);
482 }
483
484 /*
485 * Calculate the oldest Write, Flush and Apply positions among sync standbys.
486 *
487 * Return false if the number of sync standbys is less than
488 * synchronous_standby_names specifies. Otherwise return true and
489 * store the oldest positions into *writePtr, *flushPtr and *applyPtr.
490 *
491 * On return, *am_sync is set to true if this walsender is connecting to
492 * sync standby. Otherwise it's set to false.
493 */
494 static bool
SyncRepGetOldestSyncRecPtr(XLogRecPtr * writePtr,XLogRecPtr * flushPtr,XLogRecPtr * applyPtr,bool * am_sync)495 SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
496 XLogRecPtr *applyPtr, bool *am_sync)
497 {
498 List *sync_standbys;
499 ListCell *cell;
500
501 *writePtr = InvalidXLogRecPtr;
502 *flushPtr = InvalidXLogRecPtr;
503 *applyPtr = InvalidXLogRecPtr;
504 *am_sync = false;
505
506 /* Get standbys that are considered as synchronous at this moment */
507 sync_standbys = SyncRepGetSyncStandbys(am_sync);
508
509 /*
510 * Quick exit if we are not managing a sync standby or there are not
511 * enough synchronous standbys.
512 */
513 if (!(*am_sync) ||
514 SyncRepConfig == NULL ||
515 list_length(sync_standbys) < SyncRepConfig->num_sync)
516 {
517 list_free(sync_standbys);
518 return false;
519 }
520
521 /*
522 * Scan through all sync standbys and calculate the oldest Write, Flush
523 * and Apply positions.
524 */
525 foreach(cell, sync_standbys)
526 {
527 WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
528 XLogRecPtr write;
529 XLogRecPtr flush;
530 XLogRecPtr apply;
531
532 SpinLockAcquire(&walsnd->mutex);
533 write = walsnd->write;
534 flush = walsnd->flush;
535 apply = walsnd->apply;
536 SpinLockRelease(&walsnd->mutex);
537
538 if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
539 *writePtr = write;
540 if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
541 *flushPtr = flush;
542 if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
543 *applyPtr = apply;
544 }
545
546 list_free(sync_standbys);
547 return true;
548 }
549
550 /*
551 * Return the list of sync standbys, or NIL if no sync standby is connected.
552 *
553 * If there are multiple standbys with the same priority,
554 * the first one found is selected preferentially.
555 * The caller must hold SyncRepLock.
556 *
557 * On return, *am_sync is set to true if this walsender is connecting to
558 * sync standby. Otherwise it's set to false.
559 */
560 List *
SyncRepGetSyncStandbys(bool * am_sync)561 SyncRepGetSyncStandbys(bool *am_sync)
562 {
563 List *result = NIL;
564 List *pending = NIL;
565 int lowest_priority;
566 int next_highest_priority;
567 int this_priority;
568 int priority;
569 int i;
570 bool am_in_pending = false;
571 volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
572 * rearrangement */
573
574 /* Set default result */
575 if (am_sync != NULL)
576 *am_sync = false;
577
578 /* Quick exit if sync replication is not requested */
579 if (SyncRepConfig == NULL)
580 return NIL;
581
582 lowest_priority = SyncRepConfig->nmembers;
583 next_highest_priority = lowest_priority + 1;
584
585 /*
586 * Find the sync standbys which have the highest priority (i.e, 1). Also
587 * store all the other potential sync standbys into the pending list, in
588 * order to scan it later and find other sync standbys from it quickly.
589 */
590 for (i = 0; i < max_wal_senders; i++)
591 {
592 walsnd = &WalSndCtl->walsnds[i];
593
594 /* Must be active */
595 if (walsnd->pid == 0)
596 continue;
597
598 /* Must be streaming or stopping */
599 if (walsnd->state != WALSNDSTATE_STREAMING &&
600 walsnd->state != WALSNDSTATE_STOPPING)
601 continue;
602
603 /* Must be synchronous */
604 this_priority = walsnd->sync_standby_priority;
605 if (this_priority == 0)
606 continue;
607
608 /* Must have a valid flush position */
609 if (XLogRecPtrIsInvalid(walsnd->flush))
610 continue;
611
612 /*
613 * If the priority is equal to 1, consider this standby as sync and
614 * append it to the result. Otherwise append this standby to the
615 * pending list to check if it's actually sync or not later.
616 */
617 if (this_priority == 1)
618 {
619 result = lappend_int(result, i);
620 if (am_sync != NULL && walsnd == MyWalSnd)
621 *am_sync = true;
622 if (list_length(result) == SyncRepConfig->num_sync)
623 {
624 list_free(pending);
625 return result; /* Exit if got enough sync standbys */
626 }
627 }
628 else
629 {
630 pending = lappend_int(pending, i);
631 if (am_sync != NULL && walsnd == MyWalSnd)
632 am_in_pending = true;
633
634 /*
635 * Track the highest priority among the standbys in the pending
636 * list, in order to use it as the starting priority for later
637 * scan of the list. This is useful to find quickly the sync
638 * standbys from the pending list later because we can skip
639 * unnecessary scans for the unused priorities.
640 */
641 if (this_priority < next_highest_priority)
642 next_highest_priority = this_priority;
643 }
644 }
645
646 /*
647 * Consider all pending standbys as sync if the number of them plus
648 * already-found sync ones is lower than the configuration requests.
649 */
650 if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
651 {
652 bool needfree = (result != NIL && pending != NIL);
653
654 /*
655 * Set *am_sync to true if this walsender is in the pending list
656 * because all pending standbys are considered as sync.
657 */
658 if (am_sync != NULL && !(*am_sync))
659 *am_sync = am_in_pending;
660
661 result = list_concat(result, pending);
662 if (needfree)
663 pfree(pending);
664 return result;
665 }
666
667 /*
668 * Find the sync standbys from the pending list.
669 */
670 priority = next_highest_priority;
671 while (priority <= lowest_priority)
672 {
673 ListCell *cell;
674 ListCell *prev = NULL;
675 ListCell *next;
676
677 next_highest_priority = lowest_priority + 1;
678
679 for (cell = list_head(pending); cell != NULL; cell = next)
680 {
681 i = lfirst_int(cell);
682 walsnd = &WalSndCtl->walsnds[i];
683
684 next = lnext(cell);
685
686 this_priority = walsnd->sync_standby_priority;
687 if (this_priority == priority)
688 {
689 result = lappend_int(result, i);
690 if (am_sync != NULL && walsnd == MyWalSnd)
691 *am_sync = true;
692
693 /*
694 * We should always exit here after the scan of pending list
695 * starts because we know that the list has enough elements to
696 * reach SyncRepConfig->num_sync.
697 */
698 if (list_length(result) == SyncRepConfig->num_sync)
699 {
700 list_free(pending);
701 return result; /* Exit if got enough sync standbys */
702 }
703
704 /*
705 * Remove the entry for this sync standby from the list to
706 * prevent us from looking at the same entry again.
707 */
708 pending = list_delete_cell(pending, cell, prev);
709
710 continue;
711 }
712
713 if (this_priority < next_highest_priority)
714 next_highest_priority = this_priority;
715
716 prev = cell;
717 }
718
719 priority = next_highest_priority;
720 }
721
722 /*
723 * We might get here if the set of sync_standby_priority values in shared
724 * memory is inconsistent, as can happen transiently after a change in the
725 * synchronous_standby_names setting. In that case, just return the
726 * incomplete list we have so far. That will cause the caller to decide
727 * there aren't enough synchronous candidates, which should be a safe
728 * choice until the priority values become consistent again.
729 */
730 list_free(pending);
731 return result;
732 }
733
734 /*
735 * Check if we are in the list of sync standbys, and if so, determine
736 * priority sequence. Return priority if set, or zero to indicate that
737 * we are not a potential sync standby.
738 *
739 * Compare the parameter SyncRepStandbyNames against the application_name
740 * for this WALSender, or allow any name if we find a wildcard "*".
741 */
742 static int
SyncRepGetStandbyPriority(void)743 SyncRepGetStandbyPriority(void)
744 {
745 const char *standby_name;
746 int priority;
747 bool found = false;
748
749 /*
750 * Since synchronous cascade replication is not allowed, we always set the
751 * priority of cascading walsender to zero.
752 */
753 if (am_cascading_walsender)
754 return 0;
755
756 if (!SyncStandbysDefined() || SyncRepConfig == NULL)
757 return 0;
758
759 standby_name = SyncRepConfig->member_names;
760 for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
761 {
762 if (pg_strcasecmp(standby_name, application_name) == 0 ||
763 strcmp(standby_name, "*") == 0)
764 {
765 found = true;
766 break;
767 }
768 standby_name += strlen(standby_name) + 1;
769 }
770
771 return (found ? priority : 0);
772 }
773
774 /*
775 * Walk the specified queue from head. Set the state of any backends that
776 * need to be woken, remove them from the queue, and then wake them.
777 * Pass all = true to wake whole queue; otherwise, just wake up to
778 * the walsender's LSN.
779 *
780 * Must hold SyncRepLock.
781 */
782 static int
SyncRepWakeQueue(bool all,int mode)783 SyncRepWakeQueue(bool all, int mode)
784 {
785 volatile WalSndCtlData *walsndctl = WalSndCtl;
786 PGPROC *proc = NULL;
787 PGPROC *thisproc = NULL;
788 int numprocs = 0;
789
790 Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
791 Assert(SyncRepQueueIsOrderedByLSN(mode));
792
793 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
794 &(WalSndCtl->SyncRepQueue[mode]),
795 offsetof(PGPROC, syncRepLinks));
796
797 while (proc)
798 {
799 /*
800 * Assume the queue is ordered by LSN
801 */
802 if (!all && walsndctl->lsn[mode] < proc->waitLSN)
803 return numprocs;
804
805 /*
806 * Move to next proc, so we can delete thisproc from the queue.
807 * thisproc is valid, proc may be NULL after this.
808 */
809 thisproc = proc;
810 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
811 &(proc->syncRepLinks),
812 offsetof(PGPROC, syncRepLinks));
813
814 /*
815 * Remove thisproc from queue.
816 */
817 SHMQueueDelete(&(thisproc->syncRepLinks));
818
819 /*
820 * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
821 * make sure that it sees the queue link being removed before the
822 * syncRepState change.
823 */
824 pg_write_barrier();
825
826 /*
827 * Set state to complete; see SyncRepWaitForLSN() for discussion of
828 * the various states.
829 */
830 thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
831
832 /*
833 * Wake only when we have set state and removed from queue.
834 */
835 SetLatch(&(thisproc->procLatch));
836
837 numprocs++;
838 }
839
840 return numprocs;
841 }
842
843 /*
844 * The checkpointer calls this as needed to update the shared
845 * sync_standbys_defined flag, so that backends don't remain permanently wedged
846 * if synchronous_standby_names is unset. It's safe to check the current value
847 * without the lock, because it's only ever updated by one process. But we
848 * must take the lock to change it.
849 */
850 void
SyncRepUpdateSyncStandbysDefined(void)851 SyncRepUpdateSyncStandbysDefined(void)
852 {
853 bool sync_standbys_defined = SyncStandbysDefined();
854
855 if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
856 {
857 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
858
859 /*
860 * If synchronous_standby_names has been reset to empty, it's futile
861 * for backends to continue to waiting. Since the user no longer
862 * wants synchronous replication, we'd better wake them up.
863 */
864 if (!sync_standbys_defined)
865 {
866 int i;
867
868 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
869 SyncRepWakeQueue(true, i);
870 }
871
872 /*
873 * Only allow people to join the queue when there are synchronous
874 * standbys defined. Without this interlock, there's a race
875 * condition: we might wake up all the current waiters; then, some
876 * backend that hasn't yet reloaded its config might go to sleep on
877 * the queue (and never wake up). This prevents that.
878 */
879 WalSndCtl->sync_standbys_defined = sync_standbys_defined;
880
881 LWLockRelease(SyncRepLock);
882 }
883 }
884
885 #ifdef USE_ASSERT_CHECKING
886 static bool
SyncRepQueueIsOrderedByLSN(int mode)887 SyncRepQueueIsOrderedByLSN(int mode)
888 {
889 PGPROC *proc = NULL;
890 XLogRecPtr lastLSN;
891
892 Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
893
894 lastLSN = 0;
895
896 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
897 &(WalSndCtl->SyncRepQueue[mode]),
898 offsetof(PGPROC, syncRepLinks));
899
900 while (proc)
901 {
902 /*
903 * Check the queue is ordered by LSN and that multiple procs don't
904 * have matching LSNs
905 */
906 if (proc->waitLSN <= lastLSN)
907 return false;
908
909 lastLSN = proc->waitLSN;
910
911 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
912 &(proc->syncRepLinks),
913 offsetof(PGPROC, syncRepLinks));
914 }
915
916 return true;
917 }
918 #endif
919
920 /*
921 * ===========================================================
922 * Synchronous Replication functions executed by any process
923 * ===========================================================
924 */
925
926 bool
check_synchronous_standby_names(char ** newval,void ** extra,GucSource source)927 check_synchronous_standby_names(char **newval, void **extra, GucSource source)
928 {
929 if (*newval != NULL && (*newval)[0] != '\0')
930 {
931 int parse_rc;
932 SyncRepConfigData *pconf;
933
934 /* Reset communication variables to ensure a fresh start */
935 syncrep_parse_result = NULL;
936 syncrep_parse_error_msg = NULL;
937
938 /* Parse the synchronous_standby_names string */
939 syncrep_scanner_init(*newval);
940 parse_rc = syncrep_yyparse();
941 syncrep_scanner_finish();
942
943 if (parse_rc != 0 || syncrep_parse_result == NULL)
944 {
945 GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
946 if (syncrep_parse_error_msg)
947 GUC_check_errdetail("%s", syncrep_parse_error_msg);
948 else
949 GUC_check_errdetail("synchronous_standby_names parser failed");
950 return false;
951 }
952
953 if (syncrep_parse_result->num_sync <= 0)
954 {
955 GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
956 syncrep_parse_result->num_sync);
957 return false;
958 }
959
960 /* GUC extra value must be malloc'd, not palloc'd */
961 pconf = (SyncRepConfigData *)
962 malloc(syncrep_parse_result->config_size);
963 if (pconf == NULL)
964 return false;
965 memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
966
967 *extra = (void *) pconf;
968
969 /*
970 * We need not explicitly clean up syncrep_parse_result. It, and any
971 * other cruft generated during parsing, will be freed when the
972 * current memory context is deleted. (This code is generally run in
973 * a short-lived context used for config file processing, so that will
974 * not be very long.)
975 */
976 }
977 else
978 *extra = NULL;
979
980 return true;
981 }
982
983 void
assign_synchronous_standby_names(const char * newval,void * extra)984 assign_synchronous_standby_names(const char *newval, void *extra)
985 {
986 SyncRepConfig = (SyncRepConfigData *) extra;
987 }
988
989 void
assign_synchronous_commit(int newval,void * extra)990 assign_synchronous_commit(int newval, void *extra)
991 {
992 switch (newval)
993 {
994 case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
995 SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
996 break;
997 case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
998 SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
999 break;
1000 case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
1001 SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
1002 break;
1003 default:
1004 SyncRepWaitMode = SYNC_REP_NO_WAIT;
1005 break;
1006 }
1007 }
1008