1 /*-------------------------------------------------------------------------
2 *
3 * syncrep.c
4 *
5 * Synchronous replication is new as of PostgreSQL 9.1.
6 *
7 * If requested, transaction commits wait until their commit LSN are
8 * acknowledged by the synchronous standbys.
9 *
10 * This module contains the code for waiting and release of backends.
11 * All code in this module executes on the primary. The core streaming
12 * replication transport remains within WALreceiver/WALsender modules.
13 *
14 * The essence of this design is that it isolates all logic about
15 * waiting/releasing onto the primary. The primary defines which standbys
16 * it wishes to wait for. The standbys are completely unaware of the
17 * durability requirements of transactions on the primary, reducing the
18 * complexity of the code and streamlining both standby operations and
19 * network bandwidth because there is no requirement to ship
20 * per-transaction state information.
21 *
22 * Replication is either synchronous or not synchronous (async). If it is
23 * async, we just fastpath out of here. If it is sync, then we wait for
24 * the write, flush or apply location on the standby before releasing
25 * the waiting backend. Further complexity in that interaction is
26 * expected in later releases.
27 *
28 * The best performing way to manage the waiting backends is to have a
29 * single ordered queue of waiting backends, so that we can avoid
30 * searching the through all waiters each time we receive a reply.
31 *
32 * In 9.5 or before only a single standby could be considered as
33 * synchronous. In 9.6 we support a priority-based multiple synchronous
34 * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
35 * supported. The number of synchronous standbys that transactions
36 * must wait for replies from is specified in synchronous_standby_names.
37 * This parameter also specifies a list of standby names and the method
38 * (FIRST and ANY) to choose synchronous standbys from the listed ones.
39 *
40 * The method FIRST specifies a priority-based synchronous replication
41 * and makes transaction commits wait until their WAL records are
42 * replicated to the requested number of synchronous standbys chosen based
43 * on their priorities. The standbys whose names appear earlier in the list
44 * are given higher priority and will be considered as synchronous.
45 * Other standby servers appearing later in this list represent potential
46 * synchronous standbys. If any of the current synchronous standbys
47 * disconnects for whatever reason, it will be replaced immediately with
48 * the next-highest-priority standby.
49 *
50 * The method ANY specifies a quorum-based synchronous replication
51 * and makes transaction commits wait until their WAL records are
52 * replicated to at least the requested number of synchronous standbys
53 * in the list. All the standbys appearing in the list are considered as
54 * candidates for quorum synchronous standbys.
55 *
56 * If neither FIRST nor ANY is specified, FIRST is used as the method.
57 * This is for backward compatibility with 9.6 or before where only a
58 * priority-based sync replication was supported.
59 *
60 * Before the standbys chosen from synchronous_standby_names can
61 * become the synchronous standbys they must have caught up with
62 * the primary; that may take some time. Once caught up,
63 * the standbys which are considered as synchronous at that moment
64 * will release waiters from the queue.
65 *
66 * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
67 *
68 * IDENTIFICATION
69 * src/backend/replication/syncrep.c
70 *
71 *-------------------------------------------------------------------------
72 */
73 #include "postgres.h"
74
75 #include <unistd.h>
76
77 #include "access/xact.h"
78 #include "miscadmin.h"
79 #include "pgstat.h"
80 #include "replication/syncrep.h"
81 #include "replication/walsender.h"
82 #include "replication/walsender_private.h"
83 #include "storage/pmsignal.h"
84 #include "storage/proc.h"
85 #include "tcop/tcopprot.h"
86 #include "utils/builtins.h"
87 #include "utils/ps_status.h"
88
89 /* User-settable parameters for sync rep */
90 char *SyncRepStandbyNames;
91
92 #define SyncStandbysDefined() \
93 (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
94
95 static bool announce_next_takeover = true;
96
97 SyncRepConfigData *SyncRepConfig = NULL;
98 static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
99
100 static void SyncRepQueueInsert(int mode);
101 static void SyncRepCancelWait(void);
102 static int SyncRepWakeQueue(bool all, int mode);
103
104 static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
105 XLogRecPtr *flushPtr,
106 XLogRecPtr *applyPtr,
107 bool *am_sync);
108 static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
109 XLogRecPtr *flushPtr,
110 XLogRecPtr *applyPtr,
111 SyncRepStandbyData *sync_standbys,
112 int num_standbys);
113 static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
114 XLogRecPtr *flushPtr,
115 XLogRecPtr *applyPtr,
116 SyncRepStandbyData *sync_standbys,
117 int num_standbys,
118 uint8 nth);
119 static int SyncRepGetStandbyPriority(void);
120 static int standby_priority_comparator(const void *a, const void *b);
121 static int cmp_lsn(const void *a, const void *b);
122
123 #ifdef USE_ASSERT_CHECKING
124 static bool SyncRepQueueIsOrderedByLSN(int mode);
125 #endif
126
127 /*
128 * ===========================================================
129 * Synchronous Replication functions for normal user backends
130 * ===========================================================
131 */
132
133 /*
134 * Wait for synchronous replication, if requested by user.
135 *
136 * Initially backends start in state SYNC_REP_NOT_WAITING and then
137 * change that state to SYNC_REP_WAITING before adding ourselves
138 * to the wait queue. During SyncRepWakeQueue() a WALSender changes
139 * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
140 * This backend then resets its state to SYNC_REP_NOT_WAITING.
141 *
142 * 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
143 * represents a commit record. If it doesn't, then we wait only for the WAL
144 * to be flushed if synchronous_commit is set to the higher level of
145 * remote_apply, because only commit records provide apply feedback.
146 */
147 void
SyncRepWaitForLSN(XLogRecPtr lsn,bool commit)148 SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
149 {
150 char *new_status = NULL;
151 const char *old_status;
152 int mode;
153
154 /*
155 * This should be called while holding interrupts during a transaction
156 * commit to prevent the follow-up shared memory queue cleanups to be
157 * influenced by external interruptions.
158 */
159 Assert(InterruptHoldoffCount > 0);
160
161 /*
162 * Fast exit if user has not requested sync replication, or there are no
163 * sync replication standby names defined.
164 *
165 * Since this routine gets called every commit time, it's important to
166 * exit quickly if sync replication is not requested. So we check
167 * WalSndCtl->sync_standbys_defined flag without the lock and exit
168 * immediately if it's false. If it's true, we need to check it again
169 * later while holding the lock, to check the flag and operate the sync
170 * rep queue atomically. This is necessary to avoid the race condition
171 * described in SyncRepUpdateSyncStandbysDefined(). On the other hand, if
172 * it's false, the lock is not necessary because we don't touch the queue.
173 */
174 if (!SyncRepRequested() ||
175 !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
176 return;
177
178 /* Cap the level for anything other than commit to remote flush only. */
179 if (commit)
180 mode = SyncRepWaitMode;
181 else
182 mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
183
184 Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
185 Assert(WalSndCtl != NULL);
186
187 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
188 Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
189
190 /*
191 * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
192 * set. See SyncRepUpdateSyncStandbysDefined.
193 *
194 * Also check that the standby hasn't already replied. Unlikely race
195 * condition but we'll be fetching that cache line anyway so it's likely
196 * to be a low cost check.
197 */
198 if (!WalSndCtl->sync_standbys_defined ||
199 lsn <= WalSndCtl->lsn[mode])
200 {
201 LWLockRelease(SyncRepLock);
202 return;
203 }
204
205 /*
206 * Set our waitLSN so WALSender will know when to wake us, and add
207 * ourselves to the queue.
208 */
209 MyProc->waitLSN = lsn;
210 MyProc->syncRepState = SYNC_REP_WAITING;
211 SyncRepQueueInsert(mode);
212 Assert(SyncRepQueueIsOrderedByLSN(mode));
213 LWLockRelease(SyncRepLock);
214
215 /* Alter ps display to show waiting for sync rep. */
216 if (update_process_title)
217 {
218 int len;
219
220 old_status = get_ps_display(&len);
221 new_status = (char *) palloc(len + 32 + 1);
222 memcpy(new_status, old_status, len);
223 sprintf(new_status + len, " waiting for %X/%X",
224 LSN_FORMAT_ARGS(lsn));
225 set_ps_display(new_status);
226 new_status[len] = '\0'; /* truncate off " waiting ..." */
227 }
228
229 /*
230 * Wait for specified LSN to be confirmed.
231 *
232 * Each proc has its own wait latch, so we perform a normal latch
233 * check/wait loop here.
234 */
235 for (;;)
236 {
237 int rc;
238
239 /* Must reset the latch before testing state. */
240 ResetLatch(MyLatch);
241
242 /*
243 * Acquiring the lock is not needed, the latch ensures proper
244 * barriers. If it looks like we're done, we must really be done,
245 * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
246 * it will never update it again, so we can't be seeing a stale value
247 * in that case.
248 */
249 if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
250 break;
251
252 /*
253 * If a wait for synchronous replication is pending, we can neither
254 * acknowledge the commit nor raise ERROR or FATAL. The latter would
255 * lead the client to believe that the transaction aborted, which is
256 * not true: it's already committed locally. The former is no good
257 * either: the client has requested synchronous replication, and is
258 * entitled to assume that an acknowledged commit is also replicated,
259 * which might not be true. So in this case we issue a WARNING (which
260 * some clients may be able to interpret) and shut off further output.
261 * We do NOT reset ProcDiePending, so that the process will die after
262 * the commit is cleaned up.
263 */
264 if (ProcDiePending)
265 {
266 ereport(WARNING,
267 (errcode(ERRCODE_ADMIN_SHUTDOWN),
268 errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
269 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
270 whereToSendOutput = DestNone;
271 SyncRepCancelWait();
272 break;
273 }
274
275 /*
276 * It's unclear what to do if a query cancel interrupt arrives. We
277 * can't actually abort at this point, but ignoring the interrupt
278 * altogether is not helpful, so we just terminate the wait with a
279 * suitable warning.
280 */
281 if (QueryCancelPending)
282 {
283 QueryCancelPending = false;
284 ereport(WARNING,
285 (errmsg("canceling wait for synchronous replication due to user request"),
286 errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
287 SyncRepCancelWait();
288 break;
289 }
290
291 /*
292 * Wait on latch. Any condition that should wake us up will set the
293 * latch, so no need for timeout.
294 */
295 rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
296 WAIT_EVENT_SYNC_REP);
297
298 /*
299 * If the postmaster dies, we'll probably never get an acknowledgment,
300 * because all the wal sender processes will exit. So just bail out.
301 */
302 if (rc & WL_POSTMASTER_DEATH)
303 {
304 ProcDiePending = true;
305 whereToSendOutput = DestNone;
306 SyncRepCancelWait();
307 break;
308 }
309 }
310
311 /*
312 * WalSender has checked our LSN and has removed us from queue. Clean up
313 * state and leave. It's OK to reset these shared memory fields without
314 * holding SyncRepLock, because any walsenders will ignore us anyway when
315 * we're not on the queue. We need a read barrier to make sure we see the
316 * changes to the queue link (this might be unnecessary without
317 * assertions, but better safe than sorry).
318 */
319 pg_read_barrier();
320 Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
321 MyProc->syncRepState = SYNC_REP_NOT_WAITING;
322 MyProc->waitLSN = 0;
323
324 if (new_status)
325 {
326 /* Reset ps display */
327 set_ps_display(new_status);
328 pfree(new_status);
329 }
330 }
331
332 /*
333 * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
334 *
335 * Usually we will go at tail of queue, though it's possible that we arrive
336 * here out of order, so start at tail and work back to insertion point.
337 */
338 static void
SyncRepQueueInsert(int mode)339 SyncRepQueueInsert(int mode)
340 {
341 PGPROC *proc;
342
343 Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
344 proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
345 &(WalSndCtl->SyncRepQueue[mode]),
346 offsetof(PGPROC, syncRepLinks));
347
348 while (proc)
349 {
350 /*
351 * Stop at the queue element that we should after to ensure the queue
352 * is ordered by LSN.
353 */
354 if (proc->waitLSN < MyProc->waitLSN)
355 break;
356
357 proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
358 &(proc->syncRepLinks),
359 offsetof(PGPROC, syncRepLinks));
360 }
361
362 if (proc)
363 SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
364 else
365 SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
366 }
367
368 /*
369 * Acquire SyncRepLock and cancel any wait currently in progress.
370 */
371 static void
SyncRepCancelWait(void)372 SyncRepCancelWait(void)
373 {
374 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
375 if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
376 SHMQueueDelete(&(MyProc->syncRepLinks));
377 MyProc->syncRepState = SYNC_REP_NOT_WAITING;
378 LWLockRelease(SyncRepLock);
379 }
380
381 void
SyncRepCleanupAtProcExit(void)382 SyncRepCleanupAtProcExit(void)
383 {
384 /*
385 * First check if we are removed from the queue without the lock to not
386 * slow down backend exit.
387 */
388 if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
389 {
390 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
391
392 /* maybe we have just been removed, so recheck */
393 if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
394 SHMQueueDelete(&(MyProc->syncRepLinks));
395
396 LWLockRelease(SyncRepLock);
397 }
398 }
399
400 /*
401 * ===========================================================
402 * Synchronous Replication functions for wal sender processes
403 * ===========================================================
404 */
405
406 /*
407 * Take any action required to initialise sync rep state from config
408 * data. Called at WALSender startup and after each SIGHUP.
409 */
410 void
SyncRepInitConfig(void)411 SyncRepInitConfig(void)
412 {
413 int priority;
414
415 /*
416 * Determine if we are a potential sync standby and remember the result
417 * for handling replies from standby.
418 */
419 priority = SyncRepGetStandbyPriority();
420 if (MyWalSnd->sync_standby_priority != priority)
421 {
422 SpinLockAcquire(&MyWalSnd->mutex);
423 MyWalSnd->sync_standby_priority = priority;
424 SpinLockRelease(&MyWalSnd->mutex);
425
426 ereport(DEBUG1,
427 (errmsg_internal("standby \"%s\" now has synchronous standby priority %u",
428 application_name, priority)));
429 }
430 }
431
432 /*
433 * Update the LSNs on each queue based upon our latest state. This
434 * implements a simple policy of first-valid-sync-standby-releases-waiter.
435 *
436 * Other policies are possible, which would change what we do here and
437 * perhaps also which information we store as well.
438 */
439 void
SyncRepReleaseWaiters(void)440 SyncRepReleaseWaiters(void)
441 {
442 volatile WalSndCtlData *walsndctl = WalSndCtl;
443 XLogRecPtr writePtr;
444 XLogRecPtr flushPtr;
445 XLogRecPtr applyPtr;
446 bool got_recptr;
447 bool am_sync;
448 int numwrite = 0;
449 int numflush = 0;
450 int numapply = 0;
451
452 /*
453 * If this WALSender is serving a standby that is not on the list of
454 * potential sync standbys then we have nothing to do. If we are still
455 * starting up, still running base backup or the current flush position is
456 * still invalid, then leave quickly also. Streaming or stopping WAL
457 * senders are allowed to release waiters.
458 */
459 if (MyWalSnd->sync_standby_priority == 0 ||
460 (MyWalSnd->state != WALSNDSTATE_STREAMING &&
461 MyWalSnd->state != WALSNDSTATE_STOPPING) ||
462 XLogRecPtrIsInvalid(MyWalSnd->flush))
463 {
464 announce_next_takeover = true;
465 return;
466 }
467
468 /*
469 * We're a potential sync standby. Release waiters if there are enough
470 * sync standbys and we are considered as sync.
471 */
472 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
473
474 /*
475 * Check whether we are a sync standby or not, and calculate the synced
476 * positions among all sync standbys. (Note: although this step does not
477 * of itself require holding SyncRepLock, it seems like a good idea to do
478 * it after acquiring the lock. This ensures that the WAL pointers we use
479 * to release waiters are newer than any previous execution of this
480 * routine used.)
481 */
482 got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
483
484 /*
485 * If we are managing a sync standby, though we weren't prior to this,
486 * then announce we are now a sync standby.
487 */
488 if (announce_next_takeover && am_sync)
489 {
490 announce_next_takeover = false;
491
492 if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
493 ereport(LOG,
494 (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
495 application_name, MyWalSnd->sync_standby_priority)));
496 else
497 ereport(LOG,
498 (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
499 application_name)));
500 }
501
502 /*
503 * If the number of sync standbys is less than requested or we aren't
504 * managing a sync standby then just leave.
505 */
506 if (!got_recptr || !am_sync)
507 {
508 LWLockRelease(SyncRepLock);
509 announce_next_takeover = !am_sync;
510 return;
511 }
512
513 /*
514 * Set the lsn first so that when we wake backends they will release up to
515 * this location.
516 */
517 if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
518 {
519 walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
520 numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
521 }
522 if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
523 {
524 walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
525 numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
526 }
527 if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
528 {
529 walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
530 numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
531 }
532
533 LWLockRelease(SyncRepLock);
534
535 elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
536 numwrite, LSN_FORMAT_ARGS(writePtr),
537 numflush, LSN_FORMAT_ARGS(flushPtr),
538 numapply, LSN_FORMAT_ARGS(applyPtr));
539 }
540
541 /*
542 * Calculate the synced Write, Flush and Apply positions among sync standbys.
543 *
544 * Return false if the number of sync standbys is less than
545 * synchronous_standby_names specifies. Otherwise return true and
546 * store the positions into *writePtr, *flushPtr and *applyPtr.
547 *
548 * On return, *am_sync is set to true if this walsender is connecting to
549 * sync standby. Otherwise it's set to false.
550 */
551 static bool
SyncRepGetSyncRecPtr(XLogRecPtr * writePtr,XLogRecPtr * flushPtr,XLogRecPtr * applyPtr,bool * am_sync)552 SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
553 XLogRecPtr *applyPtr, bool *am_sync)
554 {
555 SyncRepStandbyData *sync_standbys;
556 int num_standbys;
557 int i;
558
559 /* Initialize default results */
560 *writePtr = InvalidXLogRecPtr;
561 *flushPtr = InvalidXLogRecPtr;
562 *applyPtr = InvalidXLogRecPtr;
563 *am_sync = false;
564
565 /* Quick out if not even configured to be synchronous */
566 if (SyncRepConfig == NULL)
567 return false;
568
569 /* Get standbys that are considered as synchronous at this moment */
570 num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
571
572 /* Am I among the candidate sync standbys? */
573 for (i = 0; i < num_standbys; i++)
574 {
575 if (sync_standbys[i].is_me)
576 {
577 *am_sync = true;
578 break;
579 }
580 }
581
582 /*
583 * Nothing more to do if we are not managing a sync standby or there are
584 * not enough synchronous standbys.
585 */
586 if (!(*am_sync) ||
587 num_standbys < SyncRepConfig->num_sync)
588 {
589 pfree(sync_standbys);
590 return false;
591 }
592
593 /*
594 * In a priority-based sync replication, the synced positions are the
595 * oldest ones among sync standbys. In a quorum-based, they are the Nth
596 * latest ones.
597 *
598 * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
599 * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
600 * because it's a bit more efficient.
601 *
602 * XXX If the numbers of current and requested sync standbys are the same,
603 * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
604 * positions even in a quorum-based sync replication.
605 */
606 if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
607 {
608 SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
609 sync_standbys, num_standbys);
610 }
611 else
612 {
613 SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
614 sync_standbys, num_standbys,
615 SyncRepConfig->num_sync);
616 }
617
618 pfree(sync_standbys);
619 return true;
620 }
621
622 /*
623 * Calculate the oldest Write, Flush and Apply positions among sync standbys.
624 */
625 static void
SyncRepGetOldestSyncRecPtr(XLogRecPtr * writePtr,XLogRecPtr * flushPtr,XLogRecPtr * applyPtr,SyncRepStandbyData * sync_standbys,int num_standbys)626 SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
627 XLogRecPtr *flushPtr,
628 XLogRecPtr *applyPtr,
629 SyncRepStandbyData *sync_standbys,
630 int num_standbys)
631 {
632 int i;
633
634 /*
635 * Scan through all sync standbys and calculate the oldest Write, Flush
636 * and Apply positions. We assume *writePtr et al were initialized to
637 * InvalidXLogRecPtr.
638 */
639 for (i = 0; i < num_standbys; i++)
640 {
641 XLogRecPtr write = sync_standbys[i].write;
642 XLogRecPtr flush = sync_standbys[i].flush;
643 XLogRecPtr apply = sync_standbys[i].apply;
644
645 if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
646 *writePtr = write;
647 if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
648 *flushPtr = flush;
649 if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
650 *applyPtr = apply;
651 }
652 }
653
654 /*
655 * Calculate the Nth latest Write, Flush and Apply positions among sync
656 * standbys.
657 */
658 static void
SyncRepGetNthLatestSyncRecPtr(XLogRecPtr * writePtr,XLogRecPtr * flushPtr,XLogRecPtr * applyPtr,SyncRepStandbyData * sync_standbys,int num_standbys,uint8 nth)659 SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
660 XLogRecPtr *flushPtr,
661 XLogRecPtr *applyPtr,
662 SyncRepStandbyData *sync_standbys,
663 int num_standbys,
664 uint8 nth)
665 {
666 XLogRecPtr *write_array;
667 XLogRecPtr *flush_array;
668 XLogRecPtr *apply_array;
669 int i;
670
671 /* Should have enough candidates, or somebody messed up */
672 Assert(nth > 0 && nth <= num_standbys);
673
674 write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
675 flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
676 apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
677
678 for (i = 0; i < num_standbys; i++)
679 {
680 write_array[i] = sync_standbys[i].write;
681 flush_array[i] = sync_standbys[i].flush;
682 apply_array[i] = sync_standbys[i].apply;
683 }
684
685 /* Sort each array in descending order */
686 qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
687 qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
688 qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
689
690 /* Get Nth latest Write, Flush, Apply positions */
691 *writePtr = write_array[nth - 1];
692 *flushPtr = flush_array[nth - 1];
693 *applyPtr = apply_array[nth - 1];
694
695 pfree(write_array);
696 pfree(flush_array);
697 pfree(apply_array);
698 }
699
700 /*
701 * Compare lsn in order to sort array in descending order.
702 */
703 static int
cmp_lsn(const void * a,const void * b)704 cmp_lsn(const void *a, const void *b)
705 {
706 XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
707 XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
708
709 if (lsn1 > lsn2)
710 return -1;
711 else if (lsn1 == lsn2)
712 return 0;
713 else
714 return 1;
715 }
716
717 /*
718 * Return data about walsenders that are candidates to be sync standbys.
719 *
720 * *standbys is set to a palloc'd array of structs of per-walsender data,
721 * and the number of valid entries (candidate sync senders) is returned.
722 * (This might be more or fewer than num_sync; caller must check.)
723 */
724 int
SyncRepGetCandidateStandbys(SyncRepStandbyData ** standbys)725 SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
726 {
727 int i;
728 int n;
729
730 /* Create result array */
731 *standbys = (SyncRepStandbyData *)
732 palloc(max_wal_senders * sizeof(SyncRepStandbyData));
733
734 /* Quick exit if sync replication is not requested */
735 if (SyncRepConfig == NULL)
736 return 0;
737
738 /* Collect raw data from shared memory */
739 n = 0;
740 for (i = 0; i < max_wal_senders; i++)
741 {
742 volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
743 * rearrangement */
744 SyncRepStandbyData *stby;
745 WalSndState state; /* not included in SyncRepStandbyData */
746
747 walsnd = &WalSndCtl->walsnds[i];
748 stby = *standbys + n;
749
750 SpinLockAcquire(&walsnd->mutex);
751 stby->pid = walsnd->pid;
752 state = walsnd->state;
753 stby->write = walsnd->write;
754 stby->flush = walsnd->flush;
755 stby->apply = walsnd->apply;
756 stby->sync_standby_priority = walsnd->sync_standby_priority;
757 SpinLockRelease(&walsnd->mutex);
758
759 /* Must be active */
760 if (stby->pid == 0)
761 continue;
762
763 /* Must be streaming or stopping */
764 if (state != WALSNDSTATE_STREAMING &&
765 state != WALSNDSTATE_STOPPING)
766 continue;
767
768 /* Must be synchronous */
769 if (stby->sync_standby_priority == 0)
770 continue;
771
772 /* Must have a valid flush position */
773 if (XLogRecPtrIsInvalid(stby->flush))
774 continue;
775
776 /* OK, it's a candidate */
777 stby->walsnd_index = i;
778 stby->is_me = (walsnd == MyWalSnd);
779 n++;
780 }
781
782 /*
783 * In quorum mode, we return all the candidates. In priority mode, if we
784 * have too many candidates then return only the num_sync ones of highest
785 * priority.
786 */
787 if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
788 n > SyncRepConfig->num_sync)
789 {
790 /* Sort by priority ... */
791 qsort(*standbys, n, sizeof(SyncRepStandbyData),
792 standby_priority_comparator);
793 /* ... then report just the first num_sync ones */
794 n = SyncRepConfig->num_sync;
795 }
796
797 return n;
798 }
799
800 /*
801 * qsort comparator to sort SyncRepStandbyData entries by priority
802 */
803 static int
standby_priority_comparator(const void * a,const void * b)804 standby_priority_comparator(const void *a, const void *b)
805 {
806 const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
807 const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
808
809 /* First, sort by increasing priority value */
810 if (sa->sync_standby_priority != sb->sync_standby_priority)
811 return sa->sync_standby_priority - sb->sync_standby_priority;
812
813 /*
814 * We might have equal priority values; arbitrarily break ties by position
815 * in the WALSnd array. (This is utterly bogus, since that is arrival
816 * order dependent, but there are regression tests that rely on it.)
817 */
818 return sa->walsnd_index - sb->walsnd_index;
819 }
820
821
822 /*
823 * Check if we are in the list of sync standbys, and if so, determine
824 * priority sequence. Return priority if set, or zero to indicate that
825 * we are not a potential sync standby.
826 *
827 * Compare the parameter SyncRepStandbyNames against the application_name
828 * for this WALSender, or allow any name if we find a wildcard "*".
829 */
830 static int
SyncRepGetStandbyPriority(void)831 SyncRepGetStandbyPriority(void)
832 {
833 const char *standby_name;
834 int priority;
835 bool found = false;
836
837 /*
838 * Since synchronous cascade replication is not allowed, we always set the
839 * priority of cascading walsender to zero.
840 */
841 if (am_cascading_walsender)
842 return 0;
843
844 if (!SyncStandbysDefined() || SyncRepConfig == NULL)
845 return 0;
846
847 standby_name = SyncRepConfig->member_names;
848 for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
849 {
850 if (pg_strcasecmp(standby_name, application_name) == 0 ||
851 strcmp(standby_name, "*") == 0)
852 {
853 found = true;
854 break;
855 }
856 standby_name += strlen(standby_name) + 1;
857 }
858
859 if (!found)
860 return 0;
861
862 /*
863 * In quorum-based sync replication, all the standbys in the list have the
864 * same priority, one.
865 */
866 return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
867 }
868
869 /*
870 * Walk the specified queue from head. Set the state of any backends that
871 * need to be woken, remove them from the queue, and then wake them.
872 * Pass all = true to wake whole queue; otherwise, just wake up to
873 * the walsender's LSN.
874 *
875 * The caller must hold SyncRepLock in exclusive mode.
876 */
877 static int
SyncRepWakeQueue(bool all,int mode)878 SyncRepWakeQueue(bool all, int mode)
879 {
880 volatile WalSndCtlData *walsndctl = WalSndCtl;
881 PGPROC *proc = NULL;
882 PGPROC *thisproc = NULL;
883 int numprocs = 0;
884
885 Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
886 Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
887 Assert(SyncRepQueueIsOrderedByLSN(mode));
888
889 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
890 &(WalSndCtl->SyncRepQueue[mode]),
891 offsetof(PGPROC, syncRepLinks));
892
893 while (proc)
894 {
895 /*
896 * Assume the queue is ordered by LSN
897 */
898 if (!all && walsndctl->lsn[mode] < proc->waitLSN)
899 return numprocs;
900
901 /*
902 * Move to next proc, so we can delete thisproc from the queue.
903 * thisproc is valid, proc may be NULL after this.
904 */
905 thisproc = proc;
906 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
907 &(proc->syncRepLinks),
908 offsetof(PGPROC, syncRepLinks));
909
910 /*
911 * Remove thisproc from queue.
912 */
913 SHMQueueDelete(&(thisproc->syncRepLinks));
914
915 /*
916 * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
917 * make sure that it sees the queue link being removed before the
918 * syncRepState change.
919 */
920 pg_write_barrier();
921
922 /*
923 * Set state to complete; see SyncRepWaitForLSN() for discussion of
924 * the various states.
925 */
926 thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
927
928 /*
929 * Wake only when we have set state and removed from queue.
930 */
931 SetLatch(&(thisproc->procLatch));
932
933 numprocs++;
934 }
935
936 return numprocs;
937 }
938
939 /*
940 * The checkpointer calls this as needed to update the shared
941 * sync_standbys_defined flag, so that backends don't remain permanently wedged
942 * if synchronous_standby_names is unset. It's safe to check the current value
943 * without the lock, because it's only ever updated by one process. But we
944 * must take the lock to change it.
945 */
946 void
SyncRepUpdateSyncStandbysDefined(void)947 SyncRepUpdateSyncStandbysDefined(void)
948 {
949 bool sync_standbys_defined = SyncStandbysDefined();
950
951 if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
952 {
953 LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
954
955 /*
956 * If synchronous_standby_names has been reset to empty, it's futile
957 * for backends to continue waiting. Since the user no longer wants
958 * synchronous replication, we'd better wake them up.
959 */
960 if (!sync_standbys_defined)
961 {
962 int i;
963
964 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
965 SyncRepWakeQueue(true, i);
966 }
967
968 /*
969 * Only allow people to join the queue when there are synchronous
970 * standbys defined. Without this interlock, there's a race
971 * condition: we might wake up all the current waiters; then, some
972 * backend that hasn't yet reloaded its config might go to sleep on
973 * the queue (and never wake up). This prevents that.
974 */
975 WalSndCtl->sync_standbys_defined = sync_standbys_defined;
976
977 LWLockRelease(SyncRepLock);
978 }
979 }
980
981 #ifdef USE_ASSERT_CHECKING
982 static bool
SyncRepQueueIsOrderedByLSN(int mode)983 SyncRepQueueIsOrderedByLSN(int mode)
984 {
985 PGPROC *proc = NULL;
986 XLogRecPtr lastLSN;
987
988 Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
989
990 lastLSN = 0;
991
992 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
993 &(WalSndCtl->SyncRepQueue[mode]),
994 offsetof(PGPROC, syncRepLinks));
995
996 while (proc)
997 {
998 /*
999 * Check the queue is ordered by LSN and that multiple procs don't
1000 * have matching LSNs
1001 */
1002 if (proc->waitLSN <= lastLSN)
1003 return false;
1004
1005 lastLSN = proc->waitLSN;
1006
1007 proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
1008 &(proc->syncRepLinks),
1009 offsetof(PGPROC, syncRepLinks));
1010 }
1011
1012 return true;
1013 }
1014 #endif
1015
1016 /*
1017 * ===========================================================
1018 * Synchronous Replication functions executed by any process
1019 * ===========================================================
1020 */
1021
1022 bool
check_synchronous_standby_names(char ** newval,void ** extra,GucSource source)1023 check_synchronous_standby_names(char **newval, void **extra, GucSource source)
1024 {
1025 if (*newval != NULL && (*newval)[0] != '\0')
1026 {
1027 int parse_rc;
1028 SyncRepConfigData *pconf;
1029
1030 /* Reset communication variables to ensure a fresh start */
1031 syncrep_parse_result = NULL;
1032 syncrep_parse_error_msg = NULL;
1033
1034 /* Parse the synchronous_standby_names string */
1035 syncrep_scanner_init(*newval);
1036 parse_rc = syncrep_yyparse();
1037 syncrep_scanner_finish();
1038
1039 if (parse_rc != 0 || syncrep_parse_result == NULL)
1040 {
1041 GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
1042 if (syncrep_parse_error_msg)
1043 GUC_check_errdetail("%s", syncrep_parse_error_msg);
1044 else
1045 GUC_check_errdetail("synchronous_standby_names parser failed");
1046 return false;
1047 }
1048
1049 if (syncrep_parse_result->num_sync <= 0)
1050 {
1051 GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
1052 syncrep_parse_result->num_sync);
1053 return false;
1054 }
1055
1056 /* GUC extra value must be malloc'd, not palloc'd */
1057 pconf = (SyncRepConfigData *)
1058 malloc(syncrep_parse_result->config_size);
1059 if (pconf == NULL)
1060 return false;
1061 memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
1062
1063 *extra = (void *) pconf;
1064
1065 /*
1066 * We need not explicitly clean up syncrep_parse_result. It, and any
1067 * other cruft generated during parsing, will be freed when the
1068 * current memory context is deleted. (This code is generally run in
1069 * a short-lived context used for config file processing, so that will
1070 * not be very long.)
1071 */
1072 }
1073 else
1074 *extra = NULL;
1075
1076 return true;
1077 }
1078
1079 void
assign_synchronous_standby_names(const char * newval,void * extra)1080 assign_synchronous_standby_names(const char *newval, void *extra)
1081 {
1082 SyncRepConfig = (SyncRepConfigData *) extra;
1083 }
1084
1085 void
assign_synchronous_commit(int newval,void * extra)1086 assign_synchronous_commit(int newval, void *extra)
1087 {
1088 switch (newval)
1089 {
1090 case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
1091 SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
1092 break;
1093 case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
1094 SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
1095 break;
1096 case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
1097 SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
1098 break;
1099 default:
1100 SyncRepWaitMode = SYNC_REP_NO_WAIT;
1101 break;
1102 }
1103 }
1104